Coverage for src/task/runner.py: 0%
66 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-04-18 12:36 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-04-18 12:36 +0000
1import time
2from collections.abc import Callable
4from celery import Task, shared_task
5from celery.events.dispatcher import Event, EventDispatcher
6from celery.result import AsyncResult
7from celery.signals import worker_ready
8from django_celery_results.models import TaskResult
10from task.custom_task import CustomTask, TaskError
11from task.receiver import clear_progress, start_receiver
14@worker_ready.connect
15def on_start(sender, **kwargs):
16 TaskResult.objects.all().delete()
19def run_task(
20 function: Callable | CustomTask,
21 *args,
22 name: str | None = None,
23 with_progression: bool = True,
24 **kwargs,
25):
26 print(args)
27 running_task = run_as_task.delay(function, name, with_progression, *args, **kwargs)
28 print(running_task.id)
30 start_receiver(running_task.app)
33def get_active_task():
34 running_task = TaskResult.objects.filter(status="PROGRESS")
35 if len(running_task) == 0:
36 return None
37 return AsyncResult(running_task[0].task_id)
40def recover_from_error(task: CustomTask, error: TaskError):
41 return task.recover(error)
44def cancel_task(task_id):
45 task = AsyncResult(task_id)
46 task.revoke(terminate=True)
47 clear_progress()
50@shared_task(bind=True)
51def run_as_task(
52 self: Task,
53 function: Callable | type[CustomTask],
54 name: str,
55 with_progression: bool,
56 *args,
57 **kwargs,
58): # Dans Task
59 if with_progression:
60 dispatcher = EventDispatcher(
61 connection=self.app.connection_for_write(), app=self.app, groups=None
62 )
64 def injected_set_progress(
65 progress: int, progress_text: str | None = None
66 ): # Inside Worker
67 # Injects the task and the dispatcher into progress_callback
68 # Goal : abstract the Task module on the consumer side
69 return set_progress(self, dispatcher, progress, progress_text)
71 self.name = name
72 if isinstance(function, type) and issubclass(
73 function, CustomTask
74 ): # function is a CustomTask class
75 self.name = name or function.__name__
76 function = function()
77 elif isinstance(function, CustomTask): # function is a CustomTask instance
78 self.name = name or function.__class__.__name__
79 else: # function is a random callable
80 self.name = name or function.__name__
81 function = CustomTask(function)
83 if with_progression:
84 function.set_progress_callback(injected_set_progress)
85 function.update_progress()
87 print(str(args) + " " + str(kwargs))
88 try:
89 output = function(*args, **kwargs)
90 except TaskError as error:
91 print("An error occurred while executing a task")
92 import traceback
94 traceback.print_exc()
95 print(error)
96 # output = recover_from_error(function, error)
97 # We would probably want to log the failure and serialize the error so that we can try to recover from the failure
98 output = None
100 if with_progression:
101 time.sleep(1) # Wait for the last progress update to be sent
102 function.update_progress()
103 dispatcher.close()
105 print("Résultat: " + str(output))
106 return output
109def set_progress(
110 task: Task, dispatcher: EventDispatcher, progress: int, progress_data: dict = {}
111): # Dans Task
112 # Not sure if that's useful, it saves the current progress state in the DB, it could spam it i guess?
113 meta = {
114 "name": task.name,
115 "current": progress,
116 "total": 100,
117 "progress_data": progress_data,
118 }
120 task.update_state(state="PROGRESS", meta=meta)
122 def e(_, *args, **kwargs):
123 return Event(
124 "task-progress",
125 uuid=task.request.id,
126 progress=progress,
127 data=progress_data,
128 task_name=task.name,
129 *args,
130 **kwargs,
131 )
133 dispatcher.send(type="task-progress", Event=e)