Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 15 additions & 17 deletions python/idsse_common/idsse/common/sci/netcdf_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ def write_netcdf(attrs: dict,
Returns:
str: The location that data was written to
"""
_make_dirs(filepath)
logger.debug('Writing data to: %s', filepath)
# ensure parent directories exist
dirname = os.path.dirname(os.path.abspath(filepath))
os.makedirs(dirname, exist_ok=True)

logger.debug('Writing data to: %s', filepath)
if use_h5_lib:
with h5nc.File(filepath, 'w') as file:
y_dimensions, x_dimensions = grid.shape
Expand All @@ -112,25 +114,21 @@ def write_netcdf(attrs: dict,
for key, value in attrs.items():
file.attrs[key] = value

else:
# otherwise, write file using netCDF4 library (default)
with Dataset(filepath, 'w', format='NETCDF4') as dataset:
y_dimensions, x_dimensions = grid.shape
dataset.createDimension('x', x_dimensions)
dataset.createDimension('y', y_dimensions)

grid_var = dataset.createVariable('grid', 'f4', ('y', 'x'))
grid_var[:] = grid
return filepath

for key, value in attrs.items():
setattr(dataset, key, str(value))
# otherwise, write file using netCDF4 library (default)
with Dataset(filepath, 'w', format='NETCDF4') as dataset:
y_dimensions, x_dimensions = grid.shape
dataset.createDimension('x', x_dimensions)
dataset.createDimension('y', y_dimensions)

return filepath
grid_var = dataset.createVariable('grid', 'f4', ('y', 'x'))
grid_var[:] = grid

for key, value in attrs.items():
setattr(dataset, key, str(value))

def _make_dirs(filename: str):
dirname = os.path.dirname(os.path.abspath(filename))
os.makedirs(dirname, exist_ok=True)
return filepath


def _read_attrs(has_nc_attr: HasNcAttr) -> dict:
Expand Down
111 changes: 109 additions & 2 deletions python/idsse_common/idsse/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import copy
import logging
import math
import os
from collections.abc import Sequence
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta, UTC
from enum import Enum
from subprocess import PIPE, Popen, TimeoutExpired
from time import sleep
from typing import Any, Generator
from uuid import UUID

Expand Down Expand Up @@ -96,6 +98,111 @@ def __delitem__(self, key):
del self.__dict__[key]


class FileBasedLock():
"""
Ensure atomic read/write of a given file using only the filesystem; behavior is the same
whether workers accessing the file are distributed across Python threads, subprocesses,
Docker containers, VMs, etc.

Note: this thread must have permissions to WRITE as well as read on the filesystem where
this file is stored.

Example usage:
```
file_of_interest = './foo.txt'
with FileBasedLock(file_of_interest):
# now guaranteed that no other process on any machine is accessing this file
with open(file_of_interest, 'a') as f:
f.write('hello world')
# lock is now released for other threads/processes
```
"""
def __init__(self, filepath: str, max_age: float):
"""
Args:
filepath (str): The file on which the caller wants to do atomic I/O (read/write)
max_age (float): The maximum time (seconds) after which a `.lock` file will be treated
as `expired` or "orphaned" by a process/thread that was unexpectedly exited.
FileBasedLocks are auto-released after this duration and the original locker
loses all guarantees of atomicity. Recommended to keep this short (10 minutes?),
based on how long a single thread could reasonably being expected to read/write
for this file type and usage.
"""
self.filepath = filepath
self._lock_path = f'{self.filepath}.lock'
self._max_age = max_age

def __enter__(self):
self.acquire()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
return False

@property
def locked(self) -> bool:
"""True if some thread has already locked this file resource"""
return os.path.exists(self._lock_path)

@property
def expired(self) -> bool:
"""True if lock is older than `_max_age` seconds and should be considered orphaned"""
if not self.locked:
return False # lock cannot be expired if it isn't locked

try:
creation_time = os.stat(self._lock_path).st_birthtime
except AttributeError:
# Linux (and maybe Windows) don't support birthtime
creation_time = os.stat(self._lock_path).st_ctime
except FileNotFoundError:
# lock file disappeared since start of function call?? *shrug* treat it as unexpired
creation_time = datetime.now(UTC).timestamp()
return (datetime.now(UTC).timestamp() - creation_time) >= self._max_age

def acquire(self, timeout=300.0) -> bool:
"""Block until the desired file (declared in FileLock __init__) is free to read/write.
Does this by creating a `.lock` file that communicates to other `FileBasedLock` instances
that this file is in use.

If `_max_age` has passed, lock will be forcefully released before acquiring for this caller.

Args:
timeout (float, optional): Number of seconds until TimeoutError will be raised.
Defaults to 300.

Raises:
TimeoutError: if timeout was exceeded waiting for lock to be released
"""

wait_ms = 0
while self.locked and not self.expired and wait_ms / 1000 < timeout:
sleep(0.01)
wait_ms += 10

if wait_ms / 1000 >= timeout:
raise TimeoutError

if self.expired:
self.release() # _max_age has passed, consider this lock abandoned and delete it

self._create_lockfile() # this actually acquires the lock
return True

def release(self) -> bool:
"""Release the lock so other processes/threads can do I/O"""
if not self.locked:
return False
os.remove(self._lock_path)
return True

def _create_lockfile(self):
"""The actual functionality triggered by `acquire()` (after lock is confirmed free)"""
with open(self._lock_path, 'w', encoding='utf-8') as file:
file.write('')


def exec_cmd(commands: Sequence[str], timeout: int | None = None) -> Sequence[str]:
"""Execute the passed commands via a Popen call

Expand Down Expand Up @@ -130,7 +237,7 @@ def to_iso(date_time: datetime) -> str:
"""Format a datetime instance to an ISO string"""
return (f'{date_time.strftime("%Y-%m-%dT%H:%M")}:'
f'{(date_time.second + date_time.microsecond / 1e6):06.3f}'
'Z' if date_time.tzname() in [None, str(timezone.utc)]
'Z' if date_time.tzname() in [None, str(UTC)]
else date_time.strftime("%Z")[3:])


Expand Down
52 changes: 50 additions & 2 deletions python/idsse_common/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@
# Geary J Layne
#
# --------------------------------------------------------------------------------
# pylint: disable=missing-function-docstring,disable=invalid-name
# pylint: disable=missing-function-docstring,invalid-name,redefined-outer-name

import os
from copy import deepcopy
from datetime import datetime, timedelta
from math import pi
from os import path
from time import sleep
from uuid import uuid4

import pytest

from idsse.common.utils import TimeDelta, Map
from idsse.common.utils import TimeDelta, Map, FileBasedLock
from idsse.common.utils import (
datetime_gen,
dict_copy_with,
Expand Down Expand Up @@ -205,3 +208,48 @@ def test_is_valid_uuid_success():
def test_is_valid_uuid_failure():
assert not is_valid_uuid('abc-def-ghi-jlk') # badly-formed UUID
assert not is_valid_uuid('1d10609e-ba56-11ee-af51-fa605d1346b6', version=7) # invalid version


@pytest.fixture
def example_file():
return f'.tmp-{uuid4()}'


@pytest.fixture(autouse=True)
def auto_cleanup(example_file: str):
# ensure any previous test .lock files are cleaned up
if os.path.exists(example_file):
os.remove(example_file)
yield
if os.path.exists(example_file):
os.remove(example_file)


def test_lock_acquire(example_file):
lock = FileBasedLock(example_file, max_age=600)
assert not lock.locked

lock.acquire()
assert lock.locked

lock.release()
assert not lock.locked


def test_lock_expired(example_file):
lock = FileBasedLock(example_file, max_age=0.001)
lock.acquire()
sleep(0.002)

assert lock.expired
lock.release()


def test_lock_timeout(example_file):
lock = FileBasedLock(example_file, max_age=60)
lock.acquire()

with pytest.raises(TimeoutError) as exc:
lock.acquire(0.1)
assert exc is not None
lock.release() # cleanup lock
Loading