Coverage for src/task/views.py: 0%
47 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-03 13:51 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-03 13:51 +0000
1import json
3from celery import Celery
4from django.http import JsonResponse, StreamingHttpResponse
5from django.urls import reverse
6from django.views.generic import ListView, RedirectView, TemplateView, View
7from django_celery_results.models import TaskResult
9from task.receiver import subscribe
10from task.runner import get_active_task
12#
13# base classes / util functions to handle tasks used in Celery
14#
15# To display a progress bar:
16# - an HistoryEvent is needed to store the tasks count.
17# - TaskResult are then used to get the status of each tasks
18#
19# This solution allows you to create a Celery chain:
20# the tasks are created one by one (counting all the TaskResults does not give the total count).
21#
22# Different task (names) can be used in 1 progress bar.
23# For example a task can 1 used at the collection level which creates many issue tasks.
24# To monitor the progress, we need to know the names of the TaskResult to track.
25# You need to write a get_XXX_task_names() function to return the list of task names
27# See gdml/backend/tasks for an example of a chain
30def get_messages_in_task_queue():
31 app = Celery("task")
32 app.config_from_object("django.conf:settings", namespace="CELERY")
34 with app.connection_or_acquire() as conn:
35 remaining = conn.default_channel.queue_declare(queue="celery", passive=True).message_count
36 return remaining
39class ProgressBarMixin:
40 """
41 Mixin used to inject the running_task uuid inside a page that would need it
42 """
44 def get_context_data(self, *args, **kwargs):
45 context = super().get_context_data(*args, **kwargs)
47 context["running_task"] = get_active_task()
49 return context
52class TaskProgressAPIView(View):
53 def get(self, request, *args, task_mod=""):
54 """
55 Returns a JSON object with the progress of the Celery tasks, based on the task name.
56 - task_mod is the python module where the get_XXX_task_names function is defined.
57 Ex: task_mod="backend.tasks"
59 get_XXX_task_names has to return a list of Celery task names to track.
60 Ex: ["backend.tasks.crawl_collection_task", "backend.tasks.crawl_issue_task"]
61 """
63 def stream_response():
64 """
65 Subscribe to the task response queue and everytime we receive something,
66 send it to the client using a StreamingHttpResponse
67 """
68 q = subscribe()
70 while True:
71 result = q.get()
72 yield f"data: {json.dumps(result)}\n\n"
74 response_format = request.GET.get("format", "stream")
75 if response_format == "json":
76 response = JsonResponse({})
77 else:
78 response = StreamingHttpResponse(stream_response(), content_type="text/event-stream")
79 return response
82class TasksView(TemplateView):
83 template_name = "tasks.html"
85 def get_context_data(self, **kwargs):
86 context = super().get_context_data(**kwargs)
87 context["tasks"] = list(TaskResult.objects.all())
88 return context
91class TaskFailedListView(ListView):
92 model = TaskResult
94 def get_queryset(self):
95 qs = super().get_queryset()
96 qs = qs.filter(
97 status="FAILURE",
98 task_name=self.kwargs["task_name"],
99 )
100 return qs
103class TasksDeleteView(RedirectView):
104 def get_redirect_url(self, *args, **kwargs):
105 TaskResult.objects.all().delete()
107 return reverse("tasks")