Coverage for src/task/receiver.py: 0%
49 statements
« prev ^ index » next coverage.py v7.9.0, created at 2025-10-13 12:26 +0000
« prev ^ index » next coverage.py v7.9.0, created at 2025-10-13 12:26 +0000
1import logging
2import queue
3import threading
4from typing import Literal, NotRequired, TypedDict
6from celery import Celery, current_app
7from celery.events.receiver import EventReceiver
9logger = logging.getLogger(__name__)
12class CeleryTaskEvent(TypedDict):
13 type: Literal["task-progress", "clear-progress"]
14 progress: NotRequired[int]
15 uuid: NotRequired[str]
16 task_name: NotRequired[str]
17 data: NotRequired[dict]
20receiver = None
21last_event = {}
23# A list of queues used to send events to our SSE Host
24# WARN : possible memory leak here. Currently only fixable with django 5...
25queues: list[queue.Queue] = []
28def subscribe():
29 q = queue.Queue(maxsize=1)
30 queues.append(q)
32 # Send the last event to the new subscriber
33 if last_event and last_event["type"] == "task-progress" and last_event.get("progress") != 100:
34 q.put_nowait(last_event)
36 return q
39def get_last_event():
40 return last_event
43def start_receiver(app: Celery = current_app):
44 global receiver
45 if receiver is None:
46 # We probably want to handle other event
47 handlers = {"task-progress": handle_progress_event}
49 receiver = EventReceiver(channel=app.connection_for_read(), app=app, handlers=handlers)
50 threading.Thread(target=receiver.capture, name="ptf_task_SSE").start()
53def get_remaining_task_count(app: Celery = current_app):
54 with app.connection_or_acquire() as conn:
55 remaining = conn.default_channel.queue_declare(queue="celery", passive=True).message_count
56 return remaining
59def clear_progress():
60 event: CeleryTaskEvent = {"type": "clear-progress"}
61 handle_progress_event(event)
64def handle_progress_event(event: CeleryTaskEvent, save_last_event=True):
65 # Clean up the event object
66 for key in ["hostname", "utcoffset", "pid", "clock", "timestamp", "local_received"]:
67 if key in event:
68 del event[key]
70 logger.debug("Event sent:", event["type"])
72 for q in queues:
73 try:
74 q.put_nowait(event)
75 except queue.Full:
76 # We can't directly handle the disconnection of the client from the SSE
77 # https://github.com/django/django/pull/17147
78 # This pull request fixes it, but we can't do anything since we are not using Django 5
79 # We are kinda forced to wait for the stream to die and then detect that the queue is not being used anymore
80 # We could fork our version of Django and fix it ourselves but it needs a little work
81 # This will do for now
82 queues.remove(q)
83 logger.debug("A streaming connection got timed out")
85 if save_last_event:
86 global last_event
87 # progress = event.get("progress", 0)
88 # if progress < 0 or progress >= 100:
89 # last_event = None
90 # else:
91 last_event = event