Coverage for src/history/models.py: 89%
38 statements
« prev ^ index » next coverage.py v7.9.0, created at 2025-11-25 13:05 +0000
« prev ^ index » next coverage.py v7.9.0, created at 2025-11-25 13:05 +0000
1from django.contrib.auth.models import User
2from django.db import models
3from django.db.models import CharField, FloatField, ForeignKey, Manager, TextField
4from django.utils import timezone
5from ptf.models import Collection, Resource
7from history.model_data import HistoryEventStatus
10class HistoryEventQuerySet(models.QuerySet):
11 def get_stale_events(self):
12 distinct_pids = self.order_by("pid", "type").distinct("pid", "type")
13 latest_events = [
14 self.filter(pid=event.pid, type=event.type).latest("created_on").pk
15 for event in distinct_pids
16 ]
17 return self.exclude(pk__in=latest_events)
20EventStatusChoices = models.TextChoices(
21 "EventStatusChoices", [(m.name, m.value) for m in HistoryEventStatus]
22)
25class HistoryEvent(models.Model):
26 """
27 Represents a top-level task
28 """
30 created_on = models.DateTimeField(db_index=True, default=timezone.now)
31 type = models.CharField(max_length=200, db_index=True)
32 pid = models.CharField(max_length=1024, db_index=True, blank=False)
33 col = models.ForeignKey(Collection, on_delete=models.CASCADE, null=True)
34 source = models.CharField(max_length=1024, db_index=True, default="", null=True, blank=True)
35 status = models.CharField(
36 max_length=10,
37 choices=EventStatusChoices.choices,
38 db_index=True,
39 default=EventStatusChoices.OK,
40 )
41 title = models.CharField(max_length=1024, db_index=True, default="")
42 type_error = models.CharField(max_length=1024, db_index=True, default="")
43 user = models.ForeignKey(User, models.SET_NULL, blank=True, null=True)
44 message = TextField(null=True)
45 objects = HistoryEventQuerySet.as_manager()
46 unique_id = models.CharField(
47 max_length=64, verbose_name="Reference unique id parent", null=True, blank=True
48 )
49 parent = models.ForeignKey(
50 "self", on_delete=models.CASCADE, null=True, blank=True, related_name="ancestors"
51 )
53 children: Manager["HistoryChildOperation"]
55 def __str__(self):
56 return f"{self.pid}:{self.type} - {self.created_on}"
59class HistoryChildOperation(models.Model):
60 """
61 Subtasks of an HistoryEvent.
62 """
64 event = ForeignKey(HistoryEvent, related_name="children", on_delete=models.CASCADE)
65 resource = ForeignKey(Resource, on_delete=models.CASCADE)
66 type = models.CharField(max_length=50, null=True, blank=False)
67 status = models.CharField(
68 max_length=10,
69 choices=EventStatusChoices.choices,
70 default=EventStatusChoices.OK,
71 null=False,
72 blank=False,
73 )
74 status_message = models.CharField(max_length=50, null=True, blank=False)
76 message = TextField(null=True, blank=False)
77 score = FloatField(null=True)
78 url = CharField(max_length=250, null=True, blank=False)