Coverage for src/task/custom_task.py: 84%

114 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-04-18 12:36 +0000

1from collections.abc import Callable 

2from typing import Protocol, Self 

3 

4 

5class TaskError(Exception): 

6 task_index: int 

7 sub_error: Self | Exception # Can be a TaskError 

8 

9 def __init__(self, task_index, sub_error): 

10 self.task_index = task_index 

11 self.sub_error = sub_error 

12 

13 def get_deep_error(self): 

14 if isinstance(self.sub_error, TaskError): 

15 return self.sub_error.get_deep_error() 

16 return self.sub_error 

17 

18 def create_subtask_index_chain(self): 

19 if isinstance(self.sub_error, TaskError): 

20 return [str(self.task_index), *self.sub_error.create_subtask_index_chain()] 

21 return [str(self.task_index)] 

22 

23 def __str__(self): 

24 error = self.get_deep_error() 

25 return ( 

26 f"({','.join(self.create_subtask_index_chain())}) {type(error).__name__}: {str(error)}" 

27 ) 

28 

29 

30class ProgressCallback(Protocol): 

31 """ 

32 Called by runners to update the progress. 

33 Injected into the function passed into `run_as_task` 

34 """ 

35 

36 def __call__(self, progress: int, progress_data: dict) -> None: ... 36 ↛ exitline 36 didn't return from function '__call__' because

37 

38 

39class CustomTask: 

40 """ 

41 A custom task is an object that can be executed inside a Celery Task using the run_as_task function 

42 

43 It is a complex object because it can contain subtasks which will be executed in a specific order 

44 

45 This is useful because of the flexibility it offers us. This covers both complex and simple situations since you 

46 can easily convert a function into a CustomTask: 

47 ``` 

48 CustomTask(lambda: print("This is a function")) 

49 ``` 

50 

51 An important feature is the ability to recover from an error. 

52 By serializing the Error and the arguments used to instantiate the Task, you can easily recover from an error. 

53 This doesn't mean that the error will disappear by itself, but it means you can fix the error and then 

54 come back to where the process was without having to restart the whole thing 

55 

56 CustomTask also allows us to have a much more precise progression tracking since we can track each subtask one 

57 by one 

58 

59 If your task is using an external way of tracking progression, you can override the get_progression method of 

60 of your task and implement your own way of keeping track of the progression. 

61 """ 

62 

63 name: str 

64 state: str = "pending" # Either "pending", "running", "finished", "crashed" 

65 current_index: int = 0 # Current subtask index 

66 

67 progress_callback: ProgressCallback | None = None 

68 

69 subtasks: list[Callable | Self] | None = None 

70 parent: Self | None = None 

71 

72 def __init__(self, func: Callable | None = None, *args): 

73 """ 

74 Convert a regular function into a CustomTask 

75 

76 In the case of inheritance, you can override the do method to define the task 

77 If you override the __init__ and the class is used as a subtask, you should pay attention to the subtask 

78 argument passing 

79 

80 If the __init__ asks for no arguments, they will be passed to the do method 

81 If the __init__ asks for arguments, they will be passed to the __init__ and the do method will be 

82 called with no arguments 

83 

84 If you want to force the do method to be called with the arguments, you can give a CustomTask object instead 

85 of the class inside _make_subtasks 

86 """ 

87 if func: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true

88 self.do = func 

89 

90 def make_progress_data(self): 

91 """ 

92 make_progress_data is used to create a dictionary containing the data that will be sent to the frontend at 

93 each progression update 

94 """ 

95 return {} 

96 

97 def get_progression(self, precise=True) -> float: # -1 or [0..1] 

98 if self.state == "crashed": 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 return -1 

100 

101 if self.state == "pending": 

102 return 0 

103 

104 if self.state == "finished": 

105 return 1 

106 

107 subtasks = self.get_tasks() 

108 subtask_count = len(subtasks) 

109 

110 done_subtask_count = self.current_index 

111 progression = done_subtask_count / subtask_count 

112 

113 if not precise or done_subtask_count >= subtask_count: 

114 return progression 

115 

116 subtask = subtasks[self.current_index] 

117 subtask_progression = subtask.get_progression() if isinstance(subtask, CustomTask) else 0 

118 

119 progression += subtask_progression / len(subtasks) 

120 

121 return progression 

122 

123 def do(self): ... 

124 

125 def update_progress(self): 

126 if self.progress_callback: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true

127 progression = self.get_progression() 

128 progression = round(progression * 100) 

129 

130 data = self.make_progress_data() 

131 

132 self.progress_callback(progression, data) 

133 

134 def set_progress_callback(self, callback): 

135 self.progress_callback = callback 

136 

137 # do calls count as subtasks 

138 def get_tasks(self) -> list[Callable | Self | type[Self]]: 

139 if self.subtasks is None: 

140 self.subtasks = [self.do, *self._make_subtasks()] 

141 return self.subtasks 

142 

143 # Should not be called directly, but should be overriden in subclasses 

144 def _make_subtasks( 

145 self, 

146 ) -> list[Callable | Self | type[Self]]: # Returns a list containing the subtasks 

147 # Don't do recursive calls in here 

148 return [] # This can include random functions or really complex CustomTask, we don't really care 

149 

150 def recover(self, error: TaskError, *args): 

151 if not isinstance(error, TaskError): 

152 raise ValueError("The error should be a TaskError") 

153 

154 return self(*args, recover_index=error.task_index, recover_error=error) 

155 

156 def on_error(self, error: Exception): 

157 """ 

158 Called when an error occurs in the task 

159 

160 returns True if the error is skippable, False otherwise 

161 """ 

162 return False 

163 

164 def __call__( 

165 self, *args, recover_index: int = 0, recover_error: TaskError | None = None, **kwargs 

166 ): 

167 """ 

168 recover_index is used to recover from an error 

169 """ 

170 

171 self.state = "running" 

172 self.current_index = 0 

173 

174 # This will be used when calling subtasks so that we send the return value of each subtask to the next one 

175 next_args = args 

176 try: 

177 for task in self.get_tasks(): 

178 # We're skipping the subtasks we've already done if we're in recovery mode 

179 if self.current_index < recover_index: 

180 self.current_index += 1 

181 continue 

182 

183 # The goal when calling subtasks is to always have an iterable that we can split to get the arguments 

184 # Kind of tricky, we're kind of fighting against Python syntax here 

185 # Since we work only using positional arguments in tasks, handling task with no arguments 

186 # while also working with possible multiple arguments can be tricky 

187 # This kind of does the work because a split empty tuple will be *nothing* 

188 # So we're converting every value we are given into tuples that we can then split 

189 # 

190 # Example: 

191 # 5 -tuple-> (5,) -split-> 5 

192 # () -tuple-> () -split-> *nothing* 

193 # (5,2) -tuple-> (5,2) -split-> 5, 2 

194 next_args = (next_args,) if not isinstance(next_args, tuple) else next_args 

195 

196 # Creating the dynamic CustomTasks 

197 if isinstance(task, type): 

198 # If the task is a class, we need to instantiate it 

199 # We're checking if the class has an overridden constructor 

200 # If it does, we're calling it with the next arguments (if it asks for some) 

201 # If it doesn't, we're calling the class with no arguments 

202 has_overridden_constructor = task.__init__ is not CustomTask.__init__ 

203 asks_for_args = task.__init__.__code__.co_argcount > 1 

204 if has_overridden_constructor and asks_for_args: 

205 task = task(*next_args, **kwargs) 

206 next_args = () 

207 kwargs = {} 

208 else: 

209 task = task() 

210 # We need to update the subtasks list with the instantiated task 

211 # Or else we will have issues when updating the progression 

212 self.subtasks[self.current_index] = task 

213 

214 is_custom_task = isinstance(task, CustomTask) 

215 if is_custom_task: 

216 # Useful when calling the subtasks so we can update the parent progression instead of the child one 

217 task.update_progress = self.update_progress 

218 

219 task.parent = self 

220 

221 # If we're in recovery mode and the subtask is a CustomTask we can recover deeper 

222 if recover_error is not None and is_custom_task: 

223 next_args = task.recover(recover_error.sub_error, *next_args) 

224 else: 

225 try: 

226 next_args = task(*next_args, **kwargs) 

227 except Exception as error: 

228 skippable = self.on_error(error) 

229 if not skippable: 

230 raise error 

231 next_args = () 

232 

233 kwargs = {} 

234 

235 # If the return value is not a tuple, we're either converting it into an empty tuple or keeping its value 

236 next_args = next_args if next_args is not None else () 

237 self.current_index += 1 

238 self.update_progress() 

239 except Exception as error: 

240 self.state = "crashed" 

241 raise TaskError(self.current_index, error) 

242 self.state = "finished" 

243 

244 # Converting empty tuple into None value instead 

245 if next_args == (): 

246 next_args = None 

247 

248 return next_args