Coverage for src / history / views.py: 0%
192 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-02-04 10:50 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-02-04 10:50 +0000
1import json
2from typing import TYPE_CHECKING, Any, Callable
3from urllib.parse import urlencode
5from django.core.paginator import Paginator
6from django.db.models import F
7from django.http import HttpResponse, JsonResponse
8from django.http.response import HttpResponseBadRequest
9from django.urls import reverse_lazy
10from django.utils import timezone
11from django.views import View
12from django.views.generic import TemplateView
13from django.views.generic.base import ContextMixin
14from django.views.generic.detail import BaseDetailView
15from django.views.generic.edit import DeleteView, UpdateView
16from matching_back import views as matching_views
17from opentelemetry import trace
18from ptf import views as ptf_views
19from ptf.exceptions import ServerUnderMaintenance
20from ptf.models.classes.article import Article
21from ptf.models.classes.collection import Collection
22from requests.exceptions import Timeout
24from history.model_data import HistoryEventDict, HistoryEventStatus, HistoryEventType
25from history.models import HistoryEvent, HistoryEventQuerySet
26from history.utils import (
27 get_history_error_warning_counts,
28 get_history_last_event_by,
29 insert_history_event,
30)
32if TYPE_CHECKING:
33 from django import http
34 from ptf.models.classes.resource import Resource
36tracer = trace.get_tracer(__name__)
39def manage_exceptions(
40 event: HistoryEventDict,
41 exception: BaseException,
42):
43 if type(exception).__name__ == "ServerUnderMaintenance":
44 message = " - ".join([str(exception), "Please try again later"])
45 else:
46 message = " ".join([str(exception)])
48 event["message"] = message
50 insert_history_event(event)
53class HistoryEventUpdateView(UpdateView):
54 model = HistoryEvent
55 fields = ["status", "message"]
56 success_url = reverse_lazy("history")
59def matching_decorator(func: Callable[[Any, str], str], is_article):
60 def inner(*args, **kwargs):
61 article: Article
63 if is_article:
64 article = args[0]
65 else:
66 article = args[0].resource.cast()
68 event: HistoryEventDict = {
69 "type": "matching",
70 "pid": article.pid,
71 "col": article.get_top_collection(),
72 "status": HistoryEventStatus.OK,
73 }
74 # Merge matching ids (can be zbl, numdam...)
75 try:
76 id_value = func(*args, **kwargs)
78 insert_history_event(event)
79 return id_value
81 except Timeout as exception:
82 """
83 Exception caused by the requests module: store it as a warning
84 """
85 event["status"] = HistoryEventStatus.WARNING
86 manage_exceptions(event, exception)
87 raise exception
89 except ServerUnderMaintenance as exception:
90 event["status"] = HistoryEventStatus.ERROR
91 event["type"] = "deploy"
92 manage_exceptions(event, exception)
93 raise exception
95 except Exception as exception:
96 event["status"] = HistoryEventStatus.ERROR
97 manage_exceptions(event, exception)
98 raise exception
100 return inner
103def execute_and_record_func(
104 type: HistoryEventType,
105 pid,
106 colid,
107 func,
108 message="",
109 record_error_only=False,
110 user=None,
111 type_error=None,
112 *func_args,
113 **func_kwargs,
114):
115 status = 200
116 func_result = None
117 collection = None
118 if colid not in ["ALL", "numdam"]:
119 collection = Collection.objects.get(pid=colid)
120 event: HistoryEventDict = {
121 "type": type,
122 "pid": pid,
123 "col": collection,
124 "status": HistoryEventStatus.OK,
125 "message": message,
126 }
128 try:
129 func_result = func(*func_args, **func_kwargs)
131 if type_error:
132 event["type_error"] = type_error
134 if not record_error_only:
135 insert_history_event(event)
136 except Timeout as exception:
137 """
138 Exception caused by the requests module: store it as a warning
139 """
140 event["status"] = HistoryEventStatus.WARNING
142 event["message"] = message
144 manage_exceptions(event, exception)
145 raise exception
146 except Exception as exception:
147 event["status"] = HistoryEventStatus.ERROR
149 event["message"] = message
150 manage_exceptions(event, exception)
151 raise exception
152 return func_result, status, message
155def edit_decorator(func):
156 def inner(self, action, *args, **kwargs):
157 resource_obj: Resource = self.resource.cast()
158 pid = resource_obj.pid
159 colid = resource_obj.get_top_collection().pid
161 message = ""
162 if hasattr(self, "obj"):
163 # Edit 1 item (ExtId or BibItemId)
164 obj = self.obj
165 if hasattr(self, "parent"):
166 parent = self.parent
167 if parent:
168 message += "[" + str(parent.sequence) + "] "
169 list_ = obj.id_type.split("-")
170 id_type = obj.id_type if len(list_) == 0 else list_[0]
171 message += id_type + ":" + obj.id_value + " " + action
172 else:
173 message += "All " + action
175 args = (self, action) + args
177 execute_and_record_func(
178 "edit", pid, colid, func, message, False, None, None, *args, **kwargs
179 )
181 return inner
184ptf_views.UpdateExtIdView.update_obj = edit_decorator(ptf_views.UpdateExtIdView.update_obj)
185matching_views.UpdateMatchingView.update_obj = edit_decorator(
186 matching_views.UpdateMatchingView.update_obj
187)
190def getLastHistoryImport(pid):
191 data = get_history_last_event_by(type="import", pid=pid)
192 return data
195class HistoryContextMixin(ContextMixin):
196 def get_context_data(self, **kwargs):
197 context = super().get_context_data(**kwargs)
198 error_count, warning_count = get_history_error_warning_counts()
199 context["warning_count"] = warning_count
200 context["error_count"] = error_count
202 # if isinstance(last_clockss_event, datetime):
203 # now = timezone.now()
204 # td = now - last_clockss_event['created_on']
205 # context['last_clockss_event'] = td.days
206 return context
209class HistoryView(TemplateView, HistoryContextMixin):
210 template_name = "history.html"
211 accepted_params = ["type", "col", "status", "month"]
213 def get_context_data(self, **kwargs):
214 context = super().get_context_data(**kwargs)
216 filters = {}
218 urlparams = {k: v[0] if isinstance(v, list) else v for k, v in self.request.GET.items()}
219 if "page" in urlparams:
220 del urlparams["page"]
221 if "search" in urlparams:
222 del urlparams["search"]
224 for filter in self.accepted_params:
225 value = self.request.GET.get(filter, None)
226 if value:
227 filters[filter] = value
229 # Get current URL params without current filter
230 _params = {**urlparams}
231 if filter in _params:
232 del _params[filter]
234 context[filter + "_link"] = "?" + urlencode(_params)
236 # filter_by_month = False # Ignore months for Now
237 # filters.setdefault("status", Q(status="ERROR") | Q(status="WARNING"))
239 # if filter_by_month:
240 # today = datetime.datetime.today()
241 # filters["created_on__year"] = today.year
242 # filters["created_on__month"] = today.month
243 if "col" in filters:
244 filters["col__pid"] = filters["col"]
245 del filters["col"]
246 qs = HistoryEvent.objects.filter(**filters).order_by("-created_on")
248 paginator = Paginator(qs, 50)
249 try:
250 page = int(self.request.GET.get("page", 1))
251 except ValueError:
252 page = 1
254 if page > paginator.num_pages:
255 page = paginator.num_pages
257 context["page_obj"] = paginator.get_page(page)
258 context["paginator"] = {
259 "page_range": paginator.get_elided_page_range(number=page, on_each_side=2, on_ends=1),
260 }
262 context["now"] = timezone.now()
264 if "col__pid" in filters:
265 del filters["col__pid"]
266 context["collections"] = (
267 HistoryEvent.objects.filter(col__isnull=False, **filters)
268 .values(colid=F("col__pid"), name=F("col__title_html"))
269 .distinct("col__title_html")
270 .order_by("col__title_html")
271 )
272 return context
275class HistoryClearView(DeleteView):
276 model = HistoryEvent
278 template_name_suffix = "_confirm_clear"
279 success_url = reverse_lazy("history")
281 def get_object(self, queryset=None):
282 # for deleting events excluding latest
283 _queryset: HistoryEventQuerySet = self.get_queryset()
284 return _queryset.get_stale_events()
285 # return self.get_queryset()
288class HistoryEventDeleteView(DeleteView):
289 model = HistoryEvent
290 success_url = reverse_lazy("history")
293class HistoryAPIView(BaseDetailView):
294 model = HistoryEvent
296 http_method_names = ["get"]
298 @tracer.start_as_current_span("HistoryAPIView.get")
299 def get(self, request: "http.HttpRequest", pk: int, datatype: str):
300 object: HistoryEvent = self.get_object()
301 if datatype == "message":
302 return HttpResponse(object.message)
303 if datatype == "table":
304 data = []
305 for a in object.children.all().prefetch_related("resource"):
306 entry = {
307 "type": a.type,
308 "status": a.status,
309 "status_message": a.status_message,
310 "message": a.message,
311 "score": a.score,
312 "url": a.url,
313 }
314 if a.resource:
315 entry["resource_pid"] = a.resource.pid
316 data.append(entry)
317 return JsonResponse(
318 {
319 "data": data,
320 "headers": [
321 "resource_pid",
322 "type",
323 "status",
324 "status_message",
325 "message",
326 "score",
327 "url",
328 ],
329 "source": object.source,
330 "pid": object.pid,
331 "col": object.col.pid if object.col else None,
332 "date": object.created_on,
333 }
334 )
335 return HttpResponseBadRequest()
338class HistoryEventInsert(View):
339 def post(self, request: "http.HttpRequest"):
340 # Todo : Sanitarization ?
341 event = json.loads(request.body.decode("utf-8"))
342 if "colid" in event:
343 colid = event["colid"]
344 del event["colid"]
345 collection = Collection.objects.get(pid=colid)
346 event["col"] = collection
347 insert_history_event(event)
348 response = HttpResponse()
349 response.status_code = 201
350 return response