Skip to content

Commit 2b2720e

Browse files
committed
Encode additional parameters in Component.data and store in 1 table
1 parent a830a79 commit 2b2720e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+616
-622
lines changed

.github/workflows/ci_code.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ jobs:
103103
104104
- name: Unit Testing
105105
run: |
106+
sqlite3 test.db "create table t(f int); drop table t;"
106107
make unit_testing pytest_arguments="--cov=superduper --cov-report=xml" SUPERDUPER_CONFIG=test/configs/${{ matrix.config }}
107108
108109
- name: Usecase Testing

plugins/sql/superduper_sql/data_backend.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,10 @@ class SQLDatabackend(IbisDataBackend):
359359

360360
def __init__(self, uri, plugin, flavour=None):
361361
super().__init__(uri, plugin, flavour)
362-
self._create_sqlalchemy_engine()
362+
if 'sqlite://./' in uri:
363+
self._create_sqlalchemy_engine(uri.replace('./', '//'))
364+
else:
365+
self._create_sqlalchemy_engine(uri)
363366
self.sm = sessionmaker(bind=self.alchemy_engine)
364367

365368
@property
@@ -374,6 +377,8 @@ def update(self, table, condition, key, value):
374377
with self.sm() as session:
375378
metadata = MetaData()
376379

380+
assert table in self.list_tables()
381+
377382
metadata.reflect(bind=session.bind)
378383
table = Table(table, metadata, autoload_with=session.bind)
379384

@@ -422,16 +427,16 @@ def delete(self, table, condition):
422427
except NoSuchTableError:
423428
raise exceptions.NotFound("Table", table)
424429

425-
def _create_sqlalchemy_engine(self):
430+
def _create_sqlalchemy_engine(self, uri):
426431
with self.connection_manager.get_connection() as conn:
427-
self.alchemy_engine = create_engine(self.uri, creator=lambda: conn.con)
432+
self.alchemy_engine = create_engine(uri, creator=lambda: conn.con)
428433
if not self._test_engine():
429434
logging.warn(
430435
"Unable to reuse the ibis connection "
431436
"to create the SQLAlchemy engine. "
432437
"Creating a new connection with the URI."
433438
)
434-
self.alchemy_engine = create_engine(self.uri)
439+
self.alchemy_engine = create_engine(uri)
435440

436441
def _test_engine(self):
437442
"""Test the engine."""

superduper/backends/base/data_backend.py

Lines changed: 46 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import copy
12
import functools
23
import hashlib
34
import json
@@ -643,11 +644,7 @@ def delete(self, table, condition):
643644
:param table: The table to delete from.
644645
:param condition: The condition to delete.
645646
"""
646-
r_table = self._get_with_component_identifier('Table', table)
647-
if r_table is None:
648-
raise exceptions.NotFound("Table", table)
649-
650-
if not r_table['is_component']:
647+
if table not in {'Deployment', 'Component'}:
651648
pid = self.primary_id(table)
652649
if pid in condition:
653650
docs = self.get_many(table, condition[pid])
@@ -660,13 +657,16 @@ def delete(self, table, condition):
660657
if 'uuid' in condition:
661658
docs = self.get_many(table, '*', condition['uuid'])
662659
elif 'identifier' in condition:
663-
docs = self.get_many(table, condition['identifier'], '*')
660+
assert 'component' in condition
661+
docs = self.get_many(
662+
table, condition['component'], condition['identifier'], '*'
663+
)
664664
docs = self._do_filter(docs, condition)
665665
else:
666-
docs = self.get_many(table, '*', '*')
666+
docs = self.get_many(table, '*', '*', '*')
667667
docs = self._do_filter(docs, condition)
668668
for r in docs:
669-
del self[table, r['identifier'], r['uuid']]
669+
del self[table, r['component'], r['identifier'], r['uuid']]
670670

671671
def drop_table(self, table):
672672
"""Drop data from table.
@@ -735,9 +735,7 @@ def replace(self, table, condition, r):
735735
:param condition: The condition to update.
736736
:param r: The document to replace.
737737
"""
738-
r_table = self._get_with_component_identifier('Table', table)
739-
740-
if not r_table['is_component']:
738+
if table not in {'Component', 'Deployment'}:
741739
pid = self.primary_id(table)
742740
if pid in condition:
743741
docs = self.get_many(table, condition[pid])
@@ -749,18 +747,21 @@ def replace(self, table, condition, r):
749747
self[table, s[pid]] = r
750748
else:
751749
if 'uuid' in condition:
752-
s = self.get_many(table, '*', condition['uuid'])[0]
753-
self[table, s['identifier'], condition['uuid']] = r
750+
s = self.get_many(table, '*', '*', condition['uuid'])[0]
751+
self[table, s['component'], s['identifier'], condition['uuid']] = r
754752
elif 'identifier' in condition:
755-
docs = self.get_many(table, condition['identifier'], '*')
753+
assert 'component' in condition
754+
docs = self.get_many(
755+
table, condition['component'], condition['identifier'], '*'
756+
)
756757
docs = self._do_filter(docs, condition)
757758
for s in docs:
758-
self[table, s['identifier'], s['uuid']] = r
759+
self[table, s['component'], s['identifier'], s['uuid']] = r
759760
else:
760-
docs = self.get_many(table, '*', '*')
761+
docs = self.get_many(table, '*', '*', '*')
761762
docs = self._do_filter(docs, condition)
762763
for s in docs:
763-
self[table, s['identifier'], s['uuid']] = r
764+
self[table, s['component'], s['identifier'], s['uuid']] = r
764765

765766
def update(self, table, condition, key, value):
766767
"""Update data in the database.
@@ -770,9 +771,7 @@ def update(self, table, condition, key, value):
770771
:param key: The key to update.
771772
:param value: The value to update.
772773
"""
773-
r_table = self._get_with_component_identifier('Table', table)
774-
775-
if not r_table['is_component']:
774+
if table not in {'Component', 'Deployment'}:
776775
pid = self.primary_id(table)
777776
if pid in condition:
778777
docs = self.get_many(table, '*', condition[pid]) + self.get_many(
@@ -788,19 +787,21 @@ def update(self, table, condition, key, value):
788787
if 'uuid' in condition:
789788
s = self.get_many(table, '*', condition['uuid'])[0]
790789
s[key] = value
791-
self[table, s['identifier'], condition['uuid']] = s
790+
self[table, s['component'], s['identifier'], condition['uuid']] = s
792791
elif 'identifier' in condition:
793-
docs = self.get_many(table, condition['identifier'], '*')
792+
docs = self.get_many(
793+
table, condition['component'], condition['identifier'], '*'
794+
)
794795
docs = self._do_filter(docs, condition)
795796
for s in docs:
796797
s[key] = value
797-
self[table, s['identifier'], s['uuid']] = s
798+
self[table, s['component'], s['identifier'], s['uuid']] = s
798799
else:
799-
docs = self.get_many(table, '*', '*')
800+
docs = self.get_many(table, '*', '*', '*')
800801
docs = self._do_filter(docs, condition)
801802
for s in docs:
802803
s[key] = value
803-
self[table, s['identifier'], s['uuid']] = s
804+
self[table, s['component'], s['identifier'], s['uuid']] = s
804805

805806
@abstractmethod
806807
def keys(self, *pattern) -> t.List[t.Tuple[str, str, str]]:
@@ -882,9 +883,11 @@ def insert(self, table, documents):
882883
except exceptions.NotFound:
883884
pid = None
884885

885-
if ('uuid' == pid or not pid) and "uuid" in documents[0]:
886+
if table in {'Component', 'Deployment'}:
886887
for r in documents:
887-
self[table, r['identifier'], r['uuid']] = r
888+
self[table, r['component'], r['identifier'], r['uuid']] = copy.deepcopy(
889+
r
890+
)
888891
ids.append(r['uuid'])
889892
elif pid:
890893
pid = self.primary_id(table)
@@ -944,13 +947,7 @@ def do_test(r):
944947
return False
945948
return True
946949

947-
tables = self.get_many('Table', query.table, '*')
948-
if not tables:
949-
raise exceptions.NotFound("Table", query.table)
950-
951-
is_component = max(tables, key=lambda x: x['version'])['is_component']
952-
953-
if not is_component:
950+
if query.table not in {'Component', 'Deployment'}:
954951
pid = self.primary_id(query.table)
955952

956953
if pid in filter_kwargs:
@@ -966,41 +963,47 @@ def do_test(r):
966963
else:
967964

968965
if not filter_kwargs:
969-
keys = self.keys(query.table, '*', '*')
966+
keys = self.keys(query.table, '*', '*', '*')
970967
docs = [self[k] for k in keys]
971968
elif set(filter_kwargs.keys()) == {'uuid'}:
972-
keys = self.keys(query.table, '*', filter_kwargs['uuid']['value'])
969+
keys = self.keys(query.table, '*', '*', filter_kwargs['uuid']['value'])
973970
docs = [self[k] for k in keys]
974971
elif set(filter_kwargs.keys()) == {'identifier'}:
975972
assert filter_kwargs['identifier']['op'] == '=='
976-
977973
keys = self.keys(query.table, filter_kwargs['identifier']['value'], '*')
978974
docs = [self[k] for k in keys]
979-
elif set(filter_kwargs.keys()) == {'identifier', 'uuid'}:
975+
elif set(filter_kwargs.keys()) == {'component', 'identifier', 'uuid'}:
980976
assert filter_kwargs['identifier']['op'] == '=='
981977
assert filter_kwargs['uuid']['op'] == '=='
978+
assert filter_kwargs['component']['op'] == '=='
982979

983980
r = self[
984981
query.table,
982+
filter_kwargs['component']['value'],
985983
filter_kwargs['identifier']['value'],
986984
filter_kwargs['uuid']['value'],
987985
]
988986
if r is None:
989987
docs = []
990988
else:
991989
docs = [r]
992-
elif set(filter_kwargs.keys()) == {'identifier', 'version'}:
990+
elif set(filter_kwargs.keys()) == {'component', 'identifier', 'version'}:
993991
assert filter_kwargs['identifier']['op'] == '=='
992+
assert filter_kwargs['component']['op'] == '=='
994993
assert filter_kwargs['version']['op'] == '=='
995994

996-
keys = self.keys(query.table, filter_kwargs['identifier']['value'], '*')
995+
keys = self.keys(
996+
query.table,
997+
filter_kwargs['component']['value'],
998+
filter_kwargs['identifier']['value'],
999+
'*',
1000+
)
9971001
docs = [self[k] for k in keys]
9981002
docs = [
9991003
r for r in docs if r['version'] == filter_kwargs['version']['value']
10001004
]
10011005
else:
1002-
1003-
keys = self.keys(query.table, '*', '*')
1006+
keys = self.keys(query.table, '*', '*', '*')
10041007
docs = [self[k] for k in keys]
10051008
docs = [r for r in docs if do_test(r)]
10061009

@@ -1011,4 +1014,4 @@ def do_test(r):
10111014
cols = query.decomposition.select.args
10121015
for i, r in enumerate(docs):
10131016
docs[i] = {k: v for k, v in r.items() if k in cols}
1014-
return docs
1017+
return [copy.deepcopy(r) for r in docs]

superduper/backends/local/vector_search.py

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -42,38 +42,31 @@ def build_tool(self, component):
4242

4343
def initialize(self):
4444
"""Initialize the vector search."""
45-
components = []
46-
from superduper import VectorIndex
47-
48-
for cls in self.db.show('Table'):
49-
t = self.db.load('Table', identifier=cls)
50-
if t.is_component and t.cls is not None:
51-
if issubclass(t.cls, VectorIndex):
52-
components.append(t.identifier)
45+
t = self.db.metadata.db['Deployment']
46+
components = (
47+
t.filter(t['parent'] == 'VectorIndex')
48+
.select('component', 'uuid', 'identifier')
49+
.execute()
50+
)
51+
5352
for component in components:
5453
try:
55-
for identifier in self.db.show(component):
56-
try:
57-
vector_index = self.db.load(component, identifier=identifier)
58-
self.put_component(component, vector_index.uuid)
59-
vectors = vector_index.get_vectors()
60-
vectors = [VectorItem(**vector) for vector in vectors]
61-
self.get_tool(vector_index.uuid).add(vectors)
62-
63-
except FileNotFoundError:
64-
logging.error(
65-
f'Could not load vector index: {identifier} '
66-
'Is the artifact store correctly configured?'
67-
)
68-
continue
69-
except TypeError as e:
70-
import traceback
71-
72-
logging.error(
73-
f'Could not load vector index: {identifier} ' f'{e}'
74-
)
75-
logging.error(traceback.format_exc())
76-
continue
54+
self.put_component(component['component'], uuid=component['uuid'])
55+
56+
except FileNotFoundError:
57+
logging.error(
58+
f'Could not load vector index: {component["identifier"]} '
59+
'Is the artifact store correctly configured?'
60+
)
61+
continue
62+
except TypeError as e:
63+
import traceback
64+
65+
logging.error(
66+
f'Could not load vector index: {component["identifier"]} ' f'{e}'
67+
)
68+
logging.error(traceback.format_exc())
69+
continue
7770
except exceptions.NotFound:
7871
pass
7972

superduper/base/annotations.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def decorated(self, *, context: str = '', job: bool = False, **kwargs):
4949

5050
if job:
5151
return Job(
52-
component=self.__class__.__name__,
52+
parent_component=self.__class__.__name__,
5353
identifier=self.identifier,
5454
uuid=self.uuid,
5555
method=f.__name__,

0 commit comments

Comments
 (0)