Coverage for src/task/views.py: 0%

47 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-03 13:51 +0000

1import json 

2 

3from celery import Celery 

4from django.http import JsonResponse, StreamingHttpResponse 

5from django.urls import reverse 

6from django.views.generic import ListView, RedirectView, TemplateView, View 

7from django_celery_results.models import TaskResult 

8 

9from task.receiver import subscribe 

10from task.runner import get_active_task 

11 

12# 

13# base classes / util functions to handle tasks used in Celery 

14# 

15# To display a progress bar: 

16# - an HistoryEvent is needed to store the tasks count. 

17# - TaskResult are then used to get the status of each tasks 

18# 

19# This solution allows you to create a Celery chain: 

20# the tasks are created one by one (counting all the TaskResults does not give the total count). 

21# 

22# Different task (names) can be used in 1 progress bar. 

23# For example a task can 1 used at the collection level which creates many issue tasks. 

24# To monitor the progress, we need to know the names of the TaskResult to track. 

25# You need to write a get_XXX_task_names() function to return the list of task names 

26 

27# See gdml/backend/tasks for an example of a chain 

28 

29 

30def get_messages_in_task_queue(): 

31 app = Celery("task") 

32 app.config_from_object("django.conf:settings", namespace="CELERY") 

33 

34 with app.connection_or_acquire() as conn: 

35 remaining = conn.default_channel.queue_declare(queue="celery", passive=True).message_count 

36 return remaining 

37 

38 

39class ProgressBarMixin: 

40 """ 

41 Mixin used to inject the running_task uuid inside a page that would need it 

42 """ 

43 

44 def get_context_data(self, *args, **kwargs): 

45 context = super().get_context_data(*args, **kwargs) 

46 

47 context["running_task"] = get_active_task() 

48 

49 return context 

50 

51 

52class TaskProgressAPIView(View): 

53 def get(self, request, *args, task_mod=""): 

54 """ 

55 Returns a JSON object with the progress of the Celery tasks, based on the task name. 

56 - task_mod is the python module where the get_XXX_task_names function is defined. 

57 Ex: task_mod="backend.tasks" 

58 

59 get_XXX_task_names has to return a list of Celery task names to track. 

60 Ex: ["backend.tasks.crawl_collection_task", "backend.tasks.crawl_issue_task"] 

61 """ 

62 

63 def stream_response(): 

64 """ 

65 Subscribe to the task response queue and everytime we receive something, 

66 send it to the client using a StreamingHttpResponse 

67 """ 

68 q = subscribe() 

69 

70 while True: 

71 result = q.get() 

72 yield f"data: {json.dumps(result)}\n\n" 

73 

74 response_format = request.GET.get("format", "stream") 

75 if response_format == "json": 

76 response = JsonResponse({}) 

77 else: 

78 response = StreamingHttpResponse(stream_response(), content_type="text/event-stream") 

79 return response 

80 

81 

82class TasksView(TemplateView): 

83 template_name = "tasks.html" 

84 

85 def get_context_data(self, **kwargs): 

86 context = super().get_context_data(**kwargs) 

87 context["tasks"] = list(TaskResult.objects.all()) 

88 return context 

89 

90 

91class TaskFailedListView(ListView): 

92 model = TaskResult 

93 

94 def get_queryset(self): 

95 qs = super().get_queryset() 

96 qs = qs.filter( 

97 status="FAILURE", 

98 task_name=self.kwargs["task_name"], 

99 ) 

100 return qs 

101 

102 

103class TasksDeleteView(RedirectView): 

104 def get_redirect_url(self, *args, **kwargs): 

105 TaskResult.objects.all().delete() 

106 

107 return reverse("tasks")