From 49d9331322f4c386fe33965e2d9ee566d1ec7575 Mon Sep 17 00:00:00 2001 From: Diego Castro Date: Mon, 11 Apr 2022 10:35:52 +0200 Subject: [PATCH 01/11] Support for abstract base models improved. (#516) Model classes moved into a `models.py` directory, which exports by default only the default `django-celery-beat` model classes now from `models.generic.py`. `managers.py` moved into `models.py` directory, because it's something that is needed only by models. --- django_celery_beat/models/__init__.py | 19 +++++ .../{models.py => models/abstract.py} | 74 +++++++--------- django_celery_beat/models/generic.py | 85 +++++++++++++++++++ django_celery_beat/{ => models}/managers.py | 0 4 files changed, 135 insertions(+), 43 deletions(-) create mode 100644 django_celery_beat/models/__init__.py rename django_celery_beat/{models.py => models/abstract.py} (91%) create mode 100644 django_celery_beat/models/generic.py rename django_celery_beat/{ => models}/managers.py (100%) diff --git a/django_celery_beat/models/__init__.py b/django_celery_beat/models/__init__.py new file mode 100644 index 00000000..d23507e7 --- /dev/null +++ b/django_celery_beat/models/__init__.py @@ -0,0 +1,19 @@ +from .generic import ( + ClockedSchedule, + ClockScheduler, + CrontabSchedule, + IntervalSchedule, + PeriodicTask, + PeriodicTasks, + SolarSchedule +) + +__ALL__ = [ + "ClockedSchedule", + "ClockScheduler", + "CrontabSchedule", + "IntervalSchedule", + "PeriodicTask", + "PeriodicTasks", + "SolarSchedule" +] \ No newline at end of file diff --git a/django_celery_beat/models.py b/django_celery_beat/models/abstract.py similarity index 91% rename from django_celery_beat/models.py rename to django_celery_beat/models/abstract.py index 1f87bbc5..a55808e4 100644 --- a/django_celery_beat/models.py +++ b/django_celery_beat/models/abstract.py @@ -7,13 +7,12 @@ from django.core.exceptions import MultipleObjectsReturned, ValidationError from django.core.validators import MaxValueValidator, MinValueValidator from django.db import models -from django.db.models import signals from django.utils.translation import gettext_lazy as _ from . import managers, validators -from .tzcrontab import TzAwareCrontab -from .utils import make_aware, now -from .clockedschedule import clocked +from ..tzcrontab import TzAwareCrontab +from ..utils import make_aware, now +from ..clockedschedule import clocked DAYS = 'days' @@ -72,8 +71,8 @@ def crontab_schedule_celery_timezone(): ] else 'UTC' -class SolarSchedule(models.Model): - """Schedule following astronomical patterns. +class AbstractSolarSchedule(models.Model): + """Abstract schedule following astronomical patterns. Example: to run every sunrise in New York City: @@ -101,6 +100,7 @@ class SolarSchedule(models.Model): class Meta: """Table information.""" + abstract = True verbose_name = _('solar event') verbose_name_plural = _('solar events') ordering = ('event', 'latitude', 'longitude') @@ -133,9 +133,8 @@ def __str__(self): self.longitude ) - -class IntervalSchedule(models.Model): - """Schedule executing on a regular interval. +class AbstractIntervalSchedule(models.Model): + """Abstract schedule executing on a regular interval. Example: execute every 2 days: @@ -166,6 +165,7 @@ class IntervalSchedule(models.Model): class Meta: """Table information.""" + abstract = True verbose_name = _('interval') verbose_name_plural = _('intervals') ordering = ['period', 'every'] @@ -206,8 +206,8 @@ def period_singular(self): return self.period[:-1] -class ClockedSchedule(models.Model): - """clocked schedule.""" +class AbstractClockedSchedule(models.Model): + """Abstract clocked schedule.""" clocked_time = models.DateTimeField( verbose_name=_('Clock Time'), @@ -217,6 +217,7 @@ class ClockedSchedule(models.Model): class Meta: """Table information.""" + abstract = True verbose_name = _('clocked') verbose_name_plural = _('clocked') ordering = ['clocked_time'] @@ -240,8 +241,8 @@ def from_schedule(cls, schedule): return cls.objects.filter(**spec).first() -class CrontabSchedule(models.Model): - """Timezone Aware Crontab-like schedule. +class AbstractCrontabSchedule(models.Model): + """Abstract timezone Aware Crontab-like schedule. Example: Run every hour at 0 minutes for days of month 10-15: @@ -305,6 +306,7 @@ class CrontabSchedule(models.Model): class Meta: """Table information.""" + abstract = True verbose_name = _('crontab') verbose_name_plural = _('crontabs') ordering = ['month_of_year', 'day_of_month', @@ -354,8 +356,8 @@ def from_schedule(cls, schedule): return cls.objects.filter(**spec).first() -class PeriodicTasks(models.Model): - """Helper table for tracking updates to periodic tasks. +class AbstractPeriodicTasks(models.Model): + """Abstract helper table for tracking updates to periodic tasks. This stores a single row with ``ident=1``. ``last_update`` is updated via django signals whenever anything is changed in the :class:`~.PeriodicTask` model. @@ -368,6 +370,12 @@ class PeriodicTasks(models.Model): objects = managers.ExtendedManager() + class Meta: + """Table information.""" + + abstract = True + + @classmethod def changed(cls, instance, **kwargs): if not instance.no_changes: @@ -384,9 +392,8 @@ def last_change(cls): except cls.DoesNotExist: pass - -class PeriodicTask(models.Model): - """Model representing a periodic task.""" +class AbstractPeriodicTask(models.Model): + """Abstract model representing a periodic task.""" name = models.CharField( max_length=200, unique=True, @@ -403,25 +410,25 @@ class PeriodicTask(models.Model): # You can only set ONE of the following schedule FK's # TODO: Redo this as a GenericForeignKey interval = models.ForeignKey( - IntervalSchedule, on_delete=models.CASCADE, + "IntervalSchedule", on_delete=models.CASCADE, null=True, blank=True, verbose_name=_('Interval Schedule'), help_text=_('Interval Schedule to run the task on. ' 'Set only one schedule type, leave the others null.'), ) crontab = models.ForeignKey( - CrontabSchedule, on_delete=models.CASCADE, null=True, blank=True, + "CrontabSchedule", on_delete=models.CASCADE, null=True, blank=True, verbose_name=_('Crontab Schedule'), help_text=_('Crontab Schedule to run the task on. ' 'Set only one schedule type, leave the others null.'), ) solar = models.ForeignKey( - SolarSchedule, on_delete=models.CASCADE, null=True, blank=True, + "SolarSchedule", on_delete=models.CASCADE, null=True, blank=True, verbose_name=_('Solar Schedule'), help_text=_('Solar Schedule to run the task on. ' 'Set only one schedule type, leave the others null.'), ) clocked = models.ForeignKey( - ClockedSchedule, on_delete=models.CASCADE, null=True, blank=True, + "ClockedSchedule", on_delete=models.CASCADE, null=True, blank=True, verbose_name=_('Clocked Schedule'), help_text=_('Clocked Schedule to run the task on. ' 'Set only one schedule type, leave the others null.'), @@ -543,6 +550,7 @@ class PeriodicTask(models.Model): class Meta: """Table information.""" + abstract = True verbose_name = _('periodic task') verbose_name_plural = _('periodic tasks') @@ -614,24 +622,4 @@ def schedule(self): if self.solar: return self.solar.schedule if self.clocked: - return self.clocked.schedule - - -signals.pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask) -signals.pre_save.connect(PeriodicTasks.changed, sender=PeriodicTask) -signals.pre_delete.connect( - PeriodicTasks.update_changed, sender=IntervalSchedule) -signals.post_save.connect( - PeriodicTasks.update_changed, sender=IntervalSchedule) -signals.post_delete.connect( - PeriodicTasks.update_changed, sender=CrontabSchedule) -signals.post_save.connect( - PeriodicTasks.update_changed, sender=CrontabSchedule) -signals.post_delete.connect( - PeriodicTasks.update_changed, sender=SolarSchedule) -signals.post_save.connect( - PeriodicTasks.update_changed, sender=SolarSchedule) -signals.post_delete.connect( - PeriodicTasks.update_changed, sender=ClockedSchedule) -signals.post_save.connect( - PeriodicTasks.update_changed, sender=ClockedSchedule) + return self.clocked.schedule \ No newline at end of file diff --git a/django_celery_beat/models/generic.py b/django_celery_beat/models/generic.py new file mode 100644 index 00000000..be443541 --- /dev/null +++ b/django_celery_beat/models/generic.py @@ -0,0 +1,85 @@ +from django.db.models import signals + +from .abstract import ( + AbstractClockedSchedule, + AbstractCrontabSchedule, + AbstractIntervalSchedule, + AbstractPeriodicTask, + AbstractPeriodicTasks, + AbstractSolarSchedule +) + +class SolarSchedule(AbstractSolarSchedule): + """Schedule following astronomical patterns.""" + + class Meta(AbstractSolarSchedule.Meta): + """Table information.""" + + abstract = False + +class IntervalSchedule(AbstractIntervalSchedule): + """Schedule with a fixed interval.""" + + class Meta(AbstractIntervalSchedule.Meta): + """Table information.""" + + abstract = False + + +class ClockScheduler(AbstractClockedSchedule): + """Schedule with a fixed interval.""" + + class Meta(AbstractClockedSchedule.Meta): + """Table information.""" + + abstract = False + +class ClockedSchedule(AbstractClockedSchedule): + """Schedule with a fixed interval.""" + + class Meta(AbstractClockedSchedule.Meta): + """Table information.""" + + abstract = False + +class CrontabSchedule(AbstractCrontabSchedule): + """Schedule with cron-style syntax.""" + + class Meta(AbstractCrontabSchedule.Meta): + """Table information.""" + + abstract = False + +class PeriodicTask(AbstractPeriodicTask): + """Interal task scheduling class.""" + + class Meta(AbstractPeriodicTask.Meta): + """Table information.""" + + abstract = False + +class PeriodicTasks(AbstractPeriodicTasks): + """Helper table for tracking updates to periodic tasks.""" + + class Meta(AbstractPeriodicTasks.Meta): + abstract = False + + +signals.pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask) +signals.pre_save.connect(PeriodicTasks.changed, sender=PeriodicTask) +signals.pre_delete.connect( + PeriodicTasks.update_changed, sender=IntervalSchedule) +signals.post_save.connect( + PeriodicTasks.update_changed, sender=IntervalSchedule) +signals.post_delete.connect( + PeriodicTasks.update_changed, sender=CrontabSchedule) +signals.post_save.connect( + PeriodicTasks.update_changed, sender=CrontabSchedule) +signals.post_delete.connect( + PeriodicTasks.update_changed, sender=SolarSchedule) +signals.post_save.connect( + PeriodicTasks.update_changed, sender=SolarSchedule) +signals.post_delete.connect( + PeriodicTasks.update_changed, sender=ClockedSchedule) +signals.post_save.connect( + PeriodicTasks.update_changed, sender=ClockedSchedule) \ No newline at end of file diff --git a/django_celery_beat/managers.py b/django_celery_beat/models/managers.py similarity index 100% rename from django_celery_beat/managers.py rename to django_celery_beat/models/managers.py From b12052bc988af5df1b8036c20c5678e41c2094e8 Mon Sep 17 00:00:00 2001 From: Diego Castro Date: Tue, 10 May 2022 15:56:30 +0200 Subject: [PATCH 02/11] Fixed a wrong import for validators --- django_celery_beat/models/abstract.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/django_celery_beat/models/abstract.py b/django_celery_beat/models/abstract.py index a55808e4..de05b1f4 100644 --- a/django_celery_beat/models/abstract.py +++ b/django_celery_beat/models/abstract.py @@ -9,10 +9,11 @@ from django.db import models from django.utils.translation import gettext_lazy as _ -from . import managers, validators +from . import managers +from .. import validators +from ..clockedschedule import clocked from ..tzcrontab import TzAwareCrontab from ..utils import make_aware, now -from ..clockedschedule import clocked DAYS = 'days' From e1b41aaa4271d7aa504eff8299d53e1d4cae8417 Mon Sep 17 00:00:00 2001 From: Diego Castro Date: Tue, 10 May 2022 17:01:20 +0200 Subject: [PATCH 03/11] Exporting 'crontab_schedule_celery_timezone' from 'models.abstract' `crontab_schedule_celery_timezone` wasn't exported properly and raised an `AttributeError` when applying migrations. --- django_celery_beat/models/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/django_celery_beat/models/__init__.py b/django_celery_beat/models/__init__.py index d23507e7..b5211b76 100644 --- a/django_celery_beat/models/__init__.py +++ b/django_celery_beat/models/__init__.py @@ -5,7 +5,8 @@ IntervalSchedule, PeriodicTask, PeriodicTasks, - SolarSchedule + SolarSchedule, + crontab_schedule_celery_timezone, ) __ALL__ = [ @@ -15,5 +16,6 @@ "IntervalSchedule", "PeriodicTask", "PeriodicTasks", - "SolarSchedule" + "SolarSchedule", + "crontab_schedule_celery_timezone", ] \ No newline at end of file From a0a7b1394774e4644d7f7929c37f032e94d5fc26 Mon Sep 17 00:00:00 2001 From: Diego Castro Date: Tue, 10 May 2022 17:25:23 +0200 Subject: [PATCH 04/11] Fixed a wrong export introduced in prev commit --- django_celery_beat/models/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/django_celery_beat/models/__init__.py b/django_celery_beat/models/__init__.py index b5211b76..02fa2943 100644 --- a/django_celery_beat/models/__init__.py +++ b/django_celery_beat/models/__init__.py @@ -1,3 +1,4 @@ +from .abstract import crontab_schedule_celery_timezone from .generic import ( ClockedSchedule, ClockScheduler, @@ -6,7 +7,6 @@ PeriodicTask, PeriodicTasks, SolarSchedule, - crontab_schedule_celery_timezone, ) __ALL__ = [ From 742ee5472e5b8020662a71664f43e78524d88192 Mon Sep 17 00:00:00 2001 From: Diego Castro Date: Fri, 3 Jun 2022 11:08:30 +0200 Subject: [PATCH 05/11] Bypass de default models used by `django_celery_beat.scheduler` (#516) when `CELERY_BEAT_(?:PERIODICTASKS?|(?:CRONTAB|INTERVAL|SOLAR|CLOCKED)SCHEDULE)_MODEL` constants are defined in django `settings`. Providing the `app_label.model_name` of your own models as value for the constants `CELERY_BEAT_(?:PERIODICTASKS?|(?:CRONTAB|INTERVAL|SOLAR|CLOCKED)SCHEDULE)_MODEL` will let `django_celery_beat.scheduler` use the custom models instead of the default ones (aka generic models, in this context/pull-request): ```python CELERY_BEAT_PERIODICTASK_MODEL = "app_label.model_name" CELERY_BEAT_PERIODICTASKS_MODEL = "app_label.model_name" CELERY_BEAT_CRONTABSCHEDULE_MODEL = "app_label.model_name" CELERY_BEAT_INTERVALSCHEDULE_MODEL = "app_label.model_name" CELERY_BEAT_SOLARSCHEDULE_MODEL = "app_label.model_name" CELERY_BEAT_CLOCKEDSCHEDULE_MODEL = "app_label.model_name" ``` Doing this we add support to automatically bypass the default `django_celery_beat` models without forcing developers to overwrite the whole `django_celery_beat.scheduler` in projects where the default models doesn't fit the requirements I updated the `README.rst` with a small explanation about how this work Additonal information: * related issue: #516 * pull-request: #534 --- .gitignore | 1 + README.rst | 48 +++++++++++ django_celery_beat/helpers.py | 133 +++++++++++++++++++++++++++++++ django_celery_beat/schedulers.py | 20 +++-- 4 files changed, 197 insertions(+), 5 deletions(-) create mode 100644 django_celery_beat/helpers.py diff --git a/.gitignore b/.gitignore index b9774de3..7dcba8dd 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,4 @@ coverage.xml .python-version venv .env +.vscode/ diff --git a/README.rst b/README.rst index dbe7517a..c03f918c 100644 --- a/README.rst +++ b/README.rst @@ -86,6 +86,54 @@ manually: >>> from django_celery_beat.models import PeriodicTasks >>> PeriodicTasks.update_changed() +Custom Models +============= + +It's possible to use your own models instead of the default ones provided by ``django_celery_beat``, to do that just define your models inheriting from the right one from ``django_celery_beat.models.abstract``: + +.. code-block:: Python + + # custom_app.models.py + from django_celery_beat.models.abstract import ( + AbstractClockedSchedule, + AbstractCrontabSchedule, + AbstractIntervalSchedule, + AbstractPeriodicTask, + AbstractPeriodicTasks, + AbstractSolarSchedule, + ) + + class CustomPeriodicTask(AbstractPeriodicTask): + ... + + class CustomPeriodicTasks(AbstractPeriodicTasks): + ... + + class CustomCrontabSchedule(AbstractCrontabSchedule): + ... + + class CustomIntervalSchedule(AbstractIntervalSchedule): + ... + + class CustomSolarSchedule(AbstractSolarSchedule): + ... + + class CustomClockedSchedule(AbstractClockedSchedule): + ... + +To let ``django_celery_beat.scheduler`` make use of your own modules, you must provide the ``app_name.model_name`` of your own custom models as values to the next constants in your settings: + +.. code-block:: Python + + # settings.py + # CELERY_BEAT_(?:PERIODICTASKS?|(?:CRONTAB|INTERVAL|SOLAR|CLOCKED)SCHEDULE)_MODEL = "app_label.model_name" + CELERY_BEAT_PERIODICTASK_MODEL = "custom_app.CustomPeriodicTask" + CELERY_BEAT_PERIODICTASKS_MODEL = "custom_app.CustomPeriodicTasks" + CELERY_BEAT_CRONTABSCHEDULE_MODEL = "custom_app.CustomCrontabSchedule" + CELERY_BEAT_INTERVALSCHEDULE_MODEL = "custom_app.CustomIntervalSchedule" + CELERY_BEAT_SOLARSCHEDULE_MODEL = "custom_app.CustomSolarSchedule" + CELERY_BEAT_CLOCKEDSCHEDULE_MODEL = "custom_app.CustomClockedSchedule" + Example creating interval-based periodic task --------------------------------------------- diff --git a/django_celery_beat/helpers.py b/django_celery_beat/helpers.py new file mode 100644 index 00000000..4f468150 --- /dev/null +++ b/django_celery_beat/helpers.py @@ -0,0 +1,133 @@ +from django.apps import apps +from django.conf import settings +from django.core.exceptions import ImproperlyConfigured + +from .models import ( + PeriodicTask, PeriodicTasks, + CrontabSchedule, IntervalSchedule, + SolarSchedule, ClockedSchedule +) + +def crontabschedule_model(): + """Return the CrontabSchedule model that is active in this project.""" + if not hasattr(settings, 'CELERY_BEAT_CRONTABSCHEDULE_MODEL'): + return CrontabSchedule + + try: + return apps.get_model( + settings.CELERY_BEAT_CRONTABSCHEDULE_MODEL + ) + except ValueError: + raise ImproperlyConfigured( + "CELERY_BEAT_CRONTABSCHEDULE_MODEL must be of the form " + "'app_label.model_name'" + ) + except LookupError: + raise ImproperlyConfigured( + "CELERY_BEAT_CRONTABSCHEDULE_MODEL refers to model " + f"'{settings.CELERY_BEAT_CRONTABSCHEDULE_MODEL}' that has not " + "been installed" + ) + +def intervalschedule_model(): + """Return the IntervalSchedule model that is active in this project.""" + if not hasattr(settings, 'CELERY_BEAT_INTERVALSCHEDULE_MODEL'): + return IntervalSchedule + + try: + return apps.get_model( + settings.CELERY_BEAT_INTERVALSCHEDULE_MODEL + ) + except ValueError: + raise ImproperlyConfigured( + "CELERY_BEAT_INTERVALSCHEDULE_MODEL must be of the form " + "'app_label.model_name'" + ) + except LookupError: + raise ImproperlyConfigured( + "CELERY_BEAT_INTERVALSCHEDULE_MODEL refers to model " + f"'{settings.CELERY_BEAT_INTERVALSCHEDULE_MODEL}' that has not " + "been installed" + ) + +def periodictask_model(): + """Return the PeriodicTask model that is active in this project.""" + if not hasattr(settings, 'CELERY_BEAT_PERIODICTASK_MODEL'): + return PeriodicTask + + try: + return apps.get_model(settings.CELERY_BEAT_PERIODICTASK_MODEL) + except ValueError: + raise ImproperlyConfigured( + "CELERY_BEAT_PERIODICTASK_MODEL must be of the form " + "'app_label.model_name'" + ) + except LookupError: + raise ImproperlyConfigured( + "CELERY_BEAT_PERIODICTASK_MODEL refers to model " + f"'{settings.CELERY_BEAT_PERIODICTASK_MODEL}' that has not been " + "installed" + ) + +def periodictasks_model(): + """Return the PeriodicTasks model that is active in this project.""" + if not hasattr(settings, 'CELERY_BEAT_PERIODICTASKS_MODEL'): + return PeriodicTasks + + try: + return apps.get_model( + settings.CELERY_BEAT_PERIODICTASKS_MODEL + ) + except ValueError: + raise ImproperlyConfigured( + "CELERY_BEAT_PERIODICTASKS_MODEL must be of the form " + "'app_label.model_name'" + ) + except LookupError: + raise ImproperlyConfigured( + "CELERY_BEAT_PERIODICTASKS_MODEL refers to model " + f"'{settings.CELERY_BEAT_PERIODICTASKS_MODEL}' that has not been " + "installed" + ) + +def solarschedule_model(): + """Return the SolarSchedule model that is active in this project.""" + if not hasattr(settings, 'CELERY_BEAT_SOLARSCHEDULE_MODEL'): + return SolarSchedule + + try: + return apps.get_model( + settings.CELERY_BEAT_SOLARSCHEDULE_MODEL + ) + except ValueError: + raise ImproperlyConfigured( + "CELERY_BEAT_SOLARSCHEDULE_MODEL must be of the form " + "'app_label.model_name'" + ) + except LookupError: + raise ImproperlyConfigured( + "CELERY_BEAT_SOLARSCHEDULE_MODEL refers to model " + f"'{settings.CELERY_BEAT_SOLARSCHEDULE_MODEL}' that has not been " + "installed" + ) + +def clockedschedule_model(): + """Return the ClockedSchedule model that is active in this project.""" + if not hasattr(settings, 'CELERY_BEAT_CLOCKEDSCHEDULE_MODEL'): + return ClockedSchedule + + try: + return apps.get_model( + settings.CELERY_BEAT_CLOCKEDSCHEDULE_MODEL + ) + except ValueError: + raise ImproperlyConfigured( + "CELERY_BEAT_CLOCKEDSCHEDULE_MODEL must be of the form " + "'app_label.model_name'" + ) + except LookupError: + raise ImproperlyConfigured( + "CELERY_BEAT_CLOCKEDSCHEDULE_MODEL refers to model " + f"'{settings.CELERY_BEAT_CLOCKEDSCHEDULE_MODEL}' that has not " + "been installed" + ) diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index 1a509e0d..391ac2bb 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -19,12 +19,15 @@ from django.db.utils import DatabaseError, InterfaceError from django.core.exceptions import ObjectDoesNotExist -from .models import ( - PeriodicTask, PeriodicTasks, - CrontabSchedule, IntervalSchedule, - SolarSchedule, ClockedSchedule -) from .clockedschedule import clocked +from .helpers import ( + clockedschedule_model, + crontabschedule_model, + intervalschedule_model, + periodictask_model, + periodictasks_model, + solarschedule_model, +) from .utils import NEVER_CHECK_TIMEOUT # This scheduler must wake up more frequently than the @@ -39,6 +42,13 @@ logger = get_logger(__name__) debug, info, warning = logger.debug, logger.info, logger.warning +ClockedSchedule = clockedschedule_model() +CrontabSchedule = crontabschedule_model() +IntervalSchedule = intervalschedule_model() +PeriodicTask = periodictask_model() +PeriodicTasks = periodictasks_model() +SolarSchedule = solarschedule_model() + class ModelEntry(ScheduleEntry): """Scheduler entry taken from database row.""" From bf9b4081d3acad5d5bbb6e3f53fb714dfc01988b Mon Sep 17 00:00:00 2001 From: Brett Buford Date: Tue, 24 Jun 2025 15:59:09 -0700 Subject: [PATCH 06/11] Make UTS pass --- django_celery_beat/helpers.py | 66 +++++----- django_celery_beat/models/__init__.py | 14 +- django_celery_beat/models/abstract.py | 17 +-- django_celery_beat/models/generic.py | 42 +++--- django_celery_beat/models/managers.py | 30 ----- django_celery_beat/schedulers.py | 181 +++++++++++++------------- 6 files changed, 161 insertions(+), 189 deletions(-) delete mode 100644 django_celery_beat/models/managers.py diff --git a/django_celery_beat/helpers.py b/django_celery_beat/helpers.py index 4f468150..99a623c2 100644 --- a/django_celery_beat/helpers.py +++ b/django_celery_beat/helpers.py @@ -2,21 +2,16 @@ from django.conf import settings from django.core.exceptions import ImproperlyConfigured -from .models import ( - PeriodicTask, PeriodicTasks, - CrontabSchedule, IntervalSchedule, - SolarSchedule, ClockedSchedule -) def crontabschedule_model(): """Return the CrontabSchedule model that is active in this project.""" - if not hasattr(settings, 'CELERY_BEAT_CRONTABSCHEDULE_MODEL'): + if not hasattr(settings, "CELERY_BEAT_CRONTABSCHEDULE_MODEL"): + from .models.generic import CrontabSchedule + return CrontabSchedule - + try: - return apps.get_model( - settings.CELERY_BEAT_CRONTABSCHEDULE_MODEL - ) + return apps.get_model(settings.CELERY_BEAT_CRONTABSCHEDULE_MODEL) except ValueError: raise ImproperlyConfigured( "CELERY_BEAT_CRONTABSCHEDULE_MODEL must be of the form " @@ -29,15 +24,16 @@ def crontabschedule_model(): "been installed" ) + def intervalschedule_model(): """Return the IntervalSchedule model that is active in this project.""" - if not hasattr(settings, 'CELERY_BEAT_INTERVALSCHEDULE_MODEL'): + if not hasattr(settings, "CELERY_BEAT_INTERVALSCHEDULE_MODEL"): + from .models.generic import IntervalSchedule + return IntervalSchedule - + try: - return apps.get_model( - settings.CELERY_BEAT_INTERVALSCHEDULE_MODEL - ) + return apps.get_model(settings.CELERY_BEAT_INTERVALSCHEDULE_MODEL) except ValueError: raise ImproperlyConfigured( "CELERY_BEAT_INTERVALSCHEDULE_MODEL must be of the form " @@ -50,11 +46,14 @@ def intervalschedule_model(): "been installed" ) + def periodictask_model(): """Return the PeriodicTask model that is active in this project.""" - if not hasattr(settings, 'CELERY_BEAT_PERIODICTASK_MODEL'): + if not hasattr(settings, "CELERY_BEAT_PERIODICTASK_MODEL"): + from .models.generic import PeriodicTask + return PeriodicTask - + try: return apps.get_model(settings.CELERY_BEAT_PERIODICTASK_MODEL) except ValueError: @@ -69,15 +68,16 @@ def periodictask_model(): "installed" ) + def periodictasks_model(): """Return the PeriodicTasks model that is active in this project.""" - if not hasattr(settings, 'CELERY_BEAT_PERIODICTASKS_MODEL'): + if not hasattr(settings, "CELERY_BEAT_PERIODICTASKS_MODEL"): + from .models.generic import PeriodicTasks + return PeriodicTasks - + try: - return apps.get_model( - settings.CELERY_BEAT_PERIODICTASKS_MODEL - ) + return apps.get_model(settings.CELERY_BEAT_PERIODICTASKS_MODEL) except ValueError: raise ImproperlyConfigured( "CELERY_BEAT_PERIODICTASKS_MODEL must be of the form " @@ -90,15 +90,16 @@ def periodictasks_model(): "installed" ) + def solarschedule_model(): """Return the SolarSchedule model that is active in this project.""" - if not hasattr(settings, 'CELERY_BEAT_SOLARSCHEDULE_MODEL'): + if not hasattr(settings, "CELERY_BEAT_SOLARSCHEDULE_MODEL"): + from .models.generic import SolarSchedule + return SolarSchedule - + try: - return apps.get_model( - settings.CELERY_BEAT_SOLARSCHEDULE_MODEL - ) + return apps.get_model(settings.CELERY_BEAT_SOLARSCHEDULE_MODEL) except ValueError: raise ImproperlyConfigured( "CELERY_BEAT_SOLARSCHEDULE_MODEL must be of the form " @@ -111,15 +112,16 @@ def solarschedule_model(): "installed" ) + def clockedschedule_model(): """Return the ClockedSchedule model that is active in this project.""" - if not hasattr(settings, 'CELERY_BEAT_CLOCKEDSCHEDULE_MODEL'): + if not hasattr(settings, "CELERY_BEAT_CLOCKEDSCHEDULE_MODEL"): + from .models.generic import ClockedSchedule + return ClockedSchedule - + try: - return apps.get_model( - settings.CELERY_BEAT_CLOCKEDSCHEDULE_MODEL - ) + return apps.get_model(settings.CELERY_BEAT_CLOCKEDSCHEDULE_MODEL) except ValueError: raise ImproperlyConfigured( "CELERY_BEAT_CLOCKEDSCHEDULE_MODEL must be of the form " diff --git a/django_celery_beat/models/__init__.py b/django_celery_beat/models/__init__.py index 02fa2943..184e66fc 100644 --- a/django_celery_beat/models/__init__.py +++ b/django_celery_beat/models/__init__.py @@ -1,21 +1,27 @@ -from .abstract import crontab_schedule_celery_timezone +from .abstract import crontab_schedule_celery_timezone, DAYS from .generic import ( ClockedSchedule, - ClockScheduler, CrontabSchedule, IntervalSchedule, PeriodicTask, PeriodicTasks, SolarSchedule, ) +from ..helpers import ( + crontabschedule_model, + intervalschedule_model, + solarschedule_model, + periodictask_model, + periodictasks_model, +) __ALL__ = [ "ClockedSchedule", - "ClockScheduler", "CrontabSchedule", "IntervalSchedule", "PeriodicTask", "PeriodicTasks", "SolarSchedule", "crontab_schedule_celery_timezone", -] \ No newline at end of file + "DAYS", +] diff --git a/django_celery_beat/models/abstract.py b/django_celery_beat/models/abstract.py index 63509730..f6497250 100644 --- a/django_celery_beat/models/abstract.py +++ b/django_celery_beat/models/abstract.py @@ -21,10 +21,11 @@ from django.db import models from django.utils.translation import gettext_lazy as _ -from . import querysets, validators -from .clockedschedule import clocked -from .tzcrontab import TzAwareCrontab -from .utils import make_aware, now +from .. import validators +from ..clockedschedule import clocked +from ..tzcrontab import TzAwareCrontab +from ..utils import make_aware, now +from ..helpers import periodictasks_model DAYS = "days" HOURS = "hours" @@ -433,7 +434,8 @@ class AbstractPeriodicTasks(models.Model): class Meta: """Table information.""" - + verbose_name = _('periodic task track') + verbose_name_plural = _('periodic task tracks') abstract = True @classmethod @@ -648,7 +650,6 @@ class AbstractPeriodicTask(models.Model): help_text=_("Detailed description about the details of this Periodic Task"), ) - objects = querysets.PeriodicTaskQuerySet.as_manager() no_changes = False class Meta: @@ -691,11 +692,11 @@ def save(self, *args, **kwargs): self._clean_expires() self.validate_unique() super().save(*args, **kwargs) - PeriodicTasks.changed(self) + periodictasks_model().changed(self) def delete(self, *args, **kwargs): super().delete(*args, **kwargs) - PeriodicTasks.changed(self) + periodictasks_model().changed(self) def _clean_expires(self): if self.expire_seconds is not None and self.expires: diff --git a/django_celery_beat/models/generic.py b/django_celery_beat/models/generic.py index be443541..5e55274f 100644 --- a/django_celery_beat/models/generic.py +++ b/django_celery_beat/models/generic.py @@ -6,8 +6,10 @@ AbstractIntervalSchedule, AbstractPeriodicTask, AbstractPeriodicTasks, - AbstractSolarSchedule + AbstractSolarSchedule, ) +from ..querysets import PeriodicTaskQuerySet + class SolarSchedule(AbstractSolarSchedule): """Schedule following astronomical patterns.""" @@ -17,6 +19,7 @@ class Meta(AbstractSolarSchedule.Meta): abstract = False + class IntervalSchedule(AbstractIntervalSchedule): """Schedule with a fixed interval.""" @@ -26,14 +29,6 @@ class Meta(AbstractIntervalSchedule.Meta): abstract = False -class ClockScheduler(AbstractClockedSchedule): - """Schedule with a fixed interval.""" - - class Meta(AbstractClockedSchedule.Meta): - """Table information.""" - - abstract = False - class ClockedSchedule(AbstractClockedSchedule): """Schedule with a fixed interval.""" @@ -42,6 +37,7 @@ class Meta(AbstractClockedSchedule.Meta): abstract = False + class CrontabSchedule(AbstractCrontabSchedule): """Schedule with cron-style syntax.""" @@ -50,14 +46,18 @@ class Meta(AbstractCrontabSchedule.Meta): abstract = False + class PeriodicTask(AbstractPeriodicTask): """Interal task scheduling class.""" + objects = PeriodicTaskQuerySet.as_manager() + class Meta(AbstractPeriodicTask.Meta): """Table information.""" abstract = False + class PeriodicTasks(AbstractPeriodicTasks): """Helper table for tracking updates to periodic tasks.""" @@ -67,19 +67,11 @@ class Meta(AbstractPeriodicTasks.Meta): signals.pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask) signals.pre_save.connect(PeriodicTasks.changed, sender=PeriodicTask) -signals.pre_delete.connect( - PeriodicTasks.update_changed, sender=IntervalSchedule) -signals.post_save.connect( - PeriodicTasks.update_changed, sender=IntervalSchedule) -signals.post_delete.connect( - PeriodicTasks.update_changed, sender=CrontabSchedule) -signals.post_save.connect( - PeriodicTasks.update_changed, sender=CrontabSchedule) -signals.post_delete.connect( - PeriodicTasks.update_changed, sender=SolarSchedule) -signals.post_save.connect( - PeriodicTasks.update_changed, sender=SolarSchedule) -signals.post_delete.connect( - PeriodicTasks.update_changed, sender=ClockedSchedule) -signals.post_save.connect( - PeriodicTasks.update_changed, sender=ClockedSchedule) \ No newline at end of file +signals.pre_delete.connect(PeriodicTasks.update_changed, sender=IntervalSchedule) +signals.post_save.connect(PeriodicTasks.update_changed, sender=IntervalSchedule) +signals.post_delete.connect(PeriodicTasks.update_changed, sender=CrontabSchedule) +signals.post_save.connect(PeriodicTasks.update_changed, sender=CrontabSchedule) +signals.post_delete.connect(PeriodicTasks.update_changed, sender=SolarSchedule) +signals.post_save.connect(PeriodicTasks.update_changed, sender=SolarSchedule) +signals.post_delete.connect(PeriodicTasks.update_changed, sender=ClockedSchedule) +signals.post_save.connect(PeriodicTasks.update_changed, sender=ClockedSchedule) diff --git a/django_celery_beat/models/managers.py b/django_celery_beat/models/managers.py deleted file mode 100644 index fe3b9176..00000000 --- a/django_celery_beat/models/managers.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Model managers.""" -from django.db import models -from django.db.models.query import QuerySet - - -class ExtendedQuerySet(QuerySet): - """Base class for query sets.""" - - def update_or_create(self, defaults=None, **kwargs): - obj, created = self.get_or_create(defaults=defaults, **kwargs) - if not created: - self._update_model_with_dict(obj, dict(defaults or {}, **kwargs)) - return obj - - def _update_model_with_dict(self, obj, fields): - [setattr(obj, attr_name, attr_value) - for attr_name, attr_value in fields.items()] - obj.save() - return obj - - -class ExtendedManager(models.Manager.from_queryset(ExtendedQuerySet)): - """Manager with common utilities.""" - - -class PeriodicTaskManager(ExtendedManager): - """Manager for PeriodicTask model.""" - - def enabled(self): - return self.filter(enabled=True) diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index 295acdb2..3a47d5c1 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -1,4 +1,5 @@ """Beat Scheduler Implementation.""" + import datetime import logging import math @@ -31,7 +32,7 @@ periodictasks_model, solarschedule_model, ) -from .utils import NEVER_CHECK_TIMEOUT +from .utils import NEVER_CHECK_TIMEOUT, aware_now, now # This scheduler must wake up more frequently than the # regular of 5 minutes because it needs to take external @@ -58,12 +59,12 @@ class ModelEntry(ScheduleEntry): """Scheduler entry taken from database row.""" model_schedules = ( - (schedules.crontab, CrontabSchedule, 'crontab'), - (schedules.schedule, IntervalSchedule, 'interval'), - (schedules.solar, SolarSchedule, 'solar'), - (clocked, ClockedSchedule, 'clocked') + (schedules.crontab, CrontabSchedule, "crontab"), + (schedules.schedule, IntervalSchedule, "interval"), + (schedules.solar, SolarSchedule, "solar"), + (clocked, ClockedSchedule, "clocked"), ) - save_fields = ['last_run_at', 'total_run_count', 'no_changes'] + save_fields = ["last_run_at", "total_run_count", "no_changes"] def __init__(self, model, app=None): """Initialize the model entry.""" @@ -74,33 +75,34 @@ def __init__(self, model, app=None): self.schedule = model.schedule except model.DoesNotExist: logger.error( - 'Disabling schedule %s that was removed from database', + "Disabling schedule %s that was removed from database", self.name, ) self._disable(model) try: - self.args = loads(model.args or '[]') - self.kwargs = loads(model.kwargs or '{}') + self.args = loads(model.args or "[]") + self.kwargs = loads(model.kwargs or "{}") except ValueError as exc: logger.exception( - 'Removing schedule %s for argument deseralization error: %r', - self.name, exc, + "Removing schedule %s for argument deseralization error: %r", + self.name, + exc, ) self._disable(model) self.options = {} - for option in ['queue', 'exchange', 'routing_key', 'priority']: + for option in ["queue", "exchange", "routing_key", "priority"]: value = getattr(model, option) if value is None: continue self.options[option] = value - if getattr(model, 'expires_', None): - self.options['expires'] = getattr(model, 'expires_') + if getattr(model, "expires_", None): + self.options["expires"] = getattr(model, "expires_") - headers = loads(model.headers or '{}') - headers['periodic_task_name'] = model.name - self.options['headers'] = headers + headers = loads(model.headers or "{}") + headers["periodic_task_name"] = model.name + self.options["headers"] = headers self.total_run_count = model.total_run_count self.model = model @@ -112,8 +114,9 @@ def __init__(self, model, app=None): # This will trigger the job to run at start_time # and avoid the heap block. if self.model.start_time: - model.last_run_at = model.last_run_at \ - - datetime.timedelta(days=365 * 30) + model.last_run_at = model.last_run_at - datetime.timedelta( + days=365 * 30 + ) self.last_run_at = model.last_run_at @@ -130,7 +133,7 @@ def is_due(self): # START DATE: only run after the `start_time`, if one exists. if self.model.start_time is not None: now = self._default_now() - if getattr(settings, 'DJANGO_CELERY_BEAT_TZ_AWARE', True): + if getattr(settings, "DJANGO_CELERY_BEAT_TZ_AWARE", True): now = maybe_make_aware(self._default_now()) if now < self.model.start_time: # The datetime is before the start date - don't run. @@ -151,8 +154,7 @@ def is_due(self): return schedules.schedstate(False, NEVER_CHECK_TIMEOUT) # ONE OFF TASK: Disable one off tasks after they've ran once - if self.model.one_off and self.model.enabled \ - and self.model.total_run_count > 0: + if self.model.one_off and self.model.enabled and self.model.total_run_count > 0: self.model.enabled = False self.model.total_run_count = 0 # Reset self.model.no_changes = False # Mark the model entry as changed @@ -167,7 +169,7 @@ def is_due(self): return self.schedule.is_due(last_run_at_in_tz) def _default_now(self): - if getattr(settings, 'DJANGO_CELERY_BEAT_TZ_AWARE', True): + if getattr(settings, "DJANGO_CELERY_BEAT_TZ_AWARE", True): now = datetime.datetime.now(self.app.timezone) else: # this ends up getting passed to maybe_make_aware, which expects @@ -180,6 +182,7 @@ def __next__(self): self.model.total_run_count += 1 self.model.no_changes = True return self.__class__(self.model) + next = __next__ # for 2to3 def save(self): @@ -199,20 +202,20 @@ def to_model_schedule(cls, schedule): model_schedule = model_type.from_schedule(schedule) model_schedule.save() return model_schedule, model_field - raise ValueError( - f'Cannot convert schedule type {schedule!r} to model') + raise ValueError(f"Cannot convert schedule type {schedule!r} to model") @classmethod def from_entry(cls, name, app=None, **entry): obj, created = PeriodicTask._default_manager.update_or_create( - name=name, defaults=cls._unpack_fields(**entry), + name=name, + defaults=cls._unpack_fields(**entry), ) return cls(obj, app=app) @classmethod - def _unpack_fields(cls, schedule, - args=None, kwargs=None, relative=None, options=None, - **entry): + def _unpack_fields( + cls, schedule, args=None, kwargs=None, relative=None, options=None, **entry + ): entry_schedules = { model_field: None for _, _, model_field in cls.model_schedules } @@ -222,27 +225,37 @@ def _unpack_fields(cls, schedule, entry_schedules, args=dumps(args or []), kwargs=dumps(kwargs or {}), - **cls._unpack_options(**options or {}) + **cls._unpack_options(**options or {}), ) return entry @classmethod - def _unpack_options(cls, queue=None, exchange=None, routing_key=None, - priority=None, headers=None, expire_seconds=None, - **kwargs): + def _unpack_options( + cls, + queue=None, + exchange=None, + routing_key=None, + priority=None, + headers=None, + expire_seconds=None, + **kwargs, + ): return { - 'queue': queue, - 'exchange': exchange, - 'routing_key': routing_key, - 'priority': priority, - 'headers': dumps(headers or {}), - 'expire_seconds': expire_seconds, + "queue": queue, + "exchange": exchange, + "routing_key": routing_key, + "priority": priority, + "headers": dumps(headers or {}), + "expire_seconds": expire_seconds, } def __repr__(self): - return ''.format( - safe_str(self.name), self.task, safe_repr(self.args), - safe_repr(self.kwargs), self.schedule, + return "".format( + safe_str(self.name), + self.task, + safe_repr(self.args), + safe_repr(self.kwargs), + self.schedule, ) @@ -265,16 +278,17 @@ def __init__(self, *args, **kwargs): Scheduler.__init__(self, *args, **kwargs) self._finalize = Finalize(self, self.sync, exitpriority=5) self.max_interval = ( - kwargs.get('max_interval') + kwargs.get("max_interval") or self.app.conf.beat_max_loop_interval - or DEFAULT_MAX_INTERVAL) + or DEFAULT_MAX_INTERVAL + ) def setup_schedule(self): self.install_default_entries(self.schedule) self.update_from_dict(self.app.conf.beat_schedule) def all_as_schedule(self): - debug('DatabaseScheduler: Fetching database schedule') + debug("DatabaseScheduler: Fetching database schedule") s = {} for model in self.enabled_models(): try: @@ -296,8 +310,7 @@ def enabled_models_qs(self): seconds=SCHEDULE_SYNC_MAX_INTERVAL ) exclude_clock_tasks_query = Q( - clocked__isnull=False, - clocked__clocked_time__gt=next_schedule_sync + clocked__isnull=False, clocked__clocked_time__gt=next_schedule_sync ) exclude_cron_tasks_query = self._get_crontab_exclude_query() @@ -322,9 +335,7 @@ def _get_crontab_exclude_query(self): server_hour = server_time.hour # Window of +/- 2 hours around the current hour in server tz. - hours_to_include = [ - (server_hour + offset) % 24 for offset in range(-2, 3) - ] + hours_to_include = [(server_hour + offset) % 24 for offset in range(-2, 3)] hours_to_include += [4] # celery's default cleanup task # Get all tasks with a simple numeric hour value @@ -336,8 +347,7 @@ def _get_crontab_exclude_query(self): # Annotate these tasks with their server-hour equivalent annotated_tasks = numeric_hour_tasks.annotate( # Cast hour string to integer - hour_int=Cast('hour', IntegerField()), - + hour_int=Cast("hour", IntegerField()), # Calculate server-hour based on timezone offset server_hour=Case( # Handle each timezone specifically @@ -345,21 +355,22 @@ def _get_crontab_exclude_query(self): When( timezone=timezone_name, then=( - F('hour_int') + F("hour_int") + self._get_timezone_offset(timezone_name) + 24 - ) % 24 + ) + % 24, ) for timezone_name in self._get_unique_timezone_names() ], # Default case - use hour as is - default=F('hour_int') - ) + default=F("hour_int"), + ), ) excluded_hour_task_ids = annotated_tasks.exclude( server_hour__in=hours_to_include - ).values_list('id', flat=True) + ).values_list("id", flat=True) # Build the final exclude query: # Exclude crontab tasks that are not in our include list @@ -374,15 +385,11 @@ def _get_valid_hour_formats(self): Return a list of all valid hour values (0-23). Both zero-padded ("00"–"09") and non-padded ("0"–"23") """ - return [str(hour) for hour in range(24)] + [ - f"{hour:02d}" for hour in range(10) - ] + return [str(hour) for hour in range(24)] + [f"{hour:02d}" for hour in range(10)] def _get_unique_timezone_names(self): """Get a list of all unique timezone names used in CrontabSchedule""" - return CrontabSchedule.objects.values_list( - 'timezone', flat=True - ).distinct() + return CrontabSchedule.objects.values_list("timezone", flat=True).distinct() def _get_timezone_offset(self, timezone_name): """ @@ -435,12 +442,12 @@ def schedule_changed(self): last, ts = self._last_timestamp, self.Changes.last_change() except DatabaseError as exc: - logger.exception('Database gave error: %r', exc) + logger.exception("Database gave error: %r", exc) return False except InterfaceError: warning( - 'DatabaseScheduler: InterfaceError in schedule_changed(), ' - 'waiting to retry in next call...' + "DatabaseScheduler: InterfaceError in schedule_changed(), " + "waiting to retry in next call..." ) return False @@ -460,7 +467,7 @@ def reserve(self, entry): def sync(self): if logger.isEnabledFor(logging.DEBUG): - debug('Writing entries...') + debug("Writing entries...") _tried = set() _failed = set() try: @@ -474,11 +481,11 @@ def sync(self): except (KeyError, TypeError, ObjectDoesNotExist): _failed.add(name) except DatabaseError as exc: - logger.exception('Database error while sync: %r', exc) + logger.exception("Database error while sync: %r", exc) except InterfaceError: warning( - 'DatabaseScheduler: InterfaceError in sync(), ' - 'waiting to retry in next call...' + "DatabaseScheduler: InterfaceError in sync(), " + "waiting to retry in next call..." ) finally: # retry later, only for the failed ones @@ -488,9 +495,7 @@ def update_from_dict(self, mapping): s = {} for name, entry_fields in mapping.items(): try: - entry = self.Entry.from_entry(name, - app=self.app, - **entry_fields) + entry = self.Entry.from_entry(name, app=self.app, **entry_fields) if entry.model.enabled: s[name] = entry @@ -502,10 +507,11 @@ def install_default_entries(self, data): entries = {} if self.app.conf.result_expires: entries.setdefault( - 'celery.backend_cleanup', { - 'task': 'celery.backend_cleanup', - 'schedule': schedules.crontab('0', '4', '*'), - 'options': {'expire_seconds': 12 * 3600}, + "celery.backend_cleanup", + { + "task": "celery.backend_cleanup", + "schedule": schedules.crontab("0", "4", "*"), + "options": {"expire_seconds": 12 * 3600}, }, ) self.update_from_dict(entries) @@ -522,26 +528,20 @@ def schedule(self): current_time = datetime.datetime.now() if self._initial_read: - debug('DatabaseScheduler: initial read') + debug("DatabaseScheduler: initial read") initial = update = True self._initial_read = False self._last_full_sync = current_time elif self.schedule_changed(): - info('DatabaseScheduler: Schedule changed.') + info("DatabaseScheduler: Schedule changed.") update = True self._last_full_sync = current_time # Force update the schedule if it's been more than 5 minutes if not update: - time_since_last_sync = ( - current_time - self._last_full_sync - ).total_seconds() - if ( - time_since_last_sync >= SCHEDULE_SYNC_MAX_INTERVAL - ): - debug( - 'DatabaseScheduler: Forcing full sync after 5 minutes' - ) + time_since_last_sync = (current_time - self._last_full_sync).total_seconds() + if time_since_last_sync >= SCHEDULE_SYNC_MAX_INTERVAL: + debug("DatabaseScheduler: Forcing full sync after 5 minutes") update = True self._last_full_sync = current_time @@ -553,7 +553,8 @@ def schedule(self): self._heap = [] self._heap_invalidated = True if logger.isEnabledFor(logging.DEBUG): - debug('Current schedule:\n%s', '\n'.join( - repr(entry) for entry in self._schedule.values()), + debug( + "Current schedule:\n%s", + "\n".join(repr(entry) for entry in self._schedule.values()), ) return self._schedule From b25089fd080b05191231b58e027e760e9d3c9816 Mon Sep 17 00:00:00 2001 From: Brett Buford Date: Tue, 24 Jun 2025 18:16:13 -0700 Subject: [PATCH 07/11] namespace the lazy FKs --- django_celery_beat/models/abstract.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/django_celery_beat/models/abstract.py b/django_celery_beat/models/abstract.py index f6497250..25d3fb63 100644 --- a/django_celery_beat/models/abstract.py +++ b/django_celery_beat/models/abstract.py @@ -476,7 +476,7 @@ class AbstractPeriodicTask(models.Model): # You can only set ONE of the following schedule FK's # TODO: Redo this as a GenericForeignKey interval = models.ForeignKey( - "IntervalSchedule", + "django_celery_beat.IntervalSchedule", on_delete=models.CASCADE, null=True, blank=True, @@ -487,7 +487,7 @@ class AbstractPeriodicTask(models.Model): ), ) crontab = models.ForeignKey( - "CrontabSchedule", + "django_celery_beat.CrontabSchedule", on_delete=models.CASCADE, null=True, blank=True, @@ -498,7 +498,7 @@ class AbstractPeriodicTask(models.Model): ), ) solar = models.ForeignKey( - "SolarSchedule", + "django_celery_beat.SolarSchedule", on_delete=models.CASCADE, null=True, blank=True, @@ -509,7 +509,7 @@ class AbstractPeriodicTask(models.Model): ), ) clocked = models.ForeignKey( - "ClockedSchedule", + "django_celery_beat.ClockedSchedule", on_delete=models.CASCADE, null=True, blank=True, From 1b4b65232f4066faac86ffb022dc5a78db82ac88 Mon Sep 17 00:00:00 2001 From: Brett Buford Date: Tue, 24 Jun 2025 18:55:56 -0700 Subject: [PATCH 08/11] precommit --- django_celery_beat/models/__init__.py | 21 +++------------- django_celery_beat/models/abstract.py | 36 ++++++++++++--------------- django_celery_beat/models/generic.py | 16 ++++++------ django_celery_beat/schedulers.py | 11 +++----- 4 files changed, 30 insertions(+), 54 deletions(-) diff --git a/django_celery_beat/models/__init__.py b/django_celery_beat/models/__init__.py index 184e66fc..6ece9b0c 100644 --- a/django_celery_beat/models/__init__.py +++ b/django_celery_beat/models/__init__.py @@ -1,21 +1,8 @@ -from .abstract import crontab_schedule_celery_timezone, DAYS -from .generic import ( - ClockedSchedule, - CrontabSchedule, - IntervalSchedule, - PeriodicTask, - PeriodicTasks, - SolarSchedule, -) -from ..helpers import ( - crontabschedule_model, - intervalschedule_model, - solarschedule_model, - periodictask_model, - periodictasks_model, -) +from .abstract import DAYS, crontab_schedule_celery_timezone +from .generic import (ClockedSchedule, CrontabSchedule, IntervalSchedule, + PeriodicTask, PeriodicTasks, SolarSchedule) -__ALL__ = [ +__all__ = [ "ClockedSchedule", "CrontabSchedule", "IntervalSchedule", diff --git a/django_celery_beat/models/abstract.py b/django_celery_beat/models/abstract.py index 25d3fb63..d368443e 100644 --- a/django_celery_beat/models/abstract.py +++ b/django_celery_beat/models/abstract.py @@ -9,23 +9,19 @@ import timezone_field from celery import current_app, schedules -from cron_descriptor import ( - FormatException, - MissingFieldException, - WrongArgumentException, - get_description, -) +from cron_descriptor import (FormatException, MissingFieldException, + WrongArgumentException, get_description) from django.conf import settings from django.core.exceptions import MultipleObjectsReturned, ValidationError from django.core.validators import MaxValueValidator, MinValueValidator from django.db import models from django.utils.translation import gettext_lazy as _ -from .. import validators -from ..clockedschedule import clocked -from ..tzcrontab import TzAwareCrontab -from ..utils import make_aware, now -from ..helpers import periodictasks_model +from django_celery_beat import validators +from django_celery_beat.clockedschedule import clocked +from django_celery_beat.helpers import periodictasks_model +from django_celery_beat.tzcrontab import TzAwareCrontab +from django_celery_beat.utils import make_aware, now DAYS = "days" HOURS = "hours" @@ -166,7 +162,7 @@ class AbstractIntervalSchedule(models.Model): null=False, verbose_name=_("Number of Periods"), help_text=_( - "Number of interval periods to wait before " "running the task again" + "Number of interval periods to wait before running the task again" ), validators=[MinValueValidator(1)], ) @@ -290,7 +286,7 @@ class AbstractCrontabSchedule(models.Model): default="*", verbose_name=_("Day(s) Of The Month"), help_text=_( - 'Cron Days Of The Month to Run. Use "*" for "all". ' '(Example: "1,15")' + 'Cron Days Of The Month to Run. Use "*" for "all". (Example: "1,15")' ), validators=[validators.day_of_month_validator], ) @@ -524,14 +520,14 @@ class AbstractPeriodicTask(models.Model): blank=True, default="[]", verbose_name=_("Positional Arguments"), - help_text=_("JSON encoded positional arguments " '(Example: ["arg1", "arg2"])'), + help_text=_('JSON encoded positional arguments (Example: ["arg1", "arg2"])'), ) kwargs = models.TextField( blank=True, default="{}", verbose_name=_("Keyword Arguments"), help_text=_( - "JSON encoded keyword arguments " '(Example: {"argument": "value"})' + 'JSON encoded keyword arguments (Example: {"argument": "value"})' ), ) @@ -542,7 +538,7 @@ class AbstractPeriodicTask(models.Model): default=None, verbose_name=_("Queue Override"), help_text=_( - "Queue defined in CELERY_TASK_QUEUES. " "Leave None for default queuing." + "Queue defined in CELERY_TASK_QUEUES. Leave None for default queuing." ), ) @@ -611,7 +607,7 @@ class AbstractPeriodicTask(models.Model): null=True, verbose_name=_("Start Datetime"), help_text=_( - "Datetime when the schedule should begin " "triggering the task to run" + "Datetime when the schedule should begin triggering the task to run" ), ) enabled = models.BooleanField( @@ -636,7 +632,7 @@ class AbstractPeriodicTask(models.Model): editable=False, verbose_name=_("Total Run Count"), help_text=_( - "Running count of how many times the schedule " "has triggered the task" + "Running count of how many times the schedule has triggered the task" ), ) date_changed = models.DateTimeField( @@ -667,10 +663,10 @@ def validate_unique(self, *args, **kwargs): if len(selected_schedule_types) == 0: raise ValidationError( - "One of clocked, interval, crontab, or solar " "must be set." + "One of clocked, interval, crontab, or solar must be set." ) - err_msg = "Only one of clocked, interval, crontab, " "or solar must be set" + err_msg = "Only one of clocked, interval, crontab, or solar must be set" if len(selected_schedule_types) > 1: error_info = {} for selected_schedule_type in selected_schedule_types: diff --git a/django_celery_beat/models/generic.py b/django_celery_beat/models/generic.py index 5e55274f..64a38051 100644 --- a/django_celery_beat/models/generic.py +++ b/django_celery_beat/models/generic.py @@ -1,14 +1,12 @@ from django.db.models import signals -from .abstract import ( - AbstractClockedSchedule, - AbstractCrontabSchedule, - AbstractIntervalSchedule, - AbstractPeriodicTask, - AbstractPeriodicTasks, - AbstractSolarSchedule, -) -from ..querysets import PeriodicTaskQuerySet +from django_celery_beat.models.abstract import (AbstractClockedSchedule, + AbstractCrontabSchedule, + AbstractIntervalSchedule, + AbstractPeriodicTask, + AbstractPeriodicTasks, + AbstractSolarSchedule) +from django_celery_beat.querysets import PeriodicTaskQuerySet class SolarSchedule(AbstractSolarSchedule): diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index 3a47d5c1..95662dae 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -24,14 +24,9 @@ from kombu.utils.json import dumps, loads from .clockedschedule import clocked -from .helpers import ( - clockedschedule_model, - crontabschedule_model, - intervalschedule_model, - periodictask_model, - periodictasks_model, - solarschedule_model, -) +from .helpers import (clockedschedule_model, crontabschedule_model, + intervalschedule_model, periodictask_model, + periodictasks_model, solarschedule_model) from .utils import NEVER_CHECK_TIMEOUT, aware_now, now # This scheduler must wake up more frequently than the From a936824058913ee246beac5f0250ff98f7c18fec Mon Sep 17 00:00:00 2001 From: Brett Buford Date: Tue, 24 Jun 2025 19:22:10 -0700 Subject: [PATCH 09/11] Cut down on unnecessary diff --- django_celery_beat/models/abstract.py | 310 +++++++++++++------------- 1 file changed, 153 insertions(+), 157 deletions(-) diff --git a/django_celery_beat/models/abstract.py b/django_celery_beat/models/abstract.py index d368443e..06f37f9a 100644 --- a/django_celery_beat/models/abstract.py +++ b/django_celery_beat/models/abstract.py @@ -23,26 +23,26 @@ from django_celery_beat.tzcrontab import TzAwareCrontab from django_celery_beat.utils import make_aware, now -DAYS = "days" -HOURS = "hours" -MINUTES = "minutes" -SECONDS = "seconds" -MICROSECONDS = "microseconds" +DAYS = 'days' +HOURS = 'hours' +MINUTES = 'minutes' +SECONDS = 'seconds' +MICROSECONDS = 'microseconds' PERIOD_CHOICES = ( - (DAYS, _("Days")), - (HOURS, _("Hours")), - (MINUTES, _("Minutes")), - (SECONDS, _("Seconds")), - (MICROSECONDS, _("Microseconds")), + (DAYS, _('Days')), + (HOURS, _('Hours')), + (MINUTES, _('Minutes')), + (SECONDS, _('Seconds')), + (MICROSECONDS, _('Microseconds')), ) SINGULAR_PERIODS = ( - (DAYS, _("Day")), - (HOURS, _("Hour")), - (MINUTES, _("Minute")), - (SECONDS, _("Second")), - (MICROSECONDS, _("Microsecond")), + (DAYS, _('Day')), + (HOURS, _('Hour')), + (MINUTES, _('Minute')), + (SECONDS, _('Second')), + (MICROSECONDS, _('Microsecond')), ) SOLAR_SCHEDULES = [ @@ -60,7 +60,7 @@ def cronexp(field): """Representation of cron expression.""" - return field and str(field).replace(" ", "") or "*" + return field and str(field).replace(' ', '') or '*' def crontab_schedule_celery_timezone(): @@ -69,12 +69,12 @@ def crontab_schedule_celery_timezone(): If is not defined or is not a valid timezone, return ``"UTC"`` instead. """ try: - CELERY_TIMEZONE = getattr(settings, "%s_TIMEZONE" % current_app.namespace) + CELERY_TIMEZONE = getattr(settings, '%s_TIMEZONE' % current_app.namespace) except AttributeError: - return "UTC" + return 'UTC' if CELERY_TIMEZONE in available_timezones(): return CELERY_TIMEZONE - return "UTC" + return 'UTC' class AbstractSolarSchedule(models.Model): @@ -88,21 +88,21 @@ class AbstractSolarSchedule(models.Model): event = models.CharField( max_length=24, choices=SOLAR_SCHEDULES, - verbose_name=_("Solar Event"), - help_text=_("The type of solar event when the job should run"), + verbose_name=_('Solar Event'), + help_text=_('The type of solar event when the job should run'), ) latitude = models.DecimalField( max_digits=9, decimal_places=6, - verbose_name=_("Latitude"), - help_text=_("Run the task when the event happens at this latitude"), + verbose_name=_('Latitude'), + help_text=_('Run the task when the event happens at this latitude'), validators=[MinValueValidator(-90), MaxValueValidator(90)], ) longitude = models.DecimalField( max_digits=9, decimal_places=6, - verbose_name=_("Longitude"), - help_text=_("Run the task when the event happens at this longitude"), + verbose_name=_('Longitude'), + help_text=_('Run the task when the event happens at this longitude'), validators=[MinValueValidator(-180), MaxValueValidator(180)], ) @@ -110,10 +110,10 @@ class Meta: """Table information.""" abstract = True - verbose_name = _("solar event") - verbose_name_plural = _("solar events") - ordering = ("event", "latitude", "longitude") - unique_together = ("event", "latitude", "longitude") + verbose_name = _('solar event') + verbose_name_plural = _('solar events') + ordering = ('event', 'latitude', 'longitude') + unique_together = ('event', 'latitude', 'longitude') @property def schedule(self): @@ -124,9 +124,9 @@ def schedule(self): @classmethod def from_schedule(cls, schedule): spec = { - "event": schedule.event, - "latitude": schedule.lat, - "longitude": schedule.lon, + 'event': schedule.event, + 'latitude': schedule.lat, + 'longitude': schedule.lon, } # we do not check for MultipleObjectsReturned exception here because @@ -137,7 +137,7 @@ def from_schedule(cls, schedule): return cls(**spec) def __str__(self): - return "{} ({}, {})".format( + return '{} ({}, {})'.format( self.get_event_display(), self.latitude, self.longitude ) @@ -160,26 +160,24 @@ class AbstractIntervalSchedule(models.Model): every = models.IntegerField( null=False, - verbose_name=_("Number of Periods"), - help_text=_( - "Number of interval periods to wait before running the task again" - ), + verbose_name=_('Number of Periods'), + help_text=_('Number of interval periods to wait before running the task again'), validators=[MinValueValidator(1)], ) period = models.CharField( max_length=24, choices=PERIOD_CHOICES, - verbose_name=_("Interval Period"), - help_text=_("The type of period between task runs (Example: days)"), + verbose_name=_('Interval Period'), + help_text=_('The type of period between task runs (Example: days)'), ) class Meta: """Table information.""" abstract = True - verbose_name = _("interval") - verbose_name_plural = _("intervals") - ordering = ["period", "every"] + verbose_name = _('interval') + verbose_name_plural = _('intervals') + ordering = ['period', 'every'] @property def schedule(self): @@ -204,12 +202,12 @@ def __str__(self): if period == self.period: readable_period = _readable_period.lower() break - return _("every {}").format(readable_period) + return _('every {}').format(readable_period) for period, _readable_period in PERIOD_CHOICES: if period == self.period: readable_period = _readable_period.lower() break - return _("every {} {}").format(self.every, readable_period) + return _('every {} {}').format(self.every, readable_period) @property def period_singular(self): @@ -220,20 +218,20 @@ class AbstractClockedSchedule(models.Model): """Abstract clocked schedule.""" clocked_time = models.DateTimeField( - verbose_name=_("Clock Time"), - help_text=_("Run the task at clocked time"), + verbose_name=_('Clock Time'), + help_text=_('Run the task at clocked time'), ) class Meta: """Table information.""" abstract = True - verbose_name = _("clocked") - verbose_name_plural = _("clocked") - ordering = ["clocked_time"] + verbose_name = _('clocked') + verbose_name_plural = _('clocked') + ordering = ['clocked_time'] def __str__(self): - return f"{make_aware(self.clocked_time)}" + return f'{make_aware(self.clocked_time)}' @property def schedule(self): @@ -242,7 +240,7 @@ def schedule(self): @classmethod def from_schedule(cls, schedule): - spec = {"clocked_time": schedule.clocked_time} + spec = {'clocked_time': schedule.clocked_time} try: return cls.objects.get(**spec) except cls.DoesNotExist: @@ -269,22 +267,22 @@ class AbstractCrontabSchedule(models.Model): # minute = models.CharField( max_length=60 * 4, - default="*", - verbose_name=_("Minute(s)"), + default='*', + verbose_name=_('Minute(s)'), help_text=_('Cron Minutes to Run. Use "*" for "all". (Example: "0,30")'), validators=[validators.minute_validator], ) hour = models.CharField( max_length=24 * 4, - default="*", - verbose_name=_("Hour(s)"), + default='*', + verbose_name=_('Hour(s)'), help_text=_('Cron Hours to Run. Use "*" for "all". (Example: "8,20")'), validators=[validators.hour_validator], ) day_of_month = models.CharField( max_length=31 * 4, - default="*", - verbose_name=_("Day(s) Of The Month"), + default='*', + verbose_name=_('Day(s) Of The Month'), help_text=_( 'Cron Days Of The Month to Run. Use "*" for "all". (Example: "1,15")' ), @@ -292,8 +290,8 @@ class AbstractCrontabSchedule(models.Model): ) month_of_year = models.CharField( max_length=64, - default="*", - verbose_name=_("Month(s) Of The Year"), + default='*', + verbose_name=_('Month(s) Of The Year'), help_text=_( 'Cron Months (1-12) Of The Year to Run. Use "*" for "all". ' '(Example: "1,12")' @@ -302,8 +300,8 @@ class AbstractCrontabSchedule(models.Model): ) day_of_week = models.CharField( max_length=64, - default="*", - verbose_name=_("Day(s) Of The Week"), + default='*', + verbose_name=_('Day(s) Of The Week'), help_text=_( 'Cron Days Of The Week to Run. Use "*" for "all", Sunday ' 'is 0 or 7, Monday is 1. (Example: "0,5")' @@ -314,23 +312,23 @@ class AbstractCrontabSchedule(models.Model): timezone = timezone_field.TimeZoneField( default=crontab_schedule_celery_timezone, use_pytz=False, - verbose_name=_("Cron Timezone"), - help_text=_("Timezone to Run the Cron Schedule on. Default is UTC."), + verbose_name=_('Cron Timezone'), + help_text=_('Timezone to Run the Cron Schedule on. Default is UTC.'), ) class Meta: """Table information.""" abstract = True - verbose_name = _("crontab") - verbose_name_plural = _("crontabs") + verbose_name = _('crontab') + verbose_name_plural = _('crontabs') ordering = [ - "month_of_year", - "day_of_month", - "day_of_week", - "hour", - "minute", - "timezone", + 'month_of_year', + 'day_of_month', + 'day_of_week', + 'hour', + 'minute', + 'timezone', ] @property @@ -344,13 +342,13 @@ def human_readable(self): month_of_year=self.month_of_year, ) if c.day_of_week and set(c.day_of_week) == set(range(7)): - day_of_week = "*" + day_of_week = '*' else: - day_of_week = cronexp(",".join(map(str, c.day_of_week))) + day_of_week = cronexp(','.join(map(str, c.day_of_week))) except ValueError: day_of_week = cronexp(self.day_of_week) - cron_expression = "{} {} {} {} {}".format( + cron_expression = '{} {} {} {} {}'.format( cronexp(self.minute), cronexp(self.hour), cronexp(self.day_of_month), @@ -360,11 +358,11 @@ def human_readable(self): try: human_readable = get_description(cron_expression) except (MissingFieldException, FormatException, WrongArgumentException): - return f"{cron_expression} {str(self.timezone)}" - return f"{human_readable} {str(self.timezone)}" + return f'{cron_expression} {str(self.timezone)}' + return f'{human_readable} {str(self.timezone)}' def __str__(self): - return "{} {} {} {} {} (m/h/dM/MY/d) {}".format( + return '{} {} {} {} {} (m/h/dM/MY/d) {}'.format( cronexp(self.minute), cronexp(self.hour), cronexp(self.day_of_month), @@ -382,7 +380,7 @@ def schedule(self): day_of_month=self.day_of_month, month_of_year=self.month_of_year, ) - if getattr(settings, "DJANGO_CELERY_BEAT_TZ_AWARE", True): + if getattr(settings, 'DJANGO_CELERY_BEAT_TZ_AWARE', True): crontab = TzAwareCrontab( minute=self.minute, hour=self.hour, @@ -396,12 +394,12 @@ def schedule(self): @classmethod def from_schedule(cls, schedule): spec = { - "minute": schedule._orig_minute, - "hour": schedule._orig_hour, - "day_of_week": schedule._orig_day_of_week, - "day_of_month": schedule._orig_day_of_month, - "month_of_year": schedule._orig_month_of_year, - "timezone": schedule.tz, + 'minute': schedule._orig_minute, + 'hour': schedule._orig_hour, + 'day_of_week': schedule._orig_day_of_week, + 'day_of_month': schedule._orig_day_of_month, + 'month_of_year': schedule._orig_month_of_year, + 'timezone': schedule.tz, } try: return cls.objects.get(**spec) @@ -430,6 +428,7 @@ class AbstractPeriodicTasks(models.Model): class Meta: """Table information.""" + verbose_name = _('periodic task track') verbose_name_plural = _('periodic task tracks') abstract = True @@ -441,7 +440,7 @@ def changed(cls, instance, **kwargs): @classmethod def update_changed(cls, **kwargs): - cls.objects.update_or_create(ident=1, defaults={"last_update": now()}) + cls.objects.update_or_create(ident=1, defaults={'last_update': now()}) @classmethod def last_change(cls): @@ -457,14 +456,14 @@ class AbstractPeriodicTask(models.Model): name = models.CharField( max_length=200, unique=True, - verbose_name=_("Name"), - help_text=_("Short Description For This Task"), + verbose_name=_('Name'), + help_text=_('Short Description For This Task'), ) task = models.CharField( max_length=200, - verbose_name="Task Name", + verbose_name='Task Name', help_text=_( - "The Name of the Celery Task that Should be Run. " + 'The Name of the Celery Task that Should be Run. ' '(Example: "proj.tasks.import_contacts")' ), ) @@ -472,63 +471,61 @@ class AbstractPeriodicTask(models.Model): # You can only set ONE of the following schedule FK's # TODO: Redo this as a GenericForeignKey interval = models.ForeignKey( - "django_celery_beat.IntervalSchedule", + 'django_celery_beat.IntervalSchedule', on_delete=models.CASCADE, null=True, blank=True, - verbose_name=_("Interval Schedule"), + verbose_name=_('Interval Schedule'), help_text=_( - "Interval Schedule to run the task on. " - "Set only one schedule type, leave the others null." + 'Interval Schedule to run the task on. ' + 'Set only one schedule type, leave the others null.' ), ) crontab = models.ForeignKey( - "django_celery_beat.CrontabSchedule", + 'django_celery_beat.CrontabSchedule', on_delete=models.CASCADE, null=True, blank=True, - verbose_name=_("Crontab Schedule"), + verbose_name=_('Crontab Schedule'), help_text=_( - "Crontab Schedule to run the task on. " - "Set only one schedule type, leave the others null." + 'Crontab Schedule to run the task on. ' + 'Set only one schedule type, leave the others null.' ), ) solar = models.ForeignKey( - "django_celery_beat.SolarSchedule", + 'django_celery_beat.SolarSchedule', on_delete=models.CASCADE, null=True, blank=True, - verbose_name=_("Solar Schedule"), + verbose_name=_('Solar Schedule'), help_text=_( - "Solar Schedule to run the task on. " - "Set only one schedule type, leave the others null." + 'Solar Schedule to run the task on. ' + 'Set only one schedule type, leave the others null.' ), ) clocked = models.ForeignKey( - "django_celery_beat.ClockedSchedule", + 'django_celery_beat.ClockedSchedule', on_delete=models.CASCADE, null=True, blank=True, - verbose_name=_("Clocked Schedule"), + verbose_name=_('Clocked Schedule'), help_text=_( - "Clocked Schedule to run the task on. " - "Set only one schedule type, leave the others null." + 'Clocked Schedule to run the task on. ' + 'Set only one schedule type, leave the others null.' ), ) # TODO: use django's JsonField args = models.TextField( blank=True, - default="[]", - verbose_name=_("Positional Arguments"), + default='[]', + verbose_name=_('Positional Arguments'), help_text=_('JSON encoded positional arguments (Example: ["arg1", "arg2"])'), ) kwargs = models.TextField( blank=True, - default="{}", - verbose_name=_("Keyword Arguments"), - help_text=_( - 'JSON encoded keyword arguments (Example: {"argument": "value"})' - ), + default='{}', + verbose_name=_('Keyword Arguments'), + help_text=_('JSON encoded keyword arguments (Example: {"argument": "value"})'), ) queue = models.CharField( @@ -536,9 +533,9 @@ class AbstractPeriodicTask(models.Model): blank=True, null=True, default=None, - verbose_name=_("Queue Override"), + verbose_name=_('Queue Override'), help_text=_( - "Queue defined in CELERY_TASK_QUEUES. Leave None for default queuing." + 'Queue defined in CELERY_TASK_QUEUES. Leave None for default queuing.' ), ) @@ -550,22 +547,22 @@ class AbstractPeriodicTask(models.Model): blank=True, null=True, default=None, - verbose_name=_("Exchange"), - help_text=_("Override Exchange for low-level AMQP routing"), + verbose_name=_('Exchange'), + help_text=_('Override Exchange for low-level AMQP routing'), ) routing_key = models.CharField( max_length=200, blank=True, null=True, default=None, - verbose_name=_("Routing Key"), - help_text=_("Override Routing Key for low-level AMQP routing"), + verbose_name=_('Routing Key'), + help_text=_('Override Routing Key for low-level AMQP routing'), ) headers = models.TextField( blank=True, - default="{}", - verbose_name=_("AMQP Message Headers"), - help_text=_("JSON encoded message headers for the AMQP message."), + default='{}', + verbose_name=_('AMQP Message Headers'), + help_text=_('JSON encoded message headers for the AMQP message.'), ) priority = models.PositiveIntegerField( @@ -573,47 +570,46 @@ class AbstractPeriodicTask(models.Model): validators=[MaxValueValidator(255)], blank=True, null=True, - verbose_name=_("Priority"), + verbose_name=_('Priority'), help_text=_( - "Priority Number between 0 and 255. " - "Supported by: RabbitMQ, Redis (priority reversed, 0 is highest)." + 'Priority Number between 0 and 255. ' + 'Supported by: RabbitMQ, Redis (priority reversed, 0 is highest).' ), ) expires = models.DateTimeField( blank=True, null=True, - verbose_name=_("Expires Datetime"), + verbose_name=_('Expires Datetime'), help_text=_( - "Datetime after which the schedule will no longer " - "trigger the task to run" + 'Datetime after which the schedule will no longer trigger the task to run' ), ) expire_seconds = models.PositiveIntegerField( blank=True, null=True, - verbose_name=_("Expires timedelta with seconds"), + verbose_name=_('Expires timedelta with seconds'), help_text=_( - "Timedelta with seconds which the schedule will no longer " - "trigger the task to run" + 'Timedelta with seconds which the schedule will no longer ' + 'trigger the task to run' ), ) one_off = models.BooleanField( default=False, - verbose_name=_("One-off Task"), - help_text=_("If True, the schedule will only run the task a single time"), + verbose_name=_('One-off Task'), + help_text=_('If True, the schedule will only run the task a single time'), ) start_time = models.DateTimeField( blank=True, null=True, - verbose_name=_("Start Datetime"), + verbose_name=_('Start Datetime'), help_text=_( - "Datetime when the schedule should begin triggering the task to run" + 'Datetime when the schedule should begin triggering the task to run' ), ) enabled = models.BooleanField( default=True, - verbose_name=_("Enabled"), - help_text=_("Set to False to disable the schedule"), + verbose_name=_('Enabled'), + help_text=_('Set to False to disable the schedule'), ) last_run_at = models.DateTimeField( auto_now=False, @@ -621,29 +617,29 @@ class AbstractPeriodicTask(models.Model): editable=False, blank=True, null=True, - verbose_name=_("Last Run Datetime"), + verbose_name=_('Last Run Datetime'), help_text=_( - "Datetime that the schedule last triggered the task to run. " - "Reset to None if enabled is set to False." + 'Datetime that the schedule last triggered the task to run. ' + 'Reset to None if enabled is set to False.' ), ) total_run_count = models.PositiveIntegerField( default=0, editable=False, - verbose_name=_("Total Run Count"), + verbose_name=_('Total Run Count'), help_text=_( - "Running count of how many times the schedule has triggered the task" + 'Running count of how many times the schedule has triggered the task' ), ) date_changed = models.DateTimeField( auto_now=True, - verbose_name=_("Last Modified"), - help_text=_("Datetime that this PeriodicTask was last modified"), + verbose_name=_('Last Modified'), + help_text=_('Datetime that this PeriodicTask was last modified'), ) description = models.TextField( blank=True, - verbose_name=_("Description"), - help_text=_("Detailed description about the details of this Periodic Task"), + verbose_name=_('Description'), + help_text=_('Detailed description about the details of this Periodic Task'), ) no_changes = False @@ -652,21 +648,21 @@ class Meta: """Table information.""" abstract = True - verbose_name = _("periodic task") - verbose_name_plural = _("periodic tasks") + verbose_name = _('periodic task') + verbose_name_plural = _('periodic tasks') def validate_unique(self, *args, **kwargs): super().validate_unique(*args, **kwargs) - schedule_types = ["interval", "crontab", "solar", "clocked"] + schedule_types = ['interval', 'crontab', 'solar', 'clocked'] selected_schedule_types = [s for s in schedule_types if getattr(self, s)] if len(selected_schedule_types) == 0: raise ValidationError( - "One of clocked, interval, crontab, or solar must be set." + 'One of clocked, interval, crontab, or solar must be set.' ) - err_msg = "Only one of clocked, interval, crontab, or solar must be set" + err_msg = 'Only one of clocked, interval, crontab, or solar must be set' if len(selected_schedule_types) > 1: error_info = {} for selected_schedule_type in selected_schedule_types: @@ -675,7 +671,7 @@ def validate_unique(self, *args, **kwargs): # clocked must be one off task if self.clocked and not self.one_off: - err_msg = "clocked must be one off, one_off must set True" + err_msg = 'clocked must be one off, one_off must set True' raise ValidationError(err_msg) def save(self, *args, **kwargs): @@ -697,7 +693,7 @@ def delete(self, *args, **kwargs): def _clean_expires(self): if self.expire_seconds is not None and self.expires: raise ValidationError( - _("Only one can be set, in expires and expire_seconds") + _('Only one can be set, in expires and expire_seconds') ) @property @@ -705,15 +701,15 @@ def expires_(self): return self.expires or self.expire_seconds def __str__(self): - fmt = "{0.name}: {{no schedule}}" + fmt = '{0.name}: {{no schedule}}' if self.interval: - fmt = "{0.name}: {0.interval}" + fmt = '{0.name}: {0.interval}' if self.crontab: - fmt = "{0.name}: {0.crontab}" + fmt = '{0.name}: {0.crontab}' if self.solar: - fmt = "{0.name}: {0.solar}" + fmt = '{0.name}: {0.solar}' if self.clocked: - fmt = "{0.name}: {0.clocked}" + fmt = '{0.name}: {0.clocked}' return fmt.format(self) @property From 245e32441e7c637cd756a958b7e3d2cfa4b5076c Mon Sep 17 00:00:00 2001 From: Brett Buford Date: Wed, 25 Jun 2025 09:23:35 -0700 Subject: [PATCH 10/11] test_helpers for code cov --- t/unit/test_helpers.py | 91 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 t/unit/test_helpers.py diff --git a/t/unit/test_helpers.py b/t/unit/test_helpers.py new file mode 100644 index 00000000..b03e9aec --- /dev/null +++ b/t/unit/test_helpers.py @@ -0,0 +1,91 @@ +import pytest +from django.core.exceptions import ImproperlyConfigured +from django.test import override_settings + +from django_celery_beat.helpers import (clockedschedule_model, + crontabschedule_model, + intervalschedule_model, + periodictask_model, + periodictasks_model, + solarschedule_model) +from django_celery_beat.models.abstract import (AbstractClockedSchedule, + AbstractCrontabSchedule, + AbstractIntervalSchedule, + AbstractPeriodicTask, + AbstractPeriodicTasks, + AbstractSolarSchedule) + +fetch_models = [ + (AbstractCrontabSchedule, crontabschedule_model), + (AbstractIntervalSchedule, intervalschedule_model), + (AbstractPeriodicTask, periodictask_model), + (AbstractSolarSchedule, solarschedule_model), + (AbstractPeriodicTasks, periodictasks_model), + (AbstractClockedSchedule, clockedschedule_model), +] + + +@pytest.mark.django_db +@pytest.mark.parametrize(("abstract_model", "fetch_func"), fetch_models) +def test_fetching_model_works_with_no_setting(abstract_model, fetch_func): + """ + Test fetching models when you're using the generic Django Celery Beat models. + """ + result = fetch_func() + assert issubclass( + result, abstract_model + ), f"Expected {abstract_model}, got {type(result)}" + + +@pytest.mark.django_db +@pytest.mark.parametrize(("abstract_model", "fetch_func"), fetch_models) +def test_fetching_model_works_with_correct_setting(abstract_model, fetch_func): + """ + Test fetching models when you're using custom Django Celery Beat models. + """ + model_name = abstract_model.__name__.replace("Abstract", "") + model_setting_name = f"CELERY_BEAT_{model_name.upper()}_MODEL" + app_label_and_generic_name = f"django_celery_beat.{model_name.upper()}" + + with override_settings(**{model_setting_name: app_label_and_generic_name}): + result = fetch_func() + assert issubclass( + result, abstract_model + ), f"Expected {abstract_model}, got {type(result)}" + + +@pytest.mark.django_db +@pytest.mark.parametrize(("abstract_model", "fetch_func"), fetch_models) +def test_fetching_model_works_with_invalid_setting(abstract_model, fetch_func): + """ + Test fetching models when you're using custom Django Celery Beat models, + but your model name is not of the form `app_label.model_name`. + """ + model_name = abstract_model.__name__.replace("Abstract", "") + model_setting_name = f"CELERY_BEAT_{model_name.upper()}_MODEL" + + with override_settings(**{model_setting_name: f"{model_name}"}), pytest.raises( + ImproperlyConfigured, + match=f"{model_setting_name} must be of the form 'app_label.model_name'", + ): + fetch_func() + + +@pytest.mark.django_db +@pytest.mark.parametrize(("abstract_model", "fetch_func"), fetch_models) +def test_fetching_model_works_with_improper_setting(abstract_model, fetch_func): + """ + Test fetching models when you're using custom Django Celery Beat models, + but your model cannot be found. + """ + model_name = abstract_model.__name__.replace("Abstract", "") + model_setting_name = f"CELERY_BEAT_{model_name.upper()}_MODEL" + + with override_settings( + **{model_setting_name: f"improper.{model_name}"} + ), pytest.raises( + ImproperlyConfigured, + match=f"{model_setting_name} refers to model 'improper.{model_name}' " + "that has not been installed", + ): + fetch_func() From 2bf1a848cfc62cc40b048cd923c9c9eeb2427ed8 Mon Sep 17 00:00:00 2001 From: Brett Buford Date: Wed, 25 Jun 2025 09:40:05 -0700 Subject: [PATCH 11/11] reduce schedulers.py diff --- django_celery_beat/schedulers.py | 178 +++++++++++++++---------------- 1 file changed, 89 insertions(+), 89 deletions(-) diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index 95662dae..1b51f41a 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -54,12 +54,12 @@ class ModelEntry(ScheduleEntry): """Scheduler entry taken from database row.""" model_schedules = ( - (schedules.crontab, CrontabSchedule, "crontab"), - (schedules.schedule, IntervalSchedule, "interval"), - (schedules.solar, SolarSchedule, "solar"), - (clocked, ClockedSchedule, "clocked"), + (schedules.crontab, CrontabSchedule, 'crontab'), + (schedules.schedule, IntervalSchedule, 'interval'), + (schedules.solar, SolarSchedule, 'solar'), + (clocked, ClockedSchedule, 'clocked') ) - save_fields = ["last_run_at", "total_run_count", "no_changes"] + save_fields = ['last_run_at', 'total_run_count', 'no_changes'] def __init__(self, model, app=None): """Initialize the model entry.""" @@ -70,34 +70,33 @@ def __init__(self, model, app=None): self.schedule = model.schedule except model.DoesNotExist: logger.error( - "Disabling schedule %s that was removed from database", + 'Disabling schedule %s that was removed from database', self.name, ) self._disable(model) try: - self.args = loads(model.args or "[]") - self.kwargs = loads(model.kwargs or "{}") + self.args = loads(model.args or '[]') + self.kwargs = loads(model.kwargs or '{}') except ValueError as exc: logger.exception( - "Removing schedule %s for argument deseralization error: %r", - self.name, - exc, + 'Removing schedule %s for argument deseralization error: %r', + self.name, exc, ) self._disable(model) self.options = {} - for option in ["queue", "exchange", "routing_key", "priority"]: + for option in ['queue', 'exchange', 'routing_key', 'priority']: value = getattr(model, option) if value is None: continue self.options[option] = value - if getattr(model, "expires_", None): - self.options["expires"] = getattr(model, "expires_") + if getattr(model, 'expires_', None): + self.options['expires'] = getattr(model, 'expires_') - headers = loads(model.headers or "{}") - headers["periodic_task_name"] = model.name - self.options["headers"] = headers + headers = loads(model.headers or '{}') + headers['periodic_task_name'] = model.name + self.options['headers'] = headers self.total_run_count = model.total_run_count self.model = model @@ -109,9 +108,8 @@ def __init__(self, model, app=None): # This will trigger the job to run at start_time # and avoid the heap block. if self.model.start_time: - model.last_run_at = model.last_run_at - datetime.timedelta( - days=365 * 30 - ) + model.last_run_at = model.last_run_at \ + - datetime.timedelta(days=365 * 30) self.last_run_at = model.last_run_at @@ -128,7 +126,7 @@ def is_due(self): # START DATE: only run after the `start_time`, if one exists. if self.model.start_time is not None: now = self._default_now() - if getattr(settings, "DJANGO_CELERY_BEAT_TZ_AWARE", True): + if getattr(settings, 'DJANGO_CELERY_BEAT_TZ_AWARE', True): now = maybe_make_aware(self._default_now()) if now < self.model.start_time: # The datetime is before the start date - don't run. @@ -149,7 +147,8 @@ def is_due(self): return schedules.schedstate(False, NEVER_CHECK_TIMEOUT) # ONE OFF TASK: Disable one off tasks after they've ran once - if self.model.one_off and self.model.enabled and self.model.total_run_count > 0: + if self.model.one_off and self.model.enabled \ + and self.model.total_run_count > 0: self.model.enabled = False self.model.total_run_count = 0 # Reset self.model.no_changes = False # Mark the model entry as changed @@ -164,7 +163,7 @@ def is_due(self): return self.schedule.is_due(last_run_at_in_tz) def _default_now(self): - if getattr(settings, "DJANGO_CELERY_BEAT_TZ_AWARE", True): + if getattr(settings, 'DJANGO_CELERY_BEAT_TZ_AWARE', True): now = datetime.datetime.now(self.app.timezone) else: # this ends up getting passed to maybe_make_aware, which expects @@ -177,7 +176,6 @@ def __next__(self): self.model.total_run_count += 1 self.model.no_changes = True return self.__class__(self.model) - next = __next__ # for 2to3 def save(self): @@ -197,20 +195,20 @@ def to_model_schedule(cls, schedule): model_schedule = model_type.from_schedule(schedule) model_schedule.save() return model_schedule, model_field - raise ValueError(f"Cannot convert schedule type {schedule!r} to model") + raise ValueError( + f'Cannot convert schedule type {schedule!r} to model') @classmethod def from_entry(cls, name, app=None, **entry): obj, created = PeriodicTask._default_manager.update_or_create( - name=name, - defaults=cls._unpack_fields(**entry), + name=name, defaults=cls._unpack_fields(**entry), ) return cls(obj, app=app) @classmethod - def _unpack_fields( - cls, schedule, args=None, kwargs=None, relative=None, options=None, **entry - ): + def _unpack_fields(cls, schedule, + args=None, kwargs=None, relative=None, options=None, + **entry): entry_schedules = { model_field: None for _, _, model_field in cls.model_schedules } @@ -220,37 +218,27 @@ def _unpack_fields( entry_schedules, args=dumps(args or []), kwargs=dumps(kwargs or {}), - **cls._unpack_options(**options or {}), + **cls._unpack_options(**options or {}) ) return entry @classmethod - def _unpack_options( - cls, - queue=None, - exchange=None, - routing_key=None, - priority=None, - headers=None, - expire_seconds=None, - **kwargs, - ): + def _unpack_options(cls, queue=None, exchange=None, routing_key=None, + priority=None, headers=None, expire_seconds=None, + **kwargs): return { - "queue": queue, - "exchange": exchange, - "routing_key": routing_key, - "priority": priority, - "headers": dumps(headers or {}), - "expire_seconds": expire_seconds, + 'queue': queue, + 'exchange': exchange, + 'routing_key': routing_key, + 'priority': priority, + 'headers': dumps(headers or {}), + 'expire_seconds': expire_seconds, } def __repr__(self): - return "".format( - safe_str(self.name), - self.task, - safe_repr(self.args), - safe_repr(self.kwargs), - self.schedule, + return ''.format( + safe_str(self.name), self.task, safe_repr(self.args), + safe_repr(self.kwargs), self.schedule, ) @@ -273,17 +261,16 @@ def __init__(self, *args, **kwargs): Scheduler.__init__(self, *args, **kwargs) self._finalize = Finalize(self, self.sync, exitpriority=5) self.max_interval = ( - kwargs.get("max_interval") + kwargs.get('max_interval') or self.app.conf.beat_max_loop_interval - or DEFAULT_MAX_INTERVAL - ) + or DEFAULT_MAX_INTERVAL) def setup_schedule(self): self.install_default_entries(self.schedule) self.update_from_dict(self.app.conf.beat_schedule) def all_as_schedule(self): - debug("DatabaseScheduler: Fetching database schedule") + debug('DatabaseScheduler: Fetching database schedule') s = {} for model in self.enabled_models(): try: @@ -305,7 +292,8 @@ def enabled_models_qs(self): seconds=SCHEDULE_SYNC_MAX_INTERVAL ) exclude_clock_tasks_query = Q( - clocked__isnull=False, clocked__clocked_time__gt=next_schedule_sync + clocked__isnull=False, + clocked__clocked_time__gt=next_schedule_sync ) exclude_cron_tasks_query = self._get_crontab_exclude_query() @@ -330,7 +318,9 @@ def _get_crontab_exclude_query(self): server_hour = server_time.hour # Window of +/- 2 hours around the current hour in server tz. - hours_to_include = [(server_hour + offset) % 24 for offset in range(-2, 3)] + hours_to_include = [ + (server_hour + offset) % 24 for offset in range(-2, 3) + ] hours_to_include += [4] # celery's default cleanup task # Get all tasks with a simple numeric hour value @@ -342,7 +332,8 @@ def _get_crontab_exclude_query(self): # Annotate these tasks with their server-hour equivalent annotated_tasks = numeric_hour_tasks.annotate( # Cast hour string to integer - hour_int=Cast("hour", IntegerField()), + hour_int=Cast('hour', IntegerField()), + # Calculate server-hour based on timezone offset server_hour=Case( # Handle each timezone specifically @@ -350,22 +341,21 @@ def _get_crontab_exclude_query(self): When( timezone=timezone_name, then=( - F("hour_int") + F('hour_int') + self._get_timezone_offset(timezone_name) + 24 - ) - % 24, + ) % 24 ) for timezone_name in self._get_unique_timezone_names() ], # Default case - use hour as is - default=F("hour_int"), - ), + default=F('hour_int') + ) ) excluded_hour_task_ids = annotated_tasks.exclude( server_hour__in=hours_to_include - ).values_list("id", flat=True) + ).values_list('id', flat=True) # Build the final exclude query: # Exclude crontab tasks that are not in our include list @@ -380,11 +370,15 @@ def _get_valid_hour_formats(self): Return a list of all valid hour values (0-23). Both zero-padded ("00"–"09") and non-padded ("0"–"23") """ - return [str(hour) for hour in range(24)] + [f"{hour:02d}" for hour in range(10)] + return [str(hour) for hour in range(24)] + [ + f"{hour:02d}" for hour in range(10) + ] def _get_unique_timezone_names(self): """Get a list of all unique timezone names used in CrontabSchedule""" - return CrontabSchedule.objects.values_list("timezone", flat=True).distinct() + return CrontabSchedule.objects.values_list( + 'timezone', flat=True + ).distinct() def _get_timezone_offset(self, timezone_name): """ @@ -437,12 +431,12 @@ def schedule_changed(self): last, ts = self._last_timestamp, self.Changes.last_change() except DatabaseError as exc: - logger.exception("Database gave error: %r", exc) + logger.exception('Database gave error: %r', exc) return False except InterfaceError: warning( - "DatabaseScheduler: InterfaceError in schedule_changed(), " - "waiting to retry in next call..." + 'DatabaseScheduler: InterfaceError in schedule_changed(), ' + 'waiting to retry in next call...' ) return False @@ -462,7 +456,7 @@ def reserve(self, entry): def sync(self): if logger.isEnabledFor(logging.DEBUG): - debug("Writing entries...") + debug('Writing entries...') _tried = set() _failed = set() try: @@ -476,11 +470,11 @@ def sync(self): except (KeyError, TypeError, ObjectDoesNotExist): _failed.add(name) except DatabaseError as exc: - logger.exception("Database error while sync: %r", exc) + logger.exception('Database error while sync: %r', exc) except InterfaceError: warning( - "DatabaseScheduler: InterfaceError in sync(), " - "waiting to retry in next call..." + 'DatabaseScheduler: InterfaceError in sync(), ' + 'waiting to retry in next call...' ) finally: # retry later, only for the failed ones @@ -490,7 +484,9 @@ def update_from_dict(self, mapping): s = {} for name, entry_fields in mapping.items(): try: - entry = self.Entry.from_entry(name, app=self.app, **entry_fields) + entry = self.Entry.from_entry(name, + app=self.app, + **entry_fields) if entry.model.enabled: s[name] = entry @@ -502,11 +498,10 @@ def install_default_entries(self, data): entries = {} if self.app.conf.result_expires: entries.setdefault( - "celery.backend_cleanup", - { - "task": "celery.backend_cleanup", - "schedule": schedules.crontab("0", "4", "*"), - "options": {"expire_seconds": 12 * 3600}, + 'celery.backend_cleanup', { + 'task': 'celery.backend_cleanup', + 'schedule': schedules.crontab('0', '4', '*'), + 'options': {'expire_seconds': 12 * 3600}, }, ) self.update_from_dict(entries) @@ -523,20 +518,26 @@ def schedule(self): current_time = datetime.datetime.now() if self._initial_read: - debug("DatabaseScheduler: initial read") + debug('DatabaseScheduler: initial read') initial = update = True self._initial_read = False self._last_full_sync = current_time elif self.schedule_changed(): - info("DatabaseScheduler: Schedule changed.") + info('DatabaseScheduler: Schedule changed.') update = True self._last_full_sync = current_time # Force update the schedule if it's been more than 5 minutes if not update: - time_since_last_sync = (current_time - self._last_full_sync).total_seconds() - if time_since_last_sync >= SCHEDULE_SYNC_MAX_INTERVAL: - debug("DatabaseScheduler: Forcing full sync after 5 minutes") + time_since_last_sync = ( + current_time - self._last_full_sync + ).total_seconds() + if ( + time_since_last_sync >= SCHEDULE_SYNC_MAX_INTERVAL + ): + debug( + 'DatabaseScheduler: Forcing full sync after 5 minutes' + ) update = True self._last_full_sync = current_time @@ -548,8 +549,7 @@ def schedule(self): self._heap = [] self._heap_invalidated = True if logger.isEnabledFor(logging.DEBUG): - debug( - "Current schedule:\n%s", - "\n".join(repr(entry) for entry in self._schedule.values()), + debug('Current schedule:\n%s', '\n'.join( + repr(entry) for entry in self._schedule.values()), ) return self._schedule