Skip to content

Commit 8d7fb12

Browse files
small updatesd
1 parent 4ade2fb commit 8d7fb12

File tree

4 files changed

+137
-121
lines changed

4 files changed

+137
-121
lines changed

backend/alembic/versions/495cb26ce93e_create_knowlege_graph_tables.py

Lines changed: 73 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,21 @@ def upgrade() -> None:
4646
op.execute(
4747
text(
4848
f"""
49-
DO $$
50-
BEGIN
51-
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{DB_READONLY_USER}') THEN
52-
EXECUTE format('CREATE USER %I WITH PASSWORD %L', '{DB_READONLY_USER}', '{DB_READONLY_PASSWORD}');
53-
-- Explicitly revoke all privileges including CONNECT
54-
EXECUTE format('REVOKE ALL ON DATABASE %I FROM %I', current_database(), '{DB_READONLY_USER}');
55-
-- Grant only the CONNECT privilege
56-
EXECUTE format('GRANT CONNECT ON DATABASE %I TO %I', current_database(), '{DB_READONLY_USER}');
57-
END IF;
58-
END
59-
$$;
60-
"""
49+
DO $$
50+
BEGIN
51+
-- Check if the read-only user already exists
52+
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{DB_READONLY_USER}') THEN
53+
-- Create the read-only user with the specified password
54+
EXECUTE format('CREATE USER %I WITH PASSWORD %L', '{DB_READONLY_USER}', '{DB_READONLY_PASSWORD}');
55+
-- First revoke all privileges to ensure a clean slate
56+
EXECUTE format('REVOKE ALL ON DATABASE %I FROM %I', current_database(), '{DB_READONLY_USER}');
57+
-- Grant only the CONNECT privilege to allow the user to connect to the database
58+
-- but not perform any operations without additional specific grants
59+
EXECUTE format('GRANT CONNECT ON DATABASE %I TO %I', current_database(), '{DB_READONLY_USER}');
60+
END IF;
61+
END
62+
$$;
63+
"""
6164
)
6265
)
6366

@@ -448,28 +451,34 @@ def upgrade() -> None:
448451
"ON kg_entity_extraction_staging USING GIN (clustering_name gin_trgm_ops)"
449452
)
450453

451-
if not MULTI_TENANT:
452-
# Create trigger to update clustering columns if entity w/ doc_id is created
453-
alphanum_pattern = r"[^a-z0-9]+"
454-
op.execute(
454+
# Create trigger to update clustering columns if entity w/ doc_id is created
455+
alphanum_pattern = r"[^a-z0-9]+"
456+
op.execute(
457+
text(
455458
f"""
456459
CREATE OR REPLACE FUNCTION update_kg_entity_clustering()
457460
RETURNS TRIGGER AS $$
458461
DECLARE
459462
doc_semantic_id text;
460463
cleaned_semantic_id text;
464+
max_length integer := 1000; -- Limit length for performance
461465
BEGIN
462466
-- Get semantic_id from document
463467
SELECT semantic_id INTO doc_semantic_id
464468
FROM document
465469
WHERE id = NEW.document_id;
466470
467-
-- Clean the semantic_id with regex patterns
471+
-- Clean the semantic_id with regex patterns and handle NULLs
468472
cleaned_semantic_id = regexp_replace(
469-
lower(COALESCE(doc_semantic_id, NEW.name)),
473+
lower(COALESCE(doc_semantic_id, NEW.name, '')),
470474
'{alphanum_pattern}', '', 'g'
471475
);
472476
477+
-- Truncate if too long for performance
478+
IF length(cleaned_semantic_id) > max_length THEN
479+
cleaned_semantic_id = left(cleaned_semantic_id, max_length);
480+
END IF;
481+
473482
-- Set clustering_name to cleaned version and generate trigrams
474483
NEW.clustering_name = cleaned_semantic_id;
475484
NEW.clustering_trigrams = show_trgm(cleaned_semantic_id);
@@ -478,50 +487,56 @@ def upgrade() -> None:
478487
$$ LANGUAGE plpgsql;
479488
"""
480489
)
481-
op.execute(
490+
)
491+
op.execute(
492+
text(
482493
"""
483494
CREATE OR REPLACE FUNCTION update_kg_entity_extraction_clustering()
484495
RETURNS TRIGGER AS $$
485496
DECLARE
486497
doc_semantic_id text;
487498
BEGIN
488499
-- Get semantic_id from document
500+
-- If no document is found, doc_semantic_id will be NULL and COALESCE will use NEW.name
489501
SELECT semantic_id INTO doc_semantic_id
490502
FROM document
491503
WHERE id = NEW.document_id;
492504
493505
-- Set clustering_name to semantic_id
494-
NEW.clustering_name = lower(COALESCE(doc_semantic_id, NEW.name));
506+
NEW.clustering_name = lower(COALESCE(doc_semantic_id, NEW.name, ''));
495507
RETURN NEW;
496508
END;
497509
$$ LANGUAGE plpgsql;
498510
"""
499511
)
500-
for table, function in (
501-
("kg_entity", "update_kg_entity_clustering"),
502-
("kg_entity_extraction_staging", "update_kg_entity_extraction_clustering"),
503-
):
504-
trigger = f"{function}_trigger"
505-
op.execute(f"DROP TRIGGER IF EXISTS {trigger} ON {table}")
506-
op.execute(
507-
f"""
508-
CREATE TRIGGER {trigger}
509-
BEFORE INSERT
510-
ON {table}
511-
FOR EACH ROW
512-
EXECUTE FUNCTION {function}();
513-
"""
514-
)
515-
516-
# Create trigger to update kg_entity clustering_name and its trigrams when document.clustering_name changes
512+
)
513+
for table, function in (
514+
("kg_entity", "update_kg_entity_clustering"),
515+
("kg_entity_extraction_staging", "update_kg_entity_extraction_clustering"),
516+
):
517+
trigger = f"{function}_trigger"
518+
op.execute(f"DROP TRIGGER IF EXISTS {trigger} ON {table}")
517519
op.execute(
520+
f"""
521+
CREATE TRIGGER {trigger}
522+
BEFORE INSERT
523+
ON {table}
524+
FOR EACH ROW
525+
EXECUTE FUNCTION {function}();
526+
"""
527+
)
528+
529+
# Create trigger to update kg_entity clustering_name and its trigrams when document.clustering_name changes
530+
op.execute(
531+
text(
518532
f"""
519533
CREATE OR REPLACE FUNCTION update_kg_entity_clustering_from_doc()
520534
RETURNS TRIGGER AS $$
521535
DECLARE
522536
cleaned_semantic_id text;
523537
BEGIN
524538
-- Clean the semantic_id with regex patterns
539+
-- If semantic_id is NULL, COALESCE will use empty string
525540
cleaned_semantic_id = regexp_replace(
526541
lower(COALESCE(NEW.semantic_id, '')),
527542
'{alphanum_pattern}', '', 'g'
@@ -538,11 +553,15 @@ def upgrade() -> None:
538553
$$ LANGUAGE plpgsql;
539554
"""
540555
)
541-
op.execute(
556+
)
557+
op.execute(
558+
text(
542559
"""
543560
CREATE OR REPLACE FUNCTION update_kg_entity_extraction_clustering_from_doc()
544561
RETURNS TRIGGER AS $$
545562
BEGIN
563+
-- Update clustering name for all entities in staging referencing this document
564+
-- If semantic_id is NULL, COALESCE will use empty string
546565
UPDATE kg_entity_extraction_staging
547566
SET
548567
clustering_name = lower(COALESCE(NEW.semantic_id, ''))
@@ -552,21 +571,22 @@ def upgrade() -> None:
552571
$$ LANGUAGE plpgsql;
553572
"""
554573
)
555-
for function in (
556-
"update_kg_entity_clustering_from_doc",
557-
"update_kg_entity_extraction_clustering_from_doc",
558-
):
559-
trigger = f"{function}_trigger"
560-
op.execute(f"DROP TRIGGER IF EXISTS {trigger} ON document")
561-
op.execute(
562-
f"""
563-
CREATE TRIGGER {trigger}
564-
AFTER UPDATE OF semantic_id
565-
ON document
566-
FOR EACH ROW
567-
EXECUTE FUNCTION {function}();
568-
"""
569-
)
574+
)
575+
for function in (
576+
"update_kg_entity_clustering_from_doc",
577+
"update_kg_entity_extraction_clustering_from_doc",
578+
):
579+
trigger = f"{function}_trigger"
580+
op.execute(f"DROP TRIGGER IF EXISTS {trigger} ON document")
581+
op.execute(
582+
f"""
583+
CREATE TRIGGER {trigger}
584+
AFTER UPDATE OF semantic_id
585+
ON document
586+
FOR EACH ROW
587+
EXECUTE FUNCTION {function}();
588+
"""
589+
)
570590

571591

572592
def downgrade() -> None:

backend/alembic_tenants/versions/3b9f09038764_add_read_only_kg_user.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,21 @@ def upgrade() -> None:
3131
op.execute(
3232
text(
3333
f"""
34-
DO $$
35-
BEGIN
36-
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{DB_READONLY_USER}') THEN
37-
EXECUTE format('CREATE USER %I WITH PASSWORD %L', '{DB_READONLY_USER}', '{DB_READONLY_PASSWORD}');
38-
-- Explicitly revoke all privileges including CONNECT
39-
EXECUTE format('REVOKE ALL ON DATABASE %I FROM %I', current_database(), '{DB_READONLY_USER}');
40-
-- Grant only the CONNECT privilege
41-
EXECUTE format('GRANT CONNECT ON DATABASE %I TO %I', current_database(), '{DB_READONLY_USER}');
42-
END IF;
43-
END
44-
$$;
45-
"""
34+
DO $$
35+
BEGIN
36+
-- Check if the read-only user already exists
37+
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{DB_READONLY_USER}') THEN
38+
-- Create the read-only user with the specified password
39+
EXECUTE format('CREATE USER %I WITH PASSWORD %L', '{DB_READONLY_USER}', '{DB_READONLY_PASSWORD}');
40+
-- First revoke all privileges to ensure a clean slate
41+
EXECUTE format('REVOKE ALL ON DATABASE %I FROM %I', current_database(), '{DB_READONLY_USER}');
42+
-- Grant only the CONNECT privilege to allow the user to connect to the database
43+
-- but not perform any operations without additional specific grants
44+
EXECUTE format('GRANT CONNECT ON DATABASE %I TO %I', current_database(), '{DB_READONLY_USER}');
45+
END IF;
46+
END
47+
$$;
48+
"""
4649
)
4750
)
4851

backend/onyx/agents/agent_search/kb_search/nodes/a3_generate_simple_sql.py

Lines changed: 43 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,24 @@
3131
from onyx.prompts.kg_prompts import SIMPLE_SQL_CORRECTION_PROMPT
3232
from onyx.prompts.kg_prompts import SIMPLE_SQL_PROMPT
3333
from onyx.prompts.kg_prompts import SOURCE_DETECTION_PROMPT
34-
from onyx.prompts.kg_prompts import SQL_AGGREGATION_REMOVAL_PROMPT
3534
from onyx.utils.logger import setup_logger
3635
from onyx.utils.threadpool_concurrency import run_with_timeout
3736

3837

3938
logger = setup_logger()
4039

4140

41+
def _drop_temp_views(
42+
allowed_docs_view_name: str, kg_relationships_view_name: str
43+
) -> None:
44+
with get_session_with_current_tenant() as db_session:
45+
drop_views(
46+
db_session,
47+
allowed_docs_view_name=allowed_docs_view_name,
48+
kg_relationships_view_name=kg_relationships_view_name,
49+
)
50+
51+
4252
def _build_entity_explanation_str(entity_normalization_map: dict[str, str]) -> str:
4353
"""
4454
Build a string of contextualized entities to avoid the model not being aware of
@@ -57,44 +67,12 @@ def _sql_is_aggregate_query(sql_statement: str) -> bool:
5767
)
5868

5969

60-
def _remove_aggregation(sql_statement: str, llm: LLM) -> str:
61-
"""
62-
Remove aggregate functions from the SQL statement.
63-
"""
64-
65-
sql_aggregation_removal_prompt = SQL_AGGREGATION_REMOVAL_PROMPT.replace(
66-
"---sql_statement---", sql_statement
67-
)
68-
69-
msg = [
70-
HumanMessage(
71-
content=sql_aggregation_removal_prompt,
72-
)
73-
]
74-
75-
try:
76-
llm_response = run_with_timeout(
77-
KG_SQL_GENERATION_TIMEOUT,
78-
llm.invoke,
79-
prompt=msg,
80-
timeout_override=25,
81-
max_tokens=800,
82-
)
83-
84-
cleaned_response = (
85-
str(llm_response.content).replace("```json\n", "").replace("\n```", "")
86-
)
87-
sql_statement = cleaned_response.split("<sql>")[1].split("</sql>")[0].strip()
88-
sql_statement = sql_statement.replace("sql", "").strip()
89-
90-
except Exception as e:
91-
logger.error(f"Error in strategy generation: {e}")
92-
raise e
93-
94-
return sql_statement
95-
96-
97-
def _get_source_documents(sql_statement: str, llm: LLM) -> str | None:
70+
def _get_source_documents(
71+
sql_statement: str,
72+
llm: LLM,
73+
allowed_docs_view_name: str,
74+
kg_relationships_view_name: str,
75+
) -> str | None:
9876
"""
9977
Generate SQL to retrieve source documents based on the input sql statement.
10078
"""
@@ -132,6 +110,11 @@ def _get_source_documents(sql_statement: str, llm: LLM) -> str | None:
132110
)
133111
else:
134112
logger.error(f"Could not generate source documents SQL: {e}")
113+
114+
_drop_temp_views(
115+
allowed_docs_view_name=allowed_docs_view_name,
116+
kg_relationships_view_name=kg_relationships_view_name,
117+
)
135118
return None
136119

137120
return sql_statement
@@ -282,6 +265,11 @@ def generate_simple_sql(
282265

283266
except Exception as e:
284267
logger.error(f"Error in strategy generation: {e}")
268+
269+
_drop_temp_views(
270+
allowed_docs_view_name=allowed_docs_view_name,
271+
kg_relationships_view_name=kg_relationships_view_name,
272+
)
285273
raise e
286274

287275
logger.debug(f"A3 - sql_statement: {sql_statement}")
@@ -321,13 +309,24 @@ def generate_simple_sql(
321309
logger.error(
322310
f"Error in generating the sql correction: {e}. Original model response: {cleaned_response}"
323311
)
312+
313+
_drop_temp_views(
314+
allowed_docs_view_name=allowed_docs_view_name,
315+
kg_relationships_view_name=kg_relationships_view_name,
316+
)
317+
324318
raise e
325319

326320
logger.debug(f"A3 - sql_statement after correction: {sql_statement}")
327321

328322
# Get SQL for source documents
329323

330-
source_documents_sql = _get_source_documents(sql_statement, llm=primary_llm)
324+
source_documents_sql = _get_source_documents(
325+
sql_statement,
326+
llm=primary_llm,
327+
allowed_docs_view_name=allowed_docs_view_name,
328+
kg_relationships_view_name=kg_relationships_view_name,
329+
)
331330

332331
logger.info(f"A3 source_documents_sql: {source_documents_sql}")
333332

@@ -373,12 +372,10 @@ def generate_simple_sql(
373372
else:
374373
source_document_results = None
375374

376-
with get_session_with_current_tenant() as db_session:
377-
drop_views(
378-
db_session,
379-
allowed_docs_view_name=allowed_docs_view_name,
380-
kg_relationships_view_name=kg_relationships_view_name,
381-
)
375+
_drop_temp_views(
376+
allowed_docs_view_name=allowed_docs_view_name,
377+
kg_relationships_view_name=kg_relationships_view_name,
378+
)
382379

383380
logger.info(f"A3 - Number of query_results: {len(query_results)}")
384381

0 commit comments

Comments
 (0)