Coverage for src/history/models.py: 89%

38 statements  

« prev     ^ index     » next       coverage.py v7.9.0, created at 2025-11-25 13:05 +0000

1from django.contrib.auth.models import User 

2from django.db import models 

3from django.db.models import CharField, FloatField, ForeignKey, Manager, TextField 

4from django.utils import timezone 

5from ptf.models import Collection, Resource 

6 

7from history.model_data import HistoryEventStatus 

8 

9 

10class HistoryEventQuerySet(models.QuerySet): 

11 def get_stale_events(self): 

12 distinct_pids = self.order_by("pid", "type").distinct("pid", "type") 

13 latest_events = [ 

14 self.filter(pid=event.pid, type=event.type).latest("created_on").pk 

15 for event in distinct_pids 

16 ] 

17 return self.exclude(pk__in=latest_events) 

18 

19 

20EventStatusChoices = models.TextChoices( 

21 "EventStatusChoices", [(m.name, m.value) for m in HistoryEventStatus] 

22) 

23 

24 

25class HistoryEvent(models.Model): 

26 """ 

27 Represents a top-level task 

28 """ 

29 

30 created_on = models.DateTimeField(db_index=True, default=timezone.now) 

31 type = models.CharField(max_length=200, db_index=True) 

32 pid = models.CharField(max_length=1024, db_index=True, blank=False) 

33 col = models.ForeignKey(Collection, on_delete=models.CASCADE, null=True) 

34 source = models.CharField(max_length=1024, db_index=True, default="", null=True, blank=True) 

35 status = models.CharField( 

36 max_length=10, 

37 choices=EventStatusChoices.choices, 

38 db_index=True, 

39 default=EventStatusChoices.OK, 

40 ) 

41 title = models.CharField(max_length=1024, db_index=True, default="") 

42 type_error = models.CharField(max_length=1024, db_index=True, default="") 

43 user = models.ForeignKey(User, models.SET_NULL, blank=True, null=True) 

44 message = TextField(null=True) 

45 objects = HistoryEventQuerySet.as_manager() 

46 unique_id = models.CharField( 

47 max_length=64, verbose_name="Reference unique id parent", null=True, blank=True 

48 ) 

49 parent = models.ForeignKey( 

50 "self", on_delete=models.CASCADE, null=True, blank=True, related_name="ancestors" 

51 ) 

52 

53 children: Manager["HistoryChildOperation"] 

54 

55 def __str__(self): 

56 return f"{self.pid}:{self.type} - {self.created_on}" 

57 

58 

59class HistoryChildOperation(models.Model): 

60 """ 

61 Subtasks of an HistoryEvent. 

62 """ 

63 

64 event = ForeignKey(HistoryEvent, related_name="children", on_delete=models.CASCADE) 

65 resource = ForeignKey(Resource, on_delete=models.CASCADE) 

66 type = models.CharField(max_length=50, null=True, blank=False) 

67 status = models.CharField( 

68 max_length=10, 

69 choices=EventStatusChoices.choices, 

70 default=EventStatusChoices.OK, 

71 null=False, 

72 blank=False, 

73 ) 

74 status_message = models.CharField(max_length=50, null=True, blank=False) 

75 

76 message = TextField(null=True, blank=False) 

77 score = FloatField(null=True) 

78 url = CharField(max_length=250, null=True, blank=False)