From 23f35207c511a5fb3066db354eaa85c55ffe8b37 Mon Sep 17 00:00:00 2001 From: Jessica Singh Date: Tue, 16 Sep 2025 13:00:11 -0700 Subject: [PATCH 1/2] slack bug --- backend/onyx/connectors/slack/connector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/onyx/connectors/slack/connector.py b/backend/onyx/connectors/slack/connector.py index c0cd232fc52..82229a9dbe5 100644 --- a/backend/onyx/connectors/slack/connector.py +++ b/backend/onyx/connectors/slack/connector.py @@ -837,7 +837,7 @@ def _load_from_checkpoint( channel_message_ts = checkpoint.channel_completion_map.get(channel_id) if channel_message_ts: - latest = channel_message_ts + oldest = channel_message_ts logger.debug( f"Getting messages for channel {channel} within range {oldest} - {latest}" @@ -855,7 +855,7 @@ def _load_from_checkpoint( f"{latest=}" ) - new_latest = message_batch[-1]["ts"] if message_batch else latest + new_latest = message_batch[0]["ts"] if message_batch else latest num_threads_start = len(seen_thread_ts) From 80fd930694ae93082c8178a90dd69f9dd914ffc0 Mon Sep 17 00:00:00 2001 From: Jessica Singh Date: Tue, 16 Sep 2025 16:40:09 -0700 Subject: [PATCH 2/2] rename to newst_oldest and change calc --- backend/onyx/connectors/slack/connector.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/backend/onyx/connectors/slack/connector.py b/backend/onyx/connectors/slack/connector.py index 82229a9dbe5..ac5700acd62 100644 --- a/backend/onyx/connectors/slack/connector.py +++ b/backend/onyx/connectors/slack/connector.py @@ -761,7 +761,7 @@ def _load_from_checkpoint( Step 2: Loop through each channel. For each channel: Step 2.1: Get messages within the time range. Step 2.2: Process messages in parallel, yield back docs. - Step 2.3: Update checkpoint with new_latest, seen_thread_ts, and current_channel. + Step 2.3: Update checkpoint with new_oldest, seen_thread_ts, and current_channel. Slack returns messages from newest to oldest, so we need to keep track of the latest message we've seen in each channel. Step 2.4: If there are no more messages in the channel, switch the current @@ -837,6 +837,7 @@ def _load_from_checkpoint( channel_message_ts = checkpoint.channel_completion_map.get(channel_id) if channel_message_ts: + # Set oldest to the checkpoint timestamp to resume from where we left off oldest = channel_message_ts logger.debug( @@ -855,7 +856,8 @@ def _load_from_checkpoint( f"{latest=}" ) - new_latest = message_batch[0]["ts"] if message_batch else latest + # message_batch[0] is the newest message (Slack returns newest to oldest) + new_oldest = message_batch[0]["ts"] if message_batch else latest num_threads_start = len(seen_thread_ts) @@ -906,15 +908,14 @@ def _load_from_checkpoint( num_threads_processed = len(seen_thread_ts) - num_threads_start # calculate a percentage progress for the current channel by determining - # our viable range start and end, and the latest timestamp we are querying - # up to - new_latest_seconds_epoch = SecondsSinceUnixEpoch(new_latest) - if new_latest_seconds_epoch > end: + # how much of the time range we've processed so far + new_oldest_seconds_epoch = SecondsSinceUnixEpoch(new_oldest) + range_start = start if start else max(0, channel_created) + if new_oldest_seconds_epoch < range_start: range_complete = 0.0 else: - range_complete = end - new_latest_seconds_epoch + range_complete = new_oldest_seconds_epoch - range_start - range_start = max(0, channel_created) range_total = end - range_start if range_total <= 0: range_total = 1 @@ -935,7 +936,7 @@ def _load_from_checkpoint( ) checkpoint.seen_thread_ts = list(seen_thread_ts) - checkpoint.channel_completion_map[channel["id"]] = new_latest + checkpoint.channel_completion_map[channel["id"]] = new_oldest # bypass channels where the first set of messages seen are all bots # check at least MIN_BOT_MESSAGE_THRESHOLD messages are in the batch