Coverage for src/task/custom_task.py: 84%
114 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-04-18 12:36 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-04-18 12:36 +0000
1from collections.abc import Callable
2from typing import Protocol, Self
5class TaskError(Exception):
6 task_index: int
7 sub_error: Self | Exception # Can be a TaskError
9 def __init__(self, task_index, sub_error):
10 self.task_index = task_index
11 self.sub_error = sub_error
13 def get_deep_error(self):
14 if isinstance(self.sub_error, TaskError):
15 return self.sub_error.get_deep_error()
16 return self.sub_error
18 def create_subtask_index_chain(self):
19 if isinstance(self.sub_error, TaskError):
20 return [str(self.task_index), *self.sub_error.create_subtask_index_chain()]
21 return [str(self.task_index)]
23 def __str__(self):
24 error = self.get_deep_error()
25 return (
26 f"({','.join(self.create_subtask_index_chain())}) {type(error).__name__}: {str(error)}"
27 )
30class ProgressCallback(Protocol):
31 """
32 Called by runners to update the progress.
33 Injected into the function passed into `run_as_task`
34 """
36 def __call__(self, progress: int, progress_data: dict) -> None: ... 36 ↛ exitline 36 didn't return from function '__call__' because
39class CustomTask:
40 """
41 A custom task is an object that can be executed inside a Celery Task using the run_as_task function
43 It is a complex object because it can contain subtasks which will be executed in a specific order
45 This is useful because of the flexibility it offers us. This covers both complex and simple situations since you
46 can easily convert a function into a CustomTask:
47 ```
48 CustomTask(lambda: print("This is a function"))
49 ```
51 An important feature is the ability to recover from an error.
52 By serializing the Error and the arguments used to instantiate the Task, you can easily recover from an error.
53 This doesn't mean that the error will disappear by itself, but it means you can fix the error and then
54 come back to where the process was without having to restart the whole thing
56 CustomTask also allows us to have a much more precise progression tracking since we can track each subtask one
57 by one
59 If your task is using an external way of tracking progression, you can override the get_progression method of
60 of your task and implement your own way of keeping track of the progression.
61 """
63 name: str
64 state: str = "pending" # Either "pending", "running", "finished", "crashed"
65 current_index: int = 0 # Current subtask index
67 progress_callback: ProgressCallback | None = None
69 subtasks: list[Callable | Self] | None = None
70 parent: Self | None = None
72 def __init__(self, func: Callable | None = None, *args):
73 """
74 Convert a regular function into a CustomTask
76 In the case of inheritance, you can override the do method to define the task
77 If you override the __init__ and the class is used as a subtask, you should pay attention to the subtask
78 argument passing
80 If the __init__ asks for no arguments, they will be passed to the do method
81 If the __init__ asks for arguments, they will be passed to the __init__ and the do method will be
82 called with no arguments
84 If you want to force the do method to be called with the arguments, you can give a CustomTask object instead
85 of the class inside _make_subtasks
86 """
87 if func: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true
88 self.do = func
90 def make_progress_data(self):
91 """
92 make_progress_data is used to create a dictionary containing the data that will be sent to the frontend at
93 each progression update
94 """
95 return {}
97 def get_progression(self, precise=True) -> float: # -1 or [0..1]
98 if self.state == "crashed": 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 return -1
101 if self.state == "pending":
102 return 0
104 if self.state == "finished":
105 return 1
107 subtasks = self.get_tasks()
108 subtask_count = len(subtasks)
110 done_subtask_count = self.current_index
111 progression = done_subtask_count / subtask_count
113 if not precise or done_subtask_count >= subtask_count:
114 return progression
116 subtask = subtasks[self.current_index]
117 subtask_progression = subtask.get_progression() if isinstance(subtask, CustomTask) else 0
119 progression += subtask_progression / len(subtasks)
121 return progression
123 def do(self): ...
125 def update_progress(self):
126 if self.progress_callback: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true
127 progression = self.get_progression()
128 progression = round(progression * 100)
130 data = self.make_progress_data()
132 self.progress_callback(progression, data)
134 def set_progress_callback(self, callback):
135 self.progress_callback = callback
137 # do calls count as subtasks
138 def get_tasks(self) -> list[Callable | Self | type[Self]]:
139 if self.subtasks is None:
140 self.subtasks = [self.do, *self._make_subtasks()]
141 return self.subtasks
143 # Should not be called directly, but should be overriden in subclasses
144 def _make_subtasks(
145 self,
146 ) -> list[Callable | Self | type[Self]]: # Returns a list containing the subtasks
147 # Don't do recursive calls in here
148 return [] # This can include random functions or really complex CustomTask, we don't really care
150 def recover(self, error: TaskError, *args):
151 if not isinstance(error, TaskError):
152 raise ValueError("The error should be a TaskError")
154 return self(*args, recover_index=error.task_index, recover_error=error)
156 def on_error(self, error: Exception):
157 """
158 Called when an error occurs in the task
160 returns True if the error is skippable, False otherwise
161 """
162 return False
164 def __call__(
165 self, *args, recover_index: int = 0, recover_error: TaskError | None = None, **kwargs
166 ):
167 """
168 recover_index is used to recover from an error
169 """
171 self.state = "running"
172 self.current_index = 0
174 # This will be used when calling subtasks so that we send the return value of each subtask to the next one
175 next_args = args
176 try:
177 for task in self.get_tasks():
178 # We're skipping the subtasks we've already done if we're in recovery mode
179 if self.current_index < recover_index:
180 self.current_index += 1
181 continue
183 # The goal when calling subtasks is to always have an iterable that we can split to get the arguments
184 # Kind of tricky, we're kind of fighting against Python syntax here
185 # Since we work only using positional arguments in tasks, handling task with no arguments
186 # while also working with possible multiple arguments can be tricky
187 # This kind of does the work because a split empty tuple will be *nothing*
188 # So we're converting every value we are given into tuples that we can then split
189 #
190 # Example:
191 # 5 -tuple-> (5,) -split-> 5
192 # () -tuple-> () -split-> *nothing*
193 # (5,2) -tuple-> (5,2) -split-> 5, 2
194 next_args = (next_args,) if not isinstance(next_args, tuple) else next_args
196 # Creating the dynamic CustomTasks
197 if isinstance(task, type):
198 # If the task is a class, we need to instantiate it
199 # We're checking if the class has an overridden constructor
200 # If it does, we're calling it with the next arguments (if it asks for some)
201 # If it doesn't, we're calling the class with no arguments
202 has_overridden_constructor = task.__init__ is not CustomTask.__init__
203 asks_for_args = task.__init__.__code__.co_argcount > 1
204 if has_overridden_constructor and asks_for_args:
205 task = task(*next_args, **kwargs)
206 next_args = ()
207 kwargs = {}
208 else:
209 task = task()
210 # We need to update the subtasks list with the instantiated task
211 # Or else we will have issues when updating the progression
212 self.subtasks[self.current_index] = task
214 is_custom_task = isinstance(task, CustomTask)
215 if is_custom_task:
216 # Useful when calling the subtasks so we can update the parent progression instead of the child one
217 task.update_progress = self.update_progress
219 task.parent = self
221 # If we're in recovery mode and the subtask is a CustomTask we can recover deeper
222 if recover_error is not None and is_custom_task:
223 next_args = task.recover(recover_error.sub_error, *next_args)
224 else:
225 try:
226 next_args = task(*next_args, **kwargs)
227 except Exception as error:
228 skippable = self.on_error(error)
229 if not skippable:
230 raise error
231 next_args = ()
233 kwargs = {}
235 # If the return value is not a tuple, we're either converting it into an empty tuple or keeping its value
236 next_args = next_args if next_args is not None else ()
237 self.current_index += 1
238 self.update_progress()
239 except Exception as error:
240 self.state = "crashed"
241 raise TaskError(self.current_index, error)
242 self.state = "finished"
244 # Converting empty tuple into None value instead
245 if next_args == ():
246 next_args = None
248 return next_args