Coverage for src/task/receiver.py: 0%

49 statements  

« prev     ^ index     » next       coverage.py v7.9.0, created at 2025-10-13 12:26 +0000

1import logging 

2import queue 

3import threading 

4from typing import Literal, NotRequired, TypedDict 

5 

6from celery import Celery, current_app 

7from celery.events.receiver import EventReceiver 

8 

9logger = logging.getLogger(__name__) 

10 

11 

12class CeleryTaskEvent(TypedDict): 

13 type: Literal["task-progress", "clear-progress"] 

14 progress: NotRequired[int] 

15 uuid: NotRequired[str] 

16 task_name: NotRequired[str] 

17 data: NotRequired[dict] 

18 

19 

20receiver = None 

21last_event = {} 

22 

23# A list of queues used to send events to our SSE Host 

24# WARN : possible memory leak here. Currently only fixable with django 5... 

25queues: list[queue.Queue] = [] 

26 

27 

28def subscribe(): 

29 q = queue.Queue(maxsize=1) 

30 queues.append(q) 

31 

32 # Send the last event to the new subscriber 

33 if last_event and last_event["type"] == "task-progress" and last_event.get("progress") != 100: 

34 q.put_nowait(last_event) 

35 

36 return q 

37 

38 

39def get_last_event(): 

40 return last_event 

41 

42 

43def start_receiver(app: Celery = current_app): 

44 global receiver 

45 if receiver is None: 

46 # We probably want to handle other event 

47 handlers = {"task-progress": handle_progress_event} 

48 

49 receiver = EventReceiver(channel=app.connection_for_read(), app=app, handlers=handlers) 

50 threading.Thread(target=receiver.capture, name="ptf_task_SSE").start() 

51 

52 

53def get_remaining_task_count(app: Celery = current_app): 

54 with app.connection_or_acquire() as conn: 

55 remaining = conn.default_channel.queue_declare(queue="celery", passive=True).message_count 

56 return remaining 

57 

58 

59def clear_progress(): 

60 event: CeleryTaskEvent = {"type": "clear-progress"} 

61 handle_progress_event(event) 

62 

63 

64def handle_progress_event(event: CeleryTaskEvent, save_last_event=True): 

65 # Clean up the event object 

66 for key in ["hostname", "utcoffset", "pid", "clock", "timestamp", "local_received"]: 

67 if key in event: 

68 del event[key] 

69 

70 logger.debug("Event sent:", event["type"]) 

71 

72 for q in queues: 

73 try: 

74 q.put_nowait(event) 

75 except queue.Full: 

76 # We can't directly handle the disconnection of the client from the SSE 

77 # https://github.com/django/django/pull/17147 

78 # This pull request fixes it, but we can't do anything since we are not using Django 5 

79 # We are kinda forced to wait for the stream to die and then detect that the queue is not being used anymore 

80 # We could fork our version of Django and fix it ourselves but it needs a little work 

81 # This will do for now 

82 queues.remove(q) 

83 logger.debug("A streaming connection got timed out") 

84 

85 if save_last_event: 

86 global last_event 

87 # progress = event.get("progress", 0) 

88 # if progress < 0 or progress >= 100: 

89 # last_event = None 

90 # else: 

91 last_event = event