Coverage for src/task/runner.py: 0%

66 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-04-18 12:36 +0000

1import time 

2from collections.abc import Callable 

3 

4from celery import Task, shared_task 

5from celery.events.dispatcher import Event, EventDispatcher 

6from celery.result import AsyncResult 

7from celery.signals import worker_ready 

8from django_celery_results.models import TaskResult 

9 

10from task.custom_task import CustomTask, TaskError 

11from task.receiver import clear_progress, start_receiver 

12 

13 

14@worker_ready.connect 

15def on_start(sender, **kwargs): 

16 TaskResult.objects.all().delete() 

17 

18 

19def run_task( 

20 function: Callable | CustomTask, 

21 *args, 

22 name: str | None = None, 

23 with_progression: bool = True, 

24 **kwargs, 

25): 

26 print(args) 

27 running_task = run_as_task.delay(function, name, with_progression, *args, **kwargs) 

28 print(running_task.id) 

29 

30 start_receiver(running_task.app) 

31 

32 

33def get_active_task(): 

34 running_task = TaskResult.objects.filter(status="PROGRESS") 

35 if len(running_task) == 0: 

36 return None 

37 return AsyncResult(running_task[0].task_id) 

38 

39 

40def recover_from_error(task: CustomTask, error: TaskError): 

41 return task.recover(error) 

42 

43 

44def cancel_task(task_id): 

45 task = AsyncResult(task_id) 

46 task.revoke(terminate=True) 

47 clear_progress() 

48 

49 

50@shared_task(bind=True) 

51def run_as_task( 

52 self: Task, 

53 function: Callable | type[CustomTask], 

54 name: str, 

55 with_progression: bool, 

56 *args, 

57 **kwargs, 

58): # Dans Task 

59 if with_progression: 

60 dispatcher = EventDispatcher( 

61 connection=self.app.connection_for_write(), app=self.app, groups=None 

62 ) 

63 

64 def injected_set_progress( 

65 progress: int, progress_text: str | None = None 

66 ): # Inside Worker 

67 # Injects the task and the dispatcher into progress_callback 

68 # Goal : abstract the Task module on the consumer side 

69 return set_progress(self, dispatcher, progress, progress_text) 

70 

71 self.name = name 

72 if isinstance(function, type) and issubclass( 

73 function, CustomTask 

74 ): # function is a CustomTask class 

75 self.name = name or function.__name__ 

76 function = function() 

77 elif isinstance(function, CustomTask): # function is a CustomTask instance 

78 self.name = name or function.__class__.__name__ 

79 else: # function is a random callable 

80 self.name = name or function.__name__ 

81 function = CustomTask(function) 

82 

83 if with_progression: 

84 function.set_progress_callback(injected_set_progress) 

85 function.update_progress() 

86 

87 print(str(args) + " " + str(kwargs)) 

88 try: 

89 output = function(*args, **kwargs) 

90 except TaskError as error: 

91 print("An error occurred while executing a task") 

92 import traceback 

93 

94 traceback.print_exc() 

95 print(error) 

96 # output = recover_from_error(function, error) 

97 # We would probably want to log the failure and serialize the error so that we can try to recover from the failure 

98 output = None 

99 

100 if with_progression: 

101 time.sleep(1) # Wait for the last progress update to be sent 

102 function.update_progress() 

103 dispatcher.close() 

104 

105 print("Résultat: " + str(output)) 

106 return output 

107 

108 

109def set_progress( 

110 task: Task, dispatcher: EventDispatcher, progress: int, progress_data: dict = {} 

111): # Dans Task 

112 # Not sure if that's useful, it saves the current progress state in the DB, it could spam it i guess? 

113 meta = { 

114 "name": task.name, 

115 "current": progress, 

116 "total": 100, 

117 "progress_data": progress_data, 

118 } 

119 

120 task.update_state(state="PROGRESS", meta=meta) 

121 

122 def e(_, *args, **kwargs): 

123 return Event( 

124 "task-progress", 

125 uuid=task.request.id, 

126 progress=progress, 

127 data=progress_data, 

128 task_name=task.name, 

129 *args, 

130 **kwargs, 

131 ) 

132 

133 dispatcher.send(type="task-progress", Event=e)