Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions backend/onyx/connectors/slack/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ def _load_from_checkpoint(
Step 2: Loop through each channel. For each channel:
Step 2.1: Get messages within the time range.
Step 2.2: Process messages in parallel, yield back docs.
Step 2.3: Update checkpoint with new_latest, seen_thread_ts, and current_channel.
Step 2.3: Update checkpoint with new_oldest, seen_thread_ts, and current_channel.
Slack returns messages from newest to oldest, so we need to keep track of
the latest message we've seen in each channel.
Step 2.4: If there are no more messages in the channel, switch the current
Expand Down Expand Up @@ -837,7 +837,8 @@ def _load_from_checkpoint(

channel_message_ts = checkpoint.channel_completion_map.get(channel_id)
if channel_message_ts:
latest = channel_message_ts
# Set oldest to the checkpoint timestamp to resume from where we left off
oldest = channel_message_ts

logger.debug(
f"Getting messages for channel {channel} within range {oldest} - {latest}"
Expand All @@ -855,7 +856,8 @@ def _load_from_checkpoint(
f"{latest=}"
)

new_latest = message_batch[-1]["ts"] if message_batch else latest
# message_batch[0] is the newest message (Slack returns newest to oldest)
new_oldest = message_batch[0]["ts"] if message_batch else latest

num_threads_start = len(seen_thread_ts)

Expand Down Expand Up @@ -906,15 +908,14 @@ def _load_from_checkpoint(
num_threads_processed = len(seen_thread_ts) - num_threads_start

# calculate a percentage progress for the current channel by determining
# our viable range start and end, and the latest timestamp we are querying
# up to
new_latest_seconds_epoch = SecondsSinceUnixEpoch(new_latest)
if new_latest_seconds_epoch > end:
# how much of the time range we've processed so far
new_oldest_seconds_epoch = SecondsSinceUnixEpoch(new_oldest)
range_start = start if start else max(0, channel_created)
if new_oldest_seconds_epoch < range_start:
range_complete = 0.0
else:
range_complete = end - new_latest_seconds_epoch
range_complete = new_oldest_seconds_epoch - range_start

range_start = max(0, channel_created)
range_total = end - range_start
if range_total <= 0:
range_total = 1
Expand All @@ -935,7 +936,7 @@ def _load_from_checkpoint(
)

checkpoint.seen_thread_ts = list(seen_thread_ts)
checkpoint.channel_completion_map[channel["id"]] = new_latest
checkpoint.channel_completion_map[channel["id"]] = new_oldest

# bypass channels where the first set of messages seen are all bots
# check at least MIN_BOT_MESSAGE_THRESHOLD messages are in the batch
Expand Down
Loading