-
Notifications
You must be signed in to change notification settings - Fork 82
feat(clp-mcp-server): Implement MCP API Tool Calls: get_nth_page and get_instructions #1401
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(clp-mcp-server): Implement MCP API Tool Calls: get_nth_page and get_instructions #1401
Conversation
WalkthroughAdds server constants, implements a session-based paginated cache with TTL and background cleanup, integrates session handling into server tools ( Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant Server
participant SessionMgr as SessionManager
note over Server,SessionMgr: Retrieve system instructions and mark session
Client->>Server: get_instructions(ctx)
Server->>SessionMgr: get_or_create_session(ctx.session_id)
SessionMgr-->>Server: SessionState (access updated)
Server->>SessionMgr: mark instructions retrieved
Server-->>Client: constants.SYSTEM_PROMPT
sequenceDiagram
autonumber
participant Client
participant Server
participant SessionMgr as SessionManager
note over Client,Server: Request paginated results
Client->>Server: get_nth_page(page_index, ctx)
Server->>SessionMgr: get_nth_page(ctx.session_id, page_index)
alt page available
SessionMgr-->>Server: { items, total_pages, total_items, items_per_page, has_next, has_previous }
Server-->>Client: Page data
else error / no cache / out-of-range / instructions missing
SessionMgr-->>Server: { error, message }
Server-->>Client: Error payload
end
sequenceDiagram
autonumber
participant Producer
participant SessionMgr as SessionManager
participant Session as SessionState
note over Producer,SessionMgr: Cache results and background cleanup
Producer->>SessionMgr: cache_query_result(session_id, results)
SessionMgr->>Session: cache_query_result(results)
Session-->>SessionMgr: first page payload
par Background cleanup
SessionMgr->>SessionMgr: periodic cleanup_expired_sessions()
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: ASSERTIVE Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧬 Code graph analysis (1)components/clp-mcp-server/clp_mcp_server/server/session_manager.py (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
components/clp-mcp-server/uv.lock
is excluded by!**/*.lock
📒 Files selected for processing (6)
components/clp-mcp-server/clp_mcp_server/server/constants.py
(1 hunks)components/clp-mcp-server/clp_mcp_server/server/server.py
(3 hunks)components/clp-mcp-server/clp_mcp_server/server/session_manager.py
(1 hunks)components/clp-mcp-server/pyproject.toml
(3 hunks)components/clp-mcp-server/tests/__init__.py
(1 hunks)components/clp-mcp-server/tests/test_session_manager.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (2)
components/clp-mcp-server/clp_mcp_server/server/constants.py (1)
CLPMcpConstants
(4-27)components/clp-mcp-server/clp_mcp_server/server/server.py (1)
get_nth_page
(37-46)
components/clp-mcp-server/tests/test_session_manager.py (2)
components/clp-mcp-server/clp_mcp_server/server/constants.py (1)
CLPMcpConstants
(4-27)components/clp-mcp-server/clp_mcp_server/server/session_manager.py (11)
QueryResult
(15-52)SessionManager
(113-206)SessionState
(56-110)get_page
(38-52)get_page_data
(79-101)cache_query_result
(66-77)cache_query_result
(168-187)is_expired
(103-106)get_or_create_session
(147-166)get_nth_page
(189-206)cleanup_expired_sessions
(137-145)
components/clp-mcp-server/clp_mcp_server/server/server.py (2)
components/clp-mcp-server/clp_mcp_server/server/constants.py (1)
CLPMcpConstants
(4-27)components/clp-mcp-server/clp_mcp_server/server/session_manager.py (3)
SessionManager
(113-206)get_or_create_session
(147-166)get_nth_page
(189-206)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: package-image
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: lint-check (macos-15)
- GitHub Check: rust-checks (ubuntu-24.04)
- GitHub Check: rust-checks (macos-15)
- GitHub Check: rust-checks (ubuntu-22.04)
🔇 Additional comments (16)
components/clp-mcp-server/tests/__init__.py (1)
1-1
: OK to add package marker.Docstring-only module is fine.
components/clp-mcp-server/pyproject.toml (1)
74-78
: Per-file ignores for tests are appropriate.Allowing S101 and TC003 in tests is reasonable.
components/clp-mcp-server/clp_mcp_server/server/constants.py (1)
7-11
: Constants look sane.Values for cleanup cadence, TTL, paging, and cache limits are reasonable defaults.
components/clp-mcp-server/clp_mcp_server/server/server.py (4)
19-22
: Server name + SessionManager init LGTM.Using CLPMcpConstants and instantiating SessionManager with TTL is correct.
23-35
: get_instructions sets session flag and returns prompt — good.Meets the “must call first” design.
48-60
: hello_world update to constants — good.Simple, safe change.
36-47
: Verify@mcp.tool
usage andctx
argument position
- Confirm whether the FastMCP API requires
@mcp.tool()
instead of@mcp.tool
.- Ensure
get_nth_page
declaresctx: Context
as its first parameter if context injection assumes a leadingctx
argument.components/clp-mcp-server/tests/test_session_manager.py (3)
31-40
: Validation of MAX_CACHED_RESULTS bound is good.Covers over-limit case.
77-127
: Strong coverage for get_page_data paths.Covers empty-cache, first/middle/last pages, and OOB.
185-205
: Good: get_nth_page happy path and error cases.Covers OOB index and missing cached results.
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (6)
1-11
: LGTM!The imports are well-organized and appropriate for the session management functionality.
14-52
: LGTM!The QueryResult implementation is solid:
- Proper validation of cached results against MAX_CACHED_RESULTS
- Correct use of
object.__setattr__
for setting computed field in frozen dataclass- Accurate ceiling division for total pages calculation
- Appropriate bounds checking for 1-based page indexing
55-110
: LGTM!The SessionState implementation handles all state management correctly:
- Proper use of timezone-aware datetime for tracking access time
- Clear error messages for missing cached results and out-of-bounds pages
- Correct TTL expiration logic using timedelta comparison
113-145
: LGTM!The thread-safe session management implementation is well-designed:
- Appropriate use of
threading.Lock
to protect the shared sessions dictionary- Background daemon thread for periodic cleanup is a good approach
- The cleanup logic correctly identifies and removes expired sessions under lock protection
- Clear comments explain the threading model
147-166
: LGTM!The session lifecycle management is correct:
- Properly handles expired session cleanup before reuse
- Creates new sessions with appropriate defaults from CLPMcpConstants
- Updates access time on every retrieval to maintain accurate TTL
- Lock protection ensures thread-safe session dictionary operations
189-206
: LGTM!The pagination interface correctly:
- Enforces the get_instructions prerequisite check
- Converts zero-based page_index (external API) to one-based page_number (internal representation)
- Delegates to session.get_page_data for the actual page retrieval
@pytest.mark.repeat(10) | ||
def test_thread_safety_cleanup_and_get_or_create_session(self) -> None: | ||
"""Validates thread safety of cleanup_expired_sessions and get_or_create_session.""" | ||
manager = SessionManager(session_ttl_minutes=10) | ||
|
||
def cleanup_task() -> None: | ||
"""Continuously expires some sessions and cleans up expired sessions.""" | ||
for _ in range(10000): | ||
for i in range(50): | ||
session = manager.get_or_create_session(f"session_{i}") | ||
if i < TestConstants.SAMPLE_RESULTS_COUNT_25: | ||
session.last_accessed = datetime.now(timezone.utc) - timedelta(minutes=20) | ||
manager.cleanup_expired_sessions() | ||
|
||
def access_task() -> None: | ||
"""Continuously creates and accesses sessions.""" | ||
for i in range(10000): | ||
session_id = f"session_{i % 50}" | ||
manager.get_or_create_session(session_id) | ||
|
||
# Run cleanup and access operations concurrently | ||
with ThreadPoolExecutor(max_workers=10) as executor: | ||
futures = [] | ||
futures.append(executor.submit(cleanup_task)) | ||
futures.append(executor.submit(access_task)) | ||
|
||
for future in futures: | ||
future.result() # ensure thread completion with no run time exceptions | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stress test is extremely heavy; likely to be slow/flaky in CI.
- repeat(10) × 10,000-iteration loops × threads is excessive.
- Reduce iterations and/or remove repeat, or gate behind an env var/marker.
Suggested lighter variant:
- @pytest.mark.repeat(10)
+ # Consider gating with: @pytest.mark.slow or an env flag
def test_thread_safety_cleanup_and_get_or_create_session(self) -> None:
@@
- with ThreadPoolExecutor(max_workers=10) as executor:
+ with ThreadPoolExecutor(max_workers=4) as executor:
@@
- for _ in range(10000):
+ for _ in range(500):
for i in range(50):
session = manager.get_or_create_session(f"session_{i}")
if i < TestConstants.SAMPLE_RESULTS_COUNT_25:
session.last_accessed = datetime.now(timezone.utc) - timedelta(minutes=20)
manager.cleanup_expired_sessions()
@@
- for i in range(10000):
+ for i in range(1000):
session_id = f"session_{i % 50}"
manager.get_or_create_session(session_id)
If you prefer to keep repeat, drop it to 2–3.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
@pytest.mark.repeat(10) | |
def test_thread_safety_cleanup_and_get_or_create_session(self) -> None: | |
"""Validates thread safety of cleanup_expired_sessions and get_or_create_session.""" | |
manager = SessionManager(session_ttl_minutes=10) | |
def cleanup_task() -> None: | |
"""Continuously expires some sessions and cleans up expired sessions.""" | |
for _ in range(10000): | |
for i in range(50): | |
session = manager.get_or_create_session(f"session_{i}") | |
if i < TestConstants.SAMPLE_RESULTS_COUNT_25: | |
session.last_accessed = datetime.now(timezone.utc) - timedelta(minutes=20) | |
manager.cleanup_expired_sessions() | |
def access_task() -> None: | |
"""Continuously creates and accesses sessions.""" | |
for i in range(10000): | |
session_id = f"session_{i % 50}" | |
manager.get_or_create_session(session_id) | |
# Run cleanup and access operations concurrently | |
with ThreadPoolExecutor(max_workers=10) as executor: | |
futures = [] | |
futures.append(executor.submit(cleanup_task)) | |
futures.append(executor.submit(access_task)) | |
for future in futures: | |
future.result() # ensure thread completion with no run time exceptions | |
# Consider gating with: @pytest.mark.slow or an env flag | |
def test_thread_safety_cleanup_and_get_or_create_session(self) -> None: | |
"""Validates thread safety of cleanup_expired_sessions and get_or_create_session.""" | |
manager = SessionManager(session_ttl_minutes=10) | |
def cleanup_task() -> None: | |
"""Continuously expires some sessions and cleans up expired sessions.""" | |
for _ in range(500): | |
for i in range(50): | |
session = manager.get_or_create_session(f"session_{i}") | |
if i < TestConstants.SAMPLE_RESULTS_COUNT_25: | |
session.last_accessed = datetime.now(timezone.utc) - timedelta(minutes=20) | |
manager.cleanup_expired_sessions() | |
def access_task() -> None: | |
"""Continuously creates and accesses sessions.""" | |
for i in range(1000): | |
session_id = f"session_{i % 50}" | |
manager.get_or_create_session(session_id) | |
# Run cleanup and access operations concurrently | |
with ThreadPoolExecutor(max_workers=4) as executor: | |
futures = [] | |
futures.append(executor.submit(cleanup_task)) | |
futures.append(executor.submit(access_task)) | |
for future in futures: | |
future.result() # ensure thread completion with no run time exceptions |
🤖 Prompt for AI Agents
In components/clp-mcp-server/tests/test_session_manager.py around lines 235 to
263, the stress test is too heavy for CI (repeat(10) plus 10,000-iteration loops
across threads) and should be reduced to avoid flakiness; change
pytest.mark.repeat(10) to repeat(2) or remove it, and reduce the large loop
counts (e.g., change 10,000 iterations to 1,000 or 2,000 and the inner cleanup
loop from 10,000 to 1,000) to keep concurrency coverage but cut runtime, or
alternatively mark the test with a slow/long-running pytest marker or gate it
behind an environment variable so it only runs in explicit stress test runs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (11)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (5)
58-59
: Empty result sets still fail bounds check.When
_num_pages
is 0 (empty cached_response), requesting page_index=0 converts to page_number=1, and the conditionself._num_pages < page_number
(0 < 1) becomes true, returning None. This causesget_page_data
to return "Page index is out of bounds." even for valid empty-result queries.Apply this diff to allow page 1 for empty results:
def get_page(self, page_index: int) -> Page | None: """ Returns a page from the cached query results. :param page_index: The number of page to retrieve (zero-based index; 0 is the first page). :return: A `Page` object for the specified page. :return: None if `page_index` is out of bounds. """ page_number = page_index + 1 # Convert zero-based to one-based + # Special case: allow page 1 for empty results + if self._num_pages == 0 and page_number == 1: + return Page( + self.cached_response, + page=1, + items_per_page=self.num_items_per_page, + ) if page_number <= 0 or self._num_pages < page_number: return None return Page( self.cached_response, page=page_number, items_per_page=self.num_items_per_page, )
138-145
: Move concurrency model documentation to class-level docstring.The multi-line docstring explaining the concurrency model is embedded within
__init__
, making it hard to discover. Move it to the class-level docstring for better visibility.Apply this diff:
class SessionManager: - """Session manager for concurrent user sessions.""" + """ + Session manager for concurrent user sessions. + + MCP Server Concurrency Model: + The server supports multiple concurrent clients, where each client only makes synchronous + API calls. This assumption leads to the following design decisions: + - sessions is a shared variable as there may be multiple sessions attached to the MCP server + - session state is NOT a shared variable because each session is accessed by only one + connection at a time because API calls for a single session are synchronous. + """ def __init__(self, session_ttl_minutes: int) -> None: """ Initializes the SessionManager and starts background cleanup thread. :param session_ttl_minutes: Session time-to-live in minutes. """ self.session_ttl_minutes = session_ttl_minutes - """ - MCP Server Concurrency Model: - The server supports multiple concurrent clients, where each client only makes synchronous - API calls. This assumptions leads to the following design decisions: - sessions is a shared variable as there may be multiple session attached to the MCP server - session state is NOT a shared variable because each session is accessed by only one - connection at a time because API calls for a single session are synchronous. - """ self._sessions_lock = threading.Lock()
148-155
: Make cleanup thread optional and stoppable for testability.The daemon thread always starts and runs an infinite loop, complicating tests and preventing clean shutdown. Add a flag to disable the thread and an Event-based stop mechanism.
Apply this diff:
- def __init__(self, session_ttl_minutes: int) -> None: + def __init__(self, session_ttl_minutes: int, start_cleanup_thread: bool = True) -> None: """ Initializes the SessionManager and starts background cleanup thread. :param session_ttl_minutes: Session time-to-live in minutes. + :param start_cleanup_thread: Whether to start the background cleanup thread. """ self.session_ttl_minutes = session_ttl_minutes self._sessions_lock = threading.Lock() self.sessions: dict[str, SessionState] = {} - self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) - self._cleanup_thread.start() + self._stop_event = threading.Event() + self._cleanup_thread: threading.Thread | None = None + if start_cleanup_thread: + self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) + self._cleanup_thread.start() def _cleanup_loop(self) -> None: """Cleans up all expired sessions periodically in a separate cleanup thread.""" - while True: - time.sleep(constants.EXPEIRED_SESSION_SWEEP_INTERVAL_SECONDS) + while not self._stop_event.is_set(): + self._stop_event.wait(constants.EXPEIRED_SESSION_SWEEP_INTERVAL_SECONDS) + if self._stop_event.is_set(): + break self.cleanup_expired_sessions() + + def stop(self) -> None: + """Stops the background cleanup thread (primarily for tests).""" + if hasattr(self, "_stop_event"): + self._stop_event.set() + t = getattr(self, "_cleanup_thread", None) + if t is not None and t.is_alive(): + t.join(timeout=1)
201-204
: Extract duplicated error message to module constant.The identical error dictionary appears in both
cache_query_result
andget_nth_page
. Extract it to reduce duplication.Apply this diff:
+# Error returned when get_instructions has not been called +_INSTRUCTIONS_REQUIRED_ERROR = { + "Error": "Please call get_instructions() first " + "to understand how to use this MCP server." +} + + @dataclass(frozen=True) class QueryResult: ... def cache_query_result(self, session_id: str, query_results: list[str]) -> dict[str, Any]: ... session = self.get_or_create_session(session_id) if session.is_instructions_retrieved is False: - return { - "Error": "Please call get_instructions() first " - "to understand how to use this MCP server." - } + return _INSTRUCTIONS_REQUIRED_ERROR.copy() session.cache_query_result(results=query_results) return session.get_page_data(0) def get_nth_page(self, session_id: str, page_index: int) -> dict[str, Any]: ... session = self.get_or_create_session(session_id) if session.is_instructions_retrieved is False: - return { - "Error": "Please call get_instructions() first " - "to understand how to use this MCP server." - } + return _INSTRUCTIONS_REQUIRED_ERROR.copy() return session.get_page_data(page_index)Also applies to: 221-224
206-208
: Missing error handling for oversized query results.Line 206 calls
session.cache_query_result(results=query_results)
, which creates aQueryResult
that raisesValueError
if results exceedMAX_CACHED_RESULTS
. The PR objectives state the server "caches up to 1,000 log entries," implying graceful handling rather than tool failure.Apply this diff to cap results before caching:
def cache_query_result(self, session_id: str, query_results: list[str]) -> dict[str, Any]: """ Caches query results for a session and returns the first page and the paging metadata. :param session_id: Unique identifier for the session. :param query_results: Complete log entries from previous query for caching. :return: Dictionary containing the first page log entries and the paging metadata if the first page can be retrieved. :return: Dictionary with ``{"Error": "error message describing the failure"}`` if fails to retrieve the first page. """ session = self.get_or_create_session(session_id) if session.is_instructions_retrieved is False: return { "Error": "Please call get_instructions() first " "to understand how to use this MCP server." } - session.cache_query_result(results=query_results) + # Cap results to MAX_CACHED_RESULTS to prevent ValueError + max_results = constants.MAX_CACHED_RESULTS + capped_results = ( + query_results[:max_results] if len(query_results) > max_results else query_results + ) + try: + session.cache_query_result(results=capped_results) + except ValueError as e: + # Defensive: return structured error if validation fails + return {"Error": str(e)} return session.get_page_data(0)components/clp-mcp-server/clp_mcp_server/server/constants.py (1)
9-23
: Improve SYSTEM_PROMPT formatting for readability.The bullet points run together without proper separation, making the prompt harder to read. The text "large result sets. - Use logical operators" lacks proper spacing.
Apply this diff to improve formatting:
SYSTEM_PROMPT = ( "You are an AI assistant that helps users query a log database using KQL " "(Kibana Query Language). When given a user query, you should generate a KQL " "query that accurately captures the user's intent. The KQL query should be as " "specific as possible to minimize the number of log messages returned. " - "You should also consider the following guidelines when generating KQL queries: " - "- Use specific field names and values to narrow down the search. " + "You should also consider the following guidelines when generating KQL queries:\n" + "- Use specific field names and values to narrow down the search.\n" "- Avoid using wildcards (*) unless absolutely necessary, as they can lead to " - "large result sets. - Use logical operators (AND, OR, NOT) to combine multiple " - "conditions. - Consider the time range of the logs you are searching. If the " + "large result sets.\n" + "- Use logical operators (AND, OR, NOT) to combine multiple conditions.\n" + "- Consider the time range of the logs you are searching. If the " "user specifies a time range, include it in the KQL query. - If the user query " - "is ambiguous or lacks detail, ask clarifying questions to better understand " - "their intent before generating the KQL query. - Always ensure that the " + "is ambiguous or lacks detail, ask clarifying questions to better understand\n" + "their intent before generating the KQL query.\n" + "- Always ensure that the " "generated KQL query is syntactically correct and can be executed without errors. " )components/clp-mcp-server/tests/test_session_manager.py (4)
52-52
: Replace magic numbers with TestConstants.ITEMS_PER_PAGE.Hardcoded ranges (10, 20) should derive from
TestConstants.ITEMS_PER_PAGE
to adapt if pagination size changes.Apply this diff:
# Test first page page1 = query_result.get_page(0) assert page1 is not None - assert list(page1) == [f"log_{i}" for i in range(10)] + assert list(page1) == [f"log_{i}" for i in range(TestConstants.ITEMS_PER_PAGE)] assert page1.page == 1 assert page1.page_count == TestConstants.EXPECTED_PAGES_25_ITEMS # Test second page page2 = query_result.get_page(1) assert page2 is not None - assert list(page2) == [f"log_{i}" for i in range(10, 20)] + assert list(page2) == [ + f"log_{i}" for i in range( + TestConstants.ITEMS_PER_PAGE, + 2 * TestConstants.ITEMS_PER_PAGE + ) + ] # Test last page page3 = query_result.get_page(2) assert page3 is not None assert ( list(page3) == - [f"log_{i}" for i in range(20, 25)] + [f"log_{i}" for i in range( + 2 * TestConstants.ITEMS_PER_PAGE, + TestConstants.SAMPLE_RESULTS_COUNT_25 + )] )Also applies to: 59-59, 65-67
109-111
: Derive second-page upper bound from TestConstants.ITEMS_PER_PAGE.The hardcoded 20 should be computed from
TestConstants.ITEMS_PER_PAGE
for maintainability.Apply this diff:
# Test second page page_data = session.get_page_data(1) assert page_data is not None assert ( page_data["items"] == - [f"log_{i}" for i in range(TestConstants.ITEMS_PER_PAGE, 20)] + [f"log_{i}" for i in range( + TestConstants.ITEMS_PER_PAGE, + 2 * TestConstants.ITEMS_PER_PAGE + )] )
191-193
: Compute second-page upper bound from TestConstants.ITEMS_PER_PAGE.Same issue: hardcoded 20 should derive from the constant.
Apply this diff:
# Get second page (index 1) page_data = manager.get_nth_page("test_session", 1) assert "Error" not in page_data assert ( page_data["items"] == - [f"log_{i}" for i in range(TestConstants.ITEMS_PER_PAGE, 20)] + [f"log_{i}" for i in range( + TestConstants.ITEMS_PER_PAGE, + 2 * TestConstants.ITEMS_PER_PAGE + )] )
235-262
: Reduce stress test intensity to avoid CI flakiness.The combination of
@pytest.mark.repeat(10)
with 10,000-iteration loops across 10 worker threads is excessive for CI. Reduce iterations or gate behind a marker.Apply this diff to lighten the test:
- @pytest.mark.repeat(10) + @pytest.mark.repeat(2) # Or remove entirely and rely on fewer iterations def test_thread_safety_cleanup_and_get_or_create_session(self) -> None: """Validates thread safety of cleanup_expired_sessions and get_or_create_session.""" manager = SessionManager(session_ttl_minutes=10) def cleanup_task() -> None: """Continuously expires some sessions and cleans up expired sessions.""" - for _ in range(10000): + for _ in range(1000): for i in range(50): session = manager.get_or_create_session(f"session_{i}") if i < TestConstants.SAMPLE_RESULTS_COUNT_25: session.last_accessed = datetime.now(timezone.utc) - timedelta(minutes=20) manager.cleanup_expired_sessions() def access_task() -> None: """Continuously creates and accesses sessions.""" - for i in range(10000): + for i in range(1000): session_id = f"session_{i % 50}" manager.get_or_create_session(session_id) # Run cleanup and access operations concurrently - with ThreadPoolExecutor(max_workers=10) as executor: + with ThreadPoolExecutor(max_workers=4) as executor: futures = [] futures.append(executor.submit(cleanup_task)) futures.append(executor.submit(access_task)) for future in futures: future.result() # ensure thread completion with no run time exceptionscomponents/clp-mcp-server/clp_mcp_server/server/server.py (1)
37-48
: Document precondition: get_instructions() must be called first.The SessionManager enforces that
get_instructions()
must be invoked beforeget_nth_page()
, but the tool docstring doesn't mention this requirement.Apply this diff:
@mcp.tool def get_nth_page(page_index: int, ctx: Context) -> dict[str, Any]: """ Retrieves the n-th page of a paginated response with the paging metadata from the previous query. :param page_index: Zero-based index, e.g., 0 for the first page. :param ctx: The `FastMCP` context containing the metadata of the underlying MCP session. :return: Dictionary containing the first page log entries and the paging metadata if the first page can be retrieved. :return: Dictionary with ``{"Error": "error message describing the failure"}`` if fails to - retrieve the first page."}``. + retrieve the first page. + + Note: Requires get_instructions() to have been called for this session; + otherwise an error is returned. """ return session_manager.get_nth_page(ctx.session_id, page_index)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (4)
components/clp-mcp-server/clp_mcp_server/server/constants.py
(1 hunks)components/clp-mcp-server/clp_mcp_server/server/server.py
(3 hunks)components/clp-mcp-server/clp_mcp_server/server/session_manager.py
(1 hunks)components/clp-mcp-server/tests/test_session_manager.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (1)
components/clp-mcp-server/clp_mcp_server/server/server.py (1)
get_nth_page
(37-49)
components/clp-mcp-server/clp_mcp_server/server/server.py (1)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (3)
SessionManager
(128-226)get_or_create_session
(167-186)get_nth_page
(210-226)
components/clp-mcp-server/tests/test_session_manager.py (1)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (11)
QueryResult
(15-65)SessionManager
(128-226)SessionState
(69-125)get_page
(49-65)get_page_data
(92-116)cache_query_result
(79-90)cache_query_result
(188-208)is_expired
(118-121)get_or_create_session
(167-186)get_nth_page
(210-226)cleanup_expired_sessions
(157-165)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: package-image
- GitHub Check: rust-checks (ubuntu-24.04)
- GitHub Check: rust-checks (ubuntu-22.04)
- GitHub Check: lint-check (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (8)
components/clp-mcp-server/clp_mcp_server/server/constants.py (1)
15-22
: Fix bullet formatting: insert newline after “large result sets.”Currently the next bullet runs together. Insert a newline to keep bullets on separate lines. Behaviour unchanged.
- "- Avoid using wildcards (*) unless absolutely necessary, as they can lead to " - "large result sets. - Use logical operators (AND, OR, NOT) to combine multiple " + "- Avoid using wildcards (*) unless absolutely necessary, as they can lead to " + "large result sets.\n" + "- Use logical operators (AND, OR, NOT) to combine multiple "components/clp-mcp-server/tests/test_session_manager.py (4)
49-67
: Avoid magic numbers; derive expectations from ITEMS_PER_PAGE.Use TestConstants.ITEMS_PER_PAGE to keep tests resilient to pagination changes.
- assert list(page1) == [f"log_{i}" for i in range(10)] + assert list(page1) == [f"log_{i}" for i in range(TestConstants.ITEMS_PER_PAGE)] @@ - assert list(page2) == [f"log_{i}" for i in range(10, 20)] + assert list(page2) == [ + f"log_{i}" for i in range( + TestConstants.ITEMS_PER_PAGE, + 2 * TestConstants.ITEMS_PER_PAGE + ) + ] @@ - assert ( - list(page3) == - [f"log_{i}" for i in range(20, 25)] - ) + assert list(page3) == [ + f"log_{i}" for i in range( + 2 * TestConstants.ITEMS_PER_PAGE, + TestConstants.SAMPLE_RESULTS_COUNT_25 + ) + ]
105-112
: Same: replace hard-coded 20 with computed upper bound.- page_data["items"] == - [f"log_{i}" for i in range(TestConstants.ITEMS_PER_PAGE, 20)] + page_data["items"] == [ + f"log_{i}" for i in range( + TestConstants.ITEMS_PER_PAGE, + 2 * TestConstants.ITEMS_PER_PAGE + ) + ]
187-191
: Same here: compute second-page slice from ITEMS_PER_PAGE.- page_data["items"] == - [f"log_{i}" for i in range(TestConstants.ITEMS_PER_PAGE, 20)] + page_data["items"] == [ + f"log_{i}" for i in range( + TestConstants.ITEMS_PER_PAGE, + 2 * TestConstants.ITEMS_PER_PAGE + ) + ]
233-260
: Stress test is too heavy; will be slow/flaky in CI.Drop repeat or gate as slow; reduce iteration counts to keep signal without timeouts.
- @pytest.mark.repeat(10) + # Consider gating: @pytest.mark.slow def test_thread_safety_cleanup_and_get_or_create_session(self) -> None: @@ - manager = SessionManager(session_ttl_minutes=10) + manager = SessionManager(session_ttl_minutes=10) @@ - for _ in range(10000): + for _ in range(500): for i in range(50): session = manager.get_or_create_session(f"session_{i}") if i < TestConstants.SAMPLE_RESULTS_COUNT_25: session.last_accessed = datetime.now(timezone.utc) - timedelta(minutes=20) manager.cleanup_expired_sessions() @@ - for i in range(10000): + for i in range(1000): session_id = f"session_{i % 50}" manager.get_or_create_session(session_id) @@ - with ThreadPoolExecutor(max_workers=10) as executor: + with ThreadPoolExecutor(max_workers=4) as executor:components/clp-mcp-server/clp_mcp_server/server/session_manager.py (3)
185-205
: Cap oversized results or handle ValueError to avoid tool failure.Exceeding MAX_CACHED_RESULTS raises ValueError in PaginatedQueryResult, bubbling out and breaking the tool. The PR states “caches up to 1,000”; cap or catch.
session = self.get_or_create_session(session_id) if session.is_instructions_retrieved is False: return { "Error": "Please call get_instructions() first " "to understand how to use this MCP server." } - session.cache_query_result(results=query_results) + max_n = constants.MAX_CACHED_RESULTS + capped = query_results[:max_n] if len(query_results) > max_n else query_results + try: + session.cache_query_result(results=capped) + except ValueError as e: + return {"Error": str(e)}
124-151
: Make cleanup thread optional and stoppable (testability, clean shutdown).Always starting an infinite daemon loop complicates tests and shutdown. Add a start flag and a stop() using an Event.
-class SessionManager: +class SessionManager: @@ - def __init__(self, session_ttl_minutes: int) -> None: + def __init__(self, session_ttl_minutes: int, start_cleanup_thread: bool = True) -> None: @@ - self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) - self._cleanup_thread.start() + self._stop_event = threading.Event() + self._cleanup_thread: threading.Thread | None = None + if start_cleanup_thread: + self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) + self._cleanup_thread.start() @@ - def _cleanup_loop(self) -> None: - """Cleans up all expired sessions periodically in a separate cleanup thread.""" - while True: - time.sleep(constants.EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS) - self.cleanup_expired_sessions() + def _cleanup_loop(self) -> None: + """Cleans up all expired sessions periodically in a separate cleanup thread.""" + while not self._stop_event.is_set(): + self._stop_event.wait(constants.EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS) + if self._stop_event.is_set(): + break + self.cleanup_expired_sessions() + + def stop(self) -> None: + """Stops the background cleanup thread.""" + if hasattr(self, "_stop_event"): + self._stop_event.set() + t = getattr(self, "_cleanup_thread", None) + if t is not None and t.is_alive(): + t.join(timeout=1)
52-60
: Empty results should return an empty first page, not out-of-bounds.When there are 0 items, page 0 should map to page_number 1 and return an empty Page. This matches typical UX and avoids spurious errors.
- page_number = page_index + 1 # Convert zero-based to one-based - if page_number <= 0 or self._num_pages < page_number: + page_number = page_index + 1 # zero-based -> one-based + if self._num_pages == 0: + if page_number != 1: + return None + return Page(self.result_log_entries, page=1, items_per_page=self.num_items_per_page) + if page_number <= 0 or page_number > self._num_pages: return None
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (3)
components/clp-mcp-server/clp_mcp_server/server/constants.py
(1 hunks)components/clp-mcp-server/clp_mcp_server/server/session_manager.py
(1 hunks)components/clp-mcp-server/tests/test_session_manager.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/clp-mcp-server/tests/test_session_manager.py (1)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (11)
PaginatedQueryResult
(14-60)SessionManager
(124-223)SessionState
(64-121)get_page
(44-60)get_page_data
(88-112)cache_query_result
(75-86)cache_query_result
(185-205)is_expired
(114-117)get_or_create_session
(164-183)get_nth_page
(207-223)cleanup_expired_sessions
(154-162)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (1)
components/clp-mcp-server/clp_mcp_server/server/server.py (1)
get_nth_page
(37-49)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: package-image
- GitHub Check: lint-check (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: rust-checks (macos-15)
- GitHub Check: rust-checks (ubuntu-22.04)
- GitHub Check: rust-checks (ubuntu-24.04)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/clp-mcp-server/clp_mcp_server/server/constants.py
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: package-image
- GitHub Check: lint-check (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: rust-checks (ubuntu-22.04)
- GitHub Check: rust-checks (ubuntu-24.04)
- GitHub Check: rust-checks (macos-15)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (3)
136-152
: Make cleanup thread optional and stoppable for testability.The cleanup thread always starts and runs an infinite loop with no clean shutdown mechanism. Past reviews suggested adding a flag to disable thread start and a
stop()
method for controlled exit, improving test stability and resource cleanup.Apply this diff:
- def __init__(self, session_ttl_minutes: int) -> None: + def __init__(self, session_ttl_minutes: int, start_cleanup_thread: bool = True) -> None: """ Initializes the SessionManager and starts background cleanup thread. :param session_ttl_minutes: Session time-to-live in minutes. + :param start_cleanup_thread: Whether to start the background cleanup thread. """ self.sessions: dict[str, SessionState] = {} self._session_ttl_minutes = session_ttl_minutes self._sessions_lock = threading.Lock() - self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) - self._cleanup_thread.start() + self._stop_event = threading.Event() + self._cleanup_thread: threading.Thread | None = None + if start_cleanup_thread: + self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) + self._cleanup_thread.start() def _cleanup_loop(self) -> None: """Cleans up all expired sessions periodically in a separate cleanup thread.""" - while True: - time.sleep(constants.EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS) + while not self._stop_event.is_set(): + if self._stop_event.wait(constants.EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS): + break self.cleanup_expired_sessions() + + def stop(self) -> None: + """Stops the background cleanup thread (primarily for tests).""" + if hasattr(self, "_stop_event"): + self._stop_event.set() + if self._cleanup_thread is not None and self._cleanup_thread.is_alive(): + self._cleanup_thread.join(timeout=1)
44-60
: Handle empty result sets gracefully.When
result_log_entries
is empty,_num_pages
becomes 0, causingget_page(0)
to returnNone
even though an empty page 1 is semantically valid. This prevents callers from distinguishing between "no results" and "invalid page index."Past reviews flagged this issue. Consider special-casing the empty-results scenario.
Apply this diff to allow page 1 for empty results:
def get_page(self, page_index: int) -> Page | None: """ Returns a page from the cached query results. :param page_index: The number of page to retrieve (zero-based index; 0 is the first page). :return: A `Page` object for the specified page. :return: None if `page_index` is out of bounds. """ page_number = page_index + 1 # Convert zero-based to one-based + # Special case: allow page 1 for empty results + if self._num_pages == 0: + if page_number == 1: + return Page([], page=1, items_per_page=self.num_items_per_page) + return None + if page_number <= 0 or self._num_pages < page_number: return None return Page( self.result_log_entries, page=page_number, items_per_page=self.num_items_per_page, )
203-203
: Add error handling for oversized query results.Line 203 calls
session.cache_query_result()
, which internally creates aPaginatedQueryResult
that raisesValueError
if results exceedMAX_CACHED_RESULTS
(lines 25-30). The PR objectives state the server "caches up to 1,000 log entries," implying graceful handling rather than tool failure. Without try/except here, large result sets crash the tool.Past reviews flagged this multiple times.
Apply this diff to cap results before caching:
def cache_query_result(self, session_id: str, query_results: list[str]) -> dict[str, Any]: """ Caches query results for a session and returns the first page and the paging metadata. :param session_id: Unique identifier for the session. :param query_results: Complete log entries from previous query for caching. :return: Dictionary containing the first page log entries and the paging metadata if the first page can be retrieved. :return: Dictionary with ``{"Error": "error message describing the failure"}`` if fails to retrieve the first page. """ session = self.get_or_create_session(session_id) if session.is_instructions_retrieved is False: return { "Error": "Please call get_instructions() first " "to understand how to use this MCP server." } - session.cache_query_result(results=query_results) + # Cap results to MAX_CACHED_RESULTS to prevent ValueError + max_results = constants.MAX_CACHED_RESULTS + capped = query_results[:max_results] if len(query_results) > max_results else query_results + try: + session.cache_query_result(results=capped) + except ValueError as e: + return {"Error": str(e)} return session.get_page_data(0)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (1)
components/clp-mcp-server/clp_mcp_server/server/server.py (1)
get_nth_page
(37-49)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: package-image
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: lint-check (macos-15)
- GitHub Check: rust-checks (macos-15)
- GitHub Check: rust-checks (ubuntu-24.04)
- GitHub Check: rust-checks (ubuntu-22.04)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
♻️ Duplicate comments (8)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (3)
140-151
: Make cleanup thread optional and stoppable to improve testability and shutdown.Always starting a daemon thread with an infinite loop complicates tests and clean shutdown. Add a flag and a stop() with Event. Based on past review comments.
- def __init__(self, session_ttl_minutes: int) -> None: + def __init__(self, session_ttl_minutes: int, start_cleanup_thread: bool = True) -> None: @@ - self._sessions_lock = threading.Lock() - self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) - self._cleanup_thread.start() + self._sessions_lock = threading.Lock() + self._stop_event = threading.Event() + self._cleanup_thread: threading.Thread | None = None + if start_cleanup_thread: + self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) + self._cleanup_thread.start() def _cleanup_loop(self) -> None: """Cleans up all expired sessions periodically in a separate cleanup thread.""" - while True: - time.sleep(constants.EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS) - self.cleanup_expired_sessions() + while not self._stop_event.is_set(): + # Wait allows prompt exit when stop() is called + self._stop_event.wait(constants.EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS) + if self._stop_event.is_set(): + break + self.cleanup_expired_sessions() + + def stop(self) -> None: + """Stops the background cleanup thread (primarily for tests).""" + if hasattr(self, "_stop_event"): + self._stop_event.set() + t = getattr(self, "_cleanup_thread", None) + if t is not None and t.is_alive(): + t.join(timeout=1)Also applies to: 152-157
44-60
: Allow empty result sets to return an empty first page instead of an error.When there are 0 results,
_num_pages == 0
makes page index 0 unreachable. Return an empty Page for index 0 to signal a successful (but empty) result. Based on past review comments.def get_page(self, page_index: int) -> Page | None: @@ - page_number = page_index + 1 # Convert zero-based to one-based - if page_number <= 0 or self._num_pages < page_number: - return None + page_number = page_index + 1 # Convert zero-based to one-based + # Special-case empty results: allow requesting the first page (index 0) + if self._num_pages == 0: + if page_index == 0: + return Page( + self.result_log_entries, + page=1, + items_per_page=self.num_items_per_page, + ) + return None + if page_number <= 0 or self._num_pages < page_number: + return None
189-206
: Prevent oversized results from crashing the tool; cap and handle gracefully.
session.cache_query_result
can raise ValueError whenquery_results
exceedMAX_CACHED_RESULTS
, which will bubble out and break the tool flow. The PR objective says “cache up to 1,000 log entries.” Cap results (and defensively catch) here.def cache_query_result(self, session_id: str, query_results: list[str]) -> dict[str, Any]: @@ session = self.get_or_create_session(session_id) if session.is_instructions_retrieved is False: return self._GET_INSTRUCTIONS_NOT_RUN_ERROR - - session.cache_query_result(results=query_results) + # Cap results to MAX_CACHED_RESULTS to avoid overflow/ValueError + max_n = constants.MAX_CACHED_RESULTS + capped = query_results[:max_n] if len(query_results) > max_n else query_results + try: + session.cache_query_result(results=capped) + except ValueError as e: + # Defensive: return structured error if validation fails + return {"Error": str(e)} return session.get_page_data(0)components/clp-mcp-server/tests/test_session_manager.py (5)
227-254
: Stress test is too heavy for CI; reduce iterations and/or drop repeat.
repeat(10)
+ 10,000-iteration loops across threads risks timeouts/flakiness. Trim to keep coverage while controlling runtime. Based on past review comments.- @pytest.mark.repeat(10) + # Consider gating with: @pytest.mark.slow or an env flag (optional) def test_thread_safety_cleanup_and_get_or_create_session(self) -> None: @@ - with ThreadPoolExecutor(max_workers=10) as executor: + with ThreadPoolExecutor(max_workers=4) as executor: @@ - for _ in range(10000): + for _ in range(500): for i in range(50): session = manager.get_or_create_session(f"session_{i}") if i < TestConstants.SAMPLE_RESULTS_COUNT_25: session.last_accessed = datetime.now(timezone.utc) - timedelta(minutes=20) manager.cleanup_expired_sessions() @@ - for i in range(10000): + for i in range(1000): session_id = f"session_{i % 50}" manager.get_or_create_session(session_id)
53-67
: Avoid magic numbers; derive expectations from ITEMS_PER_PAGE.Keep tests resilient to configuration changes.
- assert list(page1) == [f"log_{i}" for i in range(10)] + assert list(page1) == [f"log_{i}" for i in range(TestConstants.ITEMS_PER_PAGE)] @@ - assert list(page2) == [f"log_{i}" for i in range(10, 20)] + assert list(page2) == [ + f"log_{i}" for i in range( + TestConstants.ITEMS_PER_PAGE, 2 * TestConstants.ITEMS_PER_PAGE + ) + ] @@ - assert ( - list(page3) == - [f"log_{i}" for i in range(20, 25)] - ) + assert list(page3) == [ + f"log_{i}" for i in range(2 * TestConstants.ITEMS_PER_PAGE, 25) + ]
109-112
: Same: compute second-page upper bound from ITEMS_PER_PAGE.- page_data["items"] == - [f"log_{i}" for i in range(TestConstants.ITEMS_PER_PAGE, 20)] + page_data["items"] == [ + f"log_{i}" for i in range( + TestConstants.ITEMS_PER_PAGE, 2 * TestConstants.ITEMS_PER_PAGE + ) + ]
181-185
: Same: use ITEMS_PER_PAGE for second page expectation.- page_data["items"] == - [f"log_{i}" for i in range(TestConstants.ITEMS_PER_PAGE, 20)] + page_data["items"] == [ + f"log_{i}" for i in range( + TestConstants.ITEMS_PER_PAGE, 2 * TestConstants.ITEMS_PER_PAGE + ) + ]
139-141
: Replace expiry magic numbers with test constants.Use the constants you defined to avoid brittle tests.
- session.last_accessed = datetime.now(timezone.utc) - timedelta(minutes=61) + session.last_accessed = datetime.now(timezone.utc) - timedelta( + minutes=TestConstants.EXPIRED_SESSION_TTL_MINUTES + )And later:
- session1.last_accessed = datetime.now(timezone.utc) - timedelta(minutes=61) - session2.last_accessed = datetime.now(timezone.utc) - timedelta(minutes=65) + session1.last_accessed = datetime.now(timezone.utc) - timedelta( + minutes=TestConstants.EXPIRED_SESSION_TTL_MINUTES + ) + session2.last_accessed = datetime.now(timezone.utc) - timedelta( + minutes=TestConstants.EXPIRED_SESSION_TTL_MINUTES + 4 + )Also applies to: 217-219
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (3)
components/clp-mcp-server/clp_mcp_server/server/constants.py
(1 hunks)components/clp-mcp-server/clp_mcp_server/server/session_manager.py
(1 hunks)components/clp-mcp-server/tests/test_session_manager.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (1)
components/clp-mcp-server/clp_mcp_server/server/server.py (1)
get_nth_page
(37-49)
components/clp-mcp-server/tests/test_session_manager.py (1)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (11)
PaginatedQueryResult
(14-60)SessionManager
(124-221)SessionState
(64-121)get_page
(44-60)get_page_data
(88-112)cache_query_result
(75-86)cache_query_result
(189-206)is_expired
(114-117)get_or_create_session
(168-187)get_nth_page
(208-221)cleanup_expired_sessions
(158-166)
🪛 Ruff (0.14.0)
components/clp-mcp-server/tests/test_session_manager.py
148-148: Missing docstring in public method
(D102)
def __init__(self, result_log_entries: list[str], num_items_per_page: int) -> None: | ||
""" | ||
Initializes the PaginatedQueryResult. | ||
:param result_log_entries: List of cached log entries to paginate. | ||
:param num_items_per_page: | ||
:raise: ValueError if the number of cached results or num_items_per_page is invalid. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Complete the param description and tighten error docstring.
Minor clarity nits.
def __init__(self, result_log_entries: list[str], num_items_per_page: int) -> None:
@@
- :param num_items_per_page:
- :raise: ValueError if the number of cached results or num_items_per_page is invalid.
+ :param num_items_per_page: Number of items per page (must be > 0).
+ :raise ValueError: If cached results exceed MAX_CACHED_RESULTS or num_items_per_page <= 0.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def __init__(self, result_log_entries: list[str], num_items_per_page: int) -> None: | |
""" | |
Initializes the PaginatedQueryResult. | |
:param result_log_entries: List of cached log entries to paginate. | |
:param num_items_per_page: | |
:raise: ValueError if the number of cached results or num_items_per_page is invalid. | |
""" | |
def __init__(self, result_log_entries: list[str], num_items_per_page: int) -> None: | |
""" | |
Initializes the PaginatedQueryResult. | |
:param result_log_entries: List of cached log entries to paginate. | |
:param num_items_per_page: Number of items per page (must be > 0). | |
:raise ValueError: If cached results exceed MAX_CACHED_RESULTS or num_items_per_page <= 0. | |
""" |
🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/server/session_manager.py around
lines 17 to 24, the __init__ docstring is missing a description for
num_items_per_page and the raises section is vague; update the param block to
describe num_items_per_page (positive int controlling items per page) and
clarify result_log_entries (list of cached log entry strings), and tighten the
Raises section to explicitly state ValueError is raised when result_log_entries
is empty or num_items_per_page is not a positive integer (<= 0); keep wording
concise and use consistent parameter names and types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (6)
components/clp-mcp-server/tests/test_session_manager.py (6)
55-63
: Avoid magic numbers; derive from ITEMS_PER_PAGE.Use TestConstants to keep tests robust if pagination changes.
- assert list(page1) == [f"log_{i}" for i in range(10)] + assert list(page1) == [f"log_{i}" for i in range(TestConstants.ITEMS_PER_PAGE)] @@ - assert list(page2) == [f"log_{i}" for i in range(10, 20)] + assert list(page2) == [ + f"log_{i}" for i in range( + TestConstants.ITEMS_PER_PAGE, + 2 * TestConstants.ITEMS_PER_PAGE + ) + ] @@ - assert ( - list(page3) == - [f"log_{i}" for i in range(20, 25)] - ) + assert ( + list(page3) == + [f"log_{i}" for i in range( + 2 * TestConstants.ITEMS_PER_PAGE, + TestConstants.SAMPLE_RESULTS_COUNT_25 + )] + )Also applies to: 67-70
112-114
: Same: compute page bounds from ITEMS_PER_PAGE.Replace 20 with 2 * TestConstants.ITEMS_PER_PAGE for stability.
- [f"log_{i}" for i in range(TestConstants.ITEMS_PER_PAGE, 20)] + [f"log_{i}" for i in range( + TestConstants.ITEMS_PER_PAGE, + 2 * TestConstants.ITEMS_PER_PAGE + )] @@ - [f"log_{i}" for i in range(20, TestConstants.SAMPLE_RESULTS_COUNT_25)] + [f"log_{i}" for i in range( + 2 * TestConstants.ITEMS_PER_PAGE, + TestConstants.SAMPLE_RESULTS_COUNT_25 + )]Based on static analysis hints
Also applies to: 121-123
142-143
: Replace literal 61 with test constant.Use EXPIRED_SESSION_TTL_MINUTES to avoid a magic number and align with earlier change.
- session.last_accessed = datetime.now(timezone.utc) - timedelta(minutes=61) + session.last_accessed = ( + datetime.now(timezone.utc) - + timedelta(minutes=TestConstants.EXPIRED_SESSION_TTL_MINUTES) + )
149-154
: Add fixture docstring (Ruff D102).Brief docstring improves lint signal and clarity.
@pytest.fixture def active_session_manager(self) -> SessionManager: + """Create a SessionManager with an active session where instructions have been retrieved.""" manager = SessionManager(session_ttl_minutes=TestConstants.SESSION_TTL_MINUTES) session = manager.get_or_create_session("test_session") session.is_instructions_retrieved = True # Simulate get_instructions was run return manager
Based on static analysis hints
239-266
: Stress test is too heavy; likely slow/flaky in CI.Drop repeat, cut iterations, reduce threads, and remove magic values (PLR2004).
- @pytest.mark.repeat(10) + # Stress test kept light to avoid CI flakiness def test_thread_safety_cleanup_and_get_or_create_session(self) -> None: """Validates thread safety of cleanup_expired_sessions and get_or_create_session.""" manager = SessionManager(session_ttl_minutes=TestConstants.SESSION_TTL_MINUTES) + # Tunables for lighter, still-meaningful concurrency + NUM_SESSIONS = 50 + HALF = NUM_SESSIONS // 2 + CLEANUP_ITERATIONS = 500 + ACCESS_ITERATIONS = 1000 + WORKERS = 4 def cleanup_task() -> None: """Continuously expires some sessions and cleans up expired sessions.""" - for _ in range(10000): - for i in range(50): # mark half of them expired + for _ in range(CLEANUP_ITERATIONS): + for i in range(NUM_SESSIONS): # mark half expired session = manager.get_or_create_session(f"session_{i}") - if i < 25: + if i < HALF: session.last_accessed = ( datetime.now(timezone.utc) - timedelta(minutes=TestConstants.EXPIRED_SESSION_TTL_MINUTES) ) manager.cleanup_expired_sessions() def access_task() -> None: """Continuously creates and accesses sessions.""" - for i in range(10000): - session_id = f"session_{i % 50}" + for i in range(ACCESS_ITERATIONS): + session_id = f"session_{i % NUM_SESSIONS}" manager.get_or_create_session(session_id) # Run cleanup and access operations concurrently - with ThreadPoolExecutor(max_workers=10) as executor: + with ThreadPoolExecutor(max_workers=WORKERS) as executor: futures = [] futures.append(executor.submit(cleanup_task)) futures.append(executor.submit(access_task)) for future in futures: future.result() # ensure thread completion with no run time exceptionsOptional: mark as slow to gate in CI, or read from env (e.g., if os.getenv("RUN_STRESS_TEST") == "1"). Based on static analysis hints
187-189
: Avoid magic number 20 in get_nth_page expected items.Use ITEMS_PER_PAGE-derived bound.
- [f"log_{i}" for i in range(TestConstants.ITEMS_PER_PAGE, 20)] + [f"log_{i}" for i in range( + TestConstants.ITEMS_PER_PAGE, + 2 * TestConstants.ITEMS_PER_PAGE + )]
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/clp-mcp-server/tests/test_session_manager.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
components/clp-mcp-server/tests/test_session_manager.py (2)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (11)
PaginatedQueryResult
(14-60)SessionManager
(124-221)SessionState
(64-121)get_page
(44-60)get_page_data
(88-112)cache_query_result
(75-86)cache_query_result
(189-206)is_expired
(114-117)get_or_create_session
(168-187)get_nth_page
(208-221)cleanup_expired_sessions
(158-166)components/clp-mcp-server/clp_mcp_server/server/server.py (1)
get_nth_page
(37-49)
🪛 Ruff (0.14.0)
components/clp-mcp-server/tests/test_session_manager.py
33-35: One-line docstring should fit on one line
Reformat to one line
(D200)
150-150: Missing docstring in public method
(D102)
249-249: Magic value used in comparison, consider replacing 25
with a constant variable
(PLR2004)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: rust-checks (ubuntu-24.04)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (14)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (8)
56-60
: Empty results: consider allowing page_index 0 to return an empty pageCurrently, empty results yield “Page index is out of bounds.” even for the first page. Consider special-casing empty result sets so page_index 0 returns items=[], with consistent metadata, per earlier feedback.
Would you prefer handling this in PaginatedQueryResult.get_page or in SessionState.get_page_data? I can draft a concrete patch once we decide the desired metadata (e.g., total_pages=1 vs 0).
Also applies to: 98-112
140-156
: Make cleanup thread optional and stoppable (improves testability and shutdown)Always starting a daemon thread with an infinite loop complicates tests and clean shutdown. Add a start flag and stop() with an Event.
@@ - def __init__(self, session_ttl_minutes: int) -> None: + def __init__(self, session_ttl_minutes: int, start_cleanup_thread: bool = True) -> None: @@ - self._sessions_lock = threading.Lock() - self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) - self._cleanup_thread.start() + self._sessions_lock = threading.Lock() + self._stop_event = threading.Event() + self._cleanup_thread: threading.Thread | None = None + if start_cleanup_thread: + self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) + self._cleanup_thread.start() @@ - def _cleanup_loop(self) -> None: - """Cleans up all expired sessions periodically in a separate cleanup thread.""" - while True: - time.sleep(constants.EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS) - self.cleanup_expired_sessions() + def _cleanup_loop(self) -> None: + """Cleans up all expired sessions periodically in a separate cleanup thread.""" + while not self._stop_event.is_set(): + # Wait allows prompt exit when stop() is called + self._stop_event.wait(constants.EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS) + if self._stop_event.is_set(): + break + self.cleanup_expired_sessions() + + def stop(self) -> None: + """Stops the background cleanup thread (primarily for tests).""" + if hasattr(self, "_stop_event"): + self._stop_event.set() + t = getattr(self, "_cleanup_thread", None) + if t is not None and t.is_alive(): + t.join(timeout=1)Also applies to: 152-157
17-24
: Tighten constructor docstring: complete param and Raises sectionFill in num_items_per_page and use “Raises” style.
""" - Initializes the PaginatedQueryResult. + Initializes the PaginatedQueryResult. - :param result_log_entries: List of cached log entries to paginate. - :param num_items_per_page: - :raise: ValueError if the number of cached results or num_items_per_page is invalid. + :param result_log_entries: List of cached log entries to paginate. + :param num_items_per_page: Number of items per page (must be > 0). + :raises ValueError: If results exceed MAX_CACHED_RESULTS or num_items_per_page <= 0. """
39-43
: Optional: make cached results immutable to avoid accidental mutationStore a tuple internally to uphold the “cached” contract.
- self.result_log_entries = result_log_entries + self.result_log_entries = tuple(result_log_entries) self.num_items_per_page = num_items_per_page self._num_pages = (len(result_log_entries) + num_items_per_page - 1) // num_items_per_page
48-50
: Docstring: clarify “page index (zero-based)” phrasingUse consistent wording.
- :param page_index: The number of page to retrieve (zero-based index; 0 is the first page). + :param page_index: The page index to retrieve (zero-based; 0 is the first page).
92-96
: Docstring: clarify “page index (zero-based)” phrasingSame nit as above for consistency.
- :param page_index: The number of page to retrieve (zero-based index; 0 is the first page). + :param page_index: The page index to retrieve (zero-based; 0 is the first page).
213-216
: Docstring: clarify “page index (zero-based)” phrasingAligns with other methods.
- :param page_index: The number of page to retrieve (zero-based index; 0 is the first page). + :param page_index: The page index to retrieve (zero-based; 0 is the first page).
189-206
: Prevent tool crash on oversized results; cap or handle ValueErrorIf query_results > MAX_CACHED_RESULTS, PaginatedQueryResult raises ValueError, bubbling up and failing the tool. PR objective says “caches up to 1,000 log entries” — we should cap or return a structured error instead of crashing.
Apply this diff to cap and defensively catch errors:
@@ - session.cache_query_result(results=query_results) + # Cap results to MAX_CACHED_RESULTS and guard against unexpected errors + max_results = constants.MAX_CACHED_RESULTS + capped = query_results[:max_results] if len(query_results) > max_results else query_results + try: + session.cache_query_result(results=capped) + except ValueError as e: + # Defensive fallback: return structured error + return {"Error": str(e)} return session.get_page_data(0)components/clp-mcp-server/clp_mcp_server/server/constants.py (1)
9-26
: Simplify SYSTEM_PROMPT formatting; remove explicit “\n” and stray leading spacesTriple-quoted strings don’t need embedded “\n”; current text has odd indentation and extra newlines.
-SYSTEM_PROMPT = """ -You are an AI assistant that helps users query a log database using KQL (Kibana Query Language). - When given a user query, you should generate a KQL query that accurately captures the user's - intent. The KQL query should be as specific as possible to minimize the number of log messages - returned.\n -\n -You should also consider the following guidelines when generating KQL queries:\n -- Use specific field names and values to narrow down the search.\n -- Avoid using wildcards (*) unless absolutely necessary, as they can lead to large result - sets.\n -- Use logical operators (AND, OR, NOT) to combine multiple conditions.\n -- Consider the time range of the logs you are searching. If the user specifies a time range, - include it in the KQL query.\n -- If the user query is ambiguous or lacks detail, ask clarifying questions to better - understand their intent before generating the KQL query.\n -- Always ensure that the generated KQL query is syntactically correct and can be executed - without errors.\n -""" +SYSTEM_PROMPT = """ +You are an AI assistant that helps users query a log database using KQL (Kibana Query Language). +When given a user query, you should generate a KQL query that accurately captures the user's +intent. The KQL query should be as specific as possible to minimise the number of log messages +returned. + +You should also consider the following guidelines when generating KQL queries: +- Use specific field names and values to narrow down the search. +- Avoid using wildcards (*) unless absolutely necessary, as they can lead to large result sets. +- Use logical operators (AND, OR, NOT) to combine multiple conditions. +- Consider the time range of the logs you are searching. If the user specifies a time range, + include it in the KQL query. +- If the user query is ambiguous or lacks detail, ask clarifying questions to better + understand their intent before generating the KQL query. +- Always ensure that the generated KQL query is syntactically correct and can be executed + without errors. +"""components/clp-mcp-server/tests/test_session_manager.py (5)
33-35
: Compress to one-line docstring (Ruff D200)Keep summary on a single line.
- """ - Validates PaginatedQueryResult raises ValueError when results exceed MAX_CACHED_RESULTS. - """ + """Validates PaginatedQueryResult raises ValueError when results exceed MAX_CACHED_RESULTS."""
55-56
: Derive expected slices from ITEMS_PER_PAGE (remove magic 10/20)Avoid hard-coding 10/20; compute from TestConstants.ITEMS_PER_PAGE.
- assert list(page1) == [f"log_{i}" for i in range(10)] + assert list(page1) == [f"log_{i}" for i in range(TestConstants.ITEMS_PER_PAGE)] @@ - assert list(page2) == [f"log_{i}" for i in range(10, 20)] + assert list(page2) == [ + f"log_{i}" for i in range( + TestConstants.ITEMS_PER_PAGE, + 2 * TestConstants.ITEMS_PER_PAGE + ) + ] @@ - assert ( - list(page3) == - [f"log_{i}" for i in range(20, 25)] - ) + assert list(page3) == [ + f"log_{i}" for i in range( + 2 * TestConstants.ITEMS_PER_PAGE, + TestConstants.SAMPLE_RESULTS_COUNT_25 + ) + ]Also applies to: 62-63, 67-70
111-114
: Same here: compute second-page range upper bound from ITEMS_PER_PAGEKeep tests resilient to config changes.
- [f"log_{i}" for i in range(TestConstants.ITEMS_PER_PAGE, 20)] + [f"log_{i}" for i in range( + TestConstants.ITEMS_PER_PAGE, + 2 * TestConstants.ITEMS_PER_PAGE + )]
187-190
: Same: derive second-page range from ITEMS_PER_PAGEStay consistent across tests.
- [f"log_{i}" for i in range(TestConstants.ITEMS_PER_PAGE, 20)] + [f"log_{i}" for i in range( + TestConstants.ITEMS_PER_PAGE, + 2 * TestConstants.ITEMS_PER_PAGE + )]
240-263
: Stress test is too heavy; likely slow/flaky in CIrepeat(10) with 10,000-iteration loops across threads is excessive. Reduce counts or mark as slow.
- @pytest.mark.repeat(10) + # Consider gating with: @pytest.mark.slow or an env flag def test_thread_safety_cleanup_and_get_or_create_session(self) -> None: @@ - for _ in range(10000): + for _ in range(500): @@ - if i < 25: + if i < 25: # keep half expired @@ - for i in range(10000): + for i in range(1000): @@ - with ThreadPoolExecutor(max_workers=10) as executor: + with ThreadPoolExecutor(max_workers=4) as executor:If you want to keep repeat, drop it to 2–3 and/or add @pytest.mark.slow.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (3)
components/clp-mcp-server/clp_mcp_server/server/constants.py
(1 hunks)components/clp-mcp-server/clp_mcp_server/server/session_manager.py
(1 hunks)components/clp-mcp-server/tests/test_session_manager.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (1)
components/clp-mcp-server/clp_mcp_server/server/server.py (1)
get_nth_page
(37-49)
components/clp-mcp-server/tests/test_session_manager.py (2)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (11)
PaginatedQueryResult
(14-60)SessionManager
(124-221)SessionState
(64-121)get_page
(44-60)get_page_data
(88-112)cache_query_result
(75-86)cache_query_result
(189-206)is_expired
(114-117)get_or_create_session
(168-187)get_nth_page
(208-221)cleanup_expired_sessions
(158-166)components/clp-mcp-server/clp_mcp_server/server/server.py (1)
get_nth_page
(37-49)
🪛 Ruff (0.14.0)
components/clp-mcp-server/tests/test_session_manager.py
33-35: One-line docstring should fit on one line
Reformat to one line
(D200)
250-250: Magic value used in comparison, consider replacing 25
with a constant variable
(PLR2004)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: rust-checks (ubuntu-24.04)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (4)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (4)
17-24
: Complete the docstring fornum_items_per_page
.The parameter description is missing.
Apply this diff:
:param result_log_entries: List of cached log entries to paginate. - :param num_items_per_page: + :param num_items_per_page: Number of items per page (must be > 0). :raise: ValueError if the number of cached results or num_items_per_page is invalid.Based on past review comments.
178-194
: Consider making the cleanup thread stoppable for better testability.The cleanup thread runs an infinite loop and cannot be stopped, complicating testing and clean shutdown. Past reviews suggested adding a stop mechanism with a threading.Event.
If improved testability is desired, consider applying this pattern:
- def __init__(self, session_ttl_minutes: int) -> None: + def __init__(self, session_ttl_minutes: int, start_cleanup_thread: bool = True) -> None: """ Initializes the SessionManager and starts background cleanup thread. :param session_ttl_minutes: Session time-to-live in minutes. + :param start_cleanup_thread: Whether to start the background cleanup thread. """ self.sessions: dict[str, SessionState] = {} self._session_ttl_minutes = session_ttl_minutes self._sessions_lock = threading.Lock() - self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) - self._cleanup_thread.start() + self._stop_event = threading.Event() + self._cleanup_thread: threading.Thread | None = None + if start_cleanup_thread: + self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) + self._cleanup_thread.start() def _cleanup_loop(self) -> None: """Cleans up all expired sessions periodically in a separate cleanup thread.""" - while True: - time.sleep(constants.EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS) + while not self._stop_event.is_set(): + self._stop_event.wait(constants.EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS) + if self._stop_event.is_set(): + break self.cleanup_expired_sessions() + + def stop(self) -> None: + """Stops the background cleanup thread (primarily for tests).""" + if hasattr(self, "_stop_event"): + self._stop_event.set() + if self._cleanup_thread is not None and self._cleanup_thread.is_alive(): + self._cleanup_thread.join(timeout=1)This was suggested in past reviews but not implemented. Evaluate whether this complexity is worth the testability improvement.
Based on past review comments.
44-60
: Empty result sets return an error instead of an empty page.When
result_log_entries
is empty,_num_pages
becomes 0. The bounds check at Line 53 rejects page index 0 (page_number=1) because0 < 1
is true, returningNone
and causing "Page index is out of bounds" even though the query succeeded with zero results.A valid query returning no results should return an empty page (with proper metadata) rather than an error.
Apply this diff to special-case empty results:
def get_page(self, page_index: int) -> Page | None: """ Returns a page from the cached query results. :param page_index: The number of page to retrieve (zero-based index; 0 is the first page). :return: A `Page` object for the specified page. :return: None if `page_index` is out of bounds. """ page_number = page_index + 1 # Convert zero-based to one-based + + # Special case: empty results should return an empty page for page 0 + if self._num_pages == 0 and page_number == 1: + return Page( + self.result_log_entries, + page=1, + items_per_page=self.num_items_per_page, + ) + if page_number <= 0 or self._num_pages < page_number: return None return Page( self.result_log_entries, page=page_number, items_per_page=self.num_items_per_page, )Based on past review comments.
227-244
: Add error handling for oversized query results.Line 242 calls
session.cache_query_result()
, which creates aPaginatedQueryResult
that raisesValueError
if results exceedMAX_CACHED_RESULTS
. This crashes the tool instead of returning a structured error to the client.The PR objectives state the server "caches up to 1,000 log entries," implying graceful handling rather than tool failure.
Apply this diff to cap results before caching:
def cache_query_result(self, session_id: str, query_results: list[str]) -> dict[str, Any]: """ Caches query results for a session and returns the first page and the paging metadata. :param session_id: Unique identifier for the session. :param query_results: Complete log entries from previous query for caching. :return: Dictionary containing the first page log entries and the paging metadata if the first page can be retrieved. :return: Dictionary with ``{"Error": "error message describing the failure"}`` if fails to retrieve the first page. """ session = self.get_or_create_session(session_id) if session.is_instructions_retrieved is False: return self._GET_INSTRUCTIONS_NOT_RUN_ERROR.copy() - session.cache_query_result(results=query_results) + # Cap results to MAX_CACHED_RESULTS to prevent ValueError + max_results = constants.MAX_CACHED_RESULTS + if len(query_results) > max_results: + capped_results = query_results[:max_results] + else: + capped_results = query_results + + try: + session.cache_query_result(results=capped_results) + except ValueError as e: + # Defensive: return structured error if validation fails + return {"Error": str(e)} return session.get_page_data(0)Based on past review comments.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (1)
components/clp-mcp-server/clp_mcp_server/server/server.py (1)
get_nth_page
(37-49)
:param page_index: The number of page to retrieve (zero-based index; 0 is the first page). | ||
:return: A `Page` object for the specified page. | ||
:return: None if `page_index` is out of bounds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Improve docstring consistency for page_index
parameter.
The phrase "The number of page to retrieve" is grammatically awkward. Use consistent wording across all three methods.
Apply this diff to Lines 48-50:
- :param page_index: The number of page to retrieve (zero-based index; 0 is the first page).
+ :param page_index: The page index to retrieve (zero-based; 0 is the first page).
Apply similar changes to Lines 129 and 252 in SessionState.get_page_data
and SessionManager.get_nth_page
.
Based on past review comments.
Also applies to: 125-134, 246-254
🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/server/session_manager.py around
lines 48-50 (and similarly update lines 125-134 and 246-254), the docstring
phrase "The number of page to retrieve" is awkward and inconsistent; change the
`page_index` parameter description to read "The page number to retrieve
(zero-based index; 0 is the first page)." for all three methods
(SessionManager.get_page, SessionState.get_page_data,
SessionManager.get_nth_page) so the wording is consistent and grammatically
correct, and ensure the rest of each docstring still notes the return semantics
("A `Page` object for the specified page." and "None if `page_index` is out of
bounds.") without altering meaning.
components/clp-mcp-server/clp_mcp_server/server/session_manager.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (5)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (2)
21-24
: Docstring nits: complete param description and standardize wordingTighten param/raises and use “page index” wording consistently.
Initializes the PaginatedQueryResult. - :param result_log_entries: List of cached log entries to paginate. - :param num_items_per_page: - :raise: ValueError if the number of cached results or num_items_per_page is invalid. + :param result_log_entries: List of cached log entries to paginate. + :param num_items_per_page: Number of items per page (must be > 0). + :raise ValueError: If cached results exceed MAX_CACHED_RESULTS or num_items_per_page <= 0. """ @@ - :param page_index: The number of page to retrieve (zero-based index; 0 is the first page). + :param page_index: The page index to retrieve (zero-based; 0 is the first page). :return: A `Page` object for the specified page. :return: None if `page_index` is out of bounds. @@ - :param page_index: The number of page to retrieve (zero-based index; 0 is the first page). - :return: Dictionary containing paged log entries and the paging metadata if the - page `page_index` can be retrieved. - :return: Dictionary with ``{"Error": "error message describing the failure"}`` if fails to - retrieve page `page_index`. + :param page_index: The page index to retrieve (zero-based; 0 is the first page). + :return: Dictionary containing paged log entries and the paging metadata if the page can be + retrieved; otherwise ``{"Error": "..."} ``. @@ - :param page_index: The number of page to retrieve (zero-based index; 0 is the first page). - :return: Forwards `SessionState.get_page_data`'s return values. + :param page_index: The page index to retrieve (zero-based; 0 is the first page). + :return: Forwards SessionState.get_page_data's return values.Also applies to: 48-50, 129-134, 251-254
242-244
: Prevent tool failure on oversized result sets; cap or handle ValueErrorsession.cache_query_result can raise ValueError when results exceed MAX_CACHED_RESULTS, bubbling out and breaking the tool. Cap inputs and add defensive handling.
if session.is_instructions_retrieved is False: return self._GET_INSTRUCTIONS_NOT_RUN_ERROR.copy() - session.cache_query_result(results=query_results) + # Cap to MAX_CACHED_RESULTS to avoid overflow/ValueError + max_results = constants.MAX_CACHED_RESULTS + capped = query_results[:max_results] if len(query_results) > max_results else query_results + try: + session.cache_query_result(results=capped) + except ValueError as e: + # Defensive: return structured error if validation still fails + return {"Error": str(e)} return session.get_page_data(0)components/clp-mcp-server/tests/test_session_manager.py (3)
32-35
: Compress one-line docstring (Ruff D200)Keep summary on a single line.
- """ - Validates PaginatedQueryResult raises ValueError when results exceed MAX_CACHED_RESULTS. - """ + """Validates PaginatedQueryResult raises ValueError when results exceed MAX_CACHED_RESULTS."""
149-151
: Avoid magic number 61; use the test constantUse EXPIRED_SESSION_TTL_MINUTES for clarity and robustness.
- session.last_accessed = datetime.now(timezone.utc) - timedelta(minutes=61) + session.last_accessed = datetime.now(timezone.utc) - timedelta( + minutes=TestConstants.EXPIRED_SESSION_TTL_MINUTES + )
250-280
: Stress test is excessively heavy; reduce or gate to avoid CI flakinessrepeat(10) with 10k-iteration loops and 10 workers is slow and flaky.
- @pytest.mark.repeat(10) + # Consider gating with: @pytest.mark.slow def test_thread_safety_cleanup_and_get_or_create_session(self) -> None: @@ - def cleanup_task() -> None: + CONCURRENCY_SESSION_COUNT = 50 + + def cleanup_task() -> None: """Continuously expires some sessions and cleans up expired sessions.""" - for _ in range(10000): - for i in range(50): # mark half of them expired + for _ in range(500): + for i in range(CONCURRENCY_SESSION_COUNT): # mark half of them expired session = manager.get_or_create_session(f"session_{i}") - if i < 25: + if i < (CONCURRENCY_SESSION_COUNT // 2): session.last_accessed = ( datetime.now(timezone.utc) - timedelta(minutes=TestConstants.EXPIRED_SESSION_TTL_MINUTES) ) manager.cleanup_expired_sessions() @@ - def access_task() -> None: + def access_task() -> None: """Continuously creates and accesses sessions.""" - for i in range(10000): - session_id = f"session_{i % 50}" + for i in range(1000): + session_id = f"session_{i % CONCURRENCY_SESSION_COUNT}" manager.get_or_create_session(session_id) @@ - with ThreadPoolExecutor(max_workers=10) as executor: + with ThreadPoolExecutor(max_workers=4) as executor:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py
(1 hunks)components/clp-mcp-server/tests/test_session_manager.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (1)
components/clp-mcp-server/clp_mcp_server/server/server.py (1)
get_nth_page
(37-49)
components/clp-mcp-server/tests/test_session_manager.py (1)
components/clp-mcp-server/clp_mcp_server/server/session_manager.py (15)
PaginatedQueryResult
(14-60)SessionManager
(164-259)SessionState
(64-161)get_page
(44-60)get_page_data
(125-150)cache_query_result
(111-123)cache_query_result
(227-244)is_expired
(152-156)last_accessed
(101-104)last_accessed
(107-109)get_or_create_session
(206-225)is_instructions_retrieved
(90-93)is_instructions_retrieved
(96-98)get_nth_page
(246-259)cleanup_expired_sessions
(196-204)
🪛 Ruff (0.14.0)
components/clp-mcp-server/tests/test_session_manager.py
33-35: One-line docstring should fit on one line
Reformat to one line
(D200)
260-260: Magic value used in comparison, consider replacing 25
with a constant variable
(PLR2004)
self.result_log_entries = result_log_entries | ||
self.num_items_per_page = num_items_per_page | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Optional: freeze cached results to avoid accidental mutation
Store cached entries as a tuple to uphold the “cached” contract. Page works with sequences.
- self.result_log_entries = result_log_entries
+ # Ensure immutability of cached results
+ self.result_log_entries = tuple(result_log_entries)
Also applies to: 56-60
🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/server/session_manager.py around
lines 39-41 (and similarly at lines 56-60), the cached result entries are
assigned as mutable lists which can be accidentally mutated; change those
assignments to store an immutable tuple (e.g., wrap the incoming sequence with
tuple(...)) so the cached contract is preserved while pages can still operate on
sequences; update both places where result_log_entries (and any other cached
collections) are stored to use tuple(...) and ensure any subsequent code treats
them as sequences, not mutable lists.
with self._lock: | ||
if self._cached_query_result is None: | ||
return {"Error": "No previous paginated response in this session."} | ||
|
||
page = self._cached_query_result.get_page(page_index) | ||
if page is None: | ||
return {"Error": "Page index is out of bounds."} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allow empty cached results to return page 0 as an empty page
Currently, 0 items yields “Page index is out of bounds.” for index 0. Returning an empty items page with correct metadata is friendlier and aligns with typical pagination semantics.
with self._lock:
if self._cached_query_result is None:
return {"Error": "No previous paginated response in this session."}
+ # Special-case empty cached results: allow index 0 to return an empty page
+ if len(self._cached_query_result.result_log_entries) == 0:
+ if page_index == 0:
+ return {
+ "items": [],
+ "total_pages": 0,
+ "total_items": 0,
+ "num_items_per_page": self._cached_query_result.num_items_per_page,
+ "has_next": False,
+ "has_previous": False,
+ }
+ return {"Error": "Page index is out of bounds."}
+
page = self._cached_query_result.get_page(page_index)
if page is None:
return {"Error": "Page index is out of bounds."}
If your spec intentionally treats 0 results as 0 pages, please confirm and consider documenting this explicitly.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
with self._lock: | |
if self._cached_query_result is None: | |
return {"Error": "No previous paginated response in this session."} | |
page = self._cached_query_result.get_page(page_index) | |
if page is None: | |
return {"Error": "Page index is out of bounds."} | |
with self._lock: | |
if self._cached_query_result is None: | |
return {"Error": "No previous paginated response in this session."} | |
# Special-case empty cached results: allow index 0 to return an empty page | |
if len(self._cached_query_result.result_log_entries) == 0: | |
if page_index == 0: | |
return { | |
"items": [], | |
"total_pages": 0, | |
"total_items": 0, | |
"num_items_per_page": self._cached_query_result.num_items_per_page, | |
"has_next": False, | |
"has_previous": False, | |
} | |
return {"Error": "Page index is out of bounds."} | |
page = self._cached_query_result.get_page(page_index) | |
if page is None: | |
return {"Error": "Page index is out of bounds."} |
def __init__(self, session_ttl_minutes: int) -> None: | ||
""" | ||
Initializes the SessionManager and starts background cleanup thread. | ||
:param session_ttl_minutes: Session time-to-live in minutes. | ||
""" | ||
self.sessions: dict[str, SessionState] = {} | ||
self._session_ttl_minutes = session_ttl_minutes | ||
self._sessions_lock = threading.Lock() | ||
self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) | ||
self._cleanup_thread.start() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Make cleanup thread optional and stoppable (testability, clean shutdown)
Always starting a daemon thread complicates tests and shutdown. Add a flag and stop() with an Event.
- def __init__(self, session_ttl_minutes: int) -> None:
+ def __init__(self, session_ttl_minutes: int, start_cleanup_thread: bool = True) -> None:
"""
Initializes the SessionManager and starts background cleanup thread.
:param session_ttl_minutes: Session time-to-live in minutes.
"""
self.sessions: dict[str, SessionState] = {}
self._session_ttl_minutes = session_ttl_minutes
self._sessions_lock = threading.Lock()
- self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
- self._cleanup_thread.start()
+ self._stop_event = threading.Event()
+ self._cleanup_thread: threading.Thread | None = None
+ if start_cleanup_thread:
+ self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
+ self._cleanup_thread.start()
def _cleanup_loop(self) -> None:
"""Cleans up all expired sessions periodically in a separate cleanup thread."""
- while True:
- time.sleep(constants.EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS)
- self.cleanup_expired_sessions()
+ while not self._stop_event.is_set():
+ # Wait allows prompt exit when stop() is called
+ self._stop_event.wait(constants.EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS)
+ if self._stop_event.is_set():
+ break
+ self.cleanup_expired_sessions()
+
+ def stop(self) -> None:
+ """Stops the background cleanup thread (primarily for tests)."""
+ if hasattr(self, "_stop_event"):
+ self._stop_event.set()
+ t = getattr(self, "_cleanup_thread", None)
+ if t is not None and t.is_alive():
+ t.join(timeout=1)
Also applies to: 190-195
@pytest.fixture | ||
def active_session_manager(self) -> SessionManager: | ||
"""Create a SessionManager with a test session where get_instructions() has run.""" | ||
manager = SessionManager(session_ttl_minutes=TestConstants.SESSION_TTL_MINUTES) | ||
session = manager.get_or_create_session("test_session") | ||
session.is_instructions_retrieved = True # Simulate get_instructions was run | ||
return manager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Fixture: avoid starting background thread in unit tests
If you adopt start_cleanup_thread/stop(), initialise without the thread to reduce test noise.
- manager = SessionManager(session_ttl_minutes=TestConstants.SESSION_TTL_MINUTES)
+ manager = SessionManager(
+ session_ttl_minutes=TestConstants.SESSION_TTL_MINUTES,
+ start_cleanup_thread=False
+ )
session = manager.get_or_create_session("test_session")
session.is_instructions_retrieved = True # Simulate get_instructions was run
return manager
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In components/clp-mcp-server/tests/test_session_manager.py around lines 157-163,
the active_session_manager fixture currently creates a SessionManager that
starts the background cleanup thread, causing test noise; change the fixture to
initialize the SessionManager without starting the thread (e.g., pass a new
start_cleanup_thread=False or equivalent flag when constructing SessionManager)
so the cleanup thread is not started during tests, then set
session.is_instructions_retrieved=True and return the manager (alternatively,
call manager.stop() immediately after creation if the constructor always starts
the thread).
Session manager for concurrent user sessions. | ||
`SessionManager` respects `FastMCP` asynchronous concurrency model that is built on Python's | ||
asyncio runtime: | ||
Asyncio achieves concurrency on a single thread by allowing tasks to yield control at `await` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The explanation of the async concurrency within a single thread is duplicated in both SessionManger and SessionState. This does not seems to be a good practice. Maybe we can have a referral, I wonder what is the best practice? Should we move this to server.py where the mcp server is created?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know if u plan to rewrite the garbage collector using async.
You are an AI assistant that helps users query a log database using KQL (Kibana Query Language). | ||
When given a user query, you should generate a KQL query that accurately captures the user's | ||
intent. The KQL query should be as specific as possible to minimize the number of log messages | ||
returned. | ||
You should also consider the following guidelines when generating KQL queries: | ||
- Use specific field names and values to narrow down the search. | ||
- Avoid using wildcards (*) unless absolutely necessary, as they can lead to large result | ||
sets. | ||
- Use logical operators (AND, OR, NOT) to combine multiple conditions. | ||
- Consider the time range of the logs you are searching. If the user specifies a time range, | ||
include it in the KQL query. | ||
- If the user query is ambiguous or lacks detail, ask clarifying questions to better | ||
understand their intent before generating the KQL query. | ||
- Always ensure that the generated KQL query is syntactically correct and can be executed | ||
without errors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are u sure this would work? The raw string will treat a newline as a line break, so a prompt sentence will be split into multiple lines.
:return: The server's information with a list of capabilities. | ||
:param ctx: The `FastMCP` context containing the metadata of the underlying MCP session. | ||
:return: A string of “system prompt”. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When u do copy-paste, pay attention to "
as the source may use “
.
:return: Dictionary containing paged log entries and the paging metadata if the | ||
page `page_index` can be retrieved. | ||
:return: Dictionary with ``{"Error": "error message describing the failure"}`` if fails to | ||
retrieve page `page_index`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will rewrite some of the docstrings. However, you should indent multi-line return/param statements:
retrieve page `page_index`. | |
retrieve page `page_index`. |
f"PaginatedQueryResult exceeds maximum allowed cached results:" | ||
f" {len(result_log_entries)} > {constants.MAX_CACHED_RESULTS}." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f"PaginatedQueryResult exceeds maximum allowed cached results:" | |
f" {len(result_log_entries)} > {constants.MAX_CACHED_RESULTS}." | |
"PaginatedQueryResult exceeds maximum allowed cached results:" | |
f" {len(result_log_entries)} > {constants.MAX_CACHED_RESULTS}." |
f"Invalid num_items_per_page: {num_items_per_page}, " | ||
"it must be a positive integer. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f"Invalid num_items_per_page: {num_items_per_page}, " | |
"it must be a positive integer. " | |
f"Invalid num_items_per_page: {num_items_per_page}, it must be a positive integer." |
Pay attention to these details:
- When wrapping new line, space should be in the next line.
- Trailing space should be removed.
:return: A `Page` object for the specified page. | ||
:return: None if `page_index` is out of bounds. | ||
""" | ||
page_number = page_index + 1 # Convert zero-based to one-based |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
page_number = page_index + 1 # Convert zero-based to one-based | |
# Convert zero-based to one-based | |
page_number = page_index + 1 |
We prefer to put the inline comment in a separate line.
Description
In our open-source release of the clp-mcp-server, we support 4 APIs:
search_kql_query
,search_kql_query_with_timestamp
,get_nth_page
, andget_instructions
. In this PR, we implement the utility API:get_nth_page
andget_instructions
. For an overview of all four APIs, please refer to the documentation of the first MCP server release.The documentation of
get_nth_page
andget_instructions
are listed below:get_nth_page(unsigned: page-idx)
Description:
Retrieves the n-th page of a paginated response with the paging metadata from the previous query. This tool is used when a query’s response is too long and LLM only wishes to examine a specific subset of the results.
Inputs:
page-idx: the index of the page LLM wants to look into
Outputs:
The set of log messages corresponding to the requested page index.
Errors:
get_instructions()
Description:
Returns a pre-defined “system prompt” that guides the LLM’s behavior.
The instruction covers:
This function must be invoked before any other tool.
Inputs:
empty
Outputs:
“system prompt” mentioned in Description.
Implementation Detail
get_nth_page:
get_instructions:
get_instructions
then we return an error to the AI saying that it should call get_instructions first.Checklist
breaking change.
Validation performed
uv run --group dev pytest tests/test_session_manager.py -v
.uv run ruff check clp/components/clp-mcp-server
&&task lint:check-py
.task package
&&python3 -m clp_mcp_server.clp_mcp_server --host 127.0.0.1 --port 8000
. Connected a Claude Desktop agent to the running MCP server and issued a get_instruction query, successfully receiving the predefined system prompt.Note: manual integration testing for get_nth_page was skipped due to the missing substring_search implementation; its correctness is verified through unit tests instead.
Future work
To keep this PR focused, we are intentionally deferring the following non-trivial tasks:
Summary by CodeRabbit
New Features
Improvements
Tests
Chores