Coverage for src/task/views.py: 0%
41 statements
« prev ^ index » next coverage.py v7.9.0, created at 2025-10-13 12:26 +0000
« prev ^ index » next coverage.py v7.9.0, created at 2025-10-13 12:26 +0000
1import json
3from celery import Celery
4from django.http import JsonResponse, StreamingHttpResponse
5from django.urls import reverse
6from django.views.generic import ListView, RedirectView, TemplateView, View
7from django_celery_results.models import TaskResult
9from task.receiver import get_last_event, last_event, start_receiver, subscribe
10from task.runner import get_active_task
12#
13# base classes / util functions to handle tasks used in Celery
14#
15# To display a progress bar:
16# - an HistoryEvent is needed to store the tasks count.
17# - TaskResult are then used to get the status of each tasks
18#
19# This solution allows you to create a Celery chain:
20# the tasks are created one by one (counting all the TaskResults does not give the total count).
21#
22# Different task (names) can be used in 1 progress bar.
23# For example a task can 1 used at the collection level which creates many issue tasks.
24# To monitor the progress, we need to know the names of the TaskResult to track.
25# You need to write a get_XXX_task_names() function to return the list of task names
27# See gdml/backend/tasks for an example of a chain
30def get_messages_in_task_queue():
31 app = Celery("task")
32 app.config_from_object("django.conf:settings", namespace="CELERY")
34 with app.connection_or_acquire() as conn:
35 remaining = conn.default_channel.queue_declare(queue="celery", passive=True).message_count
36 return remaining
39class ProgressBarMixin:
40 """
41 Mixin used to inject the running_task uuid inside a page that would need it
42 """
44 def get_context_data(self, *args, **kwargs):
45 context = super().get_context_data(*args, **kwargs)
47 context["running_task"] = get_active_task()
49 return context
52class TaskProgressAPIView(View):
53 def get(self, request, task_name: str | None = None):
54 global last_event
55 """
56 Returns a JSON object with the progress of the Celery tasks, based on the task name.
57 - task_mod is the python module where the get_XXX_task_names function is defined.
58 Ex: task_mod="backend.tasks"
60 get_XXX_task_names has to return a list of Celery task names to track.
61 Ex: ["backend.tasks.crawl_collection_task", "backend.tasks.crawl_issue_task"]
62 """
63 start_receiver()
64 # Bypass SSE
65 return JsonResponse(get_last_event())
67 def stream_response():
68 """
69 Subscribe to the task response queue and everytime we receive something,
70 send it to the client using a StreamingHttpResponse
71 """
72 q = subscribe()
74 while True:
75 result = q.get()
76 if not task_name or task_name == result.get("task_name"):
77 yield f"data: {json.dumps(result)}\n\n"
79 response_format = request.GET.get("format", "stream")
81 if response_format == "json":
82 response = JsonResponse(last_event)
83 else:
84 response = StreamingHttpResponse(stream_response(), content_type="text/event-stream")
86 return response
89class TasksView(TemplateView):
90 template_name = "tasks.html"
91 model = TaskResult
93 def get_context_data(self, **kwargs):
94 context = super().get_context_data(**kwargs)
95 context["tasks"] = list(TaskResult.objects.all())
96 return context
99class TaskFailedListView(ListView):
100 model = TaskResult
102 def get_queryset(self):
103 qs = super().get_queryset()
104 qs = qs.filter(
105 status="FAILURE",
106 task_name=self.kwargs["task_name"],
107 )
108 return qs
111class TasksDeleteView(RedirectView):
112 def get_redirect_url(self, *args, **kwargs):
113 TaskResult.objects.all().delete()
115 return reverse("tasks")