Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions python/idsse_common/idsse/common/protocol_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ def get_issues(
if not issue_end:
issue_end = datetime.now(UTC)
if issue_start:
datetimes = list(datetime_gen(issue_end, time_delta, issue_start, num_issues))
datetimes = datetime_gen(issue_end, time_delta, issue_start, num_issues)
else:
# check if time delta is positive, if so make negative
if time_delta > zero_time_delta:
time_delta = timedelta(seconds=-1.0 * time_delta.total_seconds())
datetimes = list(datetime_gen(issue_end, time_delta))
datetimes = datetime_gen(issue_end, time_delta)

# build list of filepaths on the server for each dt (ignoring ones earlier than issue_dt)
issue_filepaths = [
Expand All @@ -138,7 +138,7 @@ def get_issues(
]

issues_with_valid_dts = self._get_unique_issues(issue_filepaths, num_issues, max_workers)
return sorted(list(issues_with_valid_dts))[:num_issues]
return sorted(list(issues_with_valid_dts), reverse=True)[:num_issues]

def get_valids(
self,
Expand Down Expand Up @@ -198,7 +198,7 @@ def get_valids(
def _get_unique_issues(
self,
dir_paths: list[str],
num_issues: int,
num_issues: int | None,
max_workers: int,
) -> list[datetime]:
"""
Expand Down
26 changes: 12 additions & 14 deletions python/idsse_common/idsse/common/rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class Queue(NamedTuple):
"""An internal data class for holding the RabbitMQ queue info"""

name: str
route_key: str
route_key: str | list[str]
durable: bool
exclusive: bool
auto_delete: bool
Expand Down Expand Up @@ -604,22 +604,17 @@ def _setup_exch_and_queue(channel: Channel, exch: Exch, queue: Queue):
logger.debug("Declared queue: %s", queue_name)

if exch.name != "": # if using default exchange, skip binding queues (not allowed by RMQ)
if isinstance(queue.route_key, list):
for route_key in queue.route_key:
channel.queue_bind(queue_name, exchange=exch.name, routing_key=route_key)
logger.debug(
"Bound queue(%s) to exchange(%s) with route_key(%s)",
queue_name,
exch.name,
route_key,
)
else:
channel.queue_bind(queue_name, exchange=exch.name, routing_key=queue.route_key)
# force route key(s) to be list to simplify logic
route_key_list = (
queue.route_key if isinstance(queue.route_key, list) else [queue.route_key]
)
for route_key in route_key_list:
channel.queue_bind(queue_name, exchange=exch.name, routing_key=route_key)
logger.debug(
"Bound queue(%s) to exchange(%s) with route_key(%s)",
queue_name,
exch.name,
queue.route_key,
route_key,
)


Expand Down Expand Up @@ -677,8 +672,11 @@ def _publish(
logger.warning("Exception when removing message from private queue: %s", exc)
except UnroutableError:
logger.warning("Message was not delivered")
except ChannelWrongStateError as exc:
logger.warning("Message not published, cause: %s", str(exc))
success_flag[0] = False
except Exception as exc:
logger.warning("Message not published, cause: %s", exc)
logger.warning("Message not published, cause: %s", str(exc))
raise exc
finally:
if done_event:
Expand Down
4 changes: 2 additions & 2 deletions python/idsse_common/test/test_aws_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ def test_check_for_does_not_find_valid(aws_utils: AwsUtils, mock_exec_cmd):
def test_get_issues(aws_utils: AwsUtils, mock_exec_cmd):
result = aws_utils.get_issues(issue_start=EXAMPLE_ISSUE, issue_end=EXAMPLE_VALID, num_issues=2)
assert len(result) == 2
assert result[0] == EXAMPLE_VALID - timedelta(hours=1)
assert result[1] == EXAMPLE_VALID
assert result[0] == EXAMPLE_VALID
assert result[1] == EXAMPLE_VALID - timedelta(hours=1)


def test_get_issues_with_same_start_stop(aws_utils: AwsUtils, mock_exec_cmd):
Expand Down
Loading