Skip to content

Commit c24f8fd

Browse files
committed
fix: reverted
1 parent dfa95b8 commit c24f8fd

File tree

4 files changed

+61
-181
lines changed

4 files changed

+61
-181
lines changed

backend/alembic/versions/37c333d8e6e8_kg_normalization.py

Lines changed: 29 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def upgrade() -> None:
2323
# Enable pg_trgm extension if not already enabled
2424
op.execute("CREATE EXTENSION IF NOT EXISTS pg_trgm")
2525

26-
# Add clustering columns
26+
# Add clustering_name and trigrams columns
2727
op.add_column(
2828
"kg_entity",
2929
sa.Column("clustering_name", NullFilteredString, nullable=True),
@@ -32,28 +32,18 @@ def upgrade() -> None:
3232
"kg_entity",
3333
sa.Column("clustering_trigrams", postgresql.ARRAY(sa.String(3)), nullable=True),
3434
)
35-
op.add_column(
36-
"kg_entity_extraction_staging",
37-
sa.Column("clustering_name", NullFilteredString, nullable=True),
38-
)
39-
op.add_column(
40-
"kg_entity_extraction_staging",
41-
sa.Column("clustered", sa.Boolean, nullable=False, server_default="false"),
42-
)
4335

44-
# Create GIN index on clustering columns
36+
# Create GIN index on clustering_trigrams
4537
op.execute("COMMIT")
4638
op.execute(
4739
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_kg_entity_clustering_trigrams "
4840
"ON kg_entity USING GIN (clustering_trigrams)"
4941
)
50-
op.execute(
51-
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_kg_entity_extraction_clustering_trigrams "
52-
"ON kg_entity_extraction_staging USING GIN (clustering_name gin_trgm_ops)"
53-
)
5442

55-
# Create trigger to update clustering columns if document_id changes
43+
# Define regex patterns in Python
5644
alphanum_pattern = r"[^a-z0-9]+"
45+
46+
# Create trigger to update clustering_name and its trigrams if document_id changes
5747
op.execute(
5848
f"""
5949
CREATE OR REPLACE FUNCTION update_kg_entity_clustering()
@@ -69,7 +59,7 @@ def upgrade() -> None:
6959
7060
-- Clean the semantic_id with regex patterns
7161
cleaned_semantic_id = regexp_replace(
72-
lower(COALESCE(doc_semantic_id, NEW.name)),
62+
lower(COALESCE(doc_semantic_id, NEW.id_name)),
7363
'{alphanum_pattern}', '', 'g'
7464
);
7565
@@ -81,40 +71,16 @@ def upgrade() -> None:
8171
$$ LANGUAGE plpgsql;
8272
"""
8373
)
74+
op.execute("DROP TRIGGER IF EXISTS kg_entity_clustering_data_trigger ON kg_entity")
8475
op.execute(
8576
"""
86-
CREATE OR REPLACE FUNCTION update_kg_entity_extraction_clustering()
87-
RETURNS TRIGGER AS $$
88-
DECLARE
89-
doc_semantic_id text;
90-
BEGIN
91-
-- Get semantic_id from document
92-
SELECT semantic_id INTO doc_semantic_id
93-
FROM document
94-
WHERE id = NEW.document_id;
95-
96-
-- Set clustering_name to semantic_id
97-
NEW.clustering_name = lower(COALESCE(doc_semantic_id, NEW.name));
98-
RETURN NEW;
99-
END;
100-
$$ LANGUAGE plpgsql;
77+
CREATE TRIGGER kg_entity_clustering_data_trigger
78+
BEFORE INSERT OR UPDATE OF document_id
79+
ON kg_entity
80+
FOR EACH ROW
81+
EXECUTE FUNCTION update_kg_entity_clustering();
10182
"""
10283
)
103-
for table, function in (
104-
("kg_entity", "update_kg_entity_clustering"),
105-
("kg_entity_extraction_staging", "update_kg_entity_extraction_clustering"),
106-
):
107-
trigger = f"{function}_trigger"
108-
op.execute(f"DROP TRIGGER IF EXISTS {trigger} ON {table}")
109-
op.execute(
110-
f"""
111-
CREATE TRIGGER {trigger}
112-
BEFORE INSERT OR UPDATE OF document_id
113-
ON {table}
114-
FOR EACH ROW
115-
EXECUTE FUNCTION {function}();
116-
"""
117-
)
11884

11985
# Create trigger to update kg_entity clustering_name and its trigrams when document.clustering_name changes
12086
op.execute(
@@ -141,35 +107,18 @@ def upgrade() -> None:
141107
$$ LANGUAGE plpgsql;
142108
"""
143109
)
110+
op.execute(
111+
"DROP TRIGGER IF EXISTS kg_entity_clustering_from_doc_trigger ON document"
112+
)
144113
op.execute(
145114
"""
146-
CREATE OR REPLACE FUNCTION update_kg_entity_extraction_clustering_from_doc()
147-
RETURNS TRIGGER AS $$
148-
BEGIN
149-
UPDATE kg_entity_extraction_staging
150-
SET
151-
clustering_name = lower(COALESCE(NEW.semantic_id, ''))
152-
WHERE document_id = NEW.id;
153-
RETURN NEW;
154-
END;
155-
$$ LANGUAGE plpgsql;
115+
CREATE TRIGGER kg_entity_clustering_from_doc_trigger
116+
AFTER UPDATE OF semantic_id
117+
ON document
118+
FOR EACH ROW
119+
EXECUTE FUNCTION update_kg_entity_clustering_from_doc();
156120
"""
157121
)
158-
for function in (
159-
"update_kg_entity_clustering_from_doc",
160-
"update_kg_entity_extraction_clustering_from_doc",
161-
):
162-
trigger = f"{function}_trigger"
163-
op.execute(f"DROP TRIGGER IF EXISTS {trigger} ON document")
164-
op.execute(
165-
f"""
166-
CREATE TRIGGER {trigger}
167-
AFTER UPDATE OF semantic_id
168-
ON document
169-
FOR EACH ROW
170-
EXECUTE FUNCTION {function}();
171-
"""
172-
)
173122

174123
# Force update all existing rows by triggering the function
175124
op.execute(
@@ -178,34 +127,23 @@ def upgrade() -> None:
178127
SET document_id = document_id;
179128
"""
180129
)
181-
op.execute(
182-
"""
183-
UPDATE kg_entity_extraction_staging
184-
SET document_id = document_id;
185-
"""
186-
)
187130

188131

189132
def downgrade() -> None:
190-
# Drop triggers and functions
191-
for function in (
192-
"update_kg_entity_clustering",
193-
"update_kg_entity_extraction_clustering",
194-
"update_kg_entity_clustering_from_doc",
195-
"update_kg_entity_extraction_clustering_from_doc",
196-
):
197-
op.execute(f"DROP TRIGGER IF EXISTS {function}_trigger ON document")
198-
op.execute(f"DROP FUNCTION IF EXISTS {function}()")
133+
# Drop triggers
134+
op.execute("DROP TRIGGER IF EXISTS kg_entity_clustering_data_trigger ON kg_entity")
135+
op.execute(
136+
"DROP TRIGGER IF EXISTS kg_entity_clustering_from_doc_trigger ON document"
137+
)
138+
139+
# Drop functions
140+
op.execute("DROP FUNCTION IF EXISTS update_kg_entity_clustering()")
141+
op.execute("DROP FUNCTION IF EXISTS update_kg_entity_clustering_from_doc()")
199142

200143
# Drop index
201144
op.execute("COMMIT") # Commit to allow CONCURRENTLY
202145
op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_kg_entity_clustering_trigrams")
203-
op.execute(
204-
"DROP INDEX CONCURRENTLY IF EXISTS idx_kg_entity_extraction_clustering_trigrams"
205-
)
206146

207147
# Drop column
208148
op.drop_column("kg_entity", "clustering_trigrams")
209149
op.drop_column("kg_entity", "clustering_name")
210-
op.drop_column("kg_entity_extraction_staging", "clustering_name")
211-
op.drop_column("kg_entity_extraction_staging", "clustered")

backend/onyx/db/models.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -936,13 +936,6 @@ class KGEntityExtractionStaging(Base):
936936
NullFilteredString, nullable=True, index=True
937937
)
938938

939-
# Data for normalization and clustering
940-
clustering_name: Mapped[str] = mapped_column(
941-
NullFilteredString, nullable=True, index=True
942-
)
943-
944-
clustered: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
945-
946939
alternative_names: Mapped[list[str]] = mapped_column(
947940
postgresql.ARRAY(String), nullable=False, default=list
948941
)

backend/onyx/kg/clustering/initial_clustering.py

Lines changed: 32 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,12 @@
77
import numpy as np
88
from rapidfuzz.fuzz import ratio
99
from sklearn.cluster import SpectralClustering # type: ignore
10-
from sqlalchemy import text
11-
from thefuzz import fuzz # type: ignore
1210

1311
from onyx.db.document import update_document_kg_info
1412
from onyx.db.engine import get_session_with_current_tenant
1513
from onyx.db.entities import add_entity
14+
from onyx.db.entities import delete_entities_by_id_names
1615
from onyx.db.entities import get_entities_by_grounding
17-
from onyx.db.entities import KGEntityExtractionStaging
1816
from onyx.db.entity_type import get_determined_grounded_entity_types
1917
from onyx.db.relationships import add_relationship
2018
from onyx.db.relationships import add_relationship_type
@@ -357,17 +355,15 @@ def _match_ungrounded_ge_entities(
357355

358356
# Try fuzzy matching with grounded entities
359357
for grounded_entity in grounded_list:
360-
score = fuzz.ratio(
361-
ungrounded_entity.lower(), grounded_entity.lower()
362-
)
358+
score = ratio(ungrounded_entity.lower(), grounded_entity.lower())
363359
if score > fuzzy_match_threshold and score > best_score:
364360
best_match = grounded_entity
365361
best_score = score
366362

367363
# Try fuzzy matching with previously processed ungrounded entities
368364
if not best_match:
369365
for processed_entity in processed_entities[entity_type]:
370-
score = fuzz.ratio(
366+
score = ratio(
371367
ungrounded_entity.lower(), processed_entity.lower()
372368
)
373369
if score > fuzzy_match_threshold and score > best_score:
@@ -440,9 +436,7 @@ def _match_determined_ge_entities(
440436

441437
# Try fuzzy matching with grounded entities
442438
for grounded_entity in determined_entities_list:
443-
score = fuzz.ratio(
444-
ungrounded_entity.lower(), grounded_entity.lower()
445-
)
439+
score = ratio(ungrounded_entity.lower(), grounded_entity.lower())
446440
if score > fuzzy_match_threshold and score > best_score:
447441
best_match = grounded_entity
448442
best_score = score
@@ -504,86 +498,42 @@ def kg_clustering(
504498

505499
relationships = get_all_relationships(db_session, kg_stage=KGStage.EXTRACTED)
506500

507-
grounded_entities: set[KGEntityExtractionStaging] = set(
508-
get_entities_by_grounding(
509-
db_session, KGStage.EXTRACTED, KGGroundingType.GROUNDED
510-
)
501+
grounded_entities = get_entities_by_grounding(
502+
db_session, KGStage.EXTRACTED, KGGroundingType.GROUNDED
511503
)
512504

513505
## Clustering
514506

515507
# TODO: re-implement clustering of ungrounded entities as well as
516-
# grounded entities that do not have a source document with deep extraction enabled!
517-
# For now we would just dedupe grounded entities that have very similar names
508+
# grounded entities that do not have a source document with deep extraction
509+
# enabled!
510+
# For now we would just create a trivial entity mapping from the
511+
# 'unclustered' entities to the 'clustered' entities. So we can simply
512+
# transfer the entity information from the Staging to the Normalized
513+
# tables.
518514
# This will be reimplemented when deep extraction is enabled.
519515

520-
THRESHOLD = 96
521-
while grounded_entities:
522-
clustered: list[str] = []
523-
524-
entity = grounded_entities.pop()
525-
clustered.append(entity.id_name)
516+
## Database operations
526517

527-
primary_entity = entity
528-
occurrences = entity.occurrences or 1
529-
names: set[str] = {entity.name}
518+
# create the clustered objects - entities
530519

531-
# find a list of entities with very similar names
520+
transferred_entities: list[str] = []
521+
for grounded_entity in grounded_entities:
532522
with get_session_with_current_tenant() as db_session:
533-
# uses GIN index, very efficient
534-
db_session.execute(text("SET pg_trgm.similarity_threshold = 0.6"))
535-
similar_entities = (
536-
db_session.query(KGEntityExtractionStaging)
537-
.filter(
538-
KGEntityExtractionStaging.entity_type_id_name
539-
== entity.entity_type_id_name,
540-
~KGEntityExtractionStaging.clustered,
541-
KGEntityExtractionStaging.id_name != entity.id_name,
542-
KGEntityExtractionStaging.clustering_name.op("%")(
543-
entity.clustering_name
544-
),
545-
)
546-
.all()
547-
)
548-
for similar in similar_entities:
549-
# skip those with number so we don't cluster version1 and version2
550-
if any(char.isdigit() for char in similar.clustering_name):
551-
continue
552-
if ratio(similar.clustering_name, entity.clustering_name) > THRESHOLD:
553-
if similar in grounded_entities:
554-
grounded_entities.remove(similar)
555-
clustered.append(similar.id_name)
556-
names.add(similar.name)
557-
occurrences += similar.occurrences or 1
558-
559-
if (
560-
primary_entity.document_id is None
561-
and similar.document_id is not None
562-
):
563-
primary_entity = similar
564-
565-
# only keep the primary entity
566-
names.remove(entity.name)
567523
added_entity = add_entity(
568524
db_session,
569525
KGStage.NORMALIZED,
570-
entity_type=primary_entity.entity_type_id_name,
571-
name=primary_entity.name,
572-
occurrences=occurrences,
573-
document_id=primary_entity.document_id,
574-
attributes=primary_entity.attributes or None,
575-
alternative_names=list(names),
526+
entity_type=grounded_entity.entity_type_id_name,
527+
name=grounded_entity.name,
528+
occurrences=grounded_entity.occurrences or 1,
529+
document_id=grounded_entity.document_id or None,
530+
attributes=grounded_entity.attributes or None,
576531
)
577-
if added_entity:
578-
db_session.query(KGEntityExtractionStaging).filter(
579-
KGEntityExtractionStaging.id_name.in_(clustered)
580-
).update({"clustered": True})
581-
db_session.commit()
582532

583-
0 / 0
584-
# TODO: delete all the clustered ones
533+
db_session.commit()
585534

586-
## Database operations
535+
if added_entity:
536+
transferred_entities.append(added_entity.id_name)
587537

588538
transferred_relationship_types: list[str] = []
589539
for relationship_type in relationship_types:
@@ -656,14 +606,14 @@ def kg_clustering(
656606
except Exception as e:
657607
logger.error(f"Error deleting relationship types: {e}")
658608

659-
# try:
660-
# with get_session_with_current_tenant() as db_session:
661-
# delete_entities_by_id_names(
662-
# db_session, transferred_entities, kg_stage=KGStage.EXTRACTED
663-
# )
664-
# db_session.commit()
665-
# except Exception as e:
666-
# logger.error(f"Error deleting entities: {e}")
609+
try:
610+
with get_session_with_current_tenant() as db_session:
611+
delete_entities_by_id_names(
612+
db_session, transferred_entities, kg_stage=KGStage.EXTRACTED
613+
)
614+
db_session.commit()
615+
except Exception as e:
616+
logger.error(f"Error deleting entities: {e}")
667617

668618
# Update document kg info
669619

backend/requirements/default.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ slack-sdk==3.20.2
8080
SQLAlchemy[mypy]==2.0.15
8181
starlette==0.46.1
8282
supervisor==4.2.5
83-
thefuzz==0.22.1
8483
RapidFuzz==3.13.0
8584
tiktoken==0.7.0
8685
timeago==1.0.16

0 commit comments

Comments
 (0)