Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/mcp_as_a_judge/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
# Database Configuration
DATABASE_URL = "sqlite://:memory:"
MAX_SESSION_RECORDS = 20 # Maximum records to keep per session (FIFO)
RECORD_RETENTION_DAYS = 1
MAX_TOTAL_SESSIONS = 50 # Maximum total sessions to keep (LRU cleanup)
164 changes: 129 additions & 35 deletions src/mcp_as_a_judge/db/cleanup_service.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
"""
Database cleanup service for conversation history records.

This service handles time-based cleanup operations for conversation history records,
removing records older than the retention period (default: 1 day).
This service handles LRU-based cleanup operations for conversation history records,
removing least recently used sessions when session limits are exceeded.
"""

from datetime import datetime, timedelta

from sqlalchemy import Engine
from sqlalchemy import Engine, func
from sqlmodel import Session, select

from mcp_as_a_judge.constants import RECORD_RETENTION_DAYS
from mcp_as_a_judge.constants import MAX_TOTAL_SESSIONS
from mcp_as_a_judge.db.interface import ConversationRecord
from mcp_as_a_judge.logging_config import get_logger

Expand All @@ -20,10 +18,20 @@

class ConversationCleanupService:
"""
Service for cleaning up old conversation history records.
Service for cleaning up conversation history records.

Implements session-based LRU cleanup strategy:
- Maintains session limit by removing least recently used sessions
- Runs immediately when new sessions are created and limit is exceeded

LRU vs FIFO for Better User Experience:
- LRU (Least Recently Used): Keeps sessions that users are actively using,
even if they're old
- FIFO (First In, First Out): Would remove oldest sessions regardless of
recent activity
- LRU provides better UX because active conversations are preserved longer

Handles time-based cleanup: Removes records older than retention period.
Note: LRU cleanup is handled by the SQLite provider during save operations.
Note: Per-session FIFO cleanup (max 20 records) is handled by the SQLite provider.
"""

def __init__(self, engine: Engine) -> None:
Expand All @@ -34,48 +42,134 @@ def __init__(self, engine: Engine) -> None:
engine: SQLAlchemy engine for database operations
"""
self.engine = engine
self.retention_days = RECORD_RETENTION_DAYS
self.last_cleanup_time = datetime.utcnow()
self.max_total_sessions = MAX_TOTAL_SESSIONS

def cleanup_old_records(self) -> int:
def get_session_count(self) -> int:
"""
Remove records older than retention_days.
This runs once per day to avoid excessive cleanup operations.
Get the total number of unique sessions in the database.

Returns:
Number of records deleted
Number of unique sessions
"""
# Only run cleanup once per day
if (datetime.utcnow() - self.last_cleanup_time).days < 1:
return 0
with Session(self.engine) as session:
# Count distinct session_ids
count_stmt = select(
func.count(func.distinct(ConversationRecord.session_id))
)
result = session.exec(count_stmt).first()
return result or 0

def get_least_recently_used_sessions(self, limit: int) -> list[str]:
"""
Get session IDs of the least recently used sessions.

cutoff_date = datetime.utcnow() - timedelta(days=self.retention_days)
Uses LRU strategy: finds sessions with the oldest "last activity" timestamp.
Last activity = MAX(timestamp) for each session (most recent record in session).

Args:
limit: Number of session IDs to return

Returns:
List of session IDs ordered by last activity (oldest first)
"""
with Session(self.engine) as session:
# Count old records
old_count_stmt = select(ConversationRecord).where(
ConversationRecord.timestamp < cutoff_date
# Find sessions with oldest last activity (LRU)
# GROUP BY session_id, ORDER BY MAX(timestamp) ASC to get least
# recently used
lru_stmt = (
select(
ConversationRecord.session_id,
func.max(ConversationRecord.timestamp).label("last_activity"),
)
.group_by(ConversationRecord.session_id)
.order_by(func.max(ConversationRecord.timestamp).asc())
.limit(limit)
)
old_records = session.exec(old_count_stmt).all()
old_count = len(old_records)

if old_count == 0:
logger.info(
f"🧹 Daily cleanup: No records older than {self.retention_days} days"
results = session.exec(lru_stmt).all()
return [result[0] for result in results]

def delete_sessions(self, session_ids: list[str]) -> int:
"""
Bulk delete all records for the given session IDs.

Args:
session_ids: List of session IDs to delete

Returns:
Number of records deleted
"""
if not session_ids:
return 0

with Session(self.engine) as session:
# Count records before deletion for logging
count_stmt = select(ConversationRecord).where(
ConversationRecord.session_id.in_( # type: ignore[attr-defined]
session_ids
)
self.last_cleanup_time = datetime.utcnow()
return 0
)
records_to_delete = session.exec(count_stmt).all()
delete_count = len(records_to_delete)

# Delete old records
for record in old_records:
# Bulk delete all records for these sessions
for record in records_to_delete:
session.delete(record)

session.commit()

# Reset cleanup tracking
self.last_cleanup_time = datetime.utcnow()
logger.info(
f"🗑️ Deleted {delete_count} records from {len(session_ids)} sessions: "
f"{', '.join(session_ids[:3])}{'...' if len(session_ids) > 3 else ''}"
)

return delete_count

def cleanup_excess_sessions(self) -> int:
"""
Remove least recently used sessions when total sessions exceed
MAX_TOTAL_SESSIONS.

This implements LRU (Least Recently Used) cleanup strategy:
- Keeps sessions that users are actively using (better UX than FIFO)
- Runs immediately when session limit is exceeded (no daily restriction)
- Removes entire sessions (all records for those session_ids)
- Called every time a new session is created to maintain session limit

Returns:
Number of records deleted
"""
current_session_count = self.get_session_count()

if current_session_count <= self.max_total_sessions:
logger.info(
f"🧹 Daily cleanup: Deleted {old_count} records older than {self.retention_days} days"
f"🧹 Session LRU cleanup: {current_session_count} sessions "
f"(max: {self.max_total_sessions}) - no cleanup needed"
)
return old_count
return 0

# Calculate how many sessions to remove
sessions_to_remove = current_session_count - self.max_total_sessions

logger.info(
f"🧹 Session LRU cleanup: {current_session_count} sessions exceeds limit "
f"({self.max_total_sessions}), removing {sessions_to_remove} "
f"least recently used sessions"
)

# Get least recently used sessions
lru_session_ids = self.get_least_recently_used_sessions(sessions_to_remove)

if not lru_session_ids:
logger.warning("🧹 No sessions found for LRU cleanup")
return 0

# Delete all records for these sessions
deleted_count = self.delete_sessions(lru_session_ids)

logger.info(
f"✅ Session LRU cleanup completed: removed {sessions_to_remove} sessions, "
f"deleted {deleted_count} records"
)

return deleted_count
4 changes: 2 additions & 2 deletions src/mcp_as_a_judge/db/db_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from mcp_as_a_judge.constants import (
DATABASE_URL,
MAX_SESSION_RECORDS,
RECORD_RETENTION_DAYS,
MAX_TOTAL_SESSIONS,
)


Expand Down Expand Up @@ -61,7 +61,7 @@ class DatabaseConfig:
def __init__(self) -> None:
self.url = DATABASE_URL
self.max_session_records = MAX_SESSION_RECORDS
self.record_retention_days = RECORD_RETENTION_DAYS
self.max_total_sessions = MAX_TOTAL_SESSIONS


class Config:
Expand Down
54 changes: 38 additions & 16 deletions src/mcp_as_a_judge/db/providers/sqlite_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""

import uuid
from datetime import datetime
from datetime import UTC, datetime

from sqlalchemy import create_engine
from sqlmodel import Session, SQLModel, asc, desc, select
Expand All @@ -29,8 +29,10 @@ class SQLiteProvider(ConversationHistoryDB):
Features:
- SQLModel with SQLAlchemy for type safety
- In-memory or file-based SQLite storage
- LRU cleanup per session
- Time-based cleanup (configurable retention)
- Two-level cleanup strategy:
1. Session-based LRU cleanup (runs when new sessions are created,
removes least recently used)
2. Per-session FIFO cleanup (max 20 records per session, runs on every save)
- Session-based conversation retrieval
"""

Expand All @@ -50,15 +52,16 @@ def __init__(self, max_session_records: int = 20, url: str = "") -> None:

self._max_session_records = max_session_records

# Initialize cleanup service for time-based cleanup
# Initialize cleanup service for LRU session cleanup
self._cleanup_service = ConversationCleanupService(engine=self.engine)

# Create tables
self._create_tables()

logger.info(
f"🗄️ SQLModel SQLite provider initialized: {connection_string}, "
f"max_records={max_session_records}, retention_days={self._cleanup_service.retention_days}"
f"max_records_per_session={max_session_records}, "
f"max_total_sessions={self._cleanup_service.max_total_sessions}"
)

def _parse_sqlite_url(self, url: str) -> str:
Expand All @@ -79,12 +82,14 @@ def _create_tables(self) -> None:
SQLModel.metadata.create_all(self.engine)
logger.info("📋 Created conversation_history table with SQLModel")

def _cleanup_old_records(self) -> int:
def _cleanup_excess_sessions(self) -> int:
"""
Remove records older than retention_days using the cleanup service.
This runs once per day to avoid excessive cleanup operations.
Remove least recently used sessions when total sessions exceed limit.
This implements LRU cleanup to maintain session limit for better memory
management.
Runs immediately when new sessions are created and limit is exceeded.
"""
return self._cleanup_service.cleanup_old_records()
return self._cleanup_service.cleanup_excess_sessions()

def _cleanup_old_messages(self, session_id: str) -> int:
"""
Expand All @@ -100,8 +105,8 @@ def _cleanup_old_messages(self, session_id: str) -> int:
current_count = len(current_records)

logger.info(
f"🧹 FIFO cleanup check for session {session_id}: {current_count} records "
f"(max: {self._max_session_records})"
f"🧹 FIFO cleanup check for session {session_id}: "
f"{current_count} records (max: {self._max_session_records})"
)

if current_count <= self._max_session_records:
Expand Down Expand Up @@ -132,22 +137,36 @@ def _cleanup_old_messages(self, session_id: str) -> int:
session.commit()

logger.info(
f"✅ LRU cleanup completed: removed {len(old_records)} records from session {session_id}"
f"✅ LRU cleanup completed: removed {len(old_records)} records "
f"from session {session_id}"
)
return len(old_records)

def _is_new_session(self, session_id: str) -> bool:
"""Check if this is a new session (no existing records)."""
with Session(self.engine) as session:
existing_record = session.exec(
select(ConversationRecord)
.where(ConversationRecord.session_id == session_id)
.limit(1)
).first()
return existing_record is None

async def save_conversation(
self, session_id: str, source: str, input_data: str, output: str
) -> str:
"""Save a conversation record to SQLite database with LRU cleanup."""
record_id = str(uuid.uuid4())
timestamp = datetime.utcnow()
timestamp = datetime.now(UTC)

logger.info(
f"💾 Saving conversation to SQLModel SQLite DB: record {record_id} "
f"for session {session_id}, source {source} at {timestamp}"
)

# Check if this is a new session before saving
is_new_session = self._is_new_session(session_id)

# Create new record
record = ConversationRecord(
id=record_id,
Expand All @@ -164,10 +183,13 @@ async def save_conversation(

logger.info("✅ Successfully inserted record into conversation_history table")

# Daily cleanup: run once per day to remove old records
self._cleanup_old_records()
# Session LRU cleanup: only run when a new session is created
if is_new_session:
logger.info(f"🆕 New session detected: {session_id}, running LRU cleanup")
self._cleanup_excess_sessions()

# Always perform LRU cleanup for this session (lightweight)
# Per-session FIFO cleanup: maintain max 20 records per session
# (runs on every save)
self._cleanup_old_messages(session_id)

return record_id
Expand Down
Loading