Coverage for src/task/receiver.py: 0%
40 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-04-18 12:36 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-04-18 12:36 +0000
1import queue
2import threading
4from celery import Celery, current_app
5from celery.events.receiver import EventReceiver
7receiver = None
8last_event = None
10# A list of queues used to send events to our SSE Host
11queues: list[queue.Queue] = []
14def subscribe():
15 q = queue.Queue(maxsize=1)
16 queues.append(q)
18 # Send the last event to the new subscriber
19 if last_event and last_event["type"] == "task-progress" and last_event["progress"] != 100:
20 q.put_nowait(last_event)
22 return q
25def start_receiver(app: Celery = current_app):
26 global receiver
27 if receiver is None:
28 # We probably want to handle other event
29 handlers = {"task-progress": handle_progress_event}
31 receiver = EventReceiver(channel=app.connection_for_read(), app=app, handlers=handlers)
32 threading.Thread(target=receiver.capture).start()
35def get_remaining_task_count(app: Celery = current_app):
36 with app.connection_or_acquire() as conn:
37 remaining = conn.default_channel.queue_declare(queue="celery", passive=True).message_count
38 return remaining
41def clear_progress():
42 event = {"type": "clear-progress"}
43 handle_progress_event(event)
46def handle_progress_event(event, save_last_event=True):
47 # Clean up the event object
48 for key in ["hostname", "utcoffset", "pid", "clock", "timestamp", "local_received"]:
49 if key in event:
50 del event[key]
52 print("Event sent:", event["type"])
54 for q in queues:
55 try:
56 q.put_nowait(event)
57 except queue.Full:
58 # We can't directly handle the disconnection of the client from the SSE
59 # https://github.com/django/django/pull/17147
60 # This pull request fixes it, but we can't do anything since we are not using Django 5
61 # We are kinda forced to wait for the stream to die and then detect that the queue is not being used anymore
62 # We could fork our version of Django and fix it ourselves but it needs a little work
63 # This will do for now
64 queues.remove(q)
65 print("A streaming connection got timed out")
67 if save_last_event:
68 if "progress" in event and event["progress"] < 0:
69 return
71 global last_event
72 last_event = event