Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 178 additions & 2 deletions endpoints/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError


# ref: https://github.com/fla9ua/markdown_to_mrkdwn
class SlackMarkdownConverter:
"""
Expand Down Expand Up @@ -193,7 +192,121 @@ def _convert_line(self, line: str) -> str:

class SlackEndpoint(Endpoint):
CACHE_PREFIX = "thread-cache"
CACHE_DURATION = 60 * 60 * 24 # 1 day
CACHE_DURATION = 60 * 60 * 24 # 24 hour
CLEANUP_INTERVAL = 60 * 60 * 24 # 24 hours

def _add_key_to_registry(self, key: str):
"""キーをレジストリに追加"""
if key == "__keys__": # 無限ループ防止
return

try:
# 現在のレジストリを取得
registry = self._get_key_registry()

# キーを追加(既存の場合は時間を更新)
registry[key] = int(time.time()) # last_update時刻
registry["__last_updated"] = int(time.time())

# レジストリを保存
self.session.storage.set("__keys__", json.dumps(registry).encode("utf-8"))

except Exception as e:
print(f"Error adding key to registry: {e}")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

レジストリへのキー追加にスレッドセーフティを考慮してください

複数のスレッドが同時にレジストリにアクセスする可能性があるため、競合状態が発生する可能性があります。また、エラーハンドリングがコンソール出力のみなので、本番環境では不十分かもしれません。

スレッドロックの追加を検討してください:

import threading

class SlackEndpoint(Endpoint):
    def __init__(self):
        self._registry_lock = threading.Lock()
    
    def _add_key_to_registry(self, key: str):
        """キーをレジストリに追加"""
        if key == "__keys__":
            return
        
        with self._registry_lock:
            try:
                # 現在のレジストリを取得
                registry = self._get_key_registry()
                # ... rest of the code
🤖 Prompt for AI Agents
In endpoints/slack.py around lines 198 to 216, the _add_key_to_registry method
is not thread-safe and only prints errors to the console. To fix this, add a
threading.Lock instance (e.g., self._registry_lock) in the class constructor to
synchronize access to the registry. Then, wrap the registry access and update
code inside a with self._registry_lock block to prevent race conditions. Also,
replace the print statement with proper logging or error handling suitable for
production environments.

def _remove_key_from_registry(self, key: str):
"""キーをレジストリから削除"""
if key == "__keys__":
return

try:
registry = self._get_key_registry()

if key in registry:
del registry[key]
registry["__last_updated"] = int(time.time())
self.session.storage.set("__keys__", json.dumps(registry).encode("utf-8"))

except Exception as e:
print(f"Error removing key from registry: {e}")

def _get_key_registry(self) -> dict:
"""キーレジストリを取得"""
try:
raw = self.session.storage.get("__keys__")
if raw:
return json.loads(raw.decode("utf-8"))
else:
return {"__last_updated": int(time.time())}
except Exception:
return {"__last_updated": int(time.time())}

def _get_all_keys(self) -> List[str]:
"""全てのキーを取得(__last_updated以外)"""
try:
registry = self._get_key_registry()
return [k for k in registry.keys() if k != "__last_updated"]
except Exception:
return []

def _cleanup_storage(self, cleanup_percentage: float = 0.5):
"""storageをクリーンアップ(古いキーから削除)"""
try:
registry = self._get_key_registry()

# __last_updated以外のキーを取得
keys_with_time = [(k, v) for k, v in registry.items() if k != "__last_updated"]

if not keys_with_time:
return

# 時刻で昇順ソート(古いものから)
keys_with_time.sort(key=lambda x: x[1])

# 削除するキーの数を計算
total_keys = len(keys_with_time)
keys_to_delete = int(total_keys * cleanup_percentage)

if keys_to_delete > 0:
# 古いキーから削除
for i in range(keys_to_delete):
key_to_delete = keys_with_time[i][0]
try:
self.session.storage.delete(key_to_delete)
self._remove_key_from_registry(key_to_delete)
except Exception as e:
print(f"Error deleting key {key_to_delete}: {e}")

print(f"Storage cleanup completed: deleted {keys_to_delete} keys out of {total_keys}")

except Exception as e:
print(f"Error during storage cleanup: {e}")

def _should_cleanup_storage(self) -> bool:
"""24時間間隔でストレージクリーンアップが必要かチェック"""
try:
registry = self._get_key_registry()
last_cleanup = registry.get("__last_cleanup", 0)
current_time = int(time.time())
return (current_time - last_cleanup) >= self.CLEANUP_INTERVAL
except Exception:
return False

def _periodic_cleanup_if_needed(self):
"""必要に応じて定期クリーンアップを実行(30%削除)"""
if self._should_cleanup_storage():
try:
# 軽めのクリーンアップ(30%削除)
self._cleanup_storage(cleanup_percentage=0.3)

# クリーンアップ時刻を記録
registry = self._get_key_registry()
registry["__last_cleanup"] = int(time.time())
self.session.storage.set("__keys__", json.dumps(registry).encode("utf-8"))

print("Periodic storage cleanup completed")
except Exception as e:
print(f"Error during periodic cleanup: {e}")

def _load_cached_history(self, channel: str, thread_ts: str):
key = f"{self.CACHE_PREFIX}-{channel}-{thread_ts}"
Expand All @@ -212,6 +325,7 @@ def _load_cached_history(self, channel: str, thread_ts: str):
data["messages"] = messages
data["last_cleanup"] = now
try:
self._add_key_to_registry(key)
self.session.storage.set(key, json.dumps(data).encode("utf-8"))
except Exception:
pass
Expand All @@ -235,6 +349,7 @@ def _append_thread_message(self, channel: str, thread_ts: str, message: Mapping)
data["messages"].append(msg)
data["last_cleanup"] = now
try:
self._add_key_to_registry(key)
self.session.storage.set(key, json.dumps(data).encode("utf-8"))
except Exception:
pass
Expand All @@ -243,6 +358,9 @@ def _invoke(self, r: Request, values: Mapping, settings: Mapping) -> Response:
"""
Invokes the endpoint with the given request.
"""
# 定期クリーンアップチェック
self._periodic_cleanup_if_needed()

# Check if this is a retry and if we should ignore it
retry_num = r.headers.get("X-Slack-Retry-Num")
if not settings.get("allow_retry") and (
Expand Down Expand Up @@ -286,6 +404,41 @@ def _invoke(self, r: Request, values: Mapping, settings: Mapping) -> Response:
token = settings.get("bot_token")
client = WebClient(token=token)

# Check if this is a cache deletion request
if message.strip().lower() == "delcache":
# Delete thread cache
cache_key = f"{self.CACHE_PREFIX}-{channel}-{thread_ts}"
conversation_key = f"slack-{channel}-{thread_ts}"

try:
# Delete both cache and conversation storage
self._remove_key_from_registry(cache_key)
self.session.storage.delete(cache_key)
self._remove_key_from_registry(conversation_key)
self.session.storage.delete(conversation_key)

# Send confirmation message
client.chat_postMessage(
channel=channel,
thread_ts=thread_ts,
text=f"✅ Thread cache has been successfully deleted. \n{cache_key} \n{conversation_key}"
)
except Exception as e:
print(f"Error deleting cache: {e}")
# Send error message
try:
client.chat_postMessage(
channel=channel,
thread_ts=thread_ts,
text=f"❌ Error deleting cache: {str(e)}"
)
except SlackApiError:
pass

return Response(
status=200, response="ok", content_type="text/plain"
)

# store the incoming app mention message
self._append_thread_message(
channel,
Expand Down Expand Up @@ -543,6 +696,7 @@ def replace_id_with_name(match):
answer = response.get("answer")
conversation_id = response.get("conversation_id")
if conversation_id:
self._add_key_to_registry(key_to_check)
self.session.storage.set(
key_to_check, conversation_id.encode("utf-8")
)
Expand Down Expand Up @@ -668,9 +822,31 @@ def replace_id_with_name(match):
thread_ts=thread_ts,
text=f"Sorry, I'm having trouble processing your request. Please try again later. Error: {err_msg}",
)

except SlackApiError:
# Failed to send error message
pass

# Check if storage size limit exceeded and cleanup if needed
if "allocated size is greater than max storage size" in err_msg.lower():
try:
self._cleanup_storage(cleanup_percentage=0.5)
# Send cleanup notification
client.chat_postMessage(
channel=channel,
thread_ts=thread_ts,
text="🧹 Storage cleanup completed. Please try your request again.",
)
except Exception as cleanup_error:
print(f"Error during storage cleanup: {cleanup_error}")
try:
client.chat_postMessage(
channel=channel,
thread_ts=thread_ts,
text="⚠️ Storage cleanup failed. Please contact administrator.",
)
except SlackApiError:
pass

return Response(
status=200,
Expand Down
2 changes: 1 addition & 1 deletion manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ resource:
enabled: true
storage:
enabled: true
size: 1048576
size: 10485760
plugins:
endpoints:
- group/slack.yaml
Expand Down