Coverage for src / history / views.py: 0%

192 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-02-04 10:50 +0000

1import json 

2from typing import TYPE_CHECKING, Any, Callable 

3from urllib.parse import urlencode 

4 

5from django.core.paginator import Paginator 

6from django.db.models import F 

7from django.http import HttpResponse, JsonResponse 

8from django.http.response import HttpResponseBadRequest 

9from django.urls import reverse_lazy 

10from django.utils import timezone 

11from django.views import View 

12from django.views.generic import TemplateView 

13from django.views.generic.base import ContextMixin 

14from django.views.generic.detail import BaseDetailView 

15from django.views.generic.edit import DeleteView, UpdateView 

16from matching_back import views as matching_views 

17from opentelemetry import trace 

18from ptf import views as ptf_views 

19from ptf.exceptions import ServerUnderMaintenance 

20from ptf.models.classes.article import Article 

21from ptf.models.classes.collection import Collection 

22from requests.exceptions import Timeout 

23 

24from history.model_data import HistoryEventDict, HistoryEventStatus, HistoryEventType 

25from history.models import HistoryEvent, HistoryEventQuerySet 

26from history.utils import ( 

27 get_history_error_warning_counts, 

28 get_history_last_event_by, 

29 insert_history_event, 

30) 

31 

32if TYPE_CHECKING: 

33 from django import http 

34 from ptf.models.classes.resource import Resource 

35 

36tracer = trace.get_tracer(__name__) 

37 

38 

39def manage_exceptions( 

40 event: HistoryEventDict, 

41 exception: BaseException, 

42): 

43 if type(exception).__name__ == "ServerUnderMaintenance": 

44 message = " - ".join([str(exception), "Please try again later"]) 

45 else: 

46 message = " ".join([str(exception)]) 

47 

48 event["message"] = message 

49 

50 insert_history_event(event) 

51 

52 

53class HistoryEventUpdateView(UpdateView): 

54 model = HistoryEvent 

55 fields = ["status", "message"] 

56 success_url = reverse_lazy("history") 

57 

58 

59def matching_decorator(func: Callable[[Any, str], str], is_article): 

60 def inner(*args, **kwargs): 

61 article: Article 

62 

63 if is_article: 

64 article = args[0] 

65 else: 

66 article = args[0].resource.cast() 

67 

68 event: HistoryEventDict = { 

69 "type": "matching", 

70 "pid": article.pid, 

71 "col": article.get_top_collection(), 

72 "status": HistoryEventStatus.OK, 

73 } 

74 # Merge matching ids (can be zbl, numdam...) 

75 try: 

76 id_value = func(*args, **kwargs) 

77 

78 insert_history_event(event) 

79 return id_value 

80 

81 except Timeout as exception: 

82 """ 

83 Exception caused by the requests module: store it as a warning 

84 """ 

85 event["status"] = HistoryEventStatus.WARNING 

86 manage_exceptions(event, exception) 

87 raise exception 

88 

89 except ServerUnderMaintenance as exception: 

90 event["status"] = HistoryEventStatus.ERROR 

91 event["type"] = "deploy" 

92 manage_exceptions(event, exception) 

93 raise exception 

94 

95 except Exception as exception: 

96 event["status"] = HistoryEventStatus.ERROR 

97 manage_exceptions(event, exception) 

98 raise exception 

99 

100 return inner 

101 

102 

103def execute_and_record_func( 

104 type: HistoryEventType, 

105 pid, 

106 colid, 

107 func, 

108 message="", 

109 record_error_only=False, 

110 user=None, 

111 type_error=None, 

112 *func_args, 

113 **func_kwargs, 

114): 

115 status = 200 

116 func_result = None 

117 collection = None 

118 if colid not in ["ALL", "numdam"]: 

119 collection = Collection.objects.get(pid=colid) 

120 event: HistoryEventDict = { 

121 "type": type, 

122 "pid": pid, 

123 "col": collection, 

124 "status": HistoryEventStatus.OK, 

125 "message": message, 

126 } 

127 

128 try: 

129 func_result = func(*func_args, **func_kwargs) 

130 

131 if type_error: 

132 event["type_error"] = type_error 

133 

134 if not record_error_only: 

135 insert_history_event(event) 

136 except Timeout as exception: 

137 """ 

138 Exception caused by the requests module: store it as a warning 

139 """ 

140 event["status"] = HistoryEventStatus.WARNING 

141 

142 event["message"] = message 

143 

144 manage_exceptions(event, exception) 

145 raise exception 

146 except Exception as exception: 

147 event["status"] = HistoryEventStatus.ERROR 

148 

149 event["message"] = message 

150 manage_exceptions(event, exception) 

151 raise exception 

152 return func_result, status, message 

153 

154 

155def edit_decorator(func): 

156 def inner(self, action, *args, **kwargs): 

157 resource_obj: Resource = self.resource.cast() 

158 pid = resource_obj.pid 

159 colid = resource_obj.get_top_collection().pid 

160 

161 message = "" 

162 if hasattr(self, "obj"): 

163 # Edit 1 item (ExtId or BibItemId) 

164 obj = self.obj 

165 if hasattr(self, "parent"): 

166 parent = self.parent 

167 if parent: 

168 message += "[" + str(parent.sequence) + "] " 

169 list_ = obj.id_type.split("-") 

170 id_type = obj.id_type if len(list_) == 0 else list_[0] 

171 message += id_type + ":" + obj.id_value + " " + action 

172 else: 

173 message += "All " + action 

174 

175 args = (self, action) + args 

176 

177 execute_and_record_func( 

178 "edit", pid, colid, func, message, False, None, None, *args, **kwargs 

179 ) 

180 

181 return inner 

182 

183 

184ptf_views.UpdateExtIdView.update_obj = edit_decorator(ptf_views.UpdateExtIdView.update_obj) 

185matching_views.UpdateMatchingView.update_obj = edit_decorator( 

186 matching_views.UpdateMatchingView.update_obj 

187) 

188 

189 

190def getLastHistoryImport(pid): 

191 data = get_history_last_event_by(type="import", pid=pid) 

192 return data 

193 

194 

195class HistoryContextMixin(ContextMixin): 

196 def get_context_data(self, **kwargs): 

197 context = super().get_context_data(**kwargs) 

198 error_count, warning_count = get_history_error_warning_counts() 

199 context["warning_count"] = warning_count 

200 context["error_count"] = error_count 

201 

202 # if isinstance(last_clockss_event, datetime): 

203 # now = timezone.now() 

204 # td = now - last_clockss_event['created_on'] 

205 # context['last_clockss_event'] = td.days 

206 return context 

207 

208 

209class HistoryView(TemplateView, HistoryContextMixin): 

210 template_name = "history.html" 

211 accepted_params = ["type", "col", "status", "month"] 

212 

213 def get_context_data(self, **kwargs): 

214 context = super().get_context_data(**kwargs) 

215 

216 filters = {} 

217 

218 urlparams = {k: v[0] if isinstance(v, list) else v for k, v in self.request.GET.items()} 

219 if "page" in urlparams: 

220 del urlparams["page"] 

221 if "search" in urlparams: 

222 del urlparams["search"] 

223 

224 for filter in self.accepted_params: 

225 value = self.request.GET.get(filter, None) 

226 if value: 

227 filters[filter] = value 

228 

229 # Get current URL params without current filter 

230 _params = {**urlparams} 

231 if filter in _params: 

232 del _params[filter] 

233 

234 context[filter + "_link"] = "?" + urlencode(_params) 

235 

236 # filter_by_month = False # Ignore months for Now 

237 # filters.setdefault("status", Q(status="ERROR") | Q(status="WARNING")) 

238 

239 # if filter_by_month: 

240 # today = datetime.datetime.today() 

241 # filters["created_on__year"] = today.year 

242 # filters["created_on__month"] = today.month 

243 if "col" in filters: 

244 filters["col__pid"] = filters["col"] 

245 del filters["col"] 

246 qs = HistoryEvent.objects.filter(**filters).order_by("-created_on") 

247 

248 paginator = Paginator(qs, 50) 

249 try: 

250 page = int(self.request.GET.get("page", 1)) 

251 except ValueError: 

252 page = 1 

253 

254 if page > paginator.num_pages: 

255 page = paginator.num_pages 

256 

257 context["page_obj"] = paginator.get_page(page) 

258 context["paginator"] = { 

259 "page_range": paginator.get_elided_page_range(number=page, on_each_side=2, on_ends=1), 

260 } 

261 

262 context["now"] = timezone.now() 

263 

264 if "col__pid" in filters: 

265 del filters["col__pid"] 

266 context["collections"] = ( 

267 HistoryEvent.objects.filter(col__isnull=False, **filters) 

268 .values(colid=F("col__pid"), name=F("col__title_html")) 

269 .distinct("col__title_html") 

270 .order_by("col__title_html") 

271 ) 

272 return context 

273 

274 

275class HistoryClearView(DeleteView): 

276 model = HistoryEvent 

277 

278 template_name_suffix = "_confirm_clear" 

279 success_url = reverse_lazy("history") 

280 

281 def get_object(self, queryset=None): 

282 # for deleting events excluding latest 

283 _queryset: HistoryEventQuerySet = self.get_queryset() 

284 return _queryset.get_stale_events() 

285 # return self.get_queryset() 

286 

287 

288class HistoryEventDeleteView(DeleteView): 

289 model = HistoryEvent 

290 success_url = reverse_lazy("history") 

291 

292 

293class HistoryAPIView(BaseDetailView): 

294 model = HistoryEvent 

295 

296 http_method_names = ["get"] 

297 

298 @tracer.start_as_current_span("HistoryAPIView.get") 

299 def get(self, request: "http.HttpRequest", pk: int, datatype: str): 

300 object: HistoryEvent = self.get_object() 

301 if datatype == "message": 

302 return HttpResponse(object.message) 

303 if datatype == "table": 

304 data = [] 

305 for a in object.children.all().prefetch_related("resource"): 

306 entry = { 

307 "type": a.type, 

308 "status": a.status, 

309 "status_message": a.status_message, 

310 "message": a.message, 

311 "score": a.score, 

312 "url": a.url, 

313 } 

314 if a.resource: 

315 entry["resource_pid"] = a.resource.pid 

316 data.append(entry) 

317 return JsonResponse( 

318 { 

319 "data": data, 

320 "headers": [ 

321 "resource_pid", 

322 "type", 

323 "status", 

324 "status_message", 

325 "message", 

326 "score", 

327 "url", 

328 ], 

329 "source": object.source, 

330 "pid": object.pid, 

331 "col": object.col.pid if object.col else None, 

332 "date": object.created_on, 

333 } 

334 ) 

335 return HttpResponseBadRequest() 

336 

337 

338class HistoryEventInsert(View): 

339 def post(self, request: "http.HttpRequest"): 

340 # Todo : Sanitarization ? 

341 event = json.loads(request.body.decode("utf-8")) 

342 if "colid" in event: 

343 colid = event["colid"] 

344 del event["colid"] 

345 collection = Collection.objects.get(pid=colid) 

346 event["col"] = collection 

347 insert_history_event(event) 

348 response = HttpResponse() 

349 response.status_code = 201 

350 return response