From f168003cd3994a1082afd1126b665682b0d852f8 Mon Sep 17 00:00:00 2001 From: "sweep-ai[bot]" <128439645+sweep-ai[bot]@users.noreply.github.com> Date: Tue, 30 Apr 2024 09:29:54 +0000 Subject: [PATCH 1/2] feat: Updated 1 files --- src/vdf_io/import_vdf/lancedb_import.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/vdf_io/import_vdf/lancedb_import.py b/src/vdf_io/import_vdf/lancedb_import.py index d1e033b..19df453 100644 --- a/src/vdf_io/import_vdf/lancedb_import.py +++ b/src/vdf_io/import_vdf/lancedb_import.py @@ -5,6 +5,7 @@ import pyarrow.parquet as pq import lancedb +from lancedb import create_index from vdf_io.constants import DEFAULT_BATCH_SIZE, INT_MAX from vdf_io.meta_types import NamespaceMeta @@ -23,6 +24,7 @@ class ImportLanceDB(ImportVDB): DB_NAME_SLUG = DBNames.LANCEDB + ID_COLUMN = "id" @classmethod def import_vdb(cls, args): @@ -103,6 +105,21 @@ def upsert_data(self): table = self.db.open_table(new_index_name) tqdm.write(f"Opened table {new_index_name}") + # Get the ID column from the parquet file schema + parquet_schema = pq.read_schema(parquet_files[0]) + id_column = None + for field in parquet_schema: + if field.name == self.ID_COLUMN: + id_column = field.name + break + + if id_column: + # Create index on the table + create_index(table, id_column) + tqdm.write(f"Created index on {id_column} for table {new_index_name}") + else: + tqdm.write(f"Warning: No ID column {self.ID_COLUMN} found in schema. Skipping index creation for table {new_index_name}") + for file in tqdm(parquet_files, desc="Iterating parquet files"): file_path = self.get_file_path(final_data_path, file) df = self.read_parquet_progress( From 0899553ffb220c8341fcc9f976434086bdb80056 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 30 Apr 2024 09:32:05 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/vdf_io/import_vdf/astradb_import.py | 1 - src/vdf_io/import_vdf/lancedb_import.py | 14 ++++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/vdf_io/import_vdf/astradb_import.py b/src/vdf_io/import_vdf/astradb_import.py index aa12b15..5d20dc0 100644 --- a/src/vdf_io/import_vdf/astradb_import.py +++ b/src/vdf_io/import_vdf/astradb_import.py @@ -7,7 +7,6 @@ from astrapy.db import AstraDB from cassandra.cluster import Cluster from cassandra.auth import PlainTextAuthProvider -from qdrant_client.http.models import Distance from vdf_io.constants import INT_MAX from vdf_io.names import DBNames diff --git a/src/vdf_io/import_vdf/lancedb_import.py b/src/vdf_io/import_vdf/lancedb_import.py index 19df453..d9febbd 100644 --- a/src/vdf_io/import_vdf/lancedb_import.py +++ b/src/vdf_io/import_vdf/lancedb_import.py @@ -114,11 +114,15 @@ def upsert_data(self): break if id_column: - # Create index on the table + # Create index on the table create_index(table, id_column) - tqdm.write(f"Created index on {id_column} for table {new_index_name}") + tqdm.write( + f"Created index on {id_column} for table {new_index_name}" + ) else: - tqdm.write(f"Warning: No ID column {self.ID_COLUMN} found in schema. Skipping index creation for table {new_index_name}") + tqdm.write( + f"Warning: No ID column {self.ID_COLUMN} found in schema. Skipping index creation for table {new_index_name}" + ) for file in tqdm(parquet_files, desc="Iterating parquet files"): file_path = self.get_file_path(final_data_path, file) @@ -134,7 +138,9 @@ def upsert_data(self): for col in df.columns: if col not in [field.name for field in table.schema]: col_type = df[col].dtype - tqdm.write(f"Adding column {col} of type {col_type} to {new_index_name}") + tqdm.write( + f"Adding column {col} of type {col_type} to {new_index_name}" + ) table.add_columns( { col: get_default_value(col_type),