From ccf496b2d7cf9f42069fb2e855056d115133acc0 Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Mon, 1 Sep 2025 08:09:06 +0200 Subject: [PATCH 01/17] Skeleton grid_janitor service as outlined in milestone 8. Ported to `next` branch for further development. --- mig/install/MiGserver-template.conf | 2 + mig/install/migrid-init.d-deb-template | 63 ++++++++++++++++++- mig/install/migrid-init.d-rh-template | 57 ++++++++++++++++- mig/shared/configuration.py | 17 +++-- mig/shared/install.py | 6 +- tests/fixture/confs-stdlocal/MiGserver.conf | 2 + .../fixture/confs-stdlocal/migrid-init.d-deb | 63 ++++++++++++++++++- tests/fixture/confs-stdlocal/migrid-init.d-rh | 57 ++++++++++++++++- .../mig_shared_configuration--new.json | 1 + 9 files changed, 255 insertions(+), 13 deletions(-) diff --git a/mig/install/MiGserver-template.conf b/mig/install/MiGserver-template.conf index 452603a31..d8dec4417 100644 --- a/mig/install/MiGserver-template.conf +++ b/mig/install/MiGserver-template.conf @@ -646,6 +646,8 @@ enable_notify = __ENABLE_NOTIFY__ enable_imnotify = __ENABLE_IMNOTIFY__ # Enable users to schedule tasks with a cron/at-like interface enable_crontab = __ENABLE_CRONTAB__ +# Enable janitor servide to handel recurring tasks like clean up and cache updates +enable_janitor = __ENABLE_JANITOR__ # Enable 2FA for web access and IO services with any TOTP authenticator client # IMPORTANT: Do NOT change this option manually here (requires Apache changes)! # use generateconfs.py --enable_twofactor=True|False diff --git a/mig/install/migrid-init.d-deb-template b/mig/install/migrid-init.d-deb-template index b8bdb9dab..d6ac02101 100755 --- a/mig/install/migrid-init.d-deb-template +++ b/mig/install/migrid-init.d-deb-template @@ -51,6 +51,7 @@ MIG_MONITOR=${MIG_CODE}/server/grid_monitor.py MIG_SSHMUX=${MIG_CODE}/server/grid_sshmux.py MIG_EVENTS=${MIG_CODE}/server/grid_events.py MIG_CRON=${MIG_CODE}/server/grid_cron.py +MIG_JANITOR=${MIG_CODE}/server/grid_janitor.py MIG_TRANSFERS=${MIG_CODE}/server/grid_transfers.py MIG_OPENID=${MIG_CODE}/server/grid_openid.py MIG_SFTP=${MIG_CODE}/server/grid_sftp.py @@ -67,7 +68,7 @@ MIG_CHKSIDROOT=${MIG_CODE}/server/chksidroot.py show_usage() { echo "Usage: migrid {start|stop|status|restart|reload}[daemon DAEMON]" echo "where daemon is left out for all or given along with DAEMON as one of the following" - echo "(script|monitor|sshmux|events|cron|transfers|openid|sftp|sftpsubsys|webdavs|ftps|notify|imnotify|vmproxy|all)" + echo "(script|monitor|sshmux|events|cron|janitor|transfers|openid|sftp|sftpsubsys|webdavs|ftps|notify|imnotify|vmproxy|all)" } check_enabled() { @@ -154,6 +155,19 @@ start_cron() { log_end_msg 1 || true fi } +start_janitor() { + check_enabled "janitor" || return 0 + DAEMON_PATH=${MIG_JANITOR} + SHORT_NAME=$(basename ${DAEMON_PATH}) + PID_FILE="$PID_DIR/${SHORT_NAME}.pid" + # NOTE: janitor tasks can be quite heavy so we lower sched prio a bit + log_daemon_msg "Starting MiG janitor daemon" ${SHORT_NAME} || true + if start-stop-daemon --start --quiet --oknodo --pidfile ${PID_FILE} --make-pidfile --user ${MIG_USER} --chuid ${MIG_USER} --nicelevel 10 --background --name ${SHORT_NAME} --startas ${DAEMON_PATH} ; then + log_end_msg 0 || true + else + log_end_msg 1 || true + fi +} start_transfers() { check_enabled "transfers" || return 0 DAEMON_PATH=${MIG_TRANSFERS} @@ -267,6 +281,7 @@ start_all() { start_sshmux start_events start_cron + start_janitor start_transfers start_openid start_sftp @@ -374,6 +389,28 @@ stop_cron() { log_end_msg 1 || true fi } +stop_janitor() { + check_enabled "janitor" || return 0 + DAEMON_PATH=${MIG_JANITOR} + SHORT_NAME=$(basename ${DAEMON_PATH}) + PID_FILE="$PID_DIR/${SHORT_NAME}.pid" + log_daemon_msg "Stopping MiG janitor" ${SHORT_NAME} || true + # Try graceful shutdown so that state is properly saved + start-stop-daemon --stop --signal INT --quiet --oknodo --pidfile ${PID_FILE} + for delay in 1 2 3; do + status_of_proc -p ${PID_FILE} ${DAEMON_PATH} ${SHORT_NAME} || break + sleep $delay + done + # Force kill if still running + status_of_proc -p ${PID_FILE} ${DAEMON_PATH} ${SHORT_NAME} && \ + start-stop-daemon --stop --quiet --oknodo --pidfile ${PID_FILE} + if ! status_of_proc -p ${PID_FILE} ${DAEMON_PATH} ${SHORT_NAME}; then + rm -f ${PID_FILE} + log_end_msg 0 || true + else + log_end_msg 1 || true + fi +} stop_transfers() { check_enabled "transfers" || return 0 DAEMON_PATH=${MIG_TRANSFERS} @@ -515,6 +552,7 @@ stop_all() { stop_sshmux stop_events stop_cron + stop_janitor stop_transfers stop_openid stop_sftp @@ -589,6 +627,18 @@ reload_cron() { log_end_msg 1 || true fi } +reload_janitor() { + check_enabled "janitor" || return 0 + DAEMON_PATH=${MIG_JANITOR} + SHORT_NAME=$(basename ${DAEMON_PATH}) + PID_FILE="$PID_DIR/${SHORT_NAME}.pid" + log_daemon_msg "Reloading MiG janitor" ${SHORT_NAME} || true + if start-stop-daemon --stop --signal HUP --quiet --oknodo --pidfile ${PID_FILE} ; then + log_end_msg 0 || true + else + log_end_msg 1 || true + fi +} reload_transfers() { check_enabled "transfers" || return 0 DAEMON_PATH=${MIG_TRANSFERS} @@ -726,6 +776,7 @@ reload_all() { reload_sshmux reload_events reload_cron + reload_janitor reload_transfers reload_openid reload_sftp @@ -777,6 +828,13 @@ status_cron() { PID_FILE="$PID_DIR/${SHORT_NAME}.pid" status_of_proc -p ${PID_FILE} ${DAEMON_PATH} ${SHORT_NAME} } +status_janitor() { + check_enabled "janitor" || return 0 + DAEMON_PATH=${MIG_JANITOR} + SHORT_NAME=$(basename ${DAEMON_PATH}) + PID_FILE="$PID_DIR/${SHORT_NAME}.pid" + status_of_proc -p ${PID_FILE} ${DAEMON_PATH} ${SHORT_NAME} +} status_transfers() { check_enabled "transfers" || return 0 DAEMON_PATH=${MIG_TRANSFERS} @@ -860,6 +918,7 @@ status_all() { status_sshmux status_events status_cron + status_janitor status_transfers status_openid status_sftp @@ -881,7 +940,7 @@ test -f ${MIG_SCRIPT} || exit 0 # Force valid target case "$2" in - script|monitor|sshmux|events|cron|transfers|openid|sftp|sftpsubsys|webdavs|ftps|notify|imnotify|vmproxy|all) + script|monitor|sshmux|events|cron|janitor|transfers|openid|sftp|sftpsubsys|webdavs|ftps|notify|imnotify|vmproxy|all) TARGET="$2" ;; '') diff --git a/mig/install/migrid-init.d-rh-template b/mig/install/migrid-init.d-rh-template index 5263496de..ab7cf763f 100755 --- a/mig/install/migrid-init.d-rh-template +++ b/mig/install/migrid-init.d-rh-template @@ -26,6 +26,7 @@ # processname: grid_sshmux.py # processname: grid_events.py # processname: grid_cron.py +# processname: grid_janitor.py # processname: grid_transfers.py # processname: grid_openid.py # processname: grid_sftp.py @@ -81,6 +82,7 @@ MIG_MONITOR=${MIG_CODE}/server/grid_monitor.py MIG_SSHMUX=${MIG_CODE}/server/grid_sshmux.py MIG_EVENTS=${MIG_CODE}/server/grid_events.py MIG_CRON=${MIG_CODE}/server/grid_cron.py +MIG_JANITOR=${MIG_CODE}/server/grid_janitor.py MIG_TRANSFERS=${MIG_CODE}/server/grid_transfers.py MIG_OPENID=${MIG_CODE}/server/grid_openid.py MIG_SFTP=${MIG_CODE}/server/grid_sftp.py @@ -97,7 +99,7 @@ MIG_CHKSIDROOT=${MIG_CODE}/server/chksidroot.py show_usage() { echo "Usage: migrid {start|stop|status|restart|reload}[daemon DAEMON]" echo "where daemon is left out for all or given along with DAEMON as one of the following" - echo "(script|monitor|sshmux|events|cron|transfers|openid|sftp|sftpsubsys|webdavs|ftps|notify|imnotify|vmproxy|all)" + echo "(script|monitor|sshmux|events|cron|janitor|transfers|openid|sftp|sftpsubsys|webdavs|ftps|notify|imnotify|vmproxy|all)" } check_enabled() { @@ -215,6 +217,22 @@ start_cron() { [ $RET2 -ne 0 ] && echo "Warning: cron not started." echo } +start_janitor() { + check_enabled "janitor" || return + DAEMON_PATH=${MIG_JANITOR} + SHORT_NAME=$(basename ${DAEMON_PATH}) + PID_FILE="$PID_DIR/${SHORT_NAME}.pid" + # NOTE: janitor tasks can be quite heavy so we lower sched prio a bit + echo -n "Starting MiG janitor daemon: $SHORT_NAME" + daemon +10 --user ${MIG_USER} --pidfile ${PID_FILE} \ + "${DAEMON_PATH} >> ${MIG_LOG}/janitor.out 2>&1 &" + fallback_save_pid "$DAEMON_PATH" "$PID_FILE" "$!" + RET2=$? + [ $RET2 ] && success + echo + [ $RET2 ] || echo "Warning: janitor not started." + echo +} start_transfers() { check_enabled "transfers" || return DAEMON_PATH=${MIG_TRANSFERS} @@ -356,6 +374,7 @@ start_all() { start_sshmux start_events start_cron + start_janitor start_transfers start_openid start_sftp @@ -432,6 +451,21 @@ stop_cron() { echo status ${DAEMON_PATH} && killproc ${DAEMON_PATH} } +stop_janitor() { + check_enabled "janitor" || return + DAEMON_PATH=${MIG_JANITOR} + SHORT_NAME=$(basename ${DAEMON_PATH}) + PID_FILE="$PID_DIR/${SHORT_NAME}.pid" + echo -n "Shutting down MiG janitor: $SHORT_NAME " + # Try graceful shutdown so that state is properly saved + killproc ${DAEMON_PATH} -INT + for delay in 1 2 3; do + status ${DAEMON_PATH} || break + sleep $delay + done + echo + status ${DAEMON_PATH} && killproc ${DAEMON_PATH} +} stop_transfers() { check_enabled "transfers" || return DAEMON_PATH=${MIG_TRANSFERS} @@ -542,6 +576,7 @@ stop_all() { stop_sshmux stop_events stop_cron + stop_janitor stop_transfers stop_openid stop_sftp @@ -600,6 +635,15 @@ reload_cron() { killproc ${DAEMON_PATH} -HUP echo } +reload_janitor() { + check_enabled "janitor" || return + DAEMON_PATH=${MIG_JANITOR} + SHORT_NAME=$(basename ${DAEMON_PATH}) + PID_FILE="$PID_DIR/${SHORT_NAME}.pid" + echo -n "Reloading MiG janitor: $SHORT_NAME " + killproc ${DAEMON_PATH} -HUP + echo +} reload_transfers() { check_enabled "transfers" || return DAEMON_PATH=${MIG_TRANSFERS} @@ -717,6 +761,7 @@ reload_all() { reload_sshmux reload_events reload_cron + reload_janitor reload_transfers reload_openid reload_sftp @@ -768,6 +813,13 @@ status_cron() { PID_FILE="$PID_DIR/${SHORT_NAME}.pid" status ${DAEMON_PATH} } +status_janitor() { + check_enabled "janitor" || return + DAEMON_PATH=${MIG_JANITOR} + SHORT_NAME=$(basename ${DAEMON_PATH}) + PID_FILE="$PID_DIR/${SHORT_NAME}.pid" + status ${DAEMON_PATH} +} status_transfers() { check_enabled "transfers" || return DAEMON_PATH=${MIG_TRANSFERS} @@ -852,6 +904,7 @@ status_all() { status_sshmux status_events status_cron + status_janitor status_transfers status_openid status_sftp @@ -873,7 +926,7 @@ test -f ${MIG_SCRIPT} || exit 0 # Force valid target case "$2" in - script|monitor|sshmux|events|cron|transfers|openid|sftp|sftpsubsys|webdavs|ftps|notify|imnotify|vmproxy|all) + script|monitor|sshmux|events|cron|janitor|transfers|openid|sftp|sftpsubsys|webdavs|ftps|notify|imnotify|vmproxy|all) TARGET="$2" ;; '') diff --git a/mig/shared/configuration.py b/mig/shared/configuration.py index 2b813efaf..62b04c237 100644 --- a/mig/shared/configuration.py +++ b/mig/shared/configuration.py @@ -357,6 +357,7 @@ def fix_missing(config_file, verbose=True): 'user_vmproxy_log': 'vmproxy.log', 'user_events_log': 'events.log', 'user_cron_log': 'cron.log', + 'user_janitor_log': 'janitor.log', 'user_transfers_log': 'transfers.log', 'user_notify_log': 'notify.log', 'user_auth_log': 'auth.log', @@ -625,6 +626,7 @@ def get(self, *args, **kwargs): 'user_vmproxy_log': 'vmproxy.log', 'user_events_log': 'events.log', 'user_cron_log': 'cron.log', + 'user_janitor_log': 'janitor.log', 'user_transfers_log': 'transfers.log', 'user_notify_log': 'notify.log', 'user_auth_log': 'auth.log', @@ -1551,6 +1553,13 @@ def reload_config(self, verbose, skip_log=False, disable_auth_log=False, self.site_enable_crontab = False if config.has_option('GLOBAL', 'user_cron_log'): self.user_cron_log = config.get('GLOBAL', 'user_cron_log') + if config.has_option('SITE', 'enable_janitor'): + self.site_enable_janitor = config.getboolean( + 'SITE', 'enable_janitor') + else: + self.site_enable_janitor = False + if config.has_option('GLOBAL', 'user_janitor_log'): + self.user_janitor_log = config.get('GLOBAL', 'user_janitor_log') if config.has_option('SITE', 'enable_notify'): self.site_enable_notify = config.getboolean( 'SITE', 'enable_notify') @@ -2695,10 +2704,10 @@ def reload_config(self, verbose, skip_log=False, disable_auth_log=False, 'user_openid_log', 'user_monitor_log', 'user_sshmux_log', 'user_vmproxy_log', 'user_events_log', 'user_cron_log', - 'user_transfers_log', 'user_notify_log', - 'user_imnotify_log', 'user_auth_log', - 'user_chkuserroot_log', 'user_chksidroot_log', - 'user_quota_log'): + 'user_janitor_log', 'user_transfers_log', + 'user_notify_log', 'user_imnotify_log', + 'user_auth_log', 'user_chkuserroot_log', + 'user_chksidroot_log', 'user_quota_log'): _log_path = getattr(self, _log_var) if not os.path.isabs(_log_path): setattr(self, _log_var, os.path.join(self.log_dir, _log_path)) diff --git a/mig/shared/install.py b/mig/shared/install.py index fdcf40e7e..92a1ab997 100644 --- a/mig/shared/install.py +++ b/mig/shared/install.py @@ -387,6 +387,7 @@ def generate_confs( enable_seafile=False, enable_duplicati=False, enable_crontab=True, + enable_janitor=False, enable_notify=False, enable_imnotify=False, enable_dev_accounts=False, @@ -712,6 +713,7 @@ def _generate_confs_prepare( enable_seafile, enable_duplicati, enable_crontab, + enable_janitor, enable_notify, enable_imnotify, enable_dev_accounts, @@ -967,6 +969,7 @@ def _generate_confs_prepare( user_dict['__ENABLE_SEAFILE__'] = "%s" % enable_seafile user_dict['__ENABLE_DUPLICATI__'] = "%s" % enable_duplicati user_dict['__ENABLE_CRONTAB__'] = "%s" % enable_crontab + user_dict['__ENABLE_JANITOR__'] = "%s" % enable_janitor user_dict['__ENABLE_NOTIFY__'] = "%s" % enable_notify user_dict['__ENABLE_IMNOTIFY__'] = "%s" % enable_imnotify user_dict['__ENABLE_DEV_ACCOUNTS__'] = "%s" % enable_dev_accounts @@ -1761,7 +1764,7 @@ def _generate_confs_prepare( else: user_dict['__APACHE_SUFFIX__'] = "" - # Helpers for the migstatecleanup cron job + # Helpers for the migstatecleanup cron job or janitor service user_dict['__CRON_VERBOSE_CLEANUP__'] = '1' user_dict['__CRON_EVENT_CLEANUP__'] = '1' if 'migoid' in signup_methods or 'migcert' in signup_methods: @@ -2517,6 +2520,7 @@ def _generate_confs_instructions(options, user_dict): chmod 755 %(destination)s/mig{stateclean,errors,sftpmon,importdoi,notifyexpire} sudo cp %(destination)s/mig{stateclean,errors,sftpmon,importdoi,notifyexpire} \\ /etc/cron.daily/ +until the janitor service is ready to take care of those tasks. The migcheckssl, migverifyarchives, migstats, migacctexpire and miglustrequota files are cron scripts to automatically check for LetsEncrypt certificate diff --git a/tests/fixture/confs-stdlocal/MiGserver.conf b/tests/fixture/confs-stdlocal/MiGserver.conf index 4670a063a..e1786473c 100644 --- a/tests/fixture/confs-stdlocal/MiGserver.conf +++ b/tests/fixture/confs-stdlocal/MiGserver.conf @@ -646,6 +646,8 @@ enable_notify = False enable_imnotify = False # Enable users to schedule tasks with a cron/at-like interface enable_crontab = True +# Enable janitor servide to handel recurring tasks like clean up and cache updates +enable_janitor = False # Enable 2FA for web access and IO services with any TOTP authenticator client # IMPORTANT: Do NOT change this option manually here (requires Apache changes)! # use generateconfs.py --enable_twofactor=True|False diff --git a/tests/fixture/confs-stdlocal/migrid-init.d-deb b/tests/fixture/confs-stdlocal/migrid-init.d-deb index b8bdb9dab..d6ac02101 100755 --- a/tests/fixture/confs-stdlocal/migrid-init.d-deb +++ b/tests/fixture/confs-stdlocal/migrid-init.d-deb @@ -51,6 +51,7 @@ MIG_MONITOR=${MIG_CODE}/server/grid_monitor.py MIG_SSHMUX=${MIG_CODE}/server/grid_sshmux.py MIG_EVENTS=${MIG_CODE}/server/grid_events.py MIG_CRON=${MIG_CODE}/server/grid_cron.py +MIG_JANITOR=${MIG_CODE}/server/grid_janitor.py MIG_TRANSFERS=${MIG_CODE}/server/grid_transfers.py MIG_OPENID=${MIG_CODE}/server/grid_openid.py MIG_SFTP=${MIG_CODE}/server/grid_sftp.py @@ -67,7 +68,7 @@ MIG_CHKSIDROOT=${MIG_CODE}/server/chksidroot.py show_usage() { echo "Usage: migrid {start|stop|status|restart|reload}[daemon DAEMON]" echo "where daemon is left out for all or given along with DAEMON as one of the following" - echo "(script|monitor|sshmux|events|cron|transfers|openid|sftp|sftpsubsys|webdavs|ftps|notify|imnotify|vmproxy|all)" + echo "(script|monitor|sshmux|events|cron|janitor|transfers|openid|sftp|sftpsubsys|webdavs|ftps|notify|imnotify|vmproxy|all)" } check_enabled() { @@ -154,6 +155,19 @@ start_cron() { log_end_msg 1 || true fi } +start_janitor() { + check_enabled "janitor" || return 0 + DAEMON_PATH=${MIG_JANITOR} + SHORT_NAME=$(basename ${DAEMON_PATH}) + PID_FILE="$PID_DIR/${SHORT_NAME}.pid" + # NOTE: janitor tasks can be quite heavy so we lower sched prio a bit + log_daemon_msg "Starting MiG janitor daemon" ${SHORT_NAME} || true + if start-stop-daemon --start --quiet --oknodo --pidfile ${PID_FILE} --make-pidfile --user ${MIG_USER} --chuid ${MIG_USER} --nicelevel 10 --background --name ${SHORT_NAME} --startas ${DAEMON_PATH} ; then + log_end_msg 0 || true + else + log_end_msg 1 || true + fi +} start_transfers() { check_enabled "transfers" || return 0 DAEMON_PATH=${MIG_TRANSFERS} @@ -267,6 +281,7 @@ start_all() { start_sshmux start_events start_cron + start_janitor start_transfers start_openid start_sftp @@ -374,6 +389,28 @@ stop_cron() { log_end_msg 1 || true fi } +stop_janitor() { + check_enabled "janitor" || return 0 + DAEMON_PATH=${MIG_JANITOR} + SHORT_NAME=$(basename ${DAEMON_PATH}) + PID_FILE="$PID_DIR/${SHORT_NAME}.pid" + log_daemon_msg "Stopping MiG janitor" ${SHORT_NAME} || true + # Try graceful shutdown so that state is properly saved + start-stop-daemon --stop --signal INT --quiet --oknodo --pidfile ${PID_FILE} + for delay in 1 2 3; do + status_of_proc -p ${PID_FILE} ${DAEMON_PATH} ${SHORT_NAME} || break + sleep $delay + done + # Force kill if still running + status_of_proc -p ${PID_FILE} ${DAEMON_PATH} ${SHORT_NAME} && \ + start-stop-daemon --stop --quiet --oknodo --pidfile ${PID_FILE} + if ! status_of_proc -p ${PID_FILE} ${DAEMON_PATH} ${SHORT_NAME}; then + rm -f ${PID_FILE} + log_end_msg 0 || true + else + log_end_msg 1 || true + fi +} stop_transfers() { check_enabled "transfers" || return 0 DAEMON_PATH=${MIG_TRANSFERS} @@ -515,6 +552,7 @@ stop_all() { stop_sshmux stop_events stop_cron + stop_janitor stop_transfers stop_openid stop_sftp @@ -589,6 +627,18 @@ reload_cron() { log_end_msg 1 || true fi } +reload_janitor() { + check_enabled "janitor" || return 0 + DAEMON_PATH=${MIG_JANITOR} + SHORT_NAME=$(basename ${DAEMON_PATH}) + PID_FILE="$PID_DIR/${SHORT_NAME}.pid" + log_daemon_msg "Reloading MiG janitor" ${SHORT_NAME} || true + if start-stop-daemon --stop --signal HUP --quiet --oknodo --pidfile ${PID_FILE} ; then + log_end_msg 0 || true + else + log_end_msg 1 || true + fi +} reload_transfers() { check_enabled "transfers" || return 0 DAEMON_PATH=${MIG_TRANSFERS} @@ -726,6 +776,7 @@ reload_all() { reload_sshmux reload_events reload_cron + reload_janitor reload_transfers reload_openid reload_sftp @@ -777,6 +828,13 @@ status_cron() { PID_FILE="$PID_DIR/${SHORT_NAME}.pid" status_of_proc -p ${PID_FILE} ${DAEMON_PATH} ${SHORT_NAME} } +status_janitor() { + check_enabled "janitor" || return 0 + DAEMON_PATH=${MIG_JANITOR} + SHORT_NAME=$(basename ${DAEMON_PATH}) + PID_FILE="$PID_DIR/${SHORT_NAME}.pid" + status_of_proc -p ${PID_FILE} ${DAEMON_PATH} ${SHORT_NAME} +} status_transfers() { check_enabled "transfers" || return 0 DAEMON_PATH=${MIG_TRANSFERS} @@ -860,6 +918,7 @@ status_all() { status_sshmux status_events status_cron + status_janitor status_transfers status_openid status_sftp @@ -881,7 +940,7 @@ test -f ${MIG_SCRIPT} || exit 0 # Force valid target case "$2" in - script|monitor|sshmux|events|cron|transfers|openid|sftp|sftpsubsys|webdavs|ftps|notify|imnotify|vmproxy|all) + script|monitor|sshmux|events|cron|janitor|transfers|openid|sftp|sftpsubsys|webdavs|ftps|notify|imnotify|vmproxy|all) TARGET="$2" ;; '') diff --git a/tests/fixture/confs-stdlocal/migrid-init.d-rh b/tests/fixture/confs-stdlocal/migrid-init.d-rh index 5263496de..ab7cf763f 100755 --- a/tests/fixture/confs-stdlocal/migrid-init.d-rh +++ b/tests/fixture/confs-stdlocal/migrid-init.d-rh @@ -26,6 +26,7 @@ # processname: grid_sshmux.py # processname: grid_events.py # processname: grid_cron.py +# processname: grid_janitor.py # processname: grid_transfers.py # processname: grid_openid.py # processname: grid_sftp.py @@ -81,6 +82,7 @@ MIG_MONITOR=${MIG_CODE}/server/grid_monitor.py MIG_SSHMUX=${MIG_CODE}/server/grid_sshmux.py MIG_EVENTS=${MIG_CODE}/server/grid_events.py MIG_CRON=${MIG_CODE}/server/grid_cron.py +MIG_JANITOR=${MIG_CODE}/server/grid_janitor.py MIG_TRANSFERS=${MIG_CODE}/server/grid_transfers.py MIG_OPENID=${MIG_CODE}/server/grid_openid.py MIG_SFTP=${MIG_CODE}/server/grid_sftp.py @@ -97,7 +99,7 @@ MIG_CHKSIDROOT=${MIG_CODE}/server/chksidroot.py show_usage() { echo "Usage: migrid {start|stop|status|restart|reload}[daemon DAEMON]" echo "where daemon is left out for all or given along with DAEMON as one of the following" - echo "(script|monitor|sshmux|events|cron|transfers|openid|sftp|sftpsubsys|webdavs|ftps|notify|imnotify|vmproxy|all)" + echo "(script|monitor|sshmux|events|cron|janitor|transfers|openid|sftp|sftpsubsys|webdavs|ftps|notify|imnotify|vmproxy|all)" } check_enabled() { @@ -215,6 +217,22 @@ start_cron() { [ $RET2 -ne 0 ] && echo "Warning: cron not started." echo } +start_janitor() { + check_enabled "janitor" || return + DAEMON_PATH=${MIG_JANITOR} + SHORT_NAME=$(basename ${DAEMON_PATH}) + PID_FILE="$PID_DIR/${SHORT_NAME}.pid" + # NOTE: janitor tasks can be quite heavy so we lower sched prio a bit + echo -n "Starting MiG janitor daemon: $SHORT_NAME" + daemon +10 --user ${MIG_USER} --pidfile ${PID_FILE} \ + "${DAEMON_PATH} >> ${MIG_LOG}/janitor.out 2>&1 &" + fallback_save_pid "$DAEMON_PATH" "$PID_FILE" "$!" + RET2=$? + [ $RET2 ] && success + echo + [ $RET2 ] || echo "Warning: janitor not started." + echo +} start_transfers() { check_enabled "transfers" || return DAEMON_PATH=${MIG_TRANSFERS} @@ -356,6 +374,7 @@ start_all() { start_sshmux start_events start_cron + start_janitor start_transfers start_openid start_sftp @@ -432,6 +451,21 @@ stop_cron() { echo status ${DAEMON_PATH} && killproc ${DAEMON_PATH} } +stop_janitor() { + check_enabled "janitor" || return + DAEMON_PATH=${MIG_JANITOR} + SHORT_NAME=$(basename ${DAEMON_PATH}) + PID_FILE="$PID_DIR/${SHORT_NAME}.pid" + echo -n "Shutting down MiG janitor: $SHORT_NAME " + # Try graceful shutdown so that state is properly saved + killproc ${DAEMON_PATH} -INT + for delay in 1 2 3; do + status ${DAEMON_PATH} || break + sleep $delay + done + echo + status ${DAEMON_PATH} && killproc ${DAEMON_PATH} +} stop_transfers() { check_enabled "transfers" || return DAEMON_PATH=${MIG_TRANSFERS} @@ -542,6 +576,7 @@ stop_all() { stop_sshmux stop_events stop_cron + stop_janitor stop_transfers stop_openid stop_sftp @@ -600,6 +635,15 @@ reload_cron() { killproc ${DAEMON_PATH} -HUP echo } +reload_janitor() { + check_enabled "janitor" || return + DAEMON_PATH=${MIG_JANITOR} + SHORT_NAME=$(basename ${DAEMON_PATH}) + PID_FILE="$PID_DIR/${SHORT_NAME}.pid" + echo -n "Reloading MiG janitor: $SHORT_NAME " + killproc ${DAEMON_PATH} -HUP + echo +} reload_transfers() { check_enabled "transfers" || return DAEMON_PATH=${MIG_TRANSFERS} @@ -717,6 +761,7 @@ reload_all() { reload_sshmux reload_events reload_cron + reload_janitor reload_transfers reload_openid reload_sftp @@ -768,6 +813,13 @@ status_cron() { PID_FILE="$PID_DIR/${SHORT_NAME}.pid" status ${DAEMON_PATH} } +status_janitor() { + check_enabled "janitor" || return + DAEMON_PATH=${MIG_JANITOR} + SHORT_NAME=$(basename ${DAEMON_PATH}) + PID_FILE="$PID_DIR/${SHORT_NAME}.pid" + status ${DAEMON_PATH} +} status_transfers() { check_enabled "transfers" || return DAEMON_PATH=${MIG_TRANSFERS} @@ -852,6 +904,7 @@ status_all() { status_sshmux status_events status_cron + status_janitor status_transfers status_openid status_sftp @@ -873,7 +926,7 @@ test -f ${MIG_SCRIPT} || exit 0 # Force valid target case "$2" in - script|monitor|sshmux|events|cron|transfers|openid|sftp|sftpsubsys|webdavs|ftps|notify|imnotify|vmproxy|all) + script|monitor|sshmux|events|cron|janitor|transfers|openid|sftp|sftpsubsys|webdavs|ftps|notify|imnotify|vmproxy|all) TARGET="$2" ;; '') diff --git a/tests/fixture/mig_shared_configuration--new.json b/tests/fixture/mig_shared_configuration--new.json index c1f6069b4..f4d7edd15 100644 --- a/tests/fixture/mig_shared_configuration--new.json +++ b/tests/fixture/mig_shared_configuration--new.json @@ -592,6 +592,7 @@ "V2", "V3" ], + "user_janitor_log": "janitor.log", "user_messages": "", "user_mig_cert_title": "", "user_mig_oid_provider": "", From 3adb087012565774079fd09a26164a33244f238f Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Mon, 1 Sep 2025 08:19:48 +0200 Subject: [PATCH 02/17] Add missing grid_janitor daemon and fix copyritgh header. --- mig/server/grid_janitor.py | 110 +++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100755 mig/server/grid_janitor.py diff --git a/mig/server/grid_janitor.py b/mig/server/grid_janitor.py new file mode 100755 index 000000000..13a311b12 --- /dev/null +++ b/mig/server/grid_janitor.py @@ -0,0 +1,110 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# grid_janitor - daemon to handle recurring tasks like clean up and updates +# Copyright (C) 2003-2025 The MiG Project by the Science HPC Center at UCPH +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# -- END_HEADER --- +# + +"""Daemon to take care of various recurring tasks like clean up, cache updates +and pruning of pending requests. +""" + +from __future__ import absolute_import, print_function + +import multiprocessing +import os +import signal +import sys +import time + +from mig.shared.conf import get_configuration_object +from mig.shared.logger import daemon_logger, register_hangup_handler + +stop_running = multiprocessing.Event() +(configuration, logger) = (None, None) + + +def stop_handler(sig, frame): + """A simple signal handler to quit on Ctrl+C (SIGINT) in main""" + # Print blank line to avoid mix with Ctrl-C line + print("") + stop_running.set() + + +if __name__ == "__main__": + # Force no log init since we use separate logger + configuration = get_configuration_object(skip_log=True) + + log_level = configuration.loglevel + if sys.argv[1:] and sys.argv[1] in ["debug", "info", "warning", "error"]: + log_level = sys.argv[1] + + # Use separate logger + + logger = daemon_logger("janitor", configuration.user_janitor_log, log_level) + configuration.logger = logger + + # Allow e.g. logrotate to force log re-open after rotates + register_hangup_handler(configuration) + + # Allow clean shutdown on SIGINT only to main process + signal.signal(signal.SIGINT, stop_handler) + + if not configuration.site_enable_janitor: + err_msg = "Janitor support is disabled in configuration!" + logger.error(err_msg) + print(err_msg) + sys.exit(1) + + print( + """This is the MiG janitor daemon which cleans up stale state data, +updates internal caches and prunes pending requests. + +Set the MIG_CONF environment to the server configuration path +unless it is available in mig/server/MiGserver.conf +""" + ) + + main_pid = os.getpid() + print("Starting janitor daemon - Ctrl-C to quit") + logger.info("(%s) Starting Janitor daemon" % main_pid) + + logger.debug("(%s) Starting main loop" % main_pid) + print("%s: Start main loop" % os.getpid()) + while not stop_running.is_set(): + try: + time.sleep(1) + except KeyboardInterrupt: + stop_running.set() + # NOTE: we can't be sure if SIGINT was sent to only main process + # so we make sure to propagate to monitor child + print("Interrupt requested - shutdown") + except Exception as exc: + logger.error( + "(%s) Caught unexpected exception: %s" % (os.getpid(), exc) + ) + + print("Janitor daemon shutting down") + logger.info("(%s) Janitor daemon shutting down" % main_pid) + + sys.exit(0) From 55a82aa8b59d301bb13d5ae79ff2d24f0e0365e9 Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Mon, 1 Sep 2025 19:53:19 +0200 Subject: [PATCH 03/17] Really commit the first functional version as stated in previous commit. The previous commit only had the polished skeleton in place. --- mig/server/grid_janitor.py | 240 ++++++++++++++++++++++++++++++++++++- 1 file changed, 239 insertions(+), 1 deletion(-) diff --git a/mig/server/grid_janitor.py b/mig/server/grid_janitor.py index 13a311b12..07b923676 100755 --- a/mig/server/grid_janitor.py +++ b/mig/server/grid_janitor.py @@ -31,6 +31,7 @@ from __future__ import absolute_import, print_function +import fnmatch import multiprocessing import os import signal @@ -38,11 +39,31 @@ import time from mig.shared.conf import get_configuration_object +from mig.shared.fileio import listdir, delete_file from mig.shared.logger import daemon_logger, register_hangup_handler +# TODO: adjust short to subsecond and long to e.g a minute for production use +#SHORT_THROTTLE_SECS = 0.5 +#LONG_THROTTLE_SECS = 60.0 +SHORT_THROTTLE_SECS = 5.0 +LONG_THROTTLE_SECS = 30.0 + +REMIND_REQ_DAYS = 5 +EXPIRE_REQ_DAYS = 30 + +EXPIRE_STATE_DAYS = 30 +EXPIRE_DUMMY_JOBS_DAYS = 7 +EXPIRE_TWOFACTOR_DAYS = 1 + +SECS_PER_DAY = 86400 +SECS_PER_HOUR = 3600 +SECS_PER_MINUTE = 60 + stop_running = multiprocessing.Event() (configuration, logger) = (None, None) +task_triggers = {} + def stop_handler(sig, frame): """A simple signal handler to quit on Ctrl+C (SIGINT) in main""" @@ -50,6 +71,219 @@ def stop_handler(sig, frame): print("") stop_running.set() +def _lookup_last_run(configuration, target): + """Check if target task is pending using internal accounting for task. + Returns the timestamp when the task was last run in UN*X epoch. + """ + # Lazy init + last_stamp = task_triggers[target] = task_triggers.get(target, -1) + if last_stamp > 0: + logger.debug("last %s task ran at %d" % (target, last_stamp)) + else: + logger.debug("no last %s task run in history" % target) + return last_stamp + +def _update_last_run(configuration, target, stamp): + """Update target task pending mark using internal accounting and supplied + task timestamp in UN*X epoch. + Returns the same updated timestamp for the task. + """ + # TODO: add a more persistent marker e.g. in mig_system_run or _files to + # remember last status across restarts and reboots? + task_triggers[target] = stamp + return task_triggers[target] + + +def _clean_stale_state_files(configuration, target_dir, filename_patterns, + expire_days, now, include_dotfiles=False): + """Inspect and clean up stale state files matching any of filename_pattern + in target_dir if they are at least expire_days old. Where filename_pattern is + a list of wildcard strings checked with fnmatch. Dot-files are excluded + from matching unless include_dotfiles is set. + Returns the number of actual actions taken for central throttle handling. + """ + handled = 0 + logger.debug("clean files matching %r in %r if older than %dd" % \ + (filename_patterns, target_dir, expire_days)) + for filename in listdir(target_dir): + if not include_dotfiles and filename.startswith('.'): + continue + tmp_age = -1 + for pattern in filename_patterns: + tmp_path = os.path.join(target_dir, filename) + if fnmatch.fnmatch(filename, pattern): + logger.debug("checking if state file %r is stale" % tmp_path) + tmp_age = now - os.path.getmtime(tmp_path) + else: + continue + tmp_age_days = tmp_age / SECS_PER_DAY + logger.debug("found state file %r of age %ds / %dd" % \ + (tmp_path, tmp_age, tmp_age_days)) + if tmp_age_days > expire_days: + logger.info("remove stale tmp file in %r : %dd" % (tmp_path, + tmp_age_days)) + if not delete_file(tmp_path, logger): + logger.error("failed to remove stale file %r" % tmp_path) + handled += 1 + logger.debug("handled %d stale state file cleanups" % handled) + return handled + +def clean_mig_system_files(configuration, now=time.time()): + """Inspect and clean up stale state files in mig_system_run. + Returns the number of actual actions taken for central throttle handling. + """ + return _clean_stale_state_files(configuration, + configuration.mig_system_files, + ['tmp*', 'no_grid_jobs*'], + EXPIRE_STATE_DAYS, now) + +def clean_sessid_to_mrls_link_home(configuration, now=time.time()): + """Inspect and clean up stale state files in sessid_to_mrsl_link_home. + Returns the number of actual actions taken for central throttle handling. + """ + return _clean_stale_state_files(configuration, + configuration.sessid_to_mrsl_link_home, + ['*'], EXPIRE_STATE_DAYS, now) + +def clean_webserver_home(configuration, now=time.time()): + """Inspect and clean up stale state files in webserver_home. + Returns the number of actual actions taken for central throttle handling. + """ + return _clean_stale_state_files(configuration, + configuration.webserver_home, + ['*'], EXPIRE_STATE_DAYS, now) + +def clean_no_job_helpers(configuration, now=time.time()): + """Inspect and clean up stale state empty job helpers inside user_home. + Returns the number of actual actions taken for central throttle handling. + """ + dummy_job_path = os.path.join(configuration.user_home, + 'no_grid_jobs_in_grid_scheduler') + return _clean_stale_state_files(configuration, + dummy_job_path, + ['*'], EXPIRE_DUMMY_JOBS_DAYS, now) + +def clean_twofactor_sessions(configuration, now=time.time()): + """Inspect and clean up stale state files in twofactor_home. + Returns the number of actual actions taken for central throttle handling. + """ + return _clean_stale_state_files(configuration, + configuration.twofactor_home, + ['*'], EXPIRE_TWOFACTOR_DAYS, now) + +def handle_state_cleanup(configuration, now=time.time()): + """Inspect various state dirs to clean up general stale old temporay files. + Returns the number of actual actions taken for central throttle handling. + """ + handled = 0 + logger.debug("handle pending state cleanups") + handled += clean_mig_system_files(configuration, now) + handled += clean_webserver_home(configuration, now) + if configuration.site_enable_jobs: + handled += clean_no_job_helpers(configuration, now) + # TODO: handle gzip of events files like cronjob + if handled > 0: + logger.info("handled %d pending state cleanup(s)" % handled) + else: + logger.debug("no pending state cleanups") + return handled + +def handle_session_cleanup(configuration, now=time.time()): + """Inspect various state dirs to clean up stale session files specifically. + Returns the number of actual actions taken for central throttle handling. + """ + handled = 0 + logger.debug("handle pending session cleanups") + if configuration.site_enable_jobs: + handled += clean_sessid_to_mrls_link_home(configuration, now) + handled += clean_twofactor_sessions(configuration, now) + # TODO: handle client session tracking cleanup (cleansessions.py) + if handled > 0: + logger.info("handled %d pending session cleanup(s)" % handled) + else: + logger.debug("no pending session cleanups") + return handled + +def remind_and_expire_user_pending(configuration, now=time.time()): + """Inspect user_pending dir and inform about pending but aging account + requests that need operator or user action. + Returns the number of actual actions taken for central throttle handling. + """ + handled = 0 + now = time.time() + for filename in listdir(configuration.user_pending): + if filename.startswith('.'): + continue + req_path = os.path.join(configuration.user_pending, filename) + logger.debug("checking account request in %r" % req_path) + req_age = now - os.path.getmtime(req_path) + req_age_days = req_age / SECS_PER_DAY + if req_age_days > REMIND_REQ_DAYS: + logger.info("found stale account request in %r : %dd" % \ + (req_path, req_age_days)) + # TODO: actually remind operator and user that request is pending + handled += 1 + if req_age_days > EXPIRE_REQ_DAYS: + logger.info("found expired account request in %r : %dd" % \ + (req_path, req_age_days)) + # TODO: actually expire request and inform user + handled += 1 + logger.debug("handled %d user account request action(s)" % handled) + return handled + +def handle_pending_requests(configuration, now=time.time()): + """Inspect various state dirs to remind or clean up stale requests. + Returns the number of actual actions taken for central throttle handling. + """ + handled = 0 + logger.debug("handle pending requests") + handled += remind_and_expire_user_pending(configuration, now) + # TODO: actually handle more requests like resources and peers + if handled > 0: + logger.info("handled %d pending requests" % handled) + else: + logger.debug("no pending state cleanups") + return handled + +def handle_cache_updates(configuration, now=time.time()): + """Inspect internal cache update markers and handle any corresponding cache + updates in one place to avoid thrashing. + Returns the number of actual actions taken for central throttle handling. + """ + handled = 0 + logger.debug("handle pending cache updates") + # TODO: actually handle vgrid/user/resource/... cache updates + if handled > 0: + logger.info("handled %d pending cache updates" % handled) + else: + logger.debug("no pending state cleanups") + return handled + +def handle_janitor_tasks(configuration, now=time.time()): + """A wrapper to take care of all regular janitor tasks like clean up and + cache updates. + Returns the number of actual tasks completed to let the main thread know if + it should throttle down or continue next run right away. + """ + tasks_completed = 0 + logger.info("handle any pending janitor tasks") + if _lookup_last_run(configuration, 'state-cleanup') + SECS_PER_DAY < now: + tasks_completed += handle_state_cleanup(configuration, now) + _update_last_run(configuration, 'state-cleanup', now) + if _lookup_last_run(configuration, 'session-cleanup') + SECS_PER_HOUR < now: + tasks_completed += handle_session_cleanup(configuration, now) + _update_last_run(configuration, 'state-cleanup', now) + if _lookup_last_run(configuration, 'pending-requests') + SECS_PER_HOUR < now: + tasks_completed += handle_pending_requests(configuration, now) + _update_last_run(configuration, 'pending-requests', now) + if _lookup_last_run(configuration, 'cache-updates') + SECS_PER_MINUTE < now: + tasks_completed += handle_cache_updates(configuration, now) + _update_last_run(configuration, 'cache-updates', now) + if tasks_completed > 0: + logger.info("handled %d janitor task(s)" % tasks_completed) + else: + logger.info("janitor found no pending tasks") + return tasks_completed if __name__ == "__main__": # Force no log init since we use separate logger @@ -93,7 +327,11 @@ def stop_handler(sig, frame): print("%s: Start main loop" % os.getpid()) while not stop_running.is_set(): try: - time.sleep(1) + now = time.time() + if handle_janitor_tasks(configuration, now) <= 0: + time.sleep(LONG_THROTTLE_SECS) + else: + time.sleep(SHORT_THROTTLE_SECS) except KeyboardInterrupt: stop_running.set() # NOTE: we can't be sure if SIGINT was sent to only main process From 6be88092225fb145fa2abbccfe24c611d2e18d0b Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Tue, 2 Sep 2025 15:01:57 +0200 Subject: [PATCH 04/17] Integrated automatic password reset acceptance and rejection depending on reset token validity. Prepared for automatic handling of other trivial requests like renewal with valid peer or rejection of obviously invalid requests. Minor adjustment to skip subdirs when cleaning files in state dirs. --- mig/server/grid_janitor.py | 114 +++++++++++++++++++++++++++++++++---- 1 file changed, 104 insertions(+), 10 deletions(-) diff --git a/mig/server/grid_janitor.py b/mig/server/grid_janitor.py index 07b923676..323c6affe 100755 --- a/mig/server/grid_janitor.py +++ b/mig/server/grid_janitor.py @@ -38,9 +38,14 @@ import sys import time +from mig.shared.accountreq import accept_account_req, reject_account_req +from mig.shared.base import get_user_id from mig.shared.conf import get_configuration_object from mig.shared.fileio import listdir, delete_file from mig.shared.logger import daemon_logger, register_hangup_handler +from mig.shared.pwcrypto import verify_reset_token +from mig.shared.serial import load +from mig.shared.userdb import load_user_dict, default_db_path # TODO: adjust short to subsecond and long to e.g a minute for production use #SHORT_THROTTLE_SECS = 0.5 @@ -50,14 +55,15 @@ REMIND_REQ_DAYS = 5 EXPIRE_REQ_DAYS = 30 +MANAGE_TRIVIAL_REQ_MINUTES = 5 EXPIRE_STATE_DAYS = 30 EXPIRE_DUMMY_JOBS_DAYS = 7 EXPIRE_TWOFACTOR_DAYS = 1 -SECS_PER_DAY = 86400 -SECS_PER_HOUR = 3600 SECS_PER_MINUTE = 60 +SECS_PER_HOUR = 60 * SECS_PER_MINUTE +SECS_PER_DAY =24 * SECS_PER_HOUR stop_running = multiprocessing.Event() (configuration, logger) = (None, None) @@ -65,6 +71,8 @@ task_triggers = {} +# TODO: add a signal handler to force run pending tasks right away + def stop_handler(sig, frame): """A simple signal handler to quit on Ctrl+C (SIGINT) in main""" # Print blank line to avoid mix with Ctrl-C line @@ -88,7 +96,7 @@ def _update_last_run(configuration, target, stamp): task timestamp in UN*X epoch. Returns the same updated timestamp for the task. """ - # TODO: add a more persistent marker e.g. in mig_system_run or _files to + # TODO: add a more persistent marker e.g. in mig system run or files to # remember last status across restarts and reboots? task_triggers[target] = stamp return task_triggers[target] @@ -97,20 +105,22 @@ def _update_last_run(configuration, target, stamp): def _clean_stale_state_files(configuration, target_dir, filename_patterns, expire_days, now, include_dotfiles=False): """Inspect and clean up stale state files matching any of filename_pattern - in target_dir if they are at least expire_days old. Where filename_pattern is - a list of wildcard strings checked with fnmatch. Dot-files are excluded - from matching unless include_dotfiles is set. + in target_dir if they are at least expire_days old. Where filename_pattern + is a list of wildcard strings checked with fnmatch. Dot-files are excluded + from matching unless include_dotfiles is set. Directories are just skipped. Returns the number of actual actions taken for central throttle handling. """ handled = 0 logger.debug("clean files matching %r in %r if older than %dd" % \ (filename_patterns, target_dir, expire_days)) for filename in listdir(target_dir): + tmp_path = os.path.join(target_dir, filename) if not include_dotfiles and filename.startswith('.'): continue + if os.path.isdir(tmp_path): + continue tmp_age = -1 for pattern in filename_patterns: - tmp_path = os.path.join(target_dir, filename) if fnmatch.fnmatch(filename, pattern): logger.debug("checking if state file %r is stale" % tmp_path) tmp_age = now - os.path.getmtime(tmp_path) @@ -204,6 +214,89 @@ def handle_session_cleanup(configuration, now=time.time()): logger.debug("no pending session cleanups") return handled +def manage_trivial_user_requests(configuration, now=time.time()): + """Inspect user_pending dir and take care of any request, which do not + require operator interaction. That is, accept any requests for password + change or renewals with complete peer acceptance and reject any obviously + invalid requests. + Returns the number of actual actions taken for central throttle handling. + """ + handled = 0 + now = time.time() + for filename in listdir(configuration.user_pending): + if filename.startswith('.'): + continue + req_id = filename + req_path = os.path.join(configuration.user_pending, req_id) + logger.debug("checking if account request in %r is trivial" % req_path) + req_timestamp = os.path.getmtime(req_path) + req_age = now - req_timestamp + req_age_minutes = req_age / SECS_PER_MINUTE + if req_age_minutes > MANAGE_TRIVIAL_REQ_MINUTES: + logger.info("found pending account request in %r : %dm" % \ + (req_path, req_age_minutes)) + req_dict = load(req_path) + client_id = get_user_id(configuration, req_dict) + user_dict = load_user_dict(logger, client_id, + default_db_path(configuration)) + # TODO: add simple logic to mark invalid requests during post? + # could e.g. be + # * non-existant, unauthorized or invalid peer + # * unauthorized password change + # * single word in full name + # ... + req_invalid = req_dict.get('invalid', False) + reset_token = req_dict.get('reset_token', '') + auth_type = reset_auth_type = req_dict.get('auth', ['migoid'])[-1] + if req_invalid: + logger.info("%r made an invalid account request"% client_id) + # TODO: reject invalid req + elif reset_token: + peer_id = user_dict.get('peers', [None])[0] + reason = 'invalid password reset token' + user_copy = True + admin_copy = True + auth_type = auth_type.replace('ext', '').replace('mig', '') + default_renew = False + valid_reset = verify_reset_token(configuration, + user_dict, + reset_token, + reset_auth_type, + req_timestamp) + if valid_reset: + logger.info("%r requested and authorized password reset" % \ + client_id) + if not accept_account_req(req_id, configuration, peer_id, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type, + default_renew=default_renew): + logger.warning("failed to accept %r password reset" % \ + client_id) + else: + logger.info("accepted %r password reset" % client_id) + else: + logger.warning("%r requested password reset with bad token" + % client_id) + if not reject_account_req(req_id, configuration, reason, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type): + logger.warning("failed to reject %r password reset" % \ + client_id) + else: + logger.info("rejected %r password reset" % client_id) + elif user_dict: + logger.info("%r requested access renewal" % client_id) + # TODO: renew if trivial with valid peer + else: + logger.info("%r requested a new account requiring operator" % \ + client_id) + # TODO: actually check and accept user request if trivial + handled += 1 + logger.debug("handled %d trivial user account request action(s)" % handled) + return handled + def remind_and_expire_user_pending(configuration, now=time.time()): """Inspect user_pending dir and inform about pending but aging account requests that need operator or user action. @@ -237,12 +330,13 @@ def handle_pending_requests(configuration, now=time.time()): """ handled = 0 logger.debug("handle pending requests") + handled += manage_trivial_user_requests(configuration, now) handled += remind_and_expire_user_pending(configuration, now) # TODO: actually handle more requests like resources and peers if handled > 0: logger.info("handled %d pending requests" % handled) else: - logger.debug("no pending state cleanups") + logger.debug("no pending requests") return handled def handle_cache_updates(configuration, now=time.time()): @@ -273,9 +367,9 @@ def handle_janitor_tasks(configuration, now=time.time()): if _lookup_last_run(configuration, 'session-cleanup') + SECS_PER_HOUR < now: tasks_completed += handle_session_cleanup(configuration, now) _update_last_run(configuration, 'state-cleanup', now) - if _lookup_last_run(configuration, 'pending-requests') + SECS_PER_HOUR < now: + if _lookup_last_run(configuration, 'pending-reqs') + SECS_PER_MINUTE < now: tasks_completed += handle_pending_requests(configuration, now) - _update_last_run(configuration, 'pending-requests', now) + _update_last_run(configuration, 'pending-reqs', now) if _lookup_last_run(configuration, 'cache-updates') + SECS_PER_MINUTE < now: tasks_completed += handle_cache_updates(configuration, now) _update_last_run(configuration, 'cache-updates', now) From 89110a844875fe0f355a882571aa0611a1f13ec3 Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Tue, 2 Sep 2025 17:15:29 +0200 Subject: [PATCH 05/17] Refactor management of trivial requests into its own function. Add rejection of expired account requests. Elaborate on reject reasons. Extend infrastructure to allow rejection of obviously invalid requests based e,g, on detection during submit. In that way we can check e.g. valid password for expired accounts during submit but delay actual rejection until janitor picks it up with sufficient delay to avoid introducing a trivial password cracking mechanism in the account request forms. We can similarly test early if a requested peer has an account and reject in janitor if not. --- mig/server/grid_janitor.py | 170 ++++++++++++++++++++++--------------- 1 file changed, 103 insertions(+), 67 deletions(-) diff --git a/mig/server/grid_janitor.py b/mig/server/grid_janitor.py index 323c6affe..59477bde0 100755 --- a/mig/server/grid_janitor.py +++ b/mig/server/grid_janitor.py @@ -214,85 +214,104 @@ def handle_session_cleanup(configuration, now=time.time()): logger.debug("no pending session cleanups") return handled +def manage_single_req(configuration, req_id, req_path, db_path, now): + """Inspect single request in req_path and take care of it if it does not + require operator interaction. That is, accept or reject password reset + depending on reset token validity, renew account if the complete peer + acceptance is in place and reject request if obviously invalid. + """ + req_dict = load(req_path) + client_id = get_user_id(configuration, req_dict) + # NOTE: use timestamp from saved request file if available + req_timestamp = req_dict.get('accepted_terms', now) + user_dict = load_user_dict(logger, client_id, db_path) + req_invalid = req_dict.get('invalid', None) + reset_token = req_dict.get('reset_token', '') + req_auth = req_dict.get('auth', ['migoid'])[-1] + auth_type = req_auth.lstrip('mig').lstrip('ext') + user_copy = True + admin_copy = True + default_renew = False + if req_invalid: + logger.info("%r made an invalid account request"% client_id) + # NOTE: 'invalid' is a list of validation error strings if set + reason = 'invalid request: %s.' % '. '.join(req_invalid) + if not reject_account_req(req_id, configuration, reason, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type): + logger.warning("failed to reject invalid %r account request" + % client_id) + else: + logger.info("rejected invalid %r account request" % \ + client_id) + elif reset_token: + valid_reset = verify_reset_token(configuration, + user_dict, + reset_token, + req_auth, + req_timestamp) + if valid_reset: + logger.info("%r requested and authorized password reset" % \ + client_id) + peer_id = user_dict.get('peers', [None])[0] + if not accept_account_req(req_id, configuration, peer_id, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type, + default_renew=default_renew): + logger.warning("failed to accept %r password reset" % \ + client_id) + else: + logger.info("accepted %r password reset" % client_id) + else: + logger.warning("%r requested password reset with bad token" + % client_id) + reason = 'invalid password reset token' + if not reject_account_req(req_id, configuration, reason, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type): + logger.warning("failed to reject %r password reset" % \ + client_id) + else: + logger.info("rejected %r password reset" % client_id) + elif user_dict: + logger.info("%r requested access renewal" % client_id) + # TODO: renew if trivial with valid peer + else: + logger.info("%r requested a new account requiring operator" % \ + client_id) + def manage_trivial_user_requests(configuration, now=time.time()): """Inspect user_pending dir and take care of any request, which do not - require operator interaction. That is, accept any requests for password - change or renewals with complete peer acceptance and reject any obviously - invalid requests. + require operator interaction. That is, accept or reject any password reset + requests depending on reset token validity, renew any with complete peer + acceptance and reject any obviously invalid requests. Returns the number of actual actions taken for central throttle handling. """ + # TODO: add simple logic to mark invalid requests already during submit? + # could e.g. be + # * non-existant, unauthorized or invalid peer + # * unauthorized password change + # * single word in full name + # ... + # Then use the invalid marker to reject in manage_single_req handled = 0 now = time.time() + db_path = default_db_path(configuration) for filename in listdir(configuration.user_pending): if filename.startswith('.'): continue req_id = filename req_path = os.path.join(configuration.user_pending, req_id) logger.debug("checking if account request in %r is trivial" % req_path) - req_timestamp = os.path.getmtime(req_path) - req_age = now - req_timestamp + req_age = now - os.path.getmtime(req_path) req_age_minutes = req_age / SECS_PER_MINUTE if req_age_minutes > MANAGE_TRIVIAL_REQ_MINUTES: logger.info("found pending account request in %r : %dm" % \ (req_path, req_age_minutes)) - req_dict = load(req_path) - client_id = get_user_id(configuration, req_dict) - user_dict = load_user_dict(logger, client_id, - default_db_path(configuration)) - # TODO: add simple logic to mark invalid requests during post? - # could e.g. be - # * non-existant, unauthorized or invalid peer - # * unauthorized password change - # * single word in full name - # ... - req_invalid = req_dict.get('invalid', False) - reset_token = req_dict.get('reset_token', '') - auth_type = reset_auth_type = req_dict.get('auth', ['migoid'])[-1] - if req_invalid: - logger.info("%r made an invalid account request"% client_id) - # TODO: reject invalid req - elif reset_token: - peer_id = user_dict.get('peers', [None])[0] - reason = 'invalid password reset token' - user_copy = True - admin_copy = True - auth_type = auth_type.replace('ext', '').replace('mig', '') - default_renew = False - valid_reset = verify_reset_token(configuration, - user_dict, - reset_token, - reset_auth_type, - req_timestamp) - if valid_reset: - logger.info("%r requested and authorized password reset" % \ - client_id) - if not accept_account_req(req_id, configuration, peer_id, - user_copy=user_copy, - admin_copy=admin_copy, - auth_type=auth_type, - default_renew=default_renew): - logger.warning("failed to accept %r password reset" % \ - client_id) - else: - logger.info("accepted %r password reset" % client_id) - else: - logger.warning("%r requested password reset with bad token" - % client_id) - if not reject_account_req(req_id, configuration, reason, - user_copy=user_copy, - admin_copy=admin_copy, - auth_type=auth_type): - logger.warning("failed to reject %r password reset" % \ - client_id) - else: - logger.info("rejected %r password reset" % client_id) - elif user_dict: - logger.info("%r requested access renewal" % client_id) - # TODO: renew if trivial with valid peer - else: - logger.info("%r requested a new account requiring operator" % \ - client_id) - # TODO: actually check and accept user request if trivial + manage_single_req(configuration, req_id, req_path, db_path, now) handled += 1 logger.debug("handled %d trivial user account request action(s)" % handled) return handled @@ -307,19 +326,36 @@ def remind_and_expire_user_pending(configuration, now=time.time()): for filename in listdir(configuration.user_pending): if filename.startswith('.'): continue - req_path = os.path.join(configuration.user_pending, filename) + req_id = filename + req_path = os.path.join(configuration.user_pending, req_id) logger.debug("checking account request in %r" % req_path) req_age = now - os.path.getmtime(req_path) req_age_days = req_age / SECS_PER_DAY + req_dict = load(req_path) + client_id = get_user_id(configuration, req_dict) + req_auth = req_dict.get('auth', ['migoid'])[-1] + auth_type = req_auth.lstrip('mig').lstrip('ext') if req_age_days > REMIND_REQ_DAYS: logger.info("found stale account request in %r : %dd" % \ (req_path, req_age_days)) # TODO: actually remind operator and user that request is pending + # ... possibly with copy to peers if pending acceptance. handled += 1 if req_age_days > EXPIRE_REQ_DAYS: - logger.info("found expired account request in %r : %dd" % \ - (req_path, req_age_days)) - # TODO: actually expire request and inform user + logger.info("found expired account request from %r in %s : %dd" % \ + (client_id, req_path, req_age_days)) + reason = 'failed to be verified and accepted within %d day limit' \ + % EXPIRE_REQ_DAYS + user_copy = True + admin_copy = True + if not reject_account_req(req_id, configuration, reason, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type): + logger.warning("failed to expire %s request from %r" % \ + (req_id, client_id)) + else: + logger.info("expired %s request from %r" % (req_id, client_id)) handled += 1 logger.debug("handled %d user account request action(s)" % handled) return handled From c349205d0186067e1d01042ace0e4d173329c591 Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Thu, 4 Sep 2025 09:44:25 +0200 Subject: [PATCH 06/17] Add more invalid account request handling to automatically also reject: - requests for renew with existing user ID collisions - requests for renew with unauthorized password change - requests for renew of suspended/blocked accounts - requests with insufficient peers info - requests with expire set in the past - requests with invalid full name Polished some style bits and stripped some undesired newlines in log entries. --- mig/server/grid_janitor.py | 45 ++++++++++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/mig/server/grid_janitor.py b/mig/server/grid_janitor.py index 59477bde0..e88e5824c 100755 --- a/mig/server/grid_janitor.py +++ b/mig/server/grid_janitor.py @@ -38,14 +38,15 @@ import sys import time -from mig.shared.accountreq import accept_account_req, reject_account_req +from mig.shared.accountreq import accept_account_req, existing_user_collision, \ + reject_account_req from mig.shared.base import get_user_id from mig.shared.conf import get_configuration_object from mig.shared.fileio import listdir, delete_file from mig.shared.logger import daemon_logger, register_hangup_handler from mig.shared.pwcrypto import verify_reset_token from mig.shared.serial import load -from mig.shared.userdb import load_user_dict, default_db_path +from mig.shared.userdb import default_db_path, load_user_dict # TODO: adjust short to subsecond and long to e.g a minute for production use #SHORT_THROTTLE_SECS = 0.5 @@ -224,6 +225,7 @@ def manage_single_req(configuration, req_id, req_path, db_path, now): client_id = get_user_id(configuration, req_dict) # NOTE: use timestamp from saved request file if available req_timestamp = req_dict.get('accepted_terms', now) + req_expire = req_dict.get('expire', now) user_dict = load_user_dict(logger, client_id, db_path) req_invalid = req_dict.get('invalid', None) reset_token = req_dict.get('reset_token', '') @@ -233,7 +235,7 @@ def manage_single_req(configuration, req_id, req_path, db_path, now): admin_copy = True default_renew = False if req_invalid: - logger.info("%r made an invalid account request"% client_id) + logger.info("%r made an invalid account request" % client_id) # NOTE: 'invalid' is a list of validation error strings if set reason = 'invalid request: %s.' % '. '.join(req_invalid) if not reject_account_req(req_id, configuration, reason, @@ -276,9 +278,31 @@ def manage_single_req(configuration, req_id, req_path, db_path, now): client_id) else: logger.info("rejected %r password reset" % client_id) + elif req_expire < now: + # NOTE: probably should no longer happen after initial auto clean + logger.warning("%r request is now past expire" % client_id) + reason = 'expired request - please re-request if still relevant' + if not reject_account_req(req_id, configuration, reason, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type): + logger.warning("failed to reject expired %r request" % client_id) + else: + logger.info("rejected %r request now past expire" % client_id) + elif existing_user_collision(configuration, req_dict, client_id): + logger.warning('ID collision in request from %r' % client_id) + reason = 'ID collision - please re-request with *existing* ID fields' + if not reject_account_req(req_id, configuration, reason, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type): + logger.warning("failed to reject %r request with ID collision" % \ + client_id) + else: + logger.info("rejected %r request with ID collision" % client_id) elif user_dict: logger.info("%r requested access renewal" % client_id) - # TODO: renew if trivial with valid peer + # TODO: renew if trivial with existing valid peer else: logger.info("%r requested a new account requiring operator" % \ client_id) @@ -288,18 +312,15 @@ def manage_trivial_user_requests(configuration, now=time.time()): require operator interaction. That is, accept or reject any password reset requests depending on reset token validity, renew any with complete peer acceptance and reject any obviously invalid requests. + Relies on various checks taking place during account request to detect but + silently mark e.g. invalid password change and invalid peers to avoid + password guessing and information disclosure. Returns the number of actual actions taken for central throttle handling. """ - # TODO: add simple logic to mark invalid requests already during submit? - # could e.g. be - # * non-existant, unauthorized or invalid peer - # * unauthorized password change - # * single word in full name - # ... - # Then use the invalid marker to reject in manage_single_req handled = 0 now = time.time() db_path = default_db_path(configuration) + # TODO: handle duplicate requests here, too? for filename in listdir(configuration.user_pending): if filename.startswith('.'): continue @@ -343,7 +364,7 @@ def remind_and_expire_user_pending(configuration, now=time.time()): handled += 1 if req_age_days > EXPIRE_REQ_DAYS: logger.info("found expired account request from %r in %s : %dd" % \ - (client_id, req_path, req_age_days)) + (client_id, req_path, req_age_days)) reason = 'failed to be verified and accepted within %d day limit' \ % EXPIRE_REQ_DAYS user_copy = True From 681471885a363577c80407d157377fa40705cfdc Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Sun, 7 Sep 2025 12:47:47 +0200 Subject: [PATCH 07/17] Refactor grid_janitor to use new mig/lib/ for all helper functions and make the daemon itself very shallow. --- mig/lib/daemon.py | 55 +++++ mig/lib/janitor.py | 444 ++++++++++++++++++++++++++++++++++ mig/server/grid_janitor.py | 406 +------------------------------ tests/test_mig_lib_daemon.py | 51 ++++ tests/test_mig_lib_janitor.py | 50 ++++ 5 files changed, 611 insertions(+), 395 deletions(-) create mode 100644 mig/lib/daemon.py create mode 100644 mig/lib/janitor.py create mode 100644 tests/test_mig_lib_daemon.py create mode 100644 tests/test_mig_lib_janitor.py diff --git a/mig/lib/daemon.py b/mig/lib/daemon.py new file mode 100644 index 000000000..5b43684ad --- /dev/null +++ b/mig/lib/daemon.py @@ -0,0 +1,55 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# daemons - helpers to support various service daemons e.g. in signal handling +# Copyright (C) 2003-2025 The MiG Project by the Science HPC Center at UCPH +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# -- END_HEADER --- +# + +"""Helpers used by service daemons e.g. to react to signals""" + +import multiprocessing +import signal + +_stop_event = multiprocessing.Event() + + +def stop_running(): + """A simple helper to set stop marker after some signal was received""" + return _stop_event.set() + + +def check_stop(): + """A simple test to see if stop marker was set after some signal was received""" + return _stop_event.is_set() + + +def stop_handler(sig, frame): + """A simple signal handler to help quit on interrupt signal in main""" + # Print blank line to avoid mix with Ctrl-C line + print("") + stop_running() + + +def register_stop_handler(configuration, stop_signal=signal.SIGINT): + """Set up stop handler to react on provided stop_signal""" + signal.signal(stop_signal, stop_handler) diff --git a/mig/lib/janitor.py b/mig/lib/janitor.py new file mode 100644 index 000000000..e8abd7251 --- /dev/null +++ b/mig/lib/janitor.py @@ -0,0 +1,444 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# janitor - janitor helpers to handle recurring clean up and update tasks +# Copyright (C) 2003-2025 The MiG Project by the Science HPC Center at UCPH +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# -- END_HEADER --- +# + +"""Helpers for the janitor service which takes care of various recurring tasks + like clean up, cache updates and pruning of pending requests. +""" + +from __future__ import absolute_import, print_function + +import fnmatch +import os +import time + +from mig.shared.accountreq import accept_account_req, existing_user_collision, \ + reject_account_req +from mig.shared.base import get_user_id +from mig.shared.fileio import listdir, delete_file +from mig.shared.pwcrypto import verify_reset_token +from mig.shared.serial import load +from mig.shared.userdb import default_db_path, load_user_dict + +REMIND_REQ_DAYS = 5 +EXPIRE_REQ_DAYS = 30 +MANAGE_TRIVIAL_REQ_MINUTES = 5 + +EXPIRE_STATE_DAYS = 30 +EXPIRE_DUMMY_JOBS_DAYS = 7 +EXPIRE_TWOFACTOR_DAYS = 1 + +SECS_PER_MINUTE = 60 +SECS_PER_HOUR = 60 * SECS_PER_MINUTE +SECS_PER_DAY = 24 * SECS_PER_HOUR + +task_triggers = {} + + +# TODO: add a signal handler to force run pending tasks right away + +def _lookup_last_run(configuration, target): + """Check if target task is pending using internal accounting for task. + Returns the timestamp when the task was last run in UN*X epoch. + """ + _logger = configuration.logger + # Lazy init + last_stamp = task_triggers[target] = task_triggers.get(target, -1) + if last_stamp > 0: + _logger.debug("last %s task ran at %d" % (target, last_stamp)) + else: + _logger.debug("no last %s task run in history" % target) + return last_stamp + + +def _update_last_run(configuration, target, stamp): + """Update target task pending mark using internal accounting and supplied + task timestamp in UN*X epoch. + Returns the same updated timestamp for the task. + """ + # TODO: add a more persistent marker e.g. in mig system run or files to + # remember last status across restarts and reboots? + task_triggers[target] = stamp + return task_triggers[target] + + +def _clean_stale_state_files(configuration, target_dir, filename_patterns, + expire_days, now, include_dotfiles=False): + """Inspect and clean up stale state files matching any of filename_pattern + in target_dir if they are at least expire_days old. Where filename_pattern + is a list of wildcard strings checked with fnmatch. Dot-files are excluded + from matching unless include_dotfiles is set. Directories are just skipped. + Returns the number of actual actions taken for central throttle handling. + """ + _logger = configuration.logger + handled = 0 + _logger.debug("clean files matching %r in %r if older than %dd" % + (filename_patterns, target_dir, expire_days)) + for filename in listdir(target_dir): + tmp_path = os.path.join(target_dir, filename) + if not include_dotfiles and filename.startswith('.'): + continue + if os.path.isdir(tmp_path): + continue + tmp_age = -1 + for pattern in filename_patterns: + if fnmatch.fnmatch(filename, pattern): + _logger.debug("checking if state file %r is stale" % tmp_path) + tmp_age = now - os.path.getmtime(tmp_path) + else: + continue + tmp_age_days = tmp_age / SECS_PER_DAY + _logger.debug("found state file %r of age %ds / %dd" % + (tmp_path, tmp_age, tmp_age_days)) + if tmp_age_days > expire_days: + _logger.info("remove stale tmp file in %r : %dd" % (tmp_path, + tmp_age_days)) + if not delete_file(tmp_path, _logger): + _logger.error("failed to remove stale file %r" % tmp_path) + handled += 1 + _logger.debug("handled %d stale state file cleanups" % handled) + return handled + + +def clean_mig_system_files(configuration, now=time.time()): + """Inspect and clean up stale state files in mig_system_run. + Returns the number of actual actions taken for central throttle handling. + """ + return _clean_stale_state_files(configuration, + configuration.mig_system_files, + ['tmp*', 'no_grid_jobs*'], + EXPIRE_STATE_DAYS, now) + + +def clean_sessid_to_mrls_link_home(configuration, now=time.time()): + """Inspect and clean up stale state files in sessid_to_mrsl_link_home. + Returns the number of actual actions taken for central throttle handling. + """ + return _clean_stale_state_files(configuration, + configuration.sessid_to_mrsl_link_home, + ['*'], EXPIRE_STATE_DAYS, now) + + +def clean_webserver_home(configuration, now=time.time()): + """Inspect and clean up stale state files in webserver_home. + Returns the number of actual actions taken for central throttle handling. + """ + return _clean_stale_state_files(configuration, + configuration.webserver_home, + ['*'], EXPIRE_STATE_DAYS, now) + + +def clean_no_job_helpers(configuration, now=time.time()): + """Inspect and clean up stale state empty job helpers inside user_home. + Returns the number of actual actions taken for central throttle handling. + """ + dummy_job_path = os.path.join(configuration.user_home, + 'no_grid_jobs_in_grid_scheduler') + return _clean_stale_state_files(configuration, + dummy_job_path, + ['*'], EXPIRE_DUMMY_JOBS_DAYS, now) + + +def clean_twofactor_sessions(configuration, now=time.time()): + """Inspect and clean up stale state files in twofactor_home. + Returns the number of actual actions taken for central throttle handling. + """ + return _clean_stale_state_files(configuration, + configuration.twofactor_home, + ['*'], EXPIRE_TWOFACTOR_DAYS, now) + + +def handle_state_cleanup(configuration, now=time.time()): + """Inspect various state dirs to clean up general stale old temporay files. + Returns the number of actual actions taken for central throttle handling. + """ + _logger = configuration.logger + handled = 0 + _logger.debug("handle pending state cleanups") + handled += clean_mig_system_files(configuration, now) + handled += clean_webserver_home(configuration, now) + if configuration.site_enable_jobs: + handled += clean_no_job_helpers(configuration, now) + # TODO: handle gzip of events files like cronjob + if handled > 0: + _logger.info("handled %d pending state cleanup(s)" % handled) + else: + _logger.debug("no pending state cleanups") + return handled + + +def handle_session_cleanup(configuration, now=time.time()): + """Inspect various state dirs to clean up stale session files specifically. + Returns the number of actual actions taken for central throttle handling. + """ + _logger = configuration.logger + handled = 0 + _logger.debug("handle pending session cleanups") + if configuration.site_enable_jobs: + handled += clean_sessid_to_mrls_link_home(configuration, now) + handled += clean_twofactor_sessions(configuration, now) + # TODO: handle client session tracking cleanup (cleansessions.py) + if handled > 0: + _logger.info("handled %d pending session cleanup(s)" % handled) + else: + _logger.debug("no pending session cleanups") + return handled + + +def manage_single_req(configuration, req_id, req_path, db_path, now): + """Inspect single request in req_path and take care of it if it does not + require operator interaction. That is, accept or reject password reset + depending on reset token validity, renew account if the complete peer + acceptance is in place and reject request if obviously invalid. + """ + _logger = configuration.logger + req_dict = load(req_path) + client_id = get_user_id(configuration, req_dict) + # NOTE: use timestamp from saved request file if available + req_timestamp = req_dict.get('accepted_terms', now) + req_expire = req_dict.get('expire', now) + user_dict = load_user_dict(_logger, client_id, db_path) + req_invalid = req_dict.get('invalid', None) + reset_token = req_dict.get('reset_token', '') + req_auth = req_dict.get('auth', ['migoid'])[-1] + auth_type = req_auth.lstrip('mig').lstrip('ext') + user_copy = True + admin_copy = True + default_renew = False + if req_invalid: + _logger.info("%r made an invalid account request" % client_id) + # NOTE: 'invalid' is a list of validation error strings if set + reason = 'invalid request: %s.' % '. '.join(req_invalid) + if not reject_account_req(req_id, configuration, reason, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type): + _logger.warning("failed to reject invalid %r account request" + % client_id) + else: + _logger.info("rejected invalid %r account request" % + client_id) + elif reset_token: + valid_reset = verify_reset_token(configuration, + user_dict, + reset_token, + req_auth, + req_timestamp) + if valid_reset: + _logger.info("%r requested and authorized password reset" % + client_id) + peer_id = user_dict.get('peers', [None])[0] + if not accept_account_req(req_id, configuration, peer_id, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type, + default_renew=default_renew): + _logger.warning("failed to accept %r password reset" % + client_id) + else: + _logger.info("accepted %r password reset" % client_id) + else: + _logger.warning("%r requested password reset with bad token" + % client_id) + reason = 'invalid password reset token' + if not reject_account_req(req_id, configuration, reason, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type): + _logger.warning("failed to reject %r password reset" % + client_id) + else: + _logger.info("rejected %r password reset" % client_id) + elif req_expire < now: + # NOTE: probably should no longer happen after initial auto clean + _logger.warning("%r request is now past expire" % client_id) + reason = 'expired request - please re-request if still relevant' + if not reject_account_req(req_id, configuration, reason, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type): + _logger.warning("failed to reject expired %r request" % client_id) + else: + _logger.info("rejected %r request now past expire" % client_id) + elif existing_user_collision(configuration, req_dict, client_id): + _logger.warning('ID collision in request from %r' % client_id) + reason = 'ID collision - please re-request with *existing* ID fields' + if not reject_account_req(req_id, configuration, reason, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type): + _logger.warning("failed to reject %r request with ID collision" % + client_id) + else: + _logger.info("rejected %r request with ID collision" % client_id) + elif user_dict: + _logger.info("%r requested access renewal" % client_id) + # TODO: renew if trivial with existing valid peer + else: + _logger.info("%r requested a new account requiring operator" % + client_id) + + +def manage_trivial_user_requests(configuration, now=time.time()): + """Inspect user_pending dir and take care of any request, which do not + require operator interaction. That is, accept or reject any password reset + requests depending on reset token validity, renew any with complete peer + acceptance and reject any obviously invalid requests. + Relies on various checks taking place during account request to detect but + silently mark e.g. invalid password change and invalid peers to avoid + password guessing and information disclosure. + Returns the number of actual actions taken for central throttle handling. + """ + _logger = configuration.logger + handled = 0 + now = time.time() + db_path = default_db_path(configuration) + # TODO: handle duplicate requests here, too? + for filename in listdir(configuration.user_pending): + if filename.startswith('.'): + continue + req_id = filename + req_path = os.path.join(configuration.user_pending, req_id) + _logger.debug( + "checking if account request in %r is trivial" % req_path) + req_age = now - os.path.getmtime(req_path) + req_age_minutes = req_age / SECS_PER_MINUTE + if req_age_minutes > MANAGE_TRIVIAL_REQ_MINUTES: + _logger.info("found pending account request in %r : %dm" % + (req_path, req_age_minutes)) + manage_single_req(configuration, req_id, req_path, db_path, now) + handled += 1 + _logger.debug( + "handled %d trivial user account request action(s)" % handled) + return handled + + +def remind_and_expire_user_pending(configuration, now=time.time()): + """Inspect user_pending dir and inform about pending but aging account + requests that need operator or user action. + Returns the number of actual actions taken for central throttle handling. + """ + _logger = configuration.logger + handled = 0 + now = time.time() + for filename in listdir(configuration.user_pending): + if filename.startswith('.'): + continue + req_id = filename + req_path = os.path.join(configuration.user_pending, req_id) + _logger.debug("checking account request in %r" % req_path) + req_age = now - os.path.getmtime(req_path) + req_age_days = req_age / SECS_PER_DAY + req_dict = load(req_path) + client_id = get_user_id(configuration, req_dict) + req_auth = req_dict.get('auth', ['migoid'])[-1] + auth_type = req_auth.lstrip('mig').lstrip('ext') + if req_age_days > REMIND_REQ_DAYS: + _logger.info("found stale account request in %r : %dd" % + (req_path, req_age_days)) + # TODO: actually remind operator and user that request is pending + # ... possibly with copy to peers if pending acceptance. + handled += 1 + if req_age_days > EXPIRE_REQ_DAYS: + _logger.info("found expired account request from %r in %s : %dd" % + (client_id, req_path, req_age_days)) + reason = 'failed to be verified and accepted within %d day limit' \ + % EXPIRE_REQ_DAYS + user_copy = True + admin_copy = True + if not reject_account_req(req_id, configuration, reason, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type): + _logger.warning("failed to expire %s request from %r" % + (req_id, client_id)) + else: + _logger.info("expired %s request from %r" % + (req_id, client_id)) + handled += 1 + _logger.debug("handled %d user account request action(s)" % handled) + return handled + + +def handle_pending_requests(configuration, now=time.time()): + """Inspect various state dirs to remind or clean up stale requests. + Returns the number of actual actions taken for central throttle handling. + """ + _logger = configuration.logger + handled = 0 + _logger.debug("handle pending requests") + handled += manage_trivial_user_requests(configuration, now) + handled += remind_and_expire_user_pending(configuration, now) + # TODO: actually handle more requests like resources and peers + if handled > 0: + _logger.info("handled %d pending requests" % handled) + else: + _logger.debug("no pending requests") + return handled + + +def handle_cache_updates(configuration, now=time.time()): + """Inspect internal cache update markers and handle any corresponding cache + updates in one place to avoid thrashing. + Returns the number of actual actions taken for central throttle handling. + """ + _logger = configuration.logger + handled = 0 + _logger.debug("handle pending cache updates") + # TODO: actually handle vgrid/user/resource/... cache updates + if handled > 0: + _logger.info("handled %d pending cache updates" % handled) + else: + _logger.debug("no pending state cleanups") + return handled + + +def handle_janitor_tasks(configuration, now=time.time()): + """A wrapper to take care of all regular janitor tasks like clean up and + cache updates. + Returns the number of actual tasks completed to let the main thread know if + it should throttle down or continue next run right away. + """ + _logger = configuration.logger + tasks_completed = 0 + _logger.info("handle any pending janitor tasks") + if _lookup_last_run(configuration, 'state-cleanup') + SECS_PER_DAY < now: + tasks_completed += handle_state_cleanup(configuration, now) + _update_last_run(configuration, 'state-cleanup', now) + if _lookup_last_run(configuration, 'session-cleanup') + SECS_PER_HOUR < now: + tasks_completed += handle_session_cleanup(configuration, now) + _update_last_run(configuration, 'state-cleanup', now) + if _lookup_last_run(configuration, 'pending-reqs') + SECS_PER_MINUTE < now: + tasks_completed += handle_pending_requests(configuration, now) + _update_last_run(configuration, 'pending-reqs', now) + if _lookup_last_run(configuration, 'cache-updates') + SECS_PER_MINUTE < now: + tasks_completed += handle_cache_updates(configuration, now) + _update_last_run(configuration, 'cache-updates', now) + if tasks_completed > 0: + _logger.info("handled %d janitor task(s)" % tasks_completed) + else: + _logger.info("janitor found no pending tasks") + return tasks_completed diff --git a/mig/server/grid_janitor.py b/mig/server/grid_janitor.py index e88e5824c..eb3bdb2d8 100755 --- a/mig/server/grid_janitor.py +++ b/mig/server/grid_janitor.py @@ -31,410 +31,23 @@ from __future__ import absolute_import, print_function -import fnmatch -import multiprocessing import os -import signal import sys import time -from mig.shared.accountreq import accept_account_req, existing_user_collision, \ - reject_account_req -from mig.shared.base import get_user_id +from mig.lib.daemon import stop_running, check_stop, register_stop_handler +from mig.lib.janitor import handle_janitor_tasks from mig.shared.conf import get_configuration_object -from mig.shared.fileio import listdir, delete_file from mig.shared.logger import daemon_logger, register_hangup_handler -from mig.shared.pwcrypto import verify_reset_token -from mig.shared.serial import load -from mig.shared.userdb import default_db_path, load_user_dict # TODO: adjust short to subsecond and long to e.g a minute for production use -#SHORT_THROTTLE_SECS = 0.5 -#LONG_THROTTLE_SECS = 60.0 +# SHORT_THROTTLE_SECS = 0.5 +# LONG_THROTTLE_SECS = 60.0 SHORT_THROTTLE_SECS = 5.0 LONG_THROTTLE_SECS = 30.0 -REMIND_REQ_DAYS = 5 -EXPIRE_REQ_DAYS = 30 -MANAGE_TRIVIAL_REQ_MINUTES = 5 - -EXPIRE_STATE_DAYS = 30 -EXPIRE_DUMMY_JOBS_DAYS = 7 -EXPIRE_TWOFACTOR_DAYS = 1 - -SECS_PER_MINUTE = 60 -SECS_PER_HOUR = 60 * SECS_PER_MINUTE -SECS_PER_DAY =24 * SECS_PER_HOUR - -stop_running = multiprocessing.Event() (configuration, logger) = (None, None) -task_triggers = {} - - -# TODO: add a signal handler to force run pending tasks right away - -def stop_handler(sig, frame): - """A simple signal handler to quit on Ctrl+C (SIGINT) in main""" - # Print blank line to avoid mix with Ctrl-C line - print("") - stop_running.set() - -def _lookup_last_run(configuration, target): - """Check if target task is pending using internal accounting for task. - Returns the timestamp when the task was last run in UN*X epoch. - """ - # Lazy init - last_stamp = task_triggers[target] = task_triggers.get(target, -1) - if last_stamp > 0: - logger.debug("last %s task ran at %d" % (target, last_stamp)) - else: - logger.debug("no last %s task run in history" % target) - return last_stamp - -def _update_last_run(configuration, target, stamp): - """Update target task pending mark using internal accounting and supplied - task timestamp in UN*X epoch. - Returns the same updated timestamp for the task. - """ - # TODO: add a more persistent marker e.g. in mig system run or files to - # remember last status across restarts and reboots? - task_triggers[target] = stamp - return task_triggers[target] - - -def _clean_stale_state_files(configuration, target_dir, filename_patterns, - expire_days, now, include_dotfiles=False): - """Inspect and clean up stale state files matching any of filename_pattern - in target_dir if they are at least expire_days old. Where filename_pattern - is a list of wildcard strings checked with fnmatch. Dot-files are excluded - from matching unless include_dotfiles is set. Directories are just skipped. - Returns the number of actual actions taken for central throttle handling. - """ - handled = 0 - logger.debug("clean files matching %r in %r if older than %dd" % \ - (filename_patterns, target_dir, expire_days)) - for filename in listdir(target_dir): - tmp_path = os.path.join(target_dir, filename) - if not include_dotfiles and filename.startswith('.'): - continue - if os.path.isdir(tmp_path): - continue - tmp_age = -1 - for pattern in filename_patterns: - if fnmatch.fnmatch(filename, pattern): - logger.debug("checking if state file %r is stale" % tmp_path) - tmp_age = now - os.path.getmtime(tmp_path) - else: - continue - tmp_age_days = tmp_age / SECS_PER_DAY - logger.debug("found state file %r of age %ds / %dd" % \ - (tmp_path, tmp_age, tmp_age_days)) - if tmp_age_days > expire_days: - logger.info("remove stale tmp file in %r : %dd" % (tmp_path, - tmp_age_days)) - if not delete_file(tmp_path, logger): - logger.error("failed to remove stale file %r" % tmp_path) - handled += 1 - logger.debug("handled %d stale state file cleanups" % handled) - return handled - -def clean_mig_system_files(configuration, now=time.time()): - """Inspect and clean up stale state files in mig_system_run. - Returns the number of actual actions taken for central throttle handling. - """ - return _clean_stale_state_files(configuration, - configuration.mig_system_files, - ['tmp*', 'no_grid_jobs*'], - EXPIRE_STATE_DAYS, now) - -def clean_sessid_to_mrls_link_home(configuration, now=time.time()): - """Inspect and clean up stale state files in sessid_to_mrsl_link_home. - Returns the number of actual actions taken for central throttle handling. - """ - return _clean_stale_state_files(configuration, - configuration.sessid_to_mrsl_link_home, - ['*'], EXPIRE_STATE_DAYS, now) - -def clean_webserver_home(configuration, now=time.time()): - """Inspect and clean up stale state files in webserver_home. - Returns the number of actual actions taken for central throttle handling. - """ - return _clean_stale_state_files(configuration, - configuration.webserver_home, - ['*'], EXPIRE_STATE_DAYS, now) - -def clean_no_job_helpers(configuration, now=time.time()): - """Inspect and clean up stale state empty job helpers inside user_home. - Returns the number of actual actions taken for central throttle handling. - """ - dummy_job_path = os.path.join(configuration.user_home, - 'no_grid_jobs_in_grid_scheduler') - return _clean_stale_state_files(configuration, - dummy_job_path, - ['*'], EXPIRE_DUMMY_JOBS_DAYS, now) - -def clean_twofactor_sessions(configuration, now=time.time()): - """Inspect and clean up stale state files in twofactor_home. - Returns the number of actual actions taken for central throttle handling. - """ - return _clean_stale_state_files(configuration, - configuration.twofactor_home, - ['*'], EXPIRE_TWOFACTOR_DAYS, now) - -def handle_state_cleanup(configuration, now=time.time()): - """Inspect various state dirs to clean up general stale old temporay files. - Returns the number of actual actions taken for central throttle handling. - """ - handled = 0 - logger.debug("handle pending state cleanups") - handled += clean_mig_system_files(configuration, now) - handled += clean_webserver_home(configuration, now) - if configuration.site_enable_jobs: - handled += clean_no_job_helpers(configuration, now) - # TODO: handle gzip of events files like cronjob - if handled > 0: - logger.info("handled %d pending state cleanup(s)" % handled) - else: - logger.debug("no pending state cleanups") - return handled - -def handle_session_cleanup(configuration, now=time.time()): - """Inspect various state dirs to clean up stale session files specifically. - Returns the number of actual actions taken for central throttle handling. - """ - handled = 0 - logger.debug("handle pending session cleanups") - if configuration.site_enable_jobs: - handled += clean_sessid_to_mrls_link_home(configuration, now) - handled += clean_twofactor_sessions(configuration, now) - # TODO: handle client session tracking cleanup (cleansessions.py) - if handled > 0: - logger.info("handled %d pending session cleanup(s)" % handled) - else: - logger.debug("no pending session cleanups") - return handled - -def manage_single_req(configuration, req_id, req_path, db_path, now): - """Inspect single request in req_path and take care of it if it does not - require operator interaction. That is, accept or reject password reset - depending on reset token validity, renew account if the complete peer - acceptance is in place and reject request if obviously invalid. - """ - req_dict = load(req_path) - client_id = get_user_id(configuration, req_dict) - # NOTE: use timestamp from saved request file if available - req_timestamp = req_dict.get('accepted_terms', now) - req_expire = req_dict.get('expire', now) - user_dict = load_user_dict(logger, client_id, db_path) - req_invalid = req_dict.get('invalid', None) - reset_token = req_dict.get('reset_token', '') - req_auth = req_dict.get('auth', ['migoid'])[-1] - auth_type = req_auth.lstrip('mig').lstrip('ext') - user_copy = True - admin_copy = True - default_renew = False - if req_invalid: - logger.info("%r made an invalid account request" % client_id) - # NOTE: 'invalid' is a list of validation error strings if set - reason = 'invalid request: %s.' % '. '.join(req_invalid) - if not reject_account_req(req_id, configuration, reason, - user_copy=user_copy, - admin_copy=admin_copy, - auth_type=auth_type): - logger.warning("failed to reject invalid %r account request" - % client_id) - else: - logger.info("rejected invalid %r account request" % \ - client_id) - elif reset_token: - valid_reset = verify_reset_token(configuration, - user_dict, - reset_token, - req_auth, - req_timestamp) - if valid_reset: - logger.info("%r requested and authorized password reset" % \ - client_id) - peer_id = user_dict.get('peers', [None])[0] - if not accept_account_req(req_id, configuration, peer_id, - user_copy=user_copy, - admin_copy=admin_copy, - auth_type=auth_type, - default_renew=default_renew): - logger.warning("failed to accept %r password reset" % \ - client_id) - else: - logger.info("accepted %r password reset" % client_id) - else: - logger.warning("%r requested password reset with bad token" - % client_id) - reason = 'invalid password reset token' - if not reject_account_req(req_id, configuration, reason, - user_copy=user_copy, - admin_copy=admin_copy, - auth_type=auth_type): - logger.warning("failed to reject %r password reset" % \ - client_id) - else: - logger.info("rejected %r password reset" % client_id) - elif req_expire < now: - # NOTE: probably should no longer happen after initial auto clean - logger.warning("%r request is now past expire" % client_id) - reason = 'expired request - please re-request if still relevant' - if not reject_account_req(req_id, configuration, reason, - user_copy=user_copy, - admin_copy=admin_copy, - auth_type=auth_type): - logger.warning("failed to reject expired %r request" % client_id) - else: - logger.info("rejected %r request now past expire" % client_id) - elif existing_user_collision(configuration, req_dict, client_id): - logger.warning('ID collision in request from %r' % client_id) - reason = 'ID collision - please re-request with *existing* ID fields' - if not reject_account_req(req_id, configuration, reason, - user_copy=user_copy, - admin_copy=admin_copy, - auth_type=auth_type): - logger.warning("failed to reject %r request with ID collision" % \ - client_id) - else: - logger.info("rejected %r request with ID collision" % client_id) - elif user_dict: - logger.info("%r requested access renewal" % client_id) - # TODO: renew if trivial with existing valid peer - else: - logger.info("%r requested a new account requiring operator" % \ - client_id) - -def manage_trivial_user_requests(configuration, now=time.time()): - """Inspect user_pending dir and take care of any request, which do not - require operator interaction. That is, accept or reject any password reset - requests depending on reset token validity, renew any with complete peer - acceptance and reject any obviously invalid requests. - Relies on various checks taking place during account request to detect but - silently mark e.g. invalid password change and invalid peers to avoid - password guessing and information disclosure. - Returns the number of actual actions taken for central throttle handling. - """ - handled = 0 - now = time.time() - db_path = default_db_path(configuration) - # TODO: handle duplicate requests here, too? - for filename in listdir(configuration.user_pending): - if filename.startswith('.'): - continue - req_id = filename - req_path = os.path.join(configuration.user_pending, req_id) - logger.debug("checking if account request in %r is trivial" % req_path) - req_age = now - os.path.getmtime(req_path) - req_age_minutes = req_age / SECS_PER_MINUTE - if req_age_minutes > MANAGE_TRIVIAL_REQ_MINUTES: - logger.info("found pending account request in %r : %dm" % \ - (req_path, req_age_minutes)) - manage_single_req(configuration, req_id, req_path, db_path, now) - handled += 1 - logger.debug("handled %d trivial user account request action(s)" % handled) - return handled - -def remind_and_expire_user_pending(configuration, now=time.time()): - """Inspect user_pending dir and inform about pending but aging account - requests that need operator or user action. - Returns the number of actual actions taken for central throttle handling. - """ - handled = 0 - now = time.time() - for filename in listdir(configuration.user_pending): - if filename.startswith('.'): - continue - req_id = filename - req_path = os.path.join(configuration.user_pending, req_id) - logger.debug("checking account request in %r" % req_path) - req_age = now - os.path.getmtime(req_path) - req_age_days = req_age / SECS_PER_DAY - req_dict = load(req_path) - client_id = get_user_id(configuration, req_dict) - req_auth = req_dict.get('auth', ['migoid'])[-1] - auth_type = req_auth.lstrip('mig').lstrip('ext') - if req_age_days > REMIND_REQ_DAYS: - logger.info("found stale account request in %r : %dd" % \ - (req_path, req_age_days)) - # TODO: actually remind operator and user that request is pending - # ... possibly with copy to peers if pending acceptance. - handled += 1 - if req_age_days > EXPIRE_REQ_DAYS: - logger.info("found expired account request from %r in %s : %dd" % \ - (client_id, req_path, req_age_days)) - reason = 'failed to be verified and accepted within %d day limit' \ - % EXPIRE_REQ_DAYS - user_copy = True - admin_copy = True - if not reject_account_req(req_id, configuration, reason, - user_copy=user_copy, - admin_copy=admin_copy, - auth_type=auth_type): - logger.warning("failed to expire %s request from %r" % \ - (req_id, client_id)) - else: - logger.info("expired %s request from %r" % (req_id, client_id)) - handled += 1 - logger.debug("handled %d user account request action(s)" % handled) - return handled - -def handle_pending_requests(configuration, now=time.time()): - """Inspect various state dirs to remind or clean up stale requests. - Returns the number of actual actions taken for central throttle handling. - """ - handled = 0 - logger.debug("handle pending requests") - handled += manage_trivial_user_requests(configuration, now) - handled += remind_and_expire_user_pending(configuration, now) - # TODO: actually handle more requests like resources and peers - if handled > 0: - logger.info("handled %d pending requests" % handled) - else: - logger.debug("no pending requests") - return handled - -def handle_cache_updates(configuration, now=time.time()): - """Inspect internal cache update markers and handle any corresponding cache - updates in one place to avoid thrashing. - Returns the number of actual actions taken for central throttle handling. - """ - handled = 0 - logger.debug("handle pending cache updates") - # TODO: actually handle vgrid/user/resource/... cache updates - if handled > 0: - logger.info("handled %d pending cache updates" % handled) - else: - logger.debug("no pending state cleanups") - return handled - -def handle_janitor_tasks(configuration, now=time.time()): - """A wrapper to take care of all regular janitor tasks like clean up and - cache updates. - Returns the number of actual tasks completed to let the main thread know if - it should throttle down or continue next run right away. - """ - tasks_completed = 0 - logger.info("handle any pending janitor tasks") - if _lookup_last_run(configuration, 'state-cleanup') + SECS_PER_DAY < now: - tasks_completed += handle_state_cleanup(configuration, now) - _update_last_run(configuration, 'state-cleanup', now) - if _lookup_last_run(configuration, 'session-cleanup') + SECS_PER_HOUR < now: - tasks_completed += handle_session_cleanup(configuration, now) - _update_last_run(configuration, 'state-cleanup', now) - if _lookup_last_run(configuration, 'pending-reqs') + SECS_PER_MINUTE < now: - tasks_completed += handle_pending_requests(configuration, now) - _update_last_run(configuration, 'pending-reqs', now) - if _lookup_last_run(configuration, 'cache-updates') + SECS_PER_MINUTE < now: - tasks_completed += handle_cache_updates(configuration, now) - _update_last_run(configuration, 'cache-updates', now) - if tasks_completed > 0: - logger.info("handled %d janitor task(s)" % tasks_completed) - else: - logger.info("janitor found no pending tasks") - return tasks_completed if __name__ == "__main__": # Force no log init since we use separate logger @@ -446,14 +59,17 @@ def handle_janitor_tasks(configuration, now=time.time()): # Use separate logger - logger = daemon_logger("janitor", configuration.user_janitor_log, log_level) + logger = daemon_logger( + "janitor", configuration.user_janitor_log, log_level) configuration.logger = logger # Allow e.g. logrotate to force log re-open after rotates register_hangup_handler(configuration) # Allow clean shutdown on SIGINT only to main process - signal.signal(signal.SIGINT, stop_handler) + register_stop_handler(configuration) + + # TODO: add a signal handler to force run pending tasks right away if not configuration.site_enable_janitor: err_msg = "Janitor support is disabled in configuration!" @@ -476,7 +92,7 @@ def handle_janitor_tasks(configuration, now=time.time()): logger.debug("(%s) Starting main loop" % main_pid) print("%s: Start main loop" % os.getpid()) - while not stop_running.is_set(): + while not check_stop(): try: now = time.time() if handle_janitor_tasks(configuration, now) <= 0: @@ -484,7 +100,7 @@ def handle_janitor_tasks(configuration, now=time.time()): else: time.sleep(SHORT_THROTTLE_SECS) except KeyboardInterrupt: - stop_running.set() + stop_running() # NOTE: we can't be sure if SIGINT was sent to only main process # so we make sure to propagate to monitor child print("Interrupt requested - shutdown") diff --git a/tests/test_mig_lib_daemon.py b/tests/test_mig_lib_daemon.py new file mode 100644 index 000000000..7d9627ff2 --- /dev/null +++ b/tests/test_mig_lib_daemon.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# test_mig_lib_daemon - unit test of the corresponding mig lib module +# Copyright (C) 2003-2025 The MiG Project by the Science HPC Center at UCPH +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. +# +# --- END_HEADER --- +# + +"""Unit tests for the migrid module pointed to in the filename""" + +import signal +import time + +from tests.support import MigTestCase + +from mig.lib.daemon import check_stop, register_stop_handler, stop_running + + +class MigLibDaemon(MigTestCase): + """Unit tests for daemon related helper functions""" + + def test_register_stop_handler(self): + """Register a stop handler and verify it can be used to mark stop""" + + # We don't actually need a configuration here so just pass None + configuration = None + # It's easier to test with alarm than the usual interrupt signal + register_stop_handler(configuration, stop_signal=signal.SIGALRM) + self.assertFalse(check_stop()) + signal.alarm(1) + time.sleep(1) + self.assertTrue(check_stop) diff --git a/tests/test_mig_lib_janitor.py b/tests/test_mig_lib_janitor.py new file mode 100644 index 000000000..6f33a1365 --- /dev/null +++ b/tests/test_mig_lib_janitor.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# test_mig_lib_janitor - unit test of the corresponding mig lib module +# Copyright (C) 2003-2025 The MiG Project by the Science HPC Center at UCPH +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. +# +# --- END_HEADER --- +# + +"""Unit tests for the migrid module pointed to in the filename""" + +import time + +from tests.support import MigTestCase, FakeConfiguration + +from mig.lib.janitor import task_triggers, _lookup_last_run, _update_last_run + + +class MigLibJanitor(MigTestCase): + """Unit tests for janitor related helper functions""" + + def test_last_run_bookkeeping(self): + """Register a last run timestamp and check it""" + expect = -1 + stamp = _lookup_last_run(self.configuration, 'janitor_task') + self.assertEqual(stamp, expect) + expect = 42 + stamp = _update_last_run(self.configuration, 'janitor_task', expect) + self.assertEqual(stamp, expect) + expect = time.time() + stamp = _update_last_run(self.configuration, 'janitor_task', expect) + self.assertEqual(stamp, expect) From 727024c100edb4b0269f280f62fb50f70238051c Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Sun, 7 Sep 2025 12:51:33 +0200 Subject: [PATCH 08/17] Move grid_janitor to the new ./sbin location for modern clean services. --- mig/server/grid_janitor.py | 116 +------------------------------------ sbin/grid_janitor.py | 115 ++++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 115 deletions(-) mode change 100755 => 120000 mig/server/grid_janitor.py create mode 100755 sbin/grid_janitor.py diff --git a/mig/server/grid_janitor.py b/mig/server/grid_janitor.py deleted file mode 100755 index eb3bdb2d8..000000000 --- a/mig/server/grid_janitor.py +++ /dev/null @@ -1,115 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# -# --- BEGIN_HEADER --- -# -# grid_janitor - daemon to handle recurring tasks like clean up and updates -# Copyright (C) 2003-2025 The MiG Project by the Science HPC Center at UCPH -# -# This file is part of MiG. -# -# MiG is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# MiG is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -# -# -- END_HEADER --- -# - -"""Daemon to take care of various recurring tasks like clean up, cache updates -and pruning of pending requests. -""" - -from __future__ import absolute_import, print_function - -import os -import sys -import time - -from mig.lib.daemon import stop_running, check_stop, register_stop_handler -from mig.lib.janitor import handle_janitor_tasks -from mig.shared.conf import get_configuration_object -from mig.shared.logger import daemon_logger, register_hangup_handler - -# TODO: adjust short to subsecond and long to e.g a minute for production use -# SHORT_THROTTLE_SECS = 0.5 -# LONG_THROTTLE_SECS = 60.0 -SHORT_THROTTLE_SECS = 5.0 -LONG_THROTTLE_SECS = 30.0 - -(configuration, logger) = (None, None) - - -if __name__ == "__main__": - # Force no log init since we use separate logger - configuration = get_configuration_object(skip_log=True) - - log_level = configuration.loglevel - if sys.argv[1:] and sys.argv[1] in ["debug", "info", "warning", "error"]: - log_level = sys.argv[1] - - # Use separate logger - - logger = daemon_logger( - "janitor", configuration.user_janitor_log, log_level) - configuration.logger = logger - - # Allow e.g. logrotate to force log re-open after rotates - register_hangup_handler(configuration) - - # Allow clean shutdown on SIGINT only to main process - register_stop_handler(configuration) - - # TODO: add a signal handler to force run pending tasks right away - - if not configuration.site_enable_janitor: - err_msg = "Janitor support is disabled in configuration!" - logger.error(err_msg) - print(err_msg) - sys.exit(1) - - print( - """This is the MiG janitor daemon which cleans up stale state data, -updates internal caches and prunes pending requests. - -Set the MIG_CONF environment to the server configuration path -unless it is available in mig/server/MiGserver.conf -""" - ) - - main_pid = os.getpid() - print("Starting janitor daemon - Ctrl-C to quit") - logger.info("(%s) Starting Janitor daemon" % main_pid) - - logger.debug("(%s) Starting main loop" % main_pid) - print("%s: Start main loop" % os.getpid()) - while not check_stop(): - try: - now = time.time() - if handle_janitor_tasks(configuration, now) <= 0: - time.sleep(LONG_THROTTLE_SECS) - else: - time.sleep(SHORT_THROTTLE_SECS) - except KeyboardInterrupt: - stop_running() - # NOTE: we can't be sure if SIGINT was sent to only main process - # so we make sure to propagate to monitor child - print("Interrupt requested - shutdown") - except Exception as exc: - logger.error( - "(%s) Caught unexpected exception: %s" % (os.getpid(), exc) - ) - - print("Janitor daemon shutting down") - logger.info("(%s) Janitor daemon shutting down" % main_pid) - - sys.exit(0) diff --git a/mig/server/grid_janitor.py b/mig/server/grid_janitor.py new file mode 120000 index 000000000..bbe3cd402 --- /dev/null +++ b/mig/server/grid_janitor.py @@ -0,0 +1 @@ +../../sbin/grid_janitor.py \ No newline at end of file diff --git a/sbin/grid_janitor.py b/sbin/grid_janitor.py new file mode 100755 index 000000000..eb3bdb2d8 --- /dev/null +++ b/sbin/grid_janitor.py @@ -0,0 +1,115 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# grid_janitor - daemon to handle recurring tasks like clean up and updates +# Copyright (C) 2003-2025 The MiG Project by the Science HPC Center at UCPH +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# -- END_HEADER --- +# + +"""Daemon to take care of various recurring tasks like clean up, cache updates +and pruning of pending requests. +""" + +from __future__ import absolute_import, print_function + +import os +import sys +import time + +from mig.lib.daemon import stop_running, check_stop, register_stop_handler +from mig.lib.janitor import handle_janitor_tasks +from mig.shared.conf import get_configuration_object +from mig.shared.logger import daemon_logger, register_hangup_handler + +# TODO: adjust short to subsecond and long to e.g a minute for production use +# SHORT_THROTTLE_SECS = 0.5 +# LONG_THROTTLE_SECS = 60.0 +SHORT_THROTTLE_SECS = 5.0 +LONG_THROTTLE_SECS = 30.0 + +(configuration, logger) = (None, None) + + +if __name__ == "__main__": + # Force no log init since we use separate logger + configuration = get_configuration_object(skip_log=True) + + log_level = configuration.loglevel + if sys.argv[1:] and sys.argv[1] in ["debug", "info", "warning", "error"]: + log_level = sys.argv[1] + + # Use separate logger + + logger = daemon_logger( + "janitor", configuration.user_janitor_log, log_level) + configuration.logger = logger + + # Allow e.g. logrotate to force log re-open after rotates + register_hangup_handler(configuration) + + # Allow clean shutdown on SIGINT only to main process + register_stop_handler(configuration) + + # TODO: add a signal handler to force run pending tasks right away + + if not configuration.site_enable_janitor: + err_msg = "Janitor support is disabled in configuration!" + logger.error(err_msg) + print(err_msg) + sys.exit(1) + + print( + """This is the MiG janitor daemon which cleans up stale state data, +updates internal caches and prunes pending requests. + +Set the MIG_CONF environment to the server configuration path +unless it is available in mig/server/MiGserver.conf +""" + ) + + main_pid = os.getpid() + print("Starting janitor daemon - Ctrl-C to quit") + logger.info("(%s) Starting Janitor daemon" % main_pid) + + logger.debug("(%s) Starting main loop" % main_pid) + print("%s: Start main loop" % os.getpid()) + while not check_stop(): + try: + now = time.time() + if handle_janitor_tasks(configuration, now) <= 0: + time.sleep(LONG_THROTTLE_SECS) + else: + time.sleep(SHORT_THROTTLE_SECS) + except KeyboardInterrupt: + stop_running() + # NOTE: we can't be sure if SIGINT was sent to only main process + # so we make sure to propagate to monitor child + print("Interrupt requested - shutdown") + except Exception as exc: + logger.error( + "(%s) Caught unexpected exception: %s" % (os.getpid(), exc) + ) + + print("Janitor daemon shutting down") + logger.info("(%s) Janitor daemon shutting down" % main_pid) + + sys.exit(0) From 2b35b47535fbd8de20146af812004491d90c7c91 Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Sun, 7 Sep 2025 14:28:30 +0200 Subject: [PATCH 09/17] Style and format only changes. Applied `black $TARGET` and `isort --line-length=80 -m=HANGING_INDENT $TARGET` to each file in the new sbin/ and mig/lib/ dirs for cleanliness. --- mig/lib/janitor.py | 293 ++++++++++++++++++++++++++----------------- sbin/grid_janitor.py | 5 +- 2 files changed, 180 insertions(+), 118 deletions(-) diff --git a/mig/lib/janitor.py b/mig/lib/janitor.py index e8abd7251..b0ade16af 100644 --- a/mig/lib/janitor.py +++ b/mig/lib/janitor.py @@ -26,7 +26,7 @@ # """Helpers for the janitor service which takes care of various recurring tasks - like clean up, cache updates and pruning of pending requests. +like clean up, cache updates and pruning of pending requests. """ from __future__ import absolute_import, print_function @@ -38,7 +38,7 @@ from mig.shared.accountreq import accept_account_req, existing_user_collision, \ reject_account_req from mig.shared.base import get_user_id -from mig.shared.fileio import listdir, delete_file +from mig.shared.fileio import delete_file, listdir from mig.shared.pwcrypto import verify_reset_token from mig.shared.serial import load from mig.shared.userdb import default_db_path, load_user_dict @@ -60,6 +60,7 @@ # TODO: add a signal handler to force run pending tasks right away + def _lookup_last_run(configuration, target): """Check if target task is pending using internal accounting for task. Returns the timestamp when the task was last run in UN*X epoch. @@ -85,8 +86,14 @@ def _update_last_run(configuration, target, stamp): return task_triggers[target] -def _clean_stale_state_files(configuration, target_dir, filename_patterns, - expire_days, now, include_dotfiles=False): +def _clean_stale_state_files( + configuration, + target_dir, + filename_patterns, + expire_days, + now, + include_dotfiles=False, +): """Inspect and clean up stale state files matching any of filename_pattern in target_dir if they are at least expire_days old. Where filename_pattern is a list of wildcard strings checked with fnmatch. Dot-files are excluded @@ -95,11 +102,13 @@ def _clean_stale_state_files(configuration, target_dir, filename_patterns, """ _logger = configuration.logger handled = 0 - _logger.debug("clean files matching %r in %r if older than %dd" % - (filename_patterns, target_dir, expire_days)) + _logger.debug( + "clean files matching %r in %r if older than %dd" + % (filename_patterns, target_dir, expire_days) + ) for filename in listdir(target_dir): tmp_path = os.path.join(target_dir, filename) - if not include_dotfiles and filename.startswith('.'): + if not include_dotfiles and filename.startswith("."): continue if os.path.isdir(tmp_path): continue @@ -111,11 +120,15 @@ def _clean_stale_state_files(configuration, target_dir, filename_patterns, else: continue tmp_age_days = tmp_age / SECS_PER_DAY - _logger.debug("found state file %r of age %ds / %dd" % - (tmp_path, tmp_age, tmp_age_days)) + _logger.debug( + "found state file %r of age %ds / %dd" + % (tmp_path, tmp_age, tmp_age_days) + ) if tmp_age_days > expire_days: - _logger.info("remove stale tmp file in %r : %dd" % (tmp_path, - tmp_age_days)) + _logger.info( + "remove stale tmp file in %r : %dd" + % (tmp_path, tmp_age_days) + ) if not delete_file(tmp_path, _logger): _logger.error("failed to remove stale file %r" % tmp_path) handled += 1 @@ -127,48 +140,64 @@ def clean_mig_system_files(configuration, now=time.time()): """Inspect and clean up stale state files in mig_system_run. Returns the number of actual actions taken for central throttle handling. """ - return _clean_stale_state_files(configuration, - configuration.mig_system_files, - ['tmp*', 'no_grid_jobs*'], - EXPIRE_STATE_DAYS, now) + return _clean_stale_state_files( + configuration, + configuration.mig_system_files, + ["tmp*", "no_grid_jobs*"], + EXPIRE_STATE_DAYS, + now, + ) def clean_sessid_to_mrls_link_home(configuration, now=time.time()): """Inspect and clean up stale state files in sessid_to_mrsl_link_home. Returns the number of actual actions taken for central throttle handling. """ - return _clean_stale_state_files(configuration, - configuration.sessid_to_mrsl_link_home, - ['*'], EXPIRE_STATE_DAYS, now) + return _clean_stale_state_files( + configuration, + configuration.sessid_to_mrsl_link_home, + ["*"], + EXPIRE_STATE_DAYS, + now, + ) def clean_webserver_home(configuration, now=time.time()): """Inspect and clean up stale state files in webserver_home. Returns the number of actual actions taken for central throttle handling. """ - return _clean_stale_state_files(configuration, - configuration.webserver_home, - ['*'], EXPIRE_STATE_DAYS, now) + return _clean_stale_state_files( + configuration, + configuration.webserver_home, + ["*"], + EXPIRE_STATE_DAYS, + now, + ) def clean_no_job_helpers(configuration, now=time.time()): """Inspect and clean up stale state empty job helpers inside user_home. Returns the number of actual actions taken for central throttle handling. """ - dummy_job_path = os.path.join(configuration.user_home, - 'no_grid_jobs_in_grid_scheduler') - return _clean_stale_state_files(configuration, - dummy_job_path, - ['*'], EXPIRE_DUMMY_JOBS_DAYS, now) + dummy_job_path = os.path.join( + configuration.user_home, "no_grid_jobs_in_grid_scheduler" + ) + return _clean_stale_state_files( + configuration, dummy_job_path, ["*"], EXPIRE_DUMMY_JOBS_DAYS, now + ) def clean_twofactor_sessions(configuration, now=time.time()): """Inspect and clean up stale state files in twofactor_home. Returns the number of actual actions taken for central throttle handling. """ - return _clean_stale_state_files(configuration, - configuration.twofactor_home, - ['*'], EXPIRE_TWOFACTOR_DAYS, now) + return _clean_stale_state_files( + configuration, + configuration.twofactor_home, + ["*"], + EXPIRE_TWOFACTOR_DAYS, + now, + ) def handle_state_cleanup(configuration, now=time.time()): @@ -218,88 +247,112 @@ def manage_single_req(configuration, req_id, req_path, db_path, now): req_dict = load(req_path) client_id = get_user_id(configuration, req_dict) # NOTE: use timestamp from saved request file if available - req_timestamp = req_dict.get('accepted_terms', now) - req_expire = req_dict.get('expire', now) + req_timestamp = req_dict.get("accepted_terms", now) + req_expire = req_dict.get("expire", now) user_dict = load_user_dict(_logger, client_id, db_path) - req_invalid = req_dict.get('invalid', None) - reset_token = req_dict.get('reset_token', '') - req_auth = req_dict.get('auth', ['migoid'])[-1] - auth_type = req_auth.lstrip('mig').lstrip('ext') + req_invalid = req_dict.get("invalid", None) + reset_token = req_dict.get("reset_token", "") + req_auth = req_dict.get("auth", ["migoid"])[-1] + auth_type = req_auth.lstrip("mig").lstrip("ext") user_copy = True admin_copy = True default_renew = False if req_invalid: _logger.info("%r made an invalid account request" % client_id) # NOTE: 'invalid' is a list of validation error strings if set - reason = 'invalid request: %s.' % '. '.join(req_invalid) - if not reject_account_req(req_id, configuration, reason, - user_copy=user_copy, - admin_copy=admin_copy, - auth_type=auth_type): - _logger.warning("failed to reject invalid %r account request" - % client_id) + reason = "invalid request: %s." % ". ".join(req_invalid) + if not reject_account_req( + req_id, + configuration, + reason, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type, + ): + _logger.warning( + "failed to reject invalid %r account request" % client_id + ) else: - _logger.info("rejected invalid %r account request" % - client_id) + _logger.info("rejected invalid %r account request" % client_id) elif reset_token: - valid_reset = verify_reset_token(configuration, - user_dict, - reset_token, - req_auth, - req_timestamp) + valid_reset = verify_reset_token( + configuration, user_dict, reset_token, req_auth, req_timestamp + ) if valid_reset: - _logger.info("%r requested and authorized password reset" % - client_id) - peer_id = user_dict.get('peers', [None])[0] - if not accept_account_req(req_id, configuration, peer_id, - user_copy=user_copy, - admin_copy=admin_copy, - auth_type=auth_type, - default_renew=default_renew): - _logger.warning("failed to accept %r password reset" % - client_id) + _logger.info( + "%r requested and authorized password reset" % client_id + ) + peer_id = user_dict.get("peers", [None])[0] + if not accept_account_req( + req_id, + configuration, + peer_id, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type, + default_renew=default_renew, + ): + _logger.warning( + "failed to accept %r password reset" % client_id + ) else: _logger.info("accepted %r password reset" % client_id) else: - _logger.warning("%r requested password reset with bad token" - % client_id) - reason = 'invalid password reset token' - if not reject_account_req(req_id, configuration, reason, - user_copy=user_copy, - admin_copy=admin_copy, - auth_type=auth_type): - _logger.warning("failed to reject %r password reset" % - client_id) + _logger.warning( + "%r requested password reset with bad token" % client_id + ) + reason = "invalid password reset token" + if not reject_account_req( + req_id, + configuration, + reason, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type, + ): + _logger.warning( + "failed to reject %r password reset" % client_id + ) else: _logger.info("rejected %r password reset" % client_id) elif req_expire < now: # NOTE: probably should no longer happen after initial auto clean _logger.warning("%r request is now past expire" % client_id) - reason = 'expired request - please re-request if still relevant' - if not reject_account_req(req_id, configuration, reason, - user_copy=user_copy, - admin_copy=admin_copy, - auth_type=auth_type): + reason = "expired request - please re-request if still relevant" + if not reject_account_req( + req_id, + configuration, + reason, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type, + ): _logger.warning("failed to reject expired %r request" % client_id) else: _logger.info("rejected %r request now past expire" % client_id) elif existing_user_collision(configuration, req_dict, client_id): - _logger.warning('ID collision in request from %r' % client_id) - reason = 'ID collision - please re-request with *existing* ID fields' - if not reject_account_req(req_id, configuration, reason, - user_copy=user_copy, - admin_copy=admin_copy, - auth_type=auth_type): - _logger.warning("failed to reject %r request with ID collision" % - client_id) + _logger.warning("ID collision in request from %r" % client_id) + reason = "ID collision - please re-request with *existing* ID fields" + if not reject_account_req( + req_id, + configuration, + reason, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type, + ): + _logger.warning( + "failed to reject %r request with ID collision" % client_id + ) else: _logger.info("rejected %r request with ID collision" % client_id) elif user_dict: _logger.info("%r requested access renewal" % client_id) # TODO: renew if trivial with existing valid peer else: - _logger.info("%r requested a new account requiring operator" % - client_id) + _logger.info( + "%r requested a new account requiring operator" % client_id + ) def manage_trivial_user_requests(configuration, now=time.time()): @@ -318,21 +371,21 @@ def manage_trivial_user_requests(configuration, now=time.time()): db_path = default_db_path(configuration) # TODO: handle duplicate requests here, too? for filename in listdir(configuration.user_pending): - if filename.startswith('.'): + if filename.startswith("."): continue req_id = filename req_path = os.path.join(configuration.user_pending, req_id) - _logger.debug( - "checking if account request in %r is trivial" % req_path) + _logger.debug("checking if account request in %r is trivial" % req_path) req_age = now - os.path.getmtime(req_path) req_age_minutes = req_age / SECS_PER_MINUTE if req_age_minutes > MANAGE_TRIVIAL_REQ_MINUTES: - _logger.info("found pending account request in %r : %dm" % - (req_path, req_age_minutes)) + _logger.info( + "found pending account request in %r : %dm" + % (req_path, req_age_minutes) + ) manage_single_req(configuration, req_id, req_path, db_path, now) handled += 1 - _logger.debug( - "handled %d trivial user account request action(s)" % handled) + _logger.debug("handled %d trivial user account request action(s)" % handled) return handled @@ -345,7 +398,7 @@ def remind_and_expire_user_pending(configuration, now=time.time()): handled = 0 now = time.time() for filename in listdir(configuration.user_pending): - if filename.startswith('.'): + if filename.startswith("."): continue req_id = filename req_path = os.path.join(configuration.user_pending, req_id) @@ -354,30 +407,40 @@ def remind_and_expire_user_pending(configuration, now=time.time()): req_age_days = req_age / SECS_PER_DAY req_dict = load(req_path) client_id = get_user_id(configuration, req_dict) - req_auth = req_dict.get('auth', ['migoid'])[-1] - auth_type = req_auth.lstrip('mig').lstrip('ext') + req_auth = req_dict.get("auth", ["migoid"])[-1] + auth_type = req_auth.lstrip("mig").lstrip("ext") if req_age_days > REMIND_REQ_DAYS: - _logger.info("found stale account request in %r : %dd" % - (req_path, req_age_days)) + _logger.info( + "found stale account request in %r : %dd" + % (req_path, req_age_days) + ) # TODO: actually remind operator and user that request is pending # ... possibly with copy to peers if pending acceptance. handled += 1 if req_age_days > EXPIRE_REQ_DAYS: - _logger.info("found expired account request from %r in %s : %dd" % - (client_id, req_path, req_age_days)) - reason = 'failed to be verified and accepted within %d day limit' \ - % EXPIRE_REQ_DAYS + _logger.info( + "found expired account request from %r in %s : %dd" + % (client_id, req_path, req_age_days) + ) + reason = ( + "failed to be verified and accepted within %d day limit" + % EXPIRE_REQ_DAYS + ) user_copy = True admin_copy = True - if not reject_account_req(req_id, configuration, reason, - user_copy=user_copy, - admin_copy=admin_copy, - auth_type=auth_type): - _logger.warning("failed to expire %s request from %r" % - (req_id, client_id)) + if not reject_account_req( + req_id, + configuration, + reason, + user_copy=user_copy, + admin_copy=admin_copy, + auth_type=auth_type, + ): + _logger.warning( + "failed to expire %s request from %r" % (req_id, client_id) + ) else: - _logger.info("expired %s request from %r" % - (req_id, client_id)) + _logger.info("expired %s request from %r" % (req_id, client_id)) handled += 1 _logger.debug("handled %d user account request action(s)" % handled) return handled @@ -425,18 +488,18 @@ def handle_janitor_tasks(configuration, now=time.time()): _logger = configuration.logger tasks_completed = 0 _logger.info("handle any pending janitor tasks") - if _lookup_last_run(configuration, 'state-cleanup') + SECS_PER_DAY < now: + if _lookup_last_run(configuration, "state-cleanup") + SECS_PER_DAY < now: tasks_completed += handle_state_cleanup(configuration, now) - _update_last_run(configuration, 'state-cleanup', now) - if _lookup_last_run(configuration, 'session-cleanup') + SECS_PER_HOUR < now: + _update_last_run(configuration, "state-cleanup", now) + if _lookup_last_run(configuration, "session-cleanup") + SECS_PER_HOUR < now: tasks_completed += handle_session_cleanup(configuration, now) - _update_last_run(configuration, 'state-cleanup', now) - if _lookup_last_run(configuration, 'pending-reqs') + SECS_PER_MINUTE < now: + _update_last_run(configuration, "state-cleanup", now) + if _lookup_last_run(configuration, "pending-reqs") + SECS_PER_MINUTE < now: tasks_completed += handle_pending_requests(configuration, now) - _update_last_run(configuration, 'pending-reqs', now) - if _lookup_last_run(configuration, 'cache-updates') + SECS_PER_MINUTE < now: + _update_last_run(configuration, "pending-reqs", now) + if _lookup_last_run(configuration, "cache-updates") + SECS_PER_MINUTE < now: tasks_completed += handle_cache_updates(configuration, now) - _update_last_run(configuration, 'cache-updates', now) + _update_last_run(configuration, "cache-updates", now) if tasks_completed > 0: _logger.info("handled %d janitor task(s)" % tasks_completed) else: diff --git a/sbin/grid_janitor.py b/sbin/grid_janitor.py index eb3bdb2d8..135a21fed 100755 --- a/sbin/grid_janitor.py +++ b/sbin/grid_janitor.py @@ -35,7 +35,7 @@ import sys import time -from mig.lib.daemon import stop_running, check_stop, register_stop_handler +from mig.lib.daemon import check_stop, register_stop_handler, stop_running from mig.lib.janitor import handle_janitor_tasks from mig.shared.conf import get_configuration_object from mig.shared.logger import daemon_logger, register_hangup_handler @@ -59,8 +59,7 @@ # Use separate logger - logger = daemon_logger( - "janitor", configuration.user_janitor_log, log_level) + logger = daemon_logger("janitor", configuration.user_janitor_log, log_level) configuration.logger = logger # Allow e.g. logrotate to force log re-open after rotates From eaf98d2d1ad14ccbe5c6146a98f2d9caf266f6c7 Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Wed, 24 Sep 2025 17:21:03 +0200 Subject: [PATCH 10/17] Minor polish in preparation for review and merge of the basic janitor functionality to ease support load. --- mig/install/MiGserver-template.conf | 2 +- mig/lib/daemon.py | 7 +++++-- mig/lib/janitor.py | 17 ++++++++++------- tests/fixture/confs-stdlocal/MiGserver.conf | 2 +- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/mig/install/MiGserver-template.conf b/mig/install/MiGserver-template.conf index d8dec4417..3f02388a5 100644 --- a/mig/install/MiGserver-template.conf +++ b/mig/install/MiGserver-template.conf @@ -646,7 +646,7 @@ enable_notify = __ENABLE_NOTIFY__ enable_imnotify = __ENABLE_IMNOTIFY__ # Enable users to schedule tasks with a cron/at-like interface enable_crontab = __ENABLE_CRONTAB__ -# Enable janitor servide to handel recurring tasks like clean up and cache updates +# Enable janitor service to handle recurring tasks like clean up and cache updates enable_janitor = __ENABLE_JANITOR__ # Enable 2FA for web access and IO services with any TOTP authenticator client # IMPORTANT: Do NOT change this option manually here (requires Apache changes)! diff --git a/mig/lib/daemon.py b/mig/lib/daemon.py index 5b43684ad..8b1377316 100644 --- a/mig/lib/daemon.py +++ b/mig/lib/daemon.py @@ -20,7 +20,8 @@ # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. # # -- END_HEADER --- # @@ -39,7 +40,9 @@ def stop_running(): def check_stop(): - """A simple test to see if stop marker was set after some signal was received""" + """A simple test to see if stop marker was set after some signal was + received. + """ return _stop_event.is_set() diff --git a/mig/lib/janitor.py b/mig/lib/janitor.py index b0ade16af..f05e02452 100644 --- a/mig/lib/janitor.py +++ b/mig/lib/janitor.py @@ -20,7 +20,8 @@ # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. # # -- END_HEADER --- # @@ -375,7 +376,8 @@ def manage_trivial_user_requests(configuration, now=time.time()): continue req_id = filename req_path = os.path.join(configuration.user_pending, req_id) - _logger.debug("checking if account request in %r is trivial" % req_path) + _logger.debug("checking if account request in %r is trivial" % + req_path) req_age = now - os.path.getmtime(req_path) req_age_minutes = req_age / SECS_PER_MINUTE if req_age_minutes > MANAGE_TRIVIAL_REQ_MINUTES: @@ -385,7 +387,8 @@ def manage_trivial_user_requests(configuration, now=time.time()): ) manage_single_req(configuration, req_id, req_path, db_path, now) handled += 1 - _logger.debug("handled %d trivial user account request action(s)" % handled) + _logger.debug("handled %d trivial user account request action(s)" % + handled) return handled @@ -436,11 +439,11 @@ def remind_and_expire_user_pending(configuration, now=time.time()): admin_copy=admin_copy, auth_type=auth_type, ): - _logger.warning( - "failed to expire %s request from %r" % (req_id, client_id) - ) + _logger.warning("failed to expire %s request from %r" % + (req_id, client_id)) else: - _logger.info("expired %s request from %r" % (req_id, client_id)) + _logger.info("expired %s request from %r" % (req_id, + client_id)) handled += 1 _logger.debug("handled %d user account request action(s)" % handled) return handled diff --git a/tests/fixture/confs-stdlocal/MiGserver.conf b/tests/fixture/confs-stdlocal/MiGserver.conf index e1786473c..9520491ab 100644 --- a/tests/fixture/confs-stdlocal/MiGserver.conf +++ b/tests/fixture/confs-stdlocal/MiGserver.conf @@ -646,7 +646,7 @@ enable_notify = False enable_imnotify = False # Enable users to schedule tasks with a cron/at-like interface enable_crontab = True -# Enable janitor servide to handel recurring tasks like clean up and cache updates +# Enable janitor service to handle recurring tasks like clean up and cache updates enable_janitor = False # Enable 2FA for web access and IO services with any TOTP authenticator client # IMPORTANT: Do NOT change this option manually here (requires Apache changes)! From 3de9b3ad84f03ea151a3ae8ea4073e00d2974260 Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Thu, 25 Sep 2025 11:18:45 +0200 Subject: [PATCH 11/17] Polish docstring for `early_validation_checks` and fix a inadvertent tuple init of the `title_entry['container_class']`. --- mig/shared/accountreq.py | 13 +++++++++---- mig/shared/functionality/migadmin.py | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/mig/shared/accountreq.py b/mig/shared/accountreq.py index 5356ccfba..7a24e5dc5 100644 --- a/mig/shared/accountreq.py +++ b/mig/shared/accountreq.py @@ -1196,10 +1196,15 @@ def existing_user_collision(configuration, raw_request, client_id): def early_validation_checks(configuration, raw_request, service, username, password): - """Early validation checks including e.g. password check when change is not - authorized. Useful to allow janitor to request invalid requests with - sufficient delay to render various user enumeration, email and password - guessing scenarios infeasible""" + """Early validation checks including e.g. ID collision and password check + in requests where change is not authorized. Only marks such requests + invalid on disk and leaves the decision for the operator/janitor to reject + clearly invalid requests upon next inspection. This setup is on purpose to + introduce sufficient delay to render various user enumeration, email and + password guessing scenarios infeasible. + The Server Admin page highlights invalid requests based on the saved + information to aid operators decide further actions. + """ logger = configuration.logger # NOTE: carefully avoid single quotes in text here to avoid js quote errors illegal_pw_change = """invalid password in renewal request. diff --git a/mig/shared/functionality/migadmin.py b/mig/shared/functionality/migadmin.py index af09f2852..c6658db3c 100755 --- a/mig/shared/functionality/migadmin.py +++ b/mig/shared/functionality/migadmin.py @@ -142,7 +142,7 @@ def main(client_id, user_arguments_dict, environ=None): ''' % configuration.sleep_secs title_entry = find_entry(output_objects, 'title') title_entry['text'] = '%s administration panel' % configuration.short_title - title_entry['container_class'] = 'fillwidth', + title_entry['container_class'] = 'fillwidth' title_entry['meta'] = meta # jquery support for tablesorter and confirmation on "remove" From ca003c5f2695b166e79e6e8370dd5ac111eab631 Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Thu, 25 Sep 2025 12:41:02 +0200 Subject: [PATCH 12/17] Add a simple interruptible sleep action for daemons to support the use case of triggering next run in otherwise idle daemons. --- mig/lib/daemon.py | 52 ++++++++++++++++++++++++++- tests/test_mig_lib_daemon.py | 69 ++++++++++++++++++++++++++++++++++-- 2 files changed, 117 insertions(+), 4 deletions(-) diff --git a/mig/lib/daemon.py b/mig/lib/daemon.py index 8b1377316..b69e00cee 100644 --- a/mig/lib/daemon.py +++ b/mig/lib/daemon.py @@ -30,8 +30,57 @@ import multiprocessing import signal +import time -_stop_event = multiprocessing.Event() +_run_event, _stop_event = multiprocessing.Event(), multiprocessing.Event() + + +def _reset_event(evt): + """A simple helper to reset evt marker after it was set""" + return evt.clear() + + +def reset_run(): + """A simple helper to reset run marker after it was set""" + return _reset_event(_run_event) + + +def do_run(): + """A simple helper to set run marker after some signal was received""" + return _run_event.set() + + +def check_run(): + """A simple test to see if run marker was set after some signal was + received. + """ + return _run_event.is_set() + + +def run_handler(sig, frame): + """A simple signal handler to trigger run on continue signal in main""" + do_run() + + +def register_run_handler(configuration, run_signal=signal.SIGCONT): + """Set up run next handler to react on provided run_signal""" + reset_run() + signal.signal(run_signal, run_handler) + + +def interruptible_sleep(configuration, max_secs, break_checks, nap_secs=0.1): + """Idle loop for max_secs or until one or more break_checks succeed""" + assert max_secs >= nap_secs, "Invalid max_secs value smaller than nap_secs" + for _ in range(int(max_secs / nap_secs)): + for check in break_checks: + if check(): + return + time.sleep(nap_secs) + + +def reset_stop(): + """A simple helper to reset stop marker after it was set""" + return _reset_event(_stop_event) def stop_running(): @@ -55,4 +104,5 @@ def stop_handler(sig, frame): def register_stop_handler(configuration, stop_signal=signal.SIGINT): """Set up stop handler to react on provided stop_signal""" + reset_stop() signal.signal(stop_signal, stop_handler) diff --git a/tests/test_mig_lib_daemon.py b/tests/test_mig_lib_daemon.py index 7d9627ff2..120274ed7 100644 --- a/tests/test_mig_lib_daemon.py +++ b/tests/test_mig_lib_daemon.py @@ -32,14 +32,77 @@ from tests.support import MigTestCase -from mig.lib.daemon import check_stop, register_stop_handler, stop_running +from mig.lib.daemon import check_run, check_stop, do_run, interruptible_sleep, \ + register_run_handler, register_stop_handler, stop_running class MigLibDaemon(MigTestCase): """Unit tests for daemon related helper functions""" - def test_register_stop_handler(self): - """Register a stop handler and verify it can be used to mark stop""" + def test_register_run_handler_manual(self): + """Register a run handler and verify it can be manually overriden to + mark early run. + """ + + # We don't actually need a configuration here so just pass None + configuration = None + # It's easier to test with alarm than the usual interrupt signal + register_run_handler(configuration, run_signal=signal.SIGALRM) + self.assertFalse(check_run()) + signal.alarm(3) + time.sleep(1) + do_run() + self.assertTrue(check_run) + + def test_register_run_handler_signal(self): + """Register a run handler and verify it can be used to trigger run""" + + # We don't actually need a configuration here so just pass None + configuration = None + # It's easier to test with alarm than the usual interrupt signal + register_run_handler(configuration, run_signal=signal.SIGALRM) + self.assertFalse(check_run()) + signal.alarm(1) + time.sleep(1) + self.assertTrue(check_run) + + def test_interruptible_sleep(self): + """Register a run handler and verify it can be used for interruptible + sleep to let daemon be responsive when needed. + """ + + # We don't actually need a configuration here so just pass None + configuration = None + # It's easier to test with alarm than the usual interrupt signal + register_run_handler(configuration, run_signal=signal.SIGALRM) + self.assertFalse(check_run()) + max_secs = 4.2 + start = time.time() + signal.alarm(1) + interruptible_sleep(configuration, max_secs, (check_run, )) + self.assertTrue(check_run) + end = time.time() + self.assertTrue(end < start + max_secs) + + def test_register_stop_handler_manual(self): + """Register a stop handler and verify it can be manually overriden to + mark early stop. + """ + + # We don't actually need a configuration here so just pass None + configuration = None + # It's easier to test with alarm than the usual interrupt signal + register_stop_handler(configuration, stop_signal=signal.SIGALRM) + self.assertFalse(check_stop()) + signal.alarm(3) + time.sleep(1) + stop_running() + self.assertTrue(check_stop) + + def test_register_stop_handler_signal(self): + """Register a stop handler and verify it can be used to mark stop upon + receiving the signal registered. + """ # We don't actually need a configuration here so just pass None configuration = None From 879a63328063605b056632983126a7314a94ad4a Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Thu, 25 Sep 2025 12:53:53 +0200 Subject: [PATCH 13/17] Implement run handler and interruptible sleep in grid_janitor with the recently added daemon helpers for the purpose. Should also make the daemon a lot more responsive to the normal SIGINT shutdown signal. --- sbin/grid_janitor.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/sbin/grid_janitor.py b/sbin/grid_janitor.py index 135a21fed..e2d67cd3a 100755 --- a/sbin/grid_janitor.py +++ b/sbin/grid_janitor.py @@ -35,14 +35,13 @@ import sys import time -from mig.lib.daemon import check_stop, register_stop_handler, stop_running +from mig.lib.daemon import check_run, check_stop, interruptible_sleep, \ + register_run_handler, register_stop_handler, reset_run, stop_running from mig.lib.janitor import handle_janitor_tasks from mig.shared.conf import get_configuration_object from mig.shared.logger import daemon_logger, register_hangup_handler -# TODO: adjust short to subsecond and long to e.g a minute for production use -# SHORT_THROTTLE_SECS = 0.5 -# LONG_THROTTLE_SECS = 60.0 +# Delay after handling tasks and when nothing done respectively SHORT_THROTTLE_SECS = 5.0 LONG_THROTTLE_SECS = 30.0 @@ -59,12 +58,16 @@ # Use separate logger - logger = daemon_logger("janitor", configuration.user_janitor_log, log_level) + logger = daemon_logger("janitor", configuration.user_janitor_log, + log_level) configuration.logger = logger # Allow e.g. logrotate to force log re-open after rotates register_hangup_handler(configuration) + # Allow trigger next run on SIGCONT to main process + register_run_handler(configuration) + # Allow clean shutdown on SIGINT only to main process register_stop_handler(configuration) @@ -95,9 +98,12 @@ try: now = time.time() if handle_janitor_tasks(configuration, now) <= 0: - time.sleep(LONG_THROTTLE_SECS) + interruptible_sleep(configuration, LONG_THROTTLE_SECS, + (check_run, check_stop)) else: - time.sleep(SHORT_THROTTLE_SECS) + interruptible_sleep(configuration, SHORT_THROTTLE_SECS, + (check_run, check_stop)) + reset_run() except KeyboardInterrupt: stop_running() # NOTE: we can't be sure if SIGINT was sent to only main process From 6fa47a54ef1f60192572f95d613132966c708904 Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Thu, 25 Sep 2025 12:55:55 +0200 Subject: [PATCH 14/17] And remove the TODO comment about the latest commit. --- sbin/grid_janitor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sbin/grid_janitor.py b/sbin/grid_janitor.py index e2d67cd3a..08f0ce40c 100755 --- a/sbin/grid_janitor.py +++ b/sbin/grid_janitor.py @@ -71,8 +71,6 @@ # Allow clean shutdown on SIGINT only to main process register_stop_handler(configuration) - # TODO: add a signal handler to force run pending tasks right away - if not configuration.site_enable_janitor: err_msg = "Janitor support is disabled in configuration!" logger.error(err_msg) From 16a48207e78714efdd2c6287f485f4516dfc6b1b Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Thu, 25 Sep 2025 15:22:42 +0200 Subject: [PATCH 15/17] Another minimal comment change. --- mig/lib/janitor.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/mig/lib/janitor.py b/mig/lib/janitor.py index f05e02452..b3dd99bc7 100644 --- a/mig/lib/janitor.py +++ b/mig/lib/janitor.py @@ -59,9 +59,6 @@ task_triggers = {} -# TODO: add a signal handler to force run pending tasks right away - - def _lookup_last_run(configuration, target): """Check if target task is pending using internal accounting for task. Returns the timestamp when the task was last run in UN*X epoch. From 7fa8c3bcc09081717c2f3dcf62259a60d9e9df19 Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Fri, 26 Sep 2025 09:04:13 +0200 Subject: [PATCH 16/17] Actually check assertion on funtion *call* rather than on the always logically True function object itself. Thanks @rasmunk for catching and reporting that important little detail. --- tests/test_mig_lib_daemon.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_mig_lib_daemon.py b/tests/test_mig_lib_daemon.py index 120274ed7..835d0a758 100644 --- a/tests/test_mig_lib_daemon.py +++ b/tests/test_mig_lib_daemon.py @@ -52,7 +52,7 @@ def test_register_run_handler_manual(self): signal.alarm(3) time.sleep(1) do_run() - self.assertTrue(check_run) + self.assertTrue(check_run()) def test_register_run_handler_signal(self): """Register a run handler and verify it can be used to trigger run""" @@ -64,7 +64,7 @@ def test_register_run_handler_signal(self): self.assertFalse(check_run()) signal.alarm(1) time.sleep(1) - self.assertTrue(check_run) + self.assertTrue(check_run()) def test_interruptible_sleep(self): """Register a run handler and verify it can be used for interruptible @@ -80,7 +80,7 @@ def test_interruptible_sleep(self): start = time.time() signal.alarm(1) interruptible_sleep(configuration, max_secs, (check_run, )) - self.assertTrue(check_run) + self.assertTrue(check_run()) end = time.time() self.assertTrue(end < start + max_secs) @@ -97,7 +97,7 @@ def test_register_stop_handler_manual(self): signal.alarm(3) time.sleep(1) stop_running() - self.assertTrue(check_stop) + self.assertTrue(check_stop()) def test_register_stop_handler_signal(self): """Register a stop handler and verify it can be used to mark stop upon @@ -111,4 +111,4 @@ def test_register_stop_handler_signal(self): self.assertFalse(check_stop()) signal.alarm(1) time.sleep(1) - self.assertTrue(check_stop) + self.assertTrue(check_stop()) From 34b75d8e0e08a27a6d97ad8d9ba79d17836c08a0 Mon Sep 17 00:00:00 2001 From: Jonas Bardino Date: Fri, 26 Sep 2025 11:11:11 +0200 Subject: [PATCH 17/17] Address a couple of important PR322 review comments. Thanks @rasmunk --- mig/install/migrid-init.d-rh-template | 1 + mig/lib/janitor.py | 48 ++++++++++++++----- tests/fixture/confs-stdlocal/migrid-init.d-rh | 1 + 3 files changed, 38 insertions(+), 12 deletions(-) diff --git a/mig/install/migrid-init.d-rh-template b/mig/install/migrid-init.d-rh-template index ab7cf763f..c4bfad648 100755 --- a/mig/install/migrid-init.d-rh-template +++ b/mig/install/migrid-init.d-rh-template @@ -388,6 +388,7 @@ start_all() { return 0 } +# TODO: should we use PID_FILE in all stop_X handlers like in deb version? stop_script() { check_enabled "jobs" || return DAEMON_PATH=${MIG_SCRIPT} diff --git a/mig/lib/janitor.py b/mig/lib/janitor.py index b3dd99bc7..f7935fc48 100644 --- a/mig/lib/janitor.py +++ b/mig/lib/janitor.py @@ -134,10 +134,12 @@ def _clean_stale_state_files( return handled -def clean_mig_system_files(configuration, now=time.time()): +def clean_mig_system_files(configuration, now=None): """Inspect and clean up stale state files in mig_system_run. Returns the number of actual actions taken for central throttle handling. """ + if now is None: + now = time.time() return _clean_stale_state_files( configuration, configuration.mig_system_files, @@ -147,10 +149,12 @@ def clean_mig_system_files(configuration, now=time.time()): ) -def clean_sessid_to_mrls_link_home(configuration, now=time.time()): +def clean_sessid_to_mrls_link_home(configuration, now=None): """Inspect and clean up stale state files in sessid_to_mrsl_link_home. Returns the number of actual actions taken for central throttle handling. """ + if now is None: + now = time.time() return _clean_stale_state_files( configuration, configuration.sessid_to_mrsl_link_home, @@ -160,10 +164,12 @@ def clean_sessid_to_mrls_link_home(configuration, now=time.time()): ) -def clean_webserver_home(configuration, now=time.time()): +def clean_webserver_home(configuration, now=None): """Inspect and clean up stale state files in webserver_home. Returns the number of actual actions taken for central throttle handling. """ + if now is None: + now = time.time() return _clean_stale_state_files( configuration, configuration.webserver_home, @@ -173,10 +179,12 @@ def clean_webserver_home(configuration, now=time.time()): ) -def clean_no_job_helpers(configuration, now=time.time()): +def clean_no_job_helpers(configuration, now=None): """Inspect and clean up stale state empty job helpers inside user_home. Returns the number of actual actions taken for central throttle handling. """ + if now is None: + now = time.time() dummy_job_path = os.path.join( configuration.user_home, "no_grid_jobs_in_grid_scheduler" ) @@ -185,10 +193,12 @@ def clean_no_job_helpers(configuration, now=time.time()): ) -def clean_twofactor_sessions(configuration, now=time.time()): +def clean_twofactor_sessions(configuration, now=None): """Inspect and clean up stale state files in twofactor_home. Returns the number of actual actions taken for central throttle handling. """ + if now is None: + now = time.time() return _clean_stale_state_files( configuration, configuration.twofactor_home, @@ -198,11 +208,13 @@ def clean_twofactor_sessions(configuration, now=time.time()): ) -def handle_state_cleanup(configuration, now=time.time()): +def handle_state_cleanup(configuration, now=None): """Inspect various state dirs to clean up general stale old temporay files. Returns the number of actual actions taken for central throttle handling. """ _logger = configuration.logger + if now is None: + now = time.time() handled = 0 _logger.debug("handle pending state cleanups") handled += clean_mig_system_files(configuration, now) @@ -217,11 +229,13 @@ def handle_state_cleanup(configuration, now=time.time()): return handled -def handle_session_cleanup(configuration, now=time.time()): +def handle_session_cleanup(configuration, now=None): """Inspect various state dirs to clean up stale session files specifically. Returns the number of actual actions taken for central throttle handling. """ _logger = configuration.logger + if now is None: + now = time.time() handled = 0 _logger.debug("handle pending session cleanups") if configuration.site_enable_jobs: @@ -353,7 +367,7 @@ def manage_single_req(configuration, req_id, req_path, db_path, now): ) -def manage_trivial_user_requests(configuration, now=time.time()): +def manage_trivial_user_requests(configuration, now=None): """Inspect user_pending dir and take care of any request, which do not require operator interaction. That is, accept or reject any password reset requests depending on reset token validity, renew any with complete peer @@ -364,6 +378,8 @@ def manage_trivial_user_requests(configuration, now=time.time()): Returns the number of actual actions taken for central throttle handling. """ _logger = configuration.logger + if now is None: + now = time.time() handled = 0 now = time.time() db_path = default_db_path(configuration) @@ -389,12 +405,14 @@ def manage_trivial_user_requests(configuration, now=time.time()): return handled -def remind_and_expire_user_pending(configuration, now=time.time()): +def remind_and_expire_user_pending(configuration, now=None): """Inspect user_pending dir and inform about pending but aging account requests that need operator or user action. Returns the number of actual actions taken for central throttle handling. """ _logger = configuration.logger + if now is None: + now = time.time() handled = 0 now = time.time() for filename in listdir(configuration.user_pending): @@ -446,11 +464,13 @@ def remind_and_expire_user_pending(configuration, now=time.time()): return handled -def handle_pending_requests(configuration, now=time.time()): +def handle_pending_requests(configuration, now=None): """Inspect various state dirs to remind or clean up stale requests. Returns the number of actual actions taken for central throttle handling. """ _logger = configuration.logger + if now is None: + now = time.time() handled = 0 _logger.debug("handle pending requests") handled += manage_trivial_user_requests(configuration, now) @@ -463,12 +483,14 @@ def handle_pending_requests(configuration, now=time.time()): return handled -def handle_cache_updates(configuration, now=time.time()): +def handle_cache_updates(configuration, now=None): """Inspect internal cache update markers and handle any corresponding cache updates in one place to avoid thrashing. Returns the number of actual actions taken for central throttle handling. """ _logger = configuration.logger + if now is None: + now = time.time() handled = 0 _logger.debug("handle pending cache updates") # TODO: actually handle vgrid/user/resource/... cache updates @@ -479,13 +501,15 @@ def handle_cache_updates(configuration, now=time.time()): return handled -def handle_janitor_tasks(configuration, now=time.time()): +def handle_janitor_tasks(configuration, now=None): """A wrapper to take care of all regular janitor tasks like clean up and cache updates. Returns the number of actual tasks completed to let the main thread know if it should throttle down or continue next run right away. """ _logger = configuration.logger + if now is None: + now = time.time() tasks_completed = 0 _logger.info("handle any pending janitor tasks") if _lookup_last_run(configuration, "state-cleanup") + SECS_PER_DAY < now: diff --git a/tests/fixture/confs-stdlocal/migrid-init.d-rh b/tests/fixture/confs-stdlocal/migrid-init.d-rh index ab7cf763f..c4bfad648 100755 --- a/tests/fixture/confs-stdlocal/migrid-init.d-rh +++ b/tests/fixture/confs-stdlocal/migrid-init.d-rh @@ -388,6 +388,7 @@ start_all() { return 0 } +# TODO: should we use PID_FILE in all stop_X handlers like in deb version? stop_script() { check_enabled "jobs" || return DAEMON_PATH=${MIG_SCRIPT}