Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 178 additions & 2 deletions endpoints/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError


# ref: https://github.com/fla9ua/markdown_to_mrkdwn
class SlackMarkdownConverter:
"""
Expand Down Expand Up @@ -193,7 +192,121 @@ def _convert_line(self, line: str) -> str:

class SlackEndpoint(Endpoint):
CACHE_PREFIX = "thread-cache"
CACHE_DURATION = 60 * 60 * 24 # 1 day
CACHE_DURATION = 60 * 60 * 24 # 24 hour
CLEANUP_INTERVAL = 60 * 60 * 24 # 24 hours

def _add_key_to_registry(self, key: str):
"""Add key to registry"""
if key == "__keys__": # Prevent infinite loop
return

try:
# Get current registry
registry = self._get_key_registry()

# Add key (update time if existing)
registry[key] = int(time.time()) # last_update time
registry["__last_updated"] = int(time.time())

# Save registry
self.session.storage.set("__keys__", json.dumps(registry).encode("utf-8"))

except Exception as e:
print(f"Error adding key to registry: {e}")

def _remove_key_from_registry(self, key: str):
"""Remove key from registry"""
if key == "__keys__":
return

try:
registry = self._get_key_registry()

if key in registry:
del registry[key]
registry["__last_updated"] = int(time.time())
self.session.storage.set("__keys__", json.dumps(registry).encode("utf-8"))

except Exception as e:
print(f"Error removing key from registry: {e}")

def _get_key_registry(self) -> dict:
"""Get key registry"""
try:
raw = self.session.storage.get("__keys__")
if raw:
return json.loads(raw.decode("utf-8"))
else:
return {"__last_updated": int(time.time())}
except Exception:
return {"__last_updated": int(time.time())}

def _get_all_keys(self) -> List[str]:
"""Get all keys (except __last_updated)"""
try:
registry = self._get_key_registry()
return [k for k in registry.keys() if k != "__last_updated"]
except Exception:
return []

def _cleanup_storage(self, cleanup_percentage: float = 0.5):
"""Clean up storage (delete old keys)"""
try:
registry = self._get_key_registry()

# Get keys other than __last_updated
keys_with_time = [(k, v) for k, v in registry.items() if k != "__last_updated"]

if not keys_with_time:
return

# Sort by time in ascending order (from oldest)
keys_with_time.sort(key=lambda x: x[1])

# Calculate number of keys to delete
total_keys = len(keys_with_time)
keys_to_delete = int(total_keys * cleanup_percentage)

if keys_to_delete > 0:
# Delete from oldest keys
for i in range(keys_to_delete):
key_to_delete = keys_with_time[i][0]
try:
self.session.storage.delete(key_to_delete)
self._remove_key_from_registry(key_to_delete)
except Exception as e:
print(f"Error deleting key {key_to_delete}: {e}")

print(f"Storage cleanup completed: deleted {keys_to_delete} keys out of {total_keys}")

except Exception as e:
print(f"Error during storage cleanup: {e}")

def _should_cleanup_storage(self) -> bool:
"""Check if storage cleanup is needed at 24-hour intervals"""
try:
registry = self._get_key_registry()
last_cleanup = registry.get("__last_cleanup", 0)
current_time = int(time.time())
return (current_time - last_cleanup) >= self.CLEANUP_INTERVAL
except Exception:
return False

def _periodic_cleanup_if_needed(self):
"""Perform periodic cleanup as needed (30% deletion)"""
if self._should_cleanup_storage():
try:
# Light cleanup (30% deletion)
self._cleanup_storage(cleanup_percentage=0.3)

# Record cleanup time
registry = self._get_key_registry()
registry["__last_cleanup"] = int(time.time())
self.session.storage.set("__keys__", json.dumps(registry).encode("utf-8"))

print("Periodic storage cleanup completed")
except Exception as e:
print(f"Error during periodic cleanup: {e}")

def _load_cached_history(self, channel: str, thread_ts: str):
key = f"{self.CACHE_PREFIX}-{channel}-{thread_ts}"
Expand All @@ -212,6 +325,7 @@ def _load_cached_history(self, channel: str, thread_ts: str):
data["messages"] = messages
data["last_cleanup"] = now
try:
self._add_key_to_registry(key)
self.session.storage.set(key, json.dumps(data).encode("utf-8"))
except Exception:
pass
Expand All @@ -235,6 +349,7 @@ def _append_thread_message(self, channel: str, thread_ts: str, message: Mapping)
data["messages"].append(msg)
data["last_cleanup"] = now
try:
self._add_key_to_registry(key)
self.session.storage.set(key, json.dumps(data).encode("utf-8"))
except Exception:
pass
Expand All @@ -243,6 +358,9 @@ def _invoke(self, r: Request, values: Mapping, settings: Mapping) -> Response:
"""
Invokes the endpoint with the given request.
"""
# Periodic cleanup check
self._periodic_cleanup_if_needed()

# Check if this is a retry and if we should ignore it
retry_num = r.headers.get("X-Slack-Retry-Num")
if not settings.get("allow_retry") and (
Expand Down Expand Up @@ -286,6 +404,41 @@ def _invoke(self, r: Request, values: Mapping, settings: Mapping) -> Response:
token = settings.get("bot_token")
client = WebClient(token=token)

# Check if this is a cache deletion request
if message.strip().lower() == "delcache":
# Delete thread cache
cache_key = f"{self.CACHE_PREFIX}-{channel}-{thread_ts}"
conversation_key = f"slack-{channel}-{thread_ts}"

try:
# Delete both cache and conversation storage
self._remove_key_from_registry(cache_key)
self.session.storage.delete(cache_key)
self._remove_key_from_registry(conversation_key)
self.session.storage.delete(conversation_key)

# Send confirmation message
client.chat_postMessage(
channel=channel,
thread_ts=thread_ts,
text=f"✅ Thread cache has been successfully deleted. \n{cache_key} \n{conversation_key}"
)
except Exception as e:
print(f"Error deleting cache: {e}")
# Send error message
try:
client.chat_postMessage(
channel=channel,
thread_ts=thread_ts,
text=f"❌ Error deleting cache: {str(e)}"
)
except SlackApiError:
pass

return Response(
status=200, response="ok", content_type="text/plain"
)

# store the incoming app mention message
self._append_thread_message(
channel,
Expand Down Expand Up @@ -543,6 +696,7 @@ def replace_id_with_name(match):
answer = response.get("answer")
conversation_id = response.get("conversation_id")
if conversation_id:
self._add_key_to_registry(key_to_check)
self.session.storage.set(
key_to_check, conversation_id.encode("utf-8")
)
Expand Down Expand Up @@ -668,9 +822,31 @@ def replace_id_with_name(match):
thread_ts=thread_ts,
text=f"Sorry, I'm having trouble processing your request. Please try again later. Error: {err_msg}",
)

except SlackApiError:
# Failed to send error message
pass

# Check if storage size limit exceeded and cleanup if needed
if "allocated size is greater than max storage size" in err_msg.lower():
try:
self._cleanup_storage(cleanup_percentage=0.5)
# Send cleanup notification
client.chat_postMessage(
channel=channel,
thread_ts=thread_ts,
text="🧹 Storage cleanup completed. Please try your request again.",
)
except Exception as cleanup_error:
print(f"Error during storage cleanup: {cleanup_error}")
try:
client.chat_postMessage(
channel=channel,
thread_ts=thread_ts,
text="⚠️ Storage cleanup failed. Please contact administrator.",
)
except SlackApiError:
pass

return Response(
status=200,
Expand Down
2 changes: 1 addition & 1 deletion manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ resource:
enabled: true
storage:
enabled: true
size: 1048576
size: 10485760
plugins:
endpoints:
- group/slack.yaml
Expand Down