Skip to content

Commit 4e6e009

Browse files
authored
Kg batch clustering (onyx-dot-app#4847)
* super genius kg_entity parent migration * feat: batched clustering * fix: nit
1 parent e847feb commit 4e6e009

File tree

5 files changed

+370
-224
lines changed

5 files changed

+370
-224
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""kgentity_parent
2+
3+
Revision ID: cec7ec36c505
4+
Revises: 495cb26ce93e
5+
Create Date: 2025-06-07 20:07:46.400770
6+
7+
"""
8+
9+
from alembic import op
10+
import sqlalchemy as sa
11+
12+
13+
# revision identifiers, used by Alembic.
14+
revision = "cec7ec36c505"
15+
down_revision = "495cb26ce93e"
16+
branch_labels = None
17+
depends_on = None
18+
19+
20+
def upgrade() -> None:
21+
op.add_column(
22+
"kg_entity",
23+
sa.Column("parent_key", sa.String(), nullable=True, index=True),
24+
)
25+
# NOTE: you will have to reindex the KG after this migration as the parent_key will be null
26+
27+
28+
def downgrade() -> None:
29+
op.drop_column("kg_entity", "parent_key")

backend/onyx/db/entities.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ def transfer_entity(
130130
entity_class=entity.entity_class,
131131
entity_subtype=entity.entity_subtype,
132132
entity_key=entity.entity_key,
133+
parent_key=entity.parent_key,
133134
alternative_names=entity.alternative_names or [],
134135
entity_type_id_name=entity.entity_type_id_name,
135136
document_id=entity.document_id,

backend/onyx/db/models.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -849,6 +849,9 @@ class KGEntity(Base):
849849
entity_key: Mapped[str] = mapped_column(
850850
NullFilteredString, nullable=True, index=True
851851
)
852+
parent_key: Mapped[str | None] = mapped_column(
853+
NullFilteredString, nullable=True, index=True
854+
)
852855
entity_subtype: Mapped[str] = mapped_column(
853856
NullFilteredString, nullable=True, index=True
854857
)
@@ -1003,7 +1006,7 @@ class KGEntityExtractionStaging(Base):
10031006
)
10041007

10051008
# Basic entity information
1006-
parent_key: Mapped[str] = mapped_column(
1009+
parent_key: Mapped[str | None] = mapped_column(
10071010
NullFilteredString, nullable=True, index=True
10081011
)
10091012

backend/onyx/db/relationships.py

Lines changed: 128 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
def upsert_staging_relationship(
2828
db_session: Session,
2929
relationship_id_name: str,
30-
source_document_id: str,
30+
source_document_id: str | None,
3131
occurrences: int = 1,
3232
) -> KGRelationshipExtractionStaging:
3333
"""
@@ -99,6 +99,72 @@ def upsert_staging_relationship(
9999
return result
100100

101101

102+
def upsert_relationship(
103+
db_session: Session,
104+
relationship_id_name: str,
105+
source_document_id: str | None,
106+
occurrences: int = 1,
107+
) -> KGRelationship:
108+
"""
109+
Upsert a new relationship directly to the database.
110+
111+
Args:
112+
db_session: SQLAlchemy database session
113+
relationship_id_name: The ID name of the relationship in format "source__relationship__target"
114+
source_document_id: ID of the source document
115+
occurrences: Number of times this relationship has been found
116+
Returns:
117+
The created or updated KGRelationship object
118+
119+
Raises:
120+
sqlalchemy.exc.IntegrityError: If there's an error with the database operation
121+
"""
122+
# Generate a unique ID for the relationship
123+
relationship_id_name = format_relationship_id(relationship_id_name)
124+
(
125+
source_entity_id_name,
126+
relationship_string,
127+
target_entity_id_name,
128+
) = split_relationship_id(relationship_id_name)
129+
130+
source_entity_type = get_entity_type(source_entity_id_name)
131+
target_entity_type = get_entity_type(target_entity_id_name)
132+
relationship_type = extract_relationship_type_id(relationship_id_name)
133+
134+
# Insert the new relationship
135+
stmt = (
136+
postgresql.insert(KGRelationship)
137+
.values(
138+
{
139+
"id_name": relationship_id_name,
140+
"source_node": source_entity_id_name,
141+
"target_node": target_entity_id_name,
142+
"source_node_type": source_entity_type,
143+
"target_node_type": target_entity_type,
144+
"type": relationship_string.lower(),
145+
"relationship_type_id_name": relationship_type,
146+
"source_document": source_document_id,
147+
"occurrences": occurrences,
148+
}
149+
)
150+
.on_conflict_do_update(
151+
index_elements=["id_name", "source_document"],
152+
set_=dict(
153+
occurrences=KGRelationship.occurrences + occurrences,
154+
),
155+
)
156+
.returning(KGRelationship)
157+
)
158+
159+
new_relationship = db_session.execute(stmt).scalar()
160+
if new_relationship is None:
161+
raise RuntimeError(
162+
f"Failed to upsert relationship with id_name: {relationship_id_name}"
163+
)
164+
db_session.flush()
165+
return new_relationship
166+
167+
102168
def transfer_relationship(
103169
db_session: Session,
104170
relationship: KGRelationshipExtractionStaging,
@@ -108,12 +174,8 @@ def transfer_relationship(
108174
Transfer a relationship from the staging table to the normalized table.
109175
"""
110176
# Translate the source and target nodes
111-
source_node = entity_translations.get(
112-
relationship.source_node, relationship.source_node
113-
)
114-
target_node = entity_translations.get(
115-
relationship.target_node, relationship.target_node
116-
)
177+
source_node = entity_translations[relationship.source_node]
178+
target_node = entity_translations[relationship.target_node]
117179
relationship_id_name = make_relationship_id(
118180
source_node, relationship.type, target_node
119181
)
@@ -218,6 +280,65 @@ def upsert_staging_relationship_type(
218280
return result
219281

220282

283+
def upsert_relationship_type(
284+
db_session: Session,
285+
source_entity_type: str,
286+
relationship_type: str,
287+
target_entity_type: str,
288+
definition: bool = False,
289+
extraction_count: int = 1,
290+
) -> KGRelationshipType:
291+
"""
292+
Upsert a new relationship type directly to the database.
293+
294+
Args:
295+
db_session: SQLAlchemy session
296+
source_entity_type: Type of the source entity
297+
relationship_type: Type of relationship
298+
target_entity_type: Type of the target entity
299+
definition: Whether this relationship type represents a definition (default False)
300+
301+
Returns:
302+
The created KGRelationshipType object
303+
"""
304+
305+
id_name = make_relationship_type_id(
306+
source_entity_type, relationship_type, target_entity_type
307+
)
308+
309+
# Create new relationship type
310+
stmt = (
311+
postgresql.insert(KGRelationshipType)
312+
.values(
313+
{
314+
"id_name": id_name,
315+
"name": relationship_type,
316+
"source_entity_type_id_name": source_entity_type.upper(),
317+
"target_entity_type_id_name": target_entity_type.upper(),
318+
"definition": definition,
319+
"occurrences": extraction_count,
320+
"type": relationship_type, # Using the relationship_type as the type
321+
"active": True, # Setting as active by default
322+
}
323+
)
324+
.on_conflict_do_update(
325+
index_elements=["id_name"],
326+
set_=dict(
327+
occurrences=KGRelationshipType.occurrences + extraction_count,
328+
),
329+
)
330+
.returning(KGRelationshipType)
331+
)
332+
333+
new_relationship_type = db_session.execute(stmt).scalar()
334+
if new_relationship_type is None:
335+
raise RuntimeError(
336+
f"Failed to upsert relationship type with id_name: {id_name}"
337+
)
338+
db_session.flush()
339+
return new_relationship_type
340+
341+
221342
def transfer_relationship_type(
222343
db_session: Session,
223344
relationship_type: KGRelationshipTypeExtractionStaging,
@@ -262,112 +383,6 @@ def transfer_relationship_type(
262383
return new_relationship_type
263384

264385

265-
def get_parent_child_relationships_and_types(
266-
db_session: Session,
267-
depth: int,
268-
) -> tuple[
269-
list[KGRelationshipExtractionStaging], list[KGRelationshipTypeExtractionStaging]
270-
]:
271-
"""
272-
Create parent-child relationships and relationship types from staging entities with
273-
a parent key, if the parent exists in the normalized entities table. Will create
274-
relationships up to depth levels. E.g., if depth is 2, a relationship will be created
275-
between the entity and its parent, and the entity and its grandparents (if any).
276-
A relationship will not be created if the parent does not exist.
277-
"""
278-
relationship_types: dict[str, KGRelationshipTypeExtractionStaging] = {}
279-
relationships: dict[tuple[str, str | None], KGRelationshipExtractionStaging] = {}
280-
281-
parented_entities = (
282-
db_session.query(KGEntityExtractionStaging)
283-
.filter(KGEntityExtractionStaging.parent_key.isnot(None))
284-
.all()
285-
)
286-
287-
# create has_subcomponent relationships and relationship types
288-
for entity in parented_entities:
289-
child = entity
290-
if entity.transferred_id_name is None:
291-
logger.warning(f"Entity {entity.id_name} has not yet been transferred")
292-
continue
293-
294-
for i in range(depth):
295-
if not child.parent_key:
296-
break
297-
298-
# find the transferred parent entity
299-
parent = (
300-
db_session.query(KGEntity)
301-
.filter(
302-
KGEntity.entity_class == child.entity_class,
303-
KGEntity.entity_key == child.parent_key,
304-
)
305-
.first()
306-
)
307-
if parent is None:
308-
logger.warning(f"Parent entity not found for {entity.id_name}")
309-
break
310-
311-
# create the relationship type
312-
relationship_type = upsert_staging_relationship_type(
313-
db_session=db_session,
314-
source_entity_type=parent.entity_type_id_name,
315-
relationship_type="has_subcomponent",
316-
target_entity_type=entity.entity_type_id_name,
317-
definition=False,
318-
extraction_count=1,
319-
)
320-
relationship_types[relationship_type.id_name] = relationship_type
321-
322-
# create the relationship
323-
# (don't add it to the table as we're using the transferred id, which breaks fk constraints)
324-
relationship_id_name = make_relationship_id(
325-
parent.id_name, "has_subcomponent", entity.transferred_id_name
326-
)
327-
if (parent.id_name, entity.document_id) not in relationships:
328-
(
329-
source_entity_id_name,
330-
relationship_string,
331-
target_entity_id_name,
332-
) = split_relationship_id(relationship_id_name)
333-
334-
source_entity_type = get_entity_type(source_entity_id_name)
335-
target_entity_type = get_entity_type(target_entity_id_name)
336-
relationship_type_id_name = extract_relationship_type_id(
337-
relationship_id_name
338-
)
339-
relationships[(relationship_id_name, entity.document_id)] = (
340-
KGRelationshipExtractionStaging(
341-
id_name=relationship_id_name,
342-
source_node=source_entity_id_name,
343-
target_node=target_entity_id_name,
344-
source_node_type=source_entity_type,
345-
target_node_type=target_entity_type,
346-
type=relationship_string,
347-
relationship_type_id_name=relationship_type_id_name,
348-
source_document=entity.document_id,
349-
occurrences=1,
350-
)
351-
)
352-
else:
353-
relationships[(parent.id_name, entity.document_id)].occurrences += 1
354-
355-
# set parent as the next child (unless we're at the max depth)
356-
if i < depth - 1:
357-
parent_staging = (
358-
db_session.query(KGEntityExtractionStaging)
359-
.filter(
360-
KGEntityExtractionStaging.transferred_id_name == parent.id_name
361-
)
362-
.first()
363-
)
364-
if parent_staging is None:
365-
break
366-
child = parent_staging
367-
368-
return list(relationships.values()), list(relationship_types.values())
369-
370-
371386
def delete_relationships_by_id_names(
372387
db_session: Session, id_names: list[str], kg_stage: KGStage
373388
) -> int:

0 commit comments

Comments
 (0)