diff --git a/python/idsse_common/idsse/common/protocol_utils.py b/python/idsse_common/idsse/common/protocol_utils.py index a0367e0..bc3d2c7 100644 --- a/python/idsse_common/idsse/common/protocol_utils.py +++ b/python/idsse_common/idsse/common/protocol_utils.py @@ -19,6 +19,7 @@ from .path_builder import PathBuilder from .utils import TimeDelta, datetime_gen + class ProtocolUtils(ABC): """Base Class - interface for DAS data discovery""" @@ -53,27 +54,27 @@ def cp(self, path: str, dest: str) -> bool: bool: Returns True if copy is successful """ - - def get_path(self, issue: datetime, valid: datetime) -> str: + def get_path(self, issue: datetime, valid: datetime, **kwargs) -> str: """Delegates to instant PathBuilder to get full path given issue and valid Args: issue (datetime): Issue date time valid (datetime): Valid date time + kwargs: Additional arguments, e.g. region Returns: str: Absolute path to file or object """ lead = TimeDelta(valid-issue) - return self.path_builder.build_path(issue=issue, valid=valid, lead=lead) + return self.path_builder.build_path(issue=issue, valid=valid, lead=lead, **kwargs) - - def check_for(self, issue: datetime, valid: datetime) -> tuple[datetime, str] | None: + def check_for(self, issue: datetime, valid: datetime, **kwargs) -> tuple[datetime, str] | None: """Checks if an object passed issue/valid exists Args: issue (datetime): The issue date/time used to format the path to the object's location valid (datetime): The valid date/time used to format the path to the object's location + kwargs: Additional arguments, e.g. region Returns: [tuple[datetime, str] | None]: A tuple of the valid date/time (indicated by object's @@ -84,7 +85,7 @@ def check_for(self, issue: datetime, valid: datetime) -> tuple[datetime, str] | file_path = self.get_path(issue, valid) dir_path = os.path.dirname(file_path) filenames = self.ls(dir_path, prepend_path=False) - filename = self.path_builder.build_filename(issue=issue, valid=valid, lead=lead) + filename = self.path_builder.build_filename(issue=issue, valid=valid, lead=lead, **kwargs) for fname in filenames: # Support wildcard matches - used for '?' as a single wildcard character in @@ -97,7 +98,8 @@ def get_issues(self, num_issues: int = 1, issue_start: datetime | None = None, issue_end: datetime = datetime.now(UTC), - time_delta: timedelta = timedelta(hours=1) + time_delta: timedelta = timedelta(hours=1), + **kwargs ) -> Sequence[datetime]: """Determine the available issue date/times @@ -106,6 +108,7 @@ def get_issues(self, issue_start (datetime, optional): The oldest date/time to look for. Defaults to None. issue_end (datetime): The newest date/time to look for. Defaults to now (UTC). time_delta (timedelta): The time step size. Defaults to 1 hour. + kwargs: Additional arguments, e.g. region Returns: Sequence[datetime]: A sequence of issue date/times @@ -126,7 +129,7 @@ def get_issues(self, if issue_start and issue_dt < issue_start: break try: - dir_path = self.path_builder.build_dir(issue=issue_dt) + dir_path = self.path_builder.build_dir(issue=issue_dt, **kwargs) issues_set.update(self._get_issues(dir_path, num_issues)) if num_issues and len(issues_set) >= num_issues: break @@ -139,7 +142,9 @@ def get_issues(self, def get_valids(self, issue: datetime, valid_start: datetime | None = None, - valid_end: datetime | None = None) -> Sequence[tuple[datetime, str]]: + valid_end: datetime | None = None, + **kwargs + ) -> Sequence[tuple[datetime, str]]: """Get all objects consistent with the passed issue date/time and filter by valid range Args: @@ -148,6 +153,7 @@ def get_valids(self, valids >= valid_start. Defaults to None. valid_end (datetime | None, optional): All returned objects will be for valids <= valid_end. Defaults to None. + kwargs: Additional arguments, e.g. region Returns: Sequence[tuple[datetime, str]]: A sequence of tuples with valid date/time (indicated by @@ -158,14 +164,14 @@ def get_valids(self, valids_and_filenames = self.check_for(issue, valid_start) return [valids_and_filenames] if valids_and_filenames is not None else [] - dir_path = self.path_builder.build_dir(issue=issue) - valid_and_file =[] + dir_path = self.path_builder.build_dir(issue=issue, **kwargs) + valid_and_file = [] for file_path in self.ls(dir_path): if file_path.endswith(self.path_builder.file_ext): try: if issue == self.path_builder.get_issue(file_path): valid_and_file.append((self.path_builder.get_valid(file_path), file_path)) - except ValueError: # Ignore invalid filepaths... + except ValueError: # Ignore invalid filepaths... pass valid_and_file = [(dt, path) for (dt, path) in valid_and_file if dt is not None] # Remove any tuple that has "None" as the valid time @@ -209,4 +215,4 @@ def _get_issues(self, break except ValueError: # Ignore invalid filepaths... pass - return issues_set + return issues_set diff --git a/python/idsse_common/idsse/common/rabbitmq_utils.py b/python/idsse_common/idsse/common/rabbitmq_utils.py index 07e236d..4fb6b0c 100644 --- a/python/idsse_common/idsse/common/rabbitmq_utils.py +++ b/python/idsse_common/idsse/common/rabbitmq_utils.py @@ -273,7 +273,7 @@ def blocking_publish(self, route_key (str): Optional route key, overriding key provided during initialization Returns: - bool: Returns True if no errors ocurred during publication. If this + bool: Returns True if no errors occurred during publication. If this publisher is configured to confirm delivery will return False if failed to confirm. """ diff --git a/python/idsse_common/test/test_rabbitmq_utils.py b/python/idsse_common/test/test_rabbitmq_utils.py index 3bd04db..b80a1b1 100644 --- a/python/idsse_common/test/test_rabbitmq_utils.py +++ b/python/idsse_common/test/test_rabbitmq_utils.py @@ -57,6 +57,7 @@ def mock_channel() -> Mock: """Mock pika.adapters.blocking_connection.BlockingChannel object""" def mock_queue_declare(queue: str, **_kwargs) -> Method: return Frame(Method(queue=queue)) # create a usable (mock) Frame using queue name passed + def mock_exch_declare(exchange: str, **_kwargs) -> Method: return Frame(Method(exchange=exchange)) @@ -328,8 +329,8 @@ def test_send_requests_returns_none_on_error(rpc_thread: Rpc, mock_connection: Mock, monkeypatch: MonkeyPatch): # pylint: disable=too-many-arguments - def mock_blocking_publish(channel, exch, message_params, queue = None, success_flag = None, - done_event = None): + def mock_blocking_publish(channel, exch, message_params, queue=None, success_flag=None, + done_event=None): # cause exception for pending request Future rpc_thread._pending_requests[EXAMPLE_UUID].set_exception(RuntimeError('Something broke')) @@ -383,7 +384,6 @@ def mock_rmq_params_and_callback(): return RabbitMqParamsAndCallback(params=params, callback=callback) - @patch('idsse.common.rabbitmq_utils.BlockingConnection') @patch('idsse.common.rabbitmq_utils.ThreadPoolExecutor') def test_consumer_initialization(mock_executor, mock_blocking_connection, mock_conn_params, @@ -406,8 +406,6 @@ def test_consumer_start(mock_executor, mock_blocking_connection, mock_conn_param start_consuming.assert_called_once() - - @patch('idsse.common.rabbitmq_utils.BlockingConnection') @patch('idsse.common.rabbitmq_utils.ThreadPoolExecutor') def test_on_message(mock_executor, mock_blocking_connection, mock_conn_params, @@ -424,6 +422,7 @@ def test_on_message(mock_executor, mock_blocking_connection, mock_conn_params, ANY, b"Test Message") + @fixture def mock_message(): return MagicMock(name='RabbitMqMessage', spec=dict)