Coverage for src/task/receiver.py: 0%

40 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-04-18 12:36 +0000

1import queue 

2import threading 

3 

4from celery import Celery, current_app 

5from celery.events.receiver import EventReceiver 

6 

7receiver = None 

8last_event = None 

9 

10# A list of queues used to send events to our SSE Host 

11queues: list[queue.Queue] = [] 

12 

13 

14def subscribe(): 

15 q = queue.Queue(maxsize=1) 

16 queues.append(q) 

17 

18 # Send the last event to the new subscriber 

19 if last_event and last_event["type"] == "task-progress" and last_event["progress"] != 100: 

20 q.put_nowait(last_event) 

21 

22 return q 

23 

24 

25def start_receiver(app: Celery = current_app): 

26 global receiver 

27 if receiver is None: 

28 # We probably want to handle other event 

29 handlers = {"task-progress": handle_progress_event} 

30 

31 receiver = EventReceiver(channel=app.connection_for_read(), app=app, handlers=handlers) 

32 threading.Thread(target=receiver.capture).start() 

33 

34 

35def get_remaining_task_count(app: Celery = current_app): 

36 with app.connection_or_acquire() as conn: 

37 remaining = conn.default_channel.queue_declare(queue="celery", passive=True).message_count 

38 return remaining 

39 

40 

41def clear_progress(): 

42 event = {"type": "clear-progress"} 

43 handle_progress_event(event) 

44 

45 

46def handle_progress_event(event, save_last_event=True): 

47 # Clean up the event object 

48 for key in ["hostname", "utcoffset", "pid", "clock", "timestamp", "local_received"]: 

49 if key in event: 

50 del event[key] 

51 

52 print("Event sent:", event["type"]) 

53 

54 for q in queues: 

55 try: 

56 q.put_nowait(event) 

57 except queue.Full: 

58 # We can't directly handle the disconnection of the client from the SSE 

59 # https://github.com/django/django/pull/17147 

60 # This pull request fixes it, but we can't do anything since we are not using Django 5 

61 # We are kinda forced to wait for the stream to die and then detect that the queue is not being used anymore 

62 # We could fork our version of Django and fix it ourselves but it needs a little work 

63 # This will do for now 

64 queues.remove(q) 

65 print("A streaming connection got timed out") 

66 

67 if save_last_event: 

68 if "progress" in event and event["progress"] < 0: 

69 return 

70 

71 global last_event 

72 last_event = event