-
Notifications
You must be signed in to change notification settings - Fork 82
feat(clp-mcp-server): Implement MCP API Tool Calls: get_nth_page and get_instructions #1401
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 37 commits
a76ff24
697f366
70dff32
0d8b06a
c5ac54e
d326f76
37f2c41
47d0301
2201266
aa37f1a
a3076b6
d3d7830
ec0f2bc
21e0386
edf68e4
3d2be57
e1c1b3b
7844653
97e7f23
d3ccf3e
15315ec
8ce8a38
6faeba9
466e242
cccd8ec
b7c3b87
9c6b881
184c94e
07aa3f8
1c2d810
02c706f
da0b2e3
142573a
6f426ca
c8816a5
702837c
e267ec4
ed7765b
9e73062
01c6a05
67d183e
ce744d7
4fc5cc3
23797fd
0644fb7
662b47d
1a53ad3
862aa42
b79f681
134591c
50e5db3
1cad6b0
2683bcf
22de748
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| """Constants for CLP MCP server.""" | ||
|
|
||
| EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS = 600 # 10 minutes | ||
| ITEM_PER_PAGE = 10 | ||
| MAX_CACHED_RESULTS = 1000 | ||
| SESSION_TTL_MINUTES = 60 | ||
|
|
||
| SERVER_NAME = "clp-mcp-server" | ||
| SYSTEM_PROMPT = ( | ||
| "You are an AI assistant that helps users query a log database using KQL " | ||
| "(Kibana Query Language). When given a user query, you should generate a KQL " | ||
| "query that accurately captures the user's intent. The KQL query should be as " | ||
| "specific as possible to minimize the number of log messages returned. " | ||
| "You should also consider the following guidelines when generating KQL queries: " | ||
| "- Use specific field names and values to narrow down the search. " | ||
| "- Avoid using wildcards (*) unless absolutely necessary, as they can lead to " | ||
| "large result sets. - Use logical operators (AND, OR, NOT) to combine multiple " | ||
| "conditions. - Consider the time range of the logs you are searching. If the " | ||
| "user specifies a time range, include it in the KQL query. - If the user query " | ||
| "is ambiguous or lacks detail, ask clarifying questions to better understand " | ||
| "their intent before generating the KQL query. - Always ensure that the " | ||
| "generated KQL query is syntactically correct and can be executed without errors. " | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,223 @@ | ||
| """Session management for CLP MCP Server.""" | ||
|
|
||
| import threading | ||
| import time | ||
| from dataclasses import dataclass, field | ||
| from datetime import datetime, timedelta, timezone | ||
| from typing import Any | ||
|
|
||
| from paginate import Page | ||
|
|
||
| from .server import constants | ||
20001020ycx marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| class PaginatedQueryResult: | ||
| """Paginates the cached log entries returned from a query's response.""" | ||
|
|
||
| def __init__(self, result_log_entries: list[str], num_items_per_page: int) -> None: | ||
| """ | ||
| Initializes the QueryResultPaginator. | ||
| :param result_log_entries: List of cached log entries to paginate. | ||
| :param num_items_per_page: | ||
| :raise: ValueError if the number of cached results or num_items_per_page is invalid. | ||
| """ | ||
| if len(result_log_entries) > constants.MAX_CACHED_RESULTS: | ||
| err_msg = ( | ||
| f"QueryResultPaginator exceeds maximum allowed cached results:" | ||
| f" {len(result_log_entries)} > {constants.MAX_CACHED_RESULTS}." | ||
| ) | ||
| raise ValueError(err_msg) | ||
|
|
||
20001020ycx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if num_items_per_page <= 0: | ||
| err_msg = ( | ||
| f"Invalid num_items_per_page: {num_items_per_page}, " | ||
| "it must be a positive integer. " | ||
20001020ycx marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ) | ||
| raise ValueError(err_msg) | ||
|
|
||
| self.result_log_entries = result_log_entries | ||
| self.num_items_per_page = num_items_per_page | ||
|
|
||
|
Comment on lines
+38
to
+40
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Optional: freeze cached results to avoid accidental mutation Store cached entries as a tuple to uphold the “cached” contract. Page works with sequences. - self.result_log_entries = result_log_entries
+ # Ensure immutability of cached results
+ self.result_log_entries = tuple(result_log_entries)Also applies to: 56-60 🤖 Prompt for AI Agents |
||
| self._num_pages = (len(result_log_entries) + num_items_per_page - 1) // num_items_per_page | ||
|
|
||
| def get_page(self, page_index: int) -> Page | None: | ||
| """ | ||
| Returns a page from the cached query results. | ||
| :param page_index: The number of page to retrieve (zero-based index; 0 is the first page). | ||
| :return: A `Page` object for the specified page. | ||
| :return: None if `page_index` is out of bounds. | ||
|
Comment on lines
+47
to
+49
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Docstring phrasing: use “page index” consistently. Tighten wording and keep zero-based note consistent across methods. - :param page_index: The number of page to retrieve (zero-based index; 0 is the first page).
+ :param page_index: The page index to retrieve (zero-based; 0 is the first page).Apply the same change in SessionState.get_page_data and SessionManager.get_nth_page. Also applies to: 102-107, 224-226 🤖 Prompt for AI Agents |
||
| """ | ||
| page_number = page_index + 1 # Convert zero-based to one-based | ||
20001020ycx marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if page_number <= 0 or self._num_pages < page_number: | ||
| return None | ||
|
|
||
| return Page( | ||
| self.result_log_entries, | ||
| page=page_number, | ||
| items_per_page=self.num_items_per_page, | ||
| ) | ||
|
|
||
|
|
||
| @dataclass | ||
| class SessionState: | ||
| """Represents the state of a user session.""" | ||
|
|
||
| session_id: str | ||
| _num_items_per_page: int | ||
| _session_ttl_minutes: int | ||
|
|
||
| is_instructions_retrieved: bool = False | ||
| last_accessed: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) | ||
| _cached_query_result: PaginatedQueryResult | None = None | ||
|
|
||
| def cache_query_result( | ||
| self, | ||
| results: list[str], | ||
| ) -> None: | ||
| """ | ||
| Caches the latest query result of the session. | ||
| :param results: Complete log entries from previous query for caching. | ||
| """ | ||
| self._cached_query_result = PaginatedQueryResult( | ||
20001020ycx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| result_log_entries=results, num_items_per_page=self._num_items_per_page | ||
| ) | ||
20001020ycx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def get_page_data(self, page_index: int) -> dict[str, Any]: | ||
| """ | ||
| Gets page data and its metadata in a dictionary format. | ||
| :param page_index: The number of page to retrieve (zero-based index; 0 is the first page). | ||
| :return: Dictionary containing paged log entries and the paging metadata if the | ||
| page `page_index` can be retrieved. | ||
| :return: Dictionary with ``{"Error": "error message describing the failure"}`` if fails to | ||
| retrieve page `page_index`. | ||
| """ | ||
| if self._cached_query_result is None: | ||
| return {"Error": "No previous paginated response in this session."} | ||
|
|
||
| page = self._cached_query_result.get_page(page_index) | ||
| if page is None: | ||
| return {"Error": "Page index is out of bounds."} | ||
20001020ycx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
Comment on lines
+108
to
+114
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Optional: friendlier empty-results behaviour. Consider allowing index 0 to return an empty page (with correct metadata) when no results are cached; current behaviour returns “out of bounds.” |
||
| return { | ||
| "items": list(page), | ||
| "total_pages": page.page_count, | ||
| "total_items": page.item_count, | ||
| "num_items_per_page": page.items_per_page, | ||
| "has_next": page.next_page is not None, | ||
| "has_previous": page.previous_page is not None, | ||
| } | ||
|
|
||
| def is_expired(self) -> bool: | ||
| """:return: Whether the session has expired.""" | ||
| time_diff = datetime.now(timezone.utc) - self.last_accessed | ||
| return time_diff > timedelta(minutes=self._session_ttl_minutes) | ||
|
|
||
| def update_access_time(self) -> None: | ||
| """Updates the last accessed timestamp.""" | ||
| self.last_accessed = datetime.now(timezone.utc) | ||
|
|
||
|
|
||
| class SessionManager: | ||
| """ | ||
| Session manager for concurrent user sessions. | ||
| `SessionManger` respects MCP Server Concurrency Model, that is: | ||
| The server supports multiple concurrent clients, where each client only makes synchronous | ||
| API calls. | ||
| This model leads to the following design decisions: | ||
| sessions is a shared variable as there may be multiple session attached to the MCP server | ||
| session state is NOT a shared variable because each session is accessed by only one | ||
| connection at a time because API calls for a single session are synchronous. | ||
| """ | ||
20001020ycx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def __init__(self, session_ttl_minutes: int) -> None: | ||
| """ | ||
| Initializes the SessionManager and starts background cleanup thread. | ||
| :param session_ttl_minutes: Session time-to-live in minutes. | ||
| """ | ||
| self.sessions: dict[str, SessionState] = {} | ||
| self._session_ttl_minutes = session_ttl_minutes | ||
| self._sessions_lock = threading.Lock() | ||
| self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) | ||
| self._cleanup_thread.start() | ||
|
|
||
|
Comment on lines
+151
to
+162
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick | 🔵 Trivial Make cleanup thread optional and stoppable (testability, clean shutdown) Always starting a daemon thread complicates tests and shutdown. Add a flag and stop() with an Event. - def __init__(self, session_ttl_minutes: int) -> None:
+ def __init__(self, session_ttl_minutes: int, start_cleanup_thread: bool = True) -> None:
"""
Initializes the SessionManager and starts background cleanup thread.
:param session_ttl_minutes: Session time-to-live in minutes.
"""
self.sessions: dict[str, SessionState] = {}
self._session_ttl_minutes = session_ttl_minutes
self._sessions_lock = threading.Lock()
- self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
- self._cleanup_thread.start()
+ self._stop_event = threading.Event()
+ self._cleanup_thread: threading.Thread | None = None
+ if start_cleanup_thread:
+ self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
+ self._cleanup_thread.start()
def _cleanup_loop(self) -> None:
"""Cleans up all expired sessions periodically in a separate cleanup thread."""
- while True:
- time.sleep(constants.EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS)
- self.cleanup_expired_sessions()
+ while not self._stop_event.is_set():
+ # Wait allows prompt exit when stop() is called
+ self._stop_event.wait(constants.EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS)
+ if self._stop_event.is_set():
+ break
+ self.cleanup_expired_sessions()
+
+ def stop(self) -> None:
+ """Stops the background cleanup thread (primarily for tests)."""
+ if hasattr(self, "_stop_event"):
+ self._stop_event.set()
+ t = getattr(self, "_cleanup_thread", None)
+ if t is not None and t.is_alive():
+ t.join(timeout=1)Also applies to: 190-195 |
||
| def _cleanup_loop(self) -> None: | ||
| """Cleans up all expired sessions periodically in a separate cleanup thread.""" | ||
| while True: | ||
| time.sleep(constants.EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS) | ||
| self.cleanup_expired_sessions() | ||
|
|
||
| def cleanup_expired_sessions(self) -> None: | ||
| """Cleans up all expired sessions.""" | ||
| with self._sessions_lock: | ||
| expired_sessions = [ | ||
| sid for sid, session in self.sessions.items() if session.is_expired() | ||
| ] | ||
|
|
||
| for sid in expired_sessions: | ||
| del self.sessions[sid] | ||
|
|
||
| def get_or_create_session(self, session_id: str) -> SessionState: | ||
| """ | ||
| Gets an existing session or creates a new one. | ||
| :param session_id: Unique identifier for the session. | ||
| :return: The SessionState object for the given session_id. | ||
| """ | ||
| with self._sessions_lock: | ||
| if session_id in self.sessions and self.sessions[session_id].is_expired(): | ||
| del self.sessions[session_id] | ||
|
|
||
| if session_id not in self.sessions: | ||
| self.sessions[session_id] = SessionState( | ||
| session_id, constants.ITEM_PER_PAGE, self._session_ttl_minutes | ||
| ) | ||
|
|
||
| session = self.sessions[session_id] | ||
|
|
||
| session.update_access_time() | ||
| return session | ||
|
|
||
| def cache_query_result(self, session_id: str, query_results: list[str]) -> dict[str, Any]: | ||
| """ | ||
| Caches query results for a session and returns the first page and the paging metadata. | ||
| :param session_id: Unique identifier for the session. | ||
| :param query_results: Complete log entries from previous query for caching. | ||
| :return: Dictionary containing the first page log entries and the paging metadata if the | ||
| first page can be retrieved. | ||
| :return: Dictionary with ``{"Error": "error message describing the failure"}`` if fails to | ||
| retrieve the first page. | ||
| """ | ||
| session = self.get_or_create_session(session_id) | ||
| if session.is_instructions_retrieved is False: | ||
| return { | ||
| "Error": "Please call get_instructions() first " | ||
| "to understand how to use this MCP server." | ||
| } | ||
20001020ycx marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| session.cache_query_result(results=query_results) | ||
|
|
||
| return session.get_page_data(0) | ||
|
|
||
| def get_nth_page(self, session_id: str, page_index: int) -> dict[str, Any]: | ||
| """ | ||
| Retrieves the n-th page of a paginated response with the paging metadata from the previous | ||
| query. | ||
| :param session_id: Unique identifier for the session. | ||
| :param page_index: The number of page to retrieve (zero-based index; 0 is the first page). | ||
| :return: Forwards `SessionState.get_page_data`'s return values. | ||
| """ | ||
| session = self.get_or_create_session(session_id) | ||
| if session.is_instructions_retrieved is False: | ||
| return { | ||
| "Error": "Please call get_instructions() first " | ||
| "to understand how to use this MCP server." | ||
| } | ||
|
|
||
| return session.get_page_data(page_index) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| """Test Package for CLP MCP Server.""" |
Uh oh!
There was an error while loading. Please reload this page.