From e38a1521153e57d07920d9c28c7ee9a6a92bc132 Mon Sep 17 00:00:00 2001 From: Will Sargent Date: Tue, 8 Apr 2025 09:41:08 -0700 Subject: [PATCH] Add timer to hayhooks --- hayhooks/app.py | 214 ++++++++++++++++++++++++++++++++++------ hayhooks/pyproject.toml | 2 + hayhooks/uv.lock | 16 +++ 3 files changed, 203 insertions(+), 29 deletions(-) diff --git a/hayhooks/app.py b/hayhooks/app.py index 2a22545..471084e 100644 --- a/hayhooks/app.py +++ b/hayhooks/app.py @@ -1,40 +1,179 @@ -import uvicorn +import asyncio +import datetime +import logging +import sys +from contextlib import asynccontextmanager from typing import List -from hayhooks.settings import settings -from hayhooks import create_app -from hayhooks.server.utils.mcp_utils import list_pipelines_as_tools, run_pipeline_as_tool -from hayhooks.server.logger import log +import uvicorn +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.interval import IntervalTrigger +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware + +from hayhooks.server.app import ( + lifespan as hayhooks_lifespan +) +from hayhooks.server.logger import log +from hayhooks.server.routers import ( + deploy_router, + draw_router, + openai_router, + status_router, + undeploy_router, +) +from hayhooks.server.utils.mcp_utils import ( + list_pipelines_as_tools, + run_pipeline_as_tool, +) +from hayhooks.settings import check_cors_settings, settings from haystack import tracing -from haystack.tracing.logging_tracer import LoggingTracer from haystack.lazy_imports import LazyImport +from haystack.tracing.logging_tracer import LoggingTracer -import logging +# MCP Imports with LazyImport("Run 'pip install \"mcp\"' to install MCP.") as mcp_import: - from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource from mcp.server import Server from mcp.server.sse import SseServerTransport + from mcp.types import EmbeddedResource, ImageContent, TextContent, Tool ########### -# This class adds MCP support and logging beyond what running `hayhooks run` would get us. +# Configuration +########### -#uvicorn_access = logging.getLogger("uvicorn.access") -#uvicorn_access.disabled = True +# uvicorn_access = logging.getLogger("uvicorn.access") +# uvicorn_access.disabled = True -HAYSTACK_DETAILED_TRACING=False +HAYSTACK_DETAILED_TRACING = True if HAYSTACK_DETAILED_TRACING: - logging.basicConfig(format="%(levelname)s - %(name)s - %(message)s", level=logging.WARNING) + logging.basicConfig( + format="%(levelname)s - %(name)s - %(message)s", level=logging.WARNING + ) logging.getLogger("haystack").setLevel(logging.DEBUG) + logging.getLogger('apscheduler').setLevel(logging.DEBUG) + # https://docs.haystack.deepset.ai/docs/logging - tracing.tracer.is_content_tracing_enabled = True # to enable tracing/logging content (inputs/outputs) - tracing.enable_tracing(LoggingTracer(tags_color_strings={"haystack.component.input": "\x1b[1;31m", "haystack.component.name": "\x1b[1;34m"})) + tracing.tracer.is_content_tracing_enabled = ( + True # to enable tracing/logging content (inputs/outputs) + ) + tracing.enable_tracing( + LoggingTracer( + tags_color_strings={ + "haystack.component.input": "\x1b[1;31m", + "haystack.component.name": "\x1b[1;34m", + } + ) + ) -# https://github.com/deepset-ai/hayhooks/tree/main?tab=readme-ov-file#run-hayhooks-programmatically -hayhooks = create_app() +########### +# Custom Scheduler Logic +########### + +scheduler = AsyncIOScheduler(timezone="UTC") # Choose your timezone + + +async def my_periodic_task(): + """ + This is the function that will be executed every 10 seconds. + """ + logger = logging.getLogger("app") + current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + logger.info(f"Periodic task executed at: {current_time}") + # --- Add your actual code here --- + await asyncio.sleep(1) # Simulate some async work if needed + logger.info("Periodic task finished.") + + +@asynccontextmanager +async def scheduler_lifespan(app: FastAPI): + """Manages the scheduler lifecycle.""" + log.info("Starting scheduler...") + # Add the job to the scheduler + scheduler.add_job( + my_periodic_task, + trigger=IntervalTrigger(seconds=10), + id="my_periodic_task_job", # Give the job a unique ID + name="Run my task every 10 seconds", + replace_existing=True, + ) + # Start the scheduler + scheduler.start() + log.info("Scheduler started.") + try: + yield # Application runs + finally: + log.info("Shutting down scheduler...") + scheduler.shutdown() + log.info("Scheduler shut down.") + + +########### +# Combined Lifespan +########### -# --- MCP Server Integration --- + +@asynccontextmanager +async def combined_lifespan(app: FastAPI): + """Combines custom scheduler lifespan with hayhooks lifespan.""" + async with scheduler_lifespan(app): + async with hayhooks_lifespan(app): + yield # Application runs here + + +########### +# FastAPI App Creation (Replicating hayhooks.create_app but with combined lifespan) +########### + + +def create_my_app() -> FastAPI: + """ + Create and configure a FastAPI application with combined lifespan. + """ + if additional_path := settings.additional_python_path: + sys.path.append(additional_path) + log.trace(f"Added {additional_path} to sys.path") + + # Instantiate FastAPI with the *combined* lifespan + if root_path := settings.root_path: + app = FastAPI(root_path=root_path, lifespan=combined_lifespan) + else: + app = FastAPI(lifespan=combined_lifespan) + + # Add CORS middleware (copied from hayhooks) + check_cors_settings() + app.add_middleware( + CORSMiddleware, + allow_origins=settings.cors_allow_origins, + allow_methods=settings.cors_allow_methods, + allow_headers=settings.cors_allow_headers, + allow_credentials=settings.cors_allow_credentials, + allow_origin_regex=settings.cors_allow_origin_regex, + expose_headers=settings.cors_expose_headers, + max_age=settings.cors_max_age, + ) + + # Include hayhooks routers (copied from hayhooks) + app.include_router(status_router) + app.include_router(draw_router) + app.include_router(deploy_router) + app.include_router(undeploy_router) + app.include_router(openai_router) + + # Include your own routers if any + # from .my_routers import custom_router + # app.include_router(custom_router) + + return app + + +# Create the app instance using your factory function +app = create_my_app() + +########### +# MCP Server Integration (Attached to the new 'app' instance) +########### mcp_import.check() # Setup the MCP server @@ -43,32 +182,49 @@ # Setup the SSE server transport for MCP mcp_sse = SseServerTransport("/messages/") + @mcp_server.list_tools() async def list_tools() -> List[Tool]: try: - return await list_pipelines_as_tools() + # Pass the app instance if needed by the underlying function, otherwise remove + return await list_pipelines_as_tools(app) except Exception as e: log.error(f"Error listing MCP tools: {e}") return [] + @mcp_server.call_tool() -async def call_tool(name: str, arguments: dict) -> List[TextContent | ImageContent | EmbeddedResource]: +async def call_tool( + name: str, arguments: dict +) -> List[TextContent | ImageContent | EmbeddedResource]: try: - return await run_pipeline_as_tool(name, arguments) + # Pass the app instance if needed by the underlying function, otherwise remove + return await run_pipeline_as_tool(app, name, arguments) except Exception as e: log.error(f"Error calling MCP tool '{name}': {e}") # Consider returning an error structure if MCP spec allows return [] + async def handle_sse(request): - async with mcp_sse.connect_sse(request.scope, request.receive, request._send) as streams: - await mcp_server.run(streams[0], streams[1], mcp_server.create_initialization_options()) + async with mcp_sse.connect_sse( + request.scope, request.receive, request._send + ) as streams: + await mcp_server.run( + streams[0], streams[1], mcp_server.create_initialization_options() + ) -# Add MCP routes directly to the main Hayhooks app -hayhooks.add_route("/sse", handle_sse) -hayhooks.mount("/messages", mcp_sse.handle_post_message) -# --- End MCP Server Integration --- + +# Add MCP routes directly to the *new* app instance +app.add_route("/sse", handle_sse) +app.mount("/messages", mcp_sse.handle_post_message) + +########### +# Run with Uvicorn +########### if __name__ == "__main__": - # Run the combined Hayhooks + MCP server - uvicorn.run("app:hayhooks", host=settings.host, port=settings.port) + # Run the application using the 'app' instance created by create_my_app() + uvicorn.run( + "app:app", host=settings.host, port=settings.port, reload=True + ) # Added reload for convenience diff --git a/hayhooks/pyproject.toml b/hayhooks/pyproject.toml index a84009e..f87c7ad 100644 --- a/hayhooks/pyproject.toml +++ b/hayhooks/pyproject.toml @@ -5,12 +5,14 @@ description = "Hayhooks" readme = "README.md" requires-python = "==3.12.*" dependencies = [ + "apscheduler>=3.11.0", "hayhooks[mcp]>=0.6.0", "haystack-ai>=2.12.0", "letta-client>=0.1.91", "openapi-llm>=0.4.2", "openapi3>=1.8.2", "pydantic-settings>=2.8.1", + "pytz>=2025.2", "rich>=13.7.1", "tavily-python>=0.5.3", "trafilatura>=2.0.0", diff --git a/hayhooks/uv.lock b/hayhooks/uv.lock index 7cd26c6..909cab5 100644 --- a/hayhooks/uv.lock +++ b/hayhooks/uv.lock @@ -79,6 +79,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a1/ee/48ca1a7c89ffec8b6a0c5d02b89c305671d5ffd8d3c94acf8b8c408575bb/anyio-4.9.0-py3-none-any.whl", hash = "sha256:9f76d541cad6e36af7beb62e978876f3b41e3e04f2c1fbf0884604c0a9c4d93c", size = 100916 }, ] +[[package]] +name = "apscheduler" +version = "3.11.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "tzlocal" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/4e/00/6d6814ddc19be2df62c8c898c4df6b5b1914f3bd024b780028caa392d186/apscheduler-3.11.0.tar.gz", hash = "sha256:4c622d250b0955a65d5d0eb91c33e6d43fd879834bf541e0a18661ae60460133", size = 107347 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d0/ae/9a053dd9229c0fde6b1f1f33f609ccff1ee79ddda364c756a924c6d8563b/APScheduler-3.11.0-py3-none-any.whl", hash = "sha256:fc134ca32e50f5eadcc4938e3a4545ab19131435e851abb40b34d63d5141c6da", size = 64004 }, +] + [[package]] name = "attrs" version = "25.3.0" @@ -289,12 +301,14 @@ name = "groundedllm-hayhooks" version = "0.1.0" source = { virtual = "." } dependencies = [ + { name = "apscheduler" }, { name = "hayhooks", extra = ["mcp"] }, { name = "haystack-ai" }, { name = "letta-client" }, { name = "openapi-llm" }, { name = "openapi3" }, { name = "pydantic-settings" }, + { name = "pytz" }, { name = "rich" }, { name = "tavily-python" }, { name = "trafilatura" }, @@ -308,6 +322,7 @@ test = [ [package.metadata] requires-dist = [ + { name = "apscheduler", specifier = ">=3.11.0" }, { name = "hayhooks", extras = ["mcp"], specifier = ">=0.6.0" }, { name = "haystack-ai", specifier = ">=2.12.0" }, { name = "letta-client", specifier = ">=0.1.91" }, @@ -316,6 +331,7 @@ requires-dist = [ { name = "pydantic-settings", specifier = ">=2.8.1" }, { name = "pytest", marker = "extra == 'test'", specifier = ">=8.0" }, { name = "pytest-mock", marker = "extra == 'test'", specifier = ">=3.12" }, + { name = "pytz", specifier = ">=2025.2" }, { name = "rich", specifier = ">=13.7.1" }, { name = "tavily-python", specifier = ">=0.5.3" }, { name = "trafilatura", specifier = ">=2.0.0" },