Coverage for src/task/runner.py: 0%

67 statements  

« prev     ^ index     » next       coverage.py v7.9.0, created at 2025-10-13 12:26 +0000

1import logging 

2import time 

3from collections.abc import Callable 

4 

5from celery import Task, shared_task, states 

6from celery.events.dispatcher import Event, EventDispatcher 

7from celery.result import AsyncResult 

8from celery.signals import worker_ready 

9from django_celery_results.models import TaskResult 

10 

11from task.custom_task import CustomTask, TaskError 

12from task.receiver import clear_progress, start_receiver 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17@worker_ready.connect 

18def on_start(sender, **kwargs): 

19 TaskResult.objects.all().delete() 

20 

21 

22def run_task( 

23 function: Callable | CustomTask, 

24 *args, 

25 **kwargs, 

26): 

27 # logger.debug(args) 

28 running_task = run_as_task.delay(function, *args, **kwargs) 

29 logger.debug("Started new task : " + running_task.id) 

30 

31 start_receiver(running_task.app) 

32 

33 

34def get_active_task(): 

35 running_task = TaskResult.objects.filter(status=states.STARTED) 

36 if len(running_task) == 0: 

37 return None 

38 return AsyncResult(running_task[0].task_id) 

39 

40 

41def recover_from_error(task: CustomTask, error: TaskError): 

42 return task.recover(error) 

43 

44 

45def cancel_task(task_id): 

46 task = AsyncResult(task_id) 

47 task.revoke(terminate=True) 

48 clear_progress() 

49 

50 

51@shared_task(bind=True) 

52def run_as_task( 

53 self: Task, 

54 function: Callable | type[CustomTask], 

55 *args, 

56 **kwargs, 

57): # Dans Task 

58 return with_progress(function, self, *args, **kwargs) 

59 

60 

61def with_progress( 

62 function: Callable | type[CustomTask], 

63 self: Task, 

64 *args, 

65 **kwargs, 

66): 

67 """ 

68 Celery tasks run with run_task dom't keep their original names 

69 

70 Worakround: use this function and declare the tasks beforehand, the Celery way 

71 """ 

72 dispatcher = EventDispatcher( 

73 connection=self.app.connection_for_write(), app=self.app, groups=None 

74 ) 

75 

76 def injected_set_progress( 

77 progress: int, progress_text: dict = {}, state=states.STARTED 

78 ): # Inside Worker 

79 # Injects the task and the dispatcher into progress_callback 

80 # Goal : abstract the Task module on the consumer side 

81 return set_progress(self, dispatcher, progress, progress_text, state=state) 

82 

83 if isinstance(function, type) and issubclass( 

84 function, CustomTask 

85 ): # function is a CustomTask class 

86 function = function() 

87 elif isinstance(function, CustomTask): # function is a CustomTask instance 

88 pass 

89 else: # function is a random callable 

90 function = CustomTask(function) 

91 

92 function.set_progress_callback(injected_set_progress) 

93 function.update_progress() 

94 

95 # logger.debug(str(args) + " " + str(kwargs)) 

96 try: 

97 output = function(*args, **kwargs) 

98 except TaskError as error: 

99 import traceback 

100 

101 logger.error("An error occurred while executing a task", exc_info=error) 

102 # output = recover_from_error(function, error) 

103 # We would probably want to log the failure and serialize the error so that we can try to recover from the failure 

104 output = None 

105 if function.event_dict: 

106 if "data" not in function.event_dict: 

107 function.event_dict["data"] = {"message": ""} 

108 function.event_dict["data"]["message"] = traceback.format_exc() 

109 function.add_history_event() 

110 raise error 

111 

112 time.sleep(1) # Wait for the last progress update to be sent 

113 function.update_progress() 

114 function.add_history_event() 

115 dispatcher.close() 

116 

117 logger.debug("Résultat: " + str(output)) 

118 return output 

119 

120 

121def set_progress( 

122 task: Task, 

123 dispatcher: EventDispatcher, 

124 progress: int, 

125 progress_data: dict = {}, 

126 state: str = states.STARTED, 

127): # Dans Task 

128 # Not sure if that's useful, it saves the current progress state in the DB, it could spam it i guess? 

129 meta = { 

130 "name": task.name, 

131 "current": progress, 

132 "total": 100, 

133 "progress_data": progress_data, 

134 } 

135 

136 task.update_state(state=state, meta=meta) 

137 

138 def e(_, *args, **kwargs): 

139 return Event( 

140 "task-progress", 

141 uuid=task.request.id, 

142 progress=progress, 

143 data=progress_data, 

144 task_name=task.name, 

145 *args, 

146 **kwargs, 

147 ) 

148 

149 dispatcher.send(type="task-progress", Event=e)