Coverage for src/task/runner.py: 0%
67 statements
« prev ^ index » next coverage.py v7.9.0, created at 2025-10-13 12:26 +0000
« prev ^ index » next coverage.py v7.9.0, created at 2025-10-13 12:26 +0000
1import logging
2import time
3from collections.abc import Callable
5from celery import Task, shared_task, states
6from celery.events.dispatcher import Event, EventDispatcher
7from celery.result import AsyncResult
8from celery.signals import worker_ready
9from django_celery_results.models import TaskResult
11from task.custom_task import CustomTask, TaskError
12from task.receiver import clear_progress, start_receiver
14logger = logging.getLogger(__name__)
17@worker_ready.connect
18def on_start(sender, **kwargs):
19 TaskResult.objects.all().delete()
22def run_task(
23 function: Callable | CustomTask,
24 *args,
25 **kwargs,
26):
27 # logger.debug(args)
28 running_task = run_as_task.delay(function, *args, **kwargs)
29 logger.debug("Started new task : " + running_task.id)
31 start_receiver(running_task.app)
34def get_active_task():
35 running_task = TaskResult.objects.filter(status=states.STARTED)
36 if len(running_task) == 0:
37 return None
38 return AsyncResult(running_task[0].task_id)
41def recover_from_error(task: CustomTask, error: TaskError):
42 return task.recover(error)
45def cancel_task(task_id):
46 task = AsyncResult(task_id)
47 task.revoke(terminate=True)
48 clear_progress()
51@shared_task(bind=True)
52def run_as_task(
53 self: Task,
54 function: Callable | type[CustomTask],
55 *args,
56 **kwargs,
57): # Dans Task
58 return with_progress(function, self, *args, **kwargs)
61def with_progress(
62 function: Callable | type[CustomTask],
63 self: Task,
64 *args,
65 **kwargs,
66):
67 """
68 Celery tasks run with run_task dom't keep their original names
70 Worakround: use this function and declare the tasks beforehand, the Celery way
71 """
72 dispatcher = EventDispatcher(
73 connection=self.app.connection_for_write(), app=self.app, groups=None
74 )
76 def injected_set_progress(
77 progress: int, progress_text: dict = {}, state=states.STARTED
78 ): # Inside Worker
79 # Injects the task and the dispatcher into progress_callback
80 # Goal : abstract the Task module on the consumer side
81 return set_progress(self, dispatcher, progress, progress_text, state=state)
83 if isinstance(function, type) and issubclass(
84 function, CustomTask
85 ): # function is a CustomTask class
86 function = function()
87 elif isinstance(function, CustomTask): # function is a CustomTask instance
88 pass
89 else: # function is a random callable
90 function = CustomTask(function)
92 function.set_progress_callback(injected_set_progress)
93 function.update_progress()
95 # logger.debug(str(args) + " " + str(kwargs))
96 try:
97 output = function(*args, **kwargs)
98 except TaskError as error:
99 import traceback
101 logger.error("An error occurred while executing a task", exc_info=error)
102 # output = recover_from_error(function, error)
103 # We would probably want to log the failure and serialize the error so that we can try to recover from the failure
104 output = None
105 if function.event_dict:
106 if "data" not in function.event_dict:
107 function.event_dict["data"] = {"message": ""}
108 function.event_dict["data"]["message"] = traceback.format_exc()
109 function.add_history_event()
110 raise error
112 time.sleep(1) # Wait for the last progress update to be sent
113 function.update_progress()
114 function.add_history_event()
115 dispatcher.close()
117 logger.debug("Résultat: " + str(output))
118 return output
121def set_progress(
122 task: Task,
123 dispatcher: EventDispatcher,
124 progress: int,
125 progress_data: dict = {},
126 state: str = states.STARTED,
127): # Dans Task
128 # Not sure if that's useful, it saves the current progress state in the DB, it could spam it i guess?
129 meta = {
130 "name": task.name,
131 "current": progress,
132 "total": 100,
133 "progress_data": progress_data,
134 }
136 task.update_state(state=state, meta=meta)
138 def e(_, *args, **kwargs):
139 return Event(
140 "task-progress",
141 uuid=task.request.id,
142 progress=progress,
143 data=progress_data,
144 task_name=task.name,
145 *args,
146 **kwargs,
147 )
149 dispatcher.send(type="task-progress", Event=e)