Stateful multi-agent gift concierge for the Kapruka domain, built to showcase context engineering, retrieval engineering, orchestration, and production-style AI system design.
This repository is not a single-prompt demo. It combines:
- LangGraph state orchestration
- structured routing and multi-route fan-out
- short-term and long-term memory
- RAG, CAG, and CRAG
- relational CRM and logistics reasoning
- Qdrant vector retrieval
- Supabase + pgvector memory storage
- FastAPI serving
- Langfuse tracing and prompt management
The core idea is simple: keep deterministic business data in SQL, keep fuzzy semantic knowledge in vector stores, and make the control flow explicit enough to inspect, test, and extend.
memory_contextis built from recent conversation turns before routing or synthesis.semantic_factscarries structured long-term memory facts into the graph aslist[dict], not as one flattened string.- specialist agents receive different prompt frames and different tool outputs.
- compound requests are decomposed into multiple routes, executed in parallel, then merged into one user-facing answer.
- prompt templates are externalized through Langfuse prompt management with local fallbacks in code.
- product knowledge retrieval is separated from CRM/logistics retrieval.
- Qdrant is used for the Kapruka product corpus and for the semantic CAG cache.
- parent-child chunking is the current default ingestion strategy.
- CRAG expands retrieval only when confidence is low.
- a dedicated semantic cache short-circuits repeated and paraphrased questions.
- FastAPI lifespan builds the agent once at startup.
- async endpoints use
ainvoke()/astream()so the event loop stays non-blocking. - state transitions are explicit in a LangGraph
StateGraph. - traces, token usage, latency, and prompt versions are observable through Langfuse.
- storage is externalized: Supabase for memory and CRM, Qdrant for vectors, Tavily for time-sensitive web search.
- the repo models real Kapruka concerns: catalog retrieval, delivery feasibility, courier availability, slot capacity, product delivery rules, and customer memory.
- structured logistics data is normalized instead of buried inside prompts.
- the router includes post-processing heuristics to recover common delivery/logistics misroutes.
The orchestrator in src/agents/orchestrator.py compiles this graph:
recallLoads short-term turns fromst_turnsand long-term facts frommem_facts.supervisorCalls the LLM router, serializes route decisions into graph state, and decides whether to fan out.profile_agentHandles CRM and logistics requests throughCRMTool.catalog_agentHandles product/catalog/internal FAQ retrieval throughRAGTool.concierge_agentHandles direct concierge turns and web search turns.merge_responsesMerges parallel specialist outputs when the router emitted multiple routes.save_memoryStores the conversation pair in short-term memory and optionally distills durable facts into long-term memory.
| Route | Node | Purpose |
|---|---|---|
crm |
profile_agent |
customer profile lookups and structured logistics checks |
rag |
catalog_agent |
Kapruka product retrieval, recommendations, internal FAQ |
web_search |
concierge_agent |
live external information such as weather or disruptions |
direct |
concierge_agent |
greetings, memory-only turns, general concierge replies |
The shared AgentState in src/agents/state.py is a meaningful part of the system design:
messagesLangGraph message list withadd_messagesreducer.user_id,session_idstable identifiers passed through every node.memory_contextformatted short-term context.semantic_factsstructured long-term facts for specialist prompts.route_decision,route_decisionsbackward-compatible single route plus full multi-route list.tool_output,final_answerraw tool output and synthesized answer.agent_outputsreducer-backed collector used to merge parallel branch outputs.should_distillwrite-path signal from the memory node.
That state model is what turns the graph from "tool calling" into explicit context engineering.
The router in src/agents/router.py does more than intent classification.
- asks the LLM for strict JSON output
- supports up to 3 routes for one user message
- validates routes and CRM actions
- deduplicates repeated routes
- extracts parameters for CRM/RAG/web actions
- repairs common delivery-feasibility misroutes after parsing
Real user messages are often compound:
- "Find a birthday cake under Rs. 5000 and check if same-day delivery is available in Kandy."
- "Update my phone number and recommend chocolates."
Instead of forcing one brittle prompt to solve everything, the router emits structured work items and the graph fans out.
One of the more practical engineering decisions in this repo is the router post-processor:
- delivery-feasibility questions that an LLM may label as
web_searchordirectare corrected toward CRM/logistics actions when the query looks like a structured district/slot/product coverage request. - live disruption queries still stay on the web-search path.
The regression tests in tests/test_logistics_flow.py exist specifically to protect that behavior.
The memory subsystem lives in src/memory/.
- store:
st_turns - implementation:
src/memory/st_store.py - backend: Supabase PostgreSQL
- behavior:
- TTL-backed conversation storage
- ring-buffer trimming
- session-scoped recent-turn recall
- current defaults from
src/infrastructure/config.py:- max turns:
30 - TTL:
24h
- max turns:
- store:
mem_facts - implementation:
src/memory/lt_store.py - backend: Supabase PostgreSQL + pgvector
- behavior:
- semantic retrieval by embedding similarity
- score decay
- soft deletion
- cross-run semantic deduplication
- current defaults:
- top-k:
5 - similarity threshold:
0.30 - TTL horizon:
90 days - half-life:
30 days - cross-run dedup similarity:
0.92
- top-k:
src/memory/memory_ops.py contains:
MemoryDistiller- triggers when the conversation is long enough or contains memory-like phrases such as
remember,always, ornever - uses an LLM to extract durable facts
- scores facts
- deduplicates within-batch
- upserts to long-term memory
- triggers when the conversation is long enough or contains memory-like phrases such as
MemoryRecaller- retrieves ST + LT memory
- applies a token budget
- currently uses a 60/40 short-term vs long-term budget split within a 500-token recall window
These exist and are implemented, but they are not on the default orchestration path today:
mem_episodessrc/memory/episodic_store.py- stores summarized conversation episodes with pgvector summaries.
mem_proceduressrc/memory/procedural_store.py- stores semantically searchable workflows and procedures.
That distinction matters: the repo contains a broader memory architecture than the current default agent runtime actively consumes.
The retrieval path is implemented in src/services/chat_service/ and src/agents/tools/rag_tool.py.
RAGTool is the public tool used by the agent for product, catalog, and internal FAQ retrieval.
Under the hood:
- embed the query
- search Qdrant
- retrieve parent-child-aware context
- build a grounded prompt
- synthesize an answer
The retriever in src/services/chat_service/rag_service.py:
- is a LangChain-compatible
BaseRetriever - deduplicates by
parent_id - passes
parent_textas the LLM-facingpage_content - preserves child text and metadata in the payload
CAGCache in src/services/chat_service/cag_cache.py is a semantic cache backed by a dedicated Qdrant collection.
Current behavior:
- collection:
cag_cache - threshold:
0.90 - TTL:
24h - lookup: KNN-1 over query embeddings
- duplicate cleanup on set: near-identical entries above
0.99
Why this is useful:
- repeated questions return instantly
- paraphrases can still hit the cache
- cached answers do not pollute the product corpus because the cache lives in its own collection
CRAGService in src/services/chat_service/crag_service.py adds a corrective retrieval pass.
Current flow:
- initial retrieval with
k=4 - confidence scoring
- if confidence
< 0.6, expand retrieval tok=8 - generate from the better evidence set
Confidence is currently heuristic-based in src/infrastructure/utils.py:
- keyword overlap
- content richness
- strategy diversity
The agent-facing RAGTool uses:
query -> semantic cache -> cache miss -> CRAG -> answer -> cache set
That gives the system two operating modes:
- low-latency path for repeated/common questions
- higher-quality corrective path for uncertain retrieval
The ingestion pipeline lives in src/services/ingest_services/.
The repo currently contains two product corpora derived from the Kapruka crawl:
data/kapruka_docs.jsonl- structured product records
data/kapruka_markdown/- rendered markdown product pages
The current CLI-exposed ingestion source is jsonl, via scripts/ingest_to_qdrant.py.
src/services/ingest_services/chunkers.py implements:
semantic_chunkfixed_chunksliding_chunkparent_child_chunklate_chunk_indexlate_chunk_split
The actual current ingestion CLI default is:
- source:
jsonl - strategy:
parent_child
Relevant config values:
- parent size:
1200 - child size:
250 - child overlap:
50 - retrieval top-k:
4 - retrieval similarity threshold:
0.7
The Kapruka dataset is product-centric and fairly compact. Parent-child chunking works well because:
- child chunks improve retrieval precision
- parent text gives the generator richer context
- repeated field structures such as price, partner, options, and descriptions stay connected during synthesis
src/services/ingest_services/web_crawler.py contains an async Playwright crawler that:
- prioritizes product detail pages
- extracts product metadata and option values
- converts crawled HTML into structured content
- keeps discovery order stable
- enforces max-depth, max-pages, and max-saved-docs limits
The notebooks show the crawl process that produced the current dataset snapshot.
The database schema covers both the memory system and the operational CRM/logistics model:
- conversation memory in
st_turns - semantic memory in
mem_facts - episodic memory in
mem_episodes - procedural memory in
mem_procedures - customer identity in
users - delivery planning in
delivery_zones,delivery_slots, andcourier_profiles - product constraints in
product_delivery_rules - historical fulfillment signals in
delivery_history
The structured business-data path is intentionally relational.
The schema generator and SQL snapshot define:
usersdelivery_zonesdelivery_slotscourier_profilesproduct_delivery_rulesdelivery_history- plus memory tables:
st_turnsmem_factsmem_episodesmem_procedures
From data/logistics/:
25delivery zones125delivery slots1000courier profiles10product delivery rule rows10000delivery history rows
Delivery coverage, courier capacity, slot availability, and product constraints are deterministic business queries. They should not be hallucinated from text retrieval.
That is why the CRM/logistics tool path exists separately from product RAG.
src/agents/tools/crm_tool.py supports:
lookup_usercreate_userupdate_userdeactivate_userlist_usersget_delivery_zonelist_delivery_slotssearch_couriersget_product_delivery_rulelookup_delivery_historycheck_delivery_coverage
check_delivery_coverage is especially important because it synthesizes:
- district-level availability
- same-day feasibility
- slot availability
- product delivery rules
- top available couriers
- historical delivery summary
The observability and prompt-ops design lives in:
src/infrastructure/observability.pysrc/agents/prompts/agent_prompts.pysrc/memory/prompts.py
Langfuse is used for:
- tracing graph nodes
- tracking token usage and latency
- routing and memory-generation visibility
- prompt management with live override capability
Prompts are fetched from Langfuse by name, but every prompt has a local fallback in code.
That gives you:
- editable prompts in Langfuse without code redeploy
- safe local execution when Langfuse prompts do not exist yet
- versionable agent behavior across router, synthesis, memory distillation, and specialist prompts
Key traced units include:
- router invocation
- recall node
- CRM dispatch
- RAG search
- CAG generation
- web search
- memory distillation
- top-level chat request
The screenshots below show a single end-to-end concierge flow moving from gift discovery to delivery confirmation.
| Initial gift recommendation | Follow-up recommendations |
|---|---|
![]() |
![]() |
| Additional cake options | Confirmed item pricing |
|---|---|
![]() |
![]() |
| Delivery coverage check | Delivery slot selection |
|---|---|
![]() |
![]() |
These screenshots show the tracing, cost, and prompt-management views wired into the agent runtime.
| Langfuse home overview | Langfuse cost dashboard |
|---|---|
![]() |
![]() |
| Langfuse tracing view | Langfuse prompt management |
|---|---|
![]() |
![]() |
The FastAPI app lives in src/api/.
POST /chatSynchronous final-answer endpoint.POST /chat/streamSSE stream of node-by-node progress using LangGraphastream().GET /healthReports agent readiness and tool availability.GET /graphReturns Mermaid and structured edge metadata for the compiled graph.GET /memory/{user_id}Returns stored long-term facts for a user.POST /memory/clearClears short-term memory for a session.
- typed request/response schemas live in
src/api/schemas.py - startup builds the agent once in FastAPI lifespan
- blocking startup work is moved to
asyncio.to_thread - CORS is open for experimentation
- streaming summarizes per-node state instead of dumping raw graph internals
pyproject.tomlpackage metadata and Hatch configuration.requirements.txtbroader runtime and notebook dependency list.Makefileworkflow shortcuts for install, schema init, seeding, ingestion, status, and tests.assets/kapruka_system_architecture.pngsystem architecture diagram.assets/supabase_schema.pngSupabase schema reference.
config/param.yamlretrieval, chunking, cache, crawling, and path defaults.config/models.yamlprovider/model catalog.config/faqs.yamlcurated FAQ query/answer pairs used to warm the semantic cache.
data/kapruka_docs.jsonlstructured product corpus for ingestion.data/kapruka_markdown/*.mdmarkdown-rendered crawl output.data/logistics/*.jsonstructured logistics seed data.
src/agents/router, state, orchestrator, prompts, tools.src/api/FastAPI app and schemas.src/infrastructure/config, logging, utils, observability, LLM providers, DB clients.src/memory/ST/LT/episodic/procedural memory implementations and policies.src/services/chat_service/RAG, CAG, CRAG, cache, and prompt templates.src/services/ingest_services/crawler, chunkers, ingestion pipeline.src/services/crm_service/CRM DB client and synthetic data generation.
scripts/init_supabase.pyinitialize Supabase schema.scripts/test_supabase.pyverify connection and pgvector extension.scripts/seed_crm_unified.pyseed CRM users plus logistics reference data.scripts/ingest_to_qdrant.pyingest the product corpus into Qdrant.scripts/rebuild_cag_cache.pyclear and warm the semantic FAQ cache fromconfig/faqs.yaml.
sql/supabase_schema.sqlSQL schema snapshot.src/infrastructure/db/supabase_schema.pydynamic schema generator used by setup scripts.sql/01_users.sqldeterministic user seed data.sql/02_delivery_zones.sqlthroughsql/06_delivery_history.sqllogistics seed snapshots.
notebooks/01_crawl_kapruka.ipynbcrawler workflow and crawl export.notebooks/02_find_chunk_size.ipynbchunk-size analysis over the product corpus.notebooks/03_routing_memory_and_tools.ipynbrouting, memory, and tool-path walkthrough.notebooks/04_multi_agent_langgraph.ipynbLangGraph visualization and multi-agent demos.
tests/test_logistics_flow.pyverifies logistics rerouting, CRM feasibility formatting, and end-to-end orchestrator behavior.
Current repository snapshot:
96JSONL product records96markdown product documents40curated FAQ cache entries25delivery zones125delivery slots1000courier profiles10product delivery rules10000delivery-history rows
pip install -r requirements.txtCreate a .env with the keys your chosen runtime path needs.
Common keys used by this repo:
OPENAI_API_KEYQDRANT_URLQDRANT_API_KEYSUPABASE_DB_URLSUPABASE_URLSUPABASE_KEYTAVILY_API_KEYLANGFUSE_SECRET_KEYLANGFUSE_PUBLIC_KEYLANGFUSE_BASE_URL
python scripts/init_supabase.py
python scripts/test_supabase.pypython scripts/seed_crm_unified.py --mode template --storage database --n-users 20 --tz Asia/Colombo --rand-seed 42Use --mode llm if you want LLM-generated CRM users instead of deterministic templates.
python scripts/ingest_to_qdrant.py --source jsonl --strategy parent_childpython scripts/rebuild_cag_cache.pypython src/api/run.pyDocs:
http://localhost:8000/docshttp://localhost:8000/redoc
I want a birthday gift under Rs. 5000. I prefer chocolates and flowers.
Expected path:
- memory recall
rag- optional CAG hit or CRAG correction
- memory write-back
Can you check same-day delivery availability in Kandy for a cake?
Expected path:
- router may infer or repair this to
crm/check_delivery_coverage - CRM tool composes coverage + rule + slot + history summary
Recommend a chocolate gift and also tell me if Kandy has an available delivery slot.
Expected path:
- router emits
ragandcrm - LangGraph fans out
merge_responsessynthesizes one answer
pytest tests/test_logistics_flow.py -vThe project does not treat "context" as one big prompt field.
It treats context as a composed system:
- conversational context with TTL and trimming
- persistent user facts with semantic recall
- structured business context from CRM/logistics tables
- retrieved product context from Qdrant
- cached answer context from CAG
- route context for branch execution
- merged multi-agent context for final synthesis
- prompt context controlled through Langfuse
That is the real engineering value in this codebase: context is modeled, stored, routed, budgeted, traced, and tested.











