Skip to content

Commit 80fd930

Browse files
committed
rename to newst_oldest and change calc
1 parent 23f3520 commit 80fd930

File tree

1 file changed

+10
-9
lines changed

1 file changed

+10
-9
lines changed

backend/onyx/connectors/slack/connector.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,7 @@ def _load_from_checkpoint(
761761
Step 2: Loop through each channel. For each channel:
762762
Step 2.1: Get messages within the time range.
763763
Step 2.2: Process messages in parallel, yield back docs.
764-
Step 2.3: Update checkpoint with new_latest, seen_thread_ts, and current_channel.
764+
Step 2.3: Update checkpoint with new_oldest, seen_thread_ts, and current_channel.
765765
Slack returns messages from newest to oldest, so we need to keep track of
766766
the latest message we've seen in each channel.
767767
Step 2.4: If there are no more messages in the channel, switch the current
@@ -837,6 +837,7 @@ def _load_from_checkpoint(
837837

838838
channel_message_ts = checkpoint.channel_completion_map.get(channel_id)
839839
if channel_message_ts:
840+
# Set oldest to the checkpoint timestamp to resume from where we left off
840841
oldest = channel_message_ts
841842

842843
logger.debug(
@@ -855,7 +856,8 @@ def _load_from_checkpoint(
855856
f"{latest=}"
856857
)
857858

858-
new_latest = message_batch[0]["ts"] if message_batch else latest
859+
# message_batch[0] is the newest message (Slack returns newest to oldest)
860+
new_oldest = message_batch[0]["ts"] if message_batch else latest
859861

860862
num_threads_start = len(seen_thread_ts)
861863

@@ -906,15 +908,14 @@ def _load_from_checkpoint(
906908
num_threads_processed = len(seen_thread_ts) - num_threads_start
907909

908910
# calculate a percentage progress for the current channel by determining
909-
# our viable range start and end, and the latest timestamp we are querying
910-
# up to
911-
new_latest_seconds_epoch = SecondsSinceUnixEpoch(new_latest)
912-
if new_latest_seconds_epoch > end:
911+
# how much of the time range we've processed so far
912+
new_oldest_seconds_epoch = SecondsSinceUnixEpoch(new_oldest)
913+
range_start = start if start else max(0, channel_created)
914+
if new_oldest_seconds_epoch < range_start:
913915
range_complete = 0.0
914916
else:
915-
range_complete = end - new_latest_seconds_epoch
917+
range_complete = new_oldest_seconds_epoch - range_start
916918

917-
range_start = max(0, channel_created)
918919
range_total = end - range_start
919920
if range_total <= 0:
920921
range_total = 1
@@ -935,7 +936,7 @@ def _load_from_checkpoint(
935936
)
936937

937938
checkpoint.seen_thread_ts = list(seen_thread_ts)
938-
checkpoint.channel_completion_map[channel["id"]] = new_latest
939+
checkpoint.channel_completion_map[channel["id"]] = new_oldest
939940

940941
# bypass channels where the first set of messages seen are all bots
941942
# check at least MIN_BOT_MESSAGE_THRESHOLD messages are in the batch

0 commit comments

Comments
 (0)