Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions backend/alembic/versions/cec7ec36c505_kgentity_parent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""kgentity_parent

Revision ID: cec7ec36c505
Revises: 495cb26ce93e
Create Date: 2025-06-07 20:07:46.400770

"""

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "cec7ec36c505"
down_revision = "495cb26ce93e"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
"kg_entity",
sa.Column("parent_key", sa.String(), nullable=True, index=True),
)
# NOTE: you will have to reindex the KG after this migration as the parent_key will be null


def downgrade() -> None:
op.drop_column("kg_entity", "parent_key")
1 change: 1 addition & 0 deletions backend/onyx/db/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def transfer_entity(
entity_class=entity.entity_class,
entity_subtype=entity.entity_subtype,
entity_key=entity.entity_key,
parent_key=entity.parent_key,
alternative_names=entity.alternative_names or [],
entity_type_id_name=entity.entity_type_id_name,
document_id=entity.document_id,
Expand Down
5 changes: 4 additions & 1 deletion backend/onyx/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,9 @@ class KGEntity(Base):
entity_key: Mapped[str] = mapped_column(
NullFilteredString, nullable=True, index=True
)
parent_key: Mapped[str | None] = mapped_column(
NullFilteredString, nullable=True, index=True
)
entity_subtype: Mapped[str] = mapped_column(
NullFilteredString, nullable=True, index=True
)
Expand Down Expand Up @@ -1003,7 +1006,7 @@ class KGEntityExtractionStaging(Base):
)

# Basic entity information
parent_key: Mapped[str] = mapped_column(
parent_key: Mapped[str | None] = mapped_column(
NullFilteredString, nullable=True, index=True
)

Expand Down
241 changes: 128 additions & 113 deletions backend/onyx/db/relationships.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
def upsert_staging_relationship(
db_session: Session,
relationship_id_name: str,
source_document_id: str,
source_document_id: str | None,
occurrences: int = 1,
) -> KGRelationshipExtractionStaging:
"""
Expand Down Expand Up @@ -99,6 +99,72 @@ def upsert_staging_relationship(
return result


def upsert_relationship(
db_session: Session,
relationship_id_name: str,
source_document_id: str | None,
occurrences: int = 1,
) -> KGRelationship:
"""
Upsert a new relationship directly to the database.

Args:
db_session: SQLAlchemy database session
relationship_id_name: The ID name of the relationship in format "source__relationship__target"
source_document_id: ID of the source document
occurrences: Number of times this relationship has been found
Returns:
The created or updated KGRelationship object

Raises:
sqlalchemy.exc.IntegrityError: If there's an error with the database operation
"""
# Generate a unique ID for the relationship
relationship_id_name = format_relationship_id(relationship_id_name)
(
source_entity_id_name,
relationship_string,
target_entity_id_name,
) = split_relationship_id(relationship_id_name)

source_entity_type = get_entity_type(source_entity_id_name)
target_entity_type = get_entity_type(target_entity_id_name)
relationship_type = extract_relationship_type_id(relationship_id_name)

# Insert the new relationship
stmt = (
postgresql.insert(KGRelationship)
.values(
{
"id_name": relationship_id_name,
"source_node": source_entity_id_name,
"target_node": target_entity_id_name,
"source_node_type": source_entity_type,
"target_node_type": target_entity_type,
"type": relationship_string.lower(),
"relationship_type_id_name": relationship_type,
"source_document": source_document_id,
"occurrences": occurrences,
}
)
.on_conflict_do_update(
index_elements=["id_name", "source_document"],
set_=dict(
occurrences=KGRelationship.occurrences + occurrences,
),
)
.returning(KGRelationship)
)

new_relationship = db_session.execute(stmt).scalar()
if new_relationship is None:
raise RuntimeError(
f"Failed to upsert relationship with id_name: {relationship_id_name}"
)
db_session.flush()
return new_relationship


def transfer_relationship(
db_session: Session,
relationship: KGRelationshipExtractionStaging,
Expand All @@ -108,12 +174,8 @@ def transfer_relationship(
Transfer a relationship from the staging table to the normalized table.
"""
# Translate the source and target nodes
source_node = entity_translations.get(
relationship.source_node, relationship.source_node
)
target_node = entity_translations.get(
relationship.target_node, relationship.target_node
)
source_node = entity_translations[relationship.source_node]
target_node = entity_translations[relationship.target_node]
relationship_id_name = make_relationship_id(
source_node, relationship.type, target_node
)
Expand Down Expand Up @@ -218,6 +280,65 @@ def upsert_staging_relationship_type(
return result


def upsert_relationship_type(
db_session: Session,
source_entity_type: str,
relationship_type: str,
target_entity_type: str,
definition: bool = False,
extraction_count: int = 1,
) -> KGRelationshipType:
"""
Upsert a new relationship type directly to the database.

Args:
db_session: SQLAlchemy session
source_entity_type: Type of the source entity
relationship_type: Type of relationship
target_entity_type: Type of the target entity
definition: Whether this relationship type represents a definition (default False)

Returns:
The created KGRelationshipType object
"""

id_name = make_relationship_type_id(
source_entity_type, relationship_type, target_entity_type
)

# Create new relationship type
stmt = (
postgresql.insert(KGRelationshipType)
.values(
{
"id_name": id_name,
"name": relationship_type,
"source_entity_type_id_name": source_entity_type.upper(),
"target_entity_type_id_name": target_entity_type.upper(),
"definition": definition,
"occurrences": extraction_count,
"type": relationship_type, # Using the relationship_type as the type
"active": True, # Setting as active by default
}
)
.on_conflict_do_update(
index_elements=["id_name"],
set_=dict(
occurrences=KGRelationshipType.occurrences + extraction_count,
),
)
.returning(KGRelationshipType)
)

new_relationship_type = db_session.execute(stmt).scalar()
if new_relationship_type is None:
raise RuntimeError(
f"Failed to upsert relationship type with id_name: {id_name}"
)
db_session.flush()
return new_relationship_type


def transfer_relationship_type(
db_session: Session,
relationship_type: KGRelationshipTypeExtractionStaging,
Expand Down Expand Up @@ -262,112 +383,6 @@ def transfer_relationship_type(
return new_relationship_type


def get_parent_child_relationships_and_types(
db_session: Session,
depth: int,
) -> tuple[
list[KGRelationshipExtractionStaging], list[KGRelationshipTypeExtractionStaging]
]:
"""
Create parent-child relationships and relationship types from staging entities with
a parent key, if the parent exists in the normalized entities table. Will create
relationships up to depth levels. E.g., if depth is 2, a relationship will be created
between the entity and its parent, and the entity and its grandparents (if any).
A relationship will not be created if the parent does not exist.
"""
relationship_types: dict[str, KGRelationshipTypeExtractionStaging] = {}
relationships: dict[tuple[str, str | None], KGRelationshipExtractionStaging] = {}

parented_entities = (
db_session.query(KGEntityExtractionStaging)
.filter(KGEntityExtractionStaging.parent_key.isnot(None))
.all()
)

# create has_subcomponent relationships and relationship types
for entity in parented_entities:
child = entity
if entity.transferred_id_name is None:
logger.warning(f"Entity {entity.id_name} has not yet been transferred")
continue

for i in range(depth):
if not child.parent_key:
break

# find the transferred parent entity
parent = (
db_session.query(KGEntity)
.filter(
KGEntity.entity_class == child.entity_class,
KGEntity.entity_key == child.parent_key,
)
.first()
)
if parent is None:
logger.warning(f"Parent entity not found for {entity.id_name}")
break

# create the relationship type
relationship_type = upsert_staging_relationship_type(
db_session=db_session,
source_entity_type=parent.entity_type_id_name,
relationship_type="has_subcomponent",
target_entity_type=entity.entity_type_id_name,
definition=False,
extraction_count=1,
)
relationship_types[relationship_type.id_name] = relationship_type

# create the relationship
# (don't add it to the table as we're using the transferred id, which breaks fk constraints)
relationship_id_name = make_relationship_id(
parent.id_name, "has_subcomponent", entity.transferred_id_name
)
if (parent.id_name, entity.document_id) not in relationships:
(
source_entity_id_name,
relationship_string,
target_entity_id_name,
) = split_relationship_id(relationship_id_name)

source_entity_type = get_entity_type(source_entity_id_name)
target_entity_type = get_entity_type(target_entity_id_name)
relationship_type_id_name = extract_relationship_type_id(
relationship_id_name
)
relationships[(relationship_id_name, entity.document_id)] = (
KGRelationshipExtractionStaging(
id_name=relationship_id_name,
source_node=source_entity_id_name,
target_node=target_entity_id_name,
source_node_type=source_entity_type,
target_node_type=target_entity_type,
type=relationship_string,
relationship_type_id_name=relationship_type_id_name,
source_document=entity.document_id,
occurrences=1,
)
)
else:
relationships[(parent.id_name, entity.document_id)].occurrences += 1

# set parent as the next child (unless we're at the max depth)
if i < depth - 1:
parent_staging = (
db_session.query(KGEntityExtractionStaging)
.filter(
KGEntityExtractionStaging.transferred_id_name == parent.id_name
)
.first()
)
if parent_staging is None:
break
child = parent_staging

return list(relationships.values()), list(relationship_types.values())


def delete_relationships_by_id_names(
db_session: Session, id_names: list[str], kg_stage: KGStage
) -> int:
Expand Down
Loading