Skip to content

Commit 2140a26

Browse files
committed
Add adapter for CrateDB
It is not suitable for merging into mainline, because it modifies the data model inappropriately for other databases. The missing details would need to be put into the dialect somehow, to make it compatible with a vanilla SQLAlchemy application. C'est la vie. The patch includes a few monkeypatches and polyfills, which should be upstreamed to crate-python and/or cratedb-toolkit.
1 parent 8552d05 commit 2140a26

File tree

4 files changed

+146
-5
lines changed

4 files changed

+146
-5
lines changed

rdflib_sqlalchemy/cratedb_patch.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import sqlalchemy as sa
2+
from sqlalchemy.dialects.postgresql.base import RESERVED_WORDS as POSTGRESQL_RESERVED_WORDS
3+
4+
5+
def cratedb_patch_dialect():
6+
try:
7+
from crate.client.sqlalchemy import CrateDialect
8+
from crate.client.sqlalchemy.compiler import CrateDDLCompiler
9+
except ImportError:
10+
return
11+
12+
def visit_create_index(
13+
self, create, include_schema=False, include_table_schema=True, **kw
14+
):
15+
return "SELECT 1;"
16+
17+
CrateDDLCompiler.visit_create_index = visit_create_index
18+
CrateDialect.preparer = CrateIdentifierPreparer
19+
20+
21+
def cratedb_polyfill_refresh_after_dml_engine(engine: sa.engine.Engine):
22+
def receive_after_execute(
23+
conn: sa.engine.Connection, clauseelement, multiparams, params, execution_options, result
24+
):
25+
"""
26+
Run a `REFRESH TABLE ...` command after each DML operation (INSERT, UPDATE, DELETE).
27+
"""
28+
29+
if isinstance(clauseelement, (sa.sql.Insert, sa.sql.Update, sa.sql.Delete)):
30+
if not isinstance(clauseelement.table, sa.sql.Join):
31+
full_table_name = f'"{clauseelement.table.name}"'
32+
if clauseelement.table.schema is not None:
33+
full_table_name = f'"{clauseelement.table.schema}".' + full_table_name
34+
conn.execute(sa.text(f'REFRESH TABLE {full_table_name};'))
35+
36+
sa.event.listen(engine, "after_execute", receive_after_execute)
37+
38+
39+
RESERVED_WORDS = set(list(POSTGRESQL_RESERVED_WORDS) + ["object"])
40+
41+
42+
class CrateIdentifierPreparer(sa.sql.compiler.IdentifierPreparer):
43+
44+
reserved_words = RESERVED_WORDS
45+
46+
def _unquote_identifier(self, value):
47+
if value[0] == self.initial_quote:
48+
value = value[1:-1].replace(
49+
self.escape_to_quote, self.escape_quote
50+
)
51+
return value
52+
53+
def format_type(self, type_, use_schema=True):
54+
if not type_.name:
55+
raise sa.exc.CompileError("PostgreSQL ENUM type requires a name.")
56+
57+
name = self.quote(type_.name)
58+
effective_schema = self.schema_for_object(type_)
59+
60+
if (
61+
not self.omit_schema
62+
and use_schema
63+
and effective_schema is not None
64+
):
65+
name = self.quote_schema(effective_schema) + "." + name
66+
return name

rdflib_sqlalchemy/store.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,11 @@ def open(self, configuration, create=True):
274274
kwargs = configuration
275275

276276
self.engine = sqlalchemy.create_engine(url, **kwargs)
277+
278+
# CrateDB needs a fix to synchronize write operations.
279+
from rdflib_sqlalchemy.cratedb_patch import cratedb_polyfill_refresh_after_dml_engine
280+
cratedb_polyfill_refresh_after_dml_engine(self.engine)
281+
277282
try:
278283
conn = self.engine.connect()
279284
except OperationalError:

rdflib_sqlalchemy/tables.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1-
from sqlalchemy import Column, Table, Index, types
1+
from sqlalchemy import Column, Table, Index, text, types
2+
from sqlalchemy.sql import quoted_name
23

4+
from rdflib_sqlalchemy.cratedb_patch import cratedb_patch_dialect
35
from rdflib_sqlalchemy.types import TermType
46

57

8+
cratedb_patch_dialect()
9+
610
MYSQL_MAX_INDEX_LENGTH = 200
711

812
TABLE_NAME_TEMPLATES = [
@@ -25,7 +29,7 @@ def create_asserted_statements_table(interned_id, metadata):
2529
return Table(
2630
"{interned_id}_asserted_statements".format(interned_id=interned_id),
2731
metadata,
28-
Column("id", types.Integer, nullable=False, primary_key=True),
32+
Column("id", types.BigInteger, nullable=False, primary_key=True, server_default=text("NOW()::LONG")),
2933
Column("subject", TermType, nullable=False),
3034
Column("predicate", TermType, nullable=False),
3135
Column("object", TermType, nullable=False),
@@ -71,7 +75,7 @@ def create_type_statements_table(interned_id, metadata):
7175
return Table(
7276
"{interned_id}_type_statements".format(interned_id=interned_id),
7377
metadata,
74-
Column("id", types.Integer, nullable=False, primary_key=True),
78+
Column("id", types.BigInteger, nullable=False, primary_key=True, server_default=text("NOW()::LONG")),
7579
Column("member", TermType, nullable=False),
7680
Column("klass", TermType, nullable=False),
7781
Column("context", TermType, nullable=False),
@@ -110,7 +114,7 @@ def create_literal_statements_table(interned_id, metadata):
110114
return Table(
111115
"{interned_id}_literal_statements".format(interned_id=interned_id),
112116
metadata,
113-
Column("id", types.Integer, nullable=False, primary_key=True),
117+
Column("id", types.BigInteger, nullable=False, primary_key=True, server_default=text("NOW()::LONG")),
114118
Column("subject", TermType, nullable=False),
115119
Column("predicate", TermType, nullable=False),
116120
Column("object", TermType),
@@ -154,7 +158,7 @@ def create_quoted_statements_table(interned_id, metadata):
154158
return Table(
155159
"{interned_id}_quoted_statements".format(interned_id=interned_id),
156160
metadata,
157-
Column("id", types.Integer, nullable=False, primary_key=True),
161+
Column("id", types.BigInteger, nullable=False, primary_key=True, server_default=text("NOW()::LONG")),
158162
Column("subject", TermType, nullable=False),
159163
Column("predicate", TermType, nullable=False),
160164
Column("object", TermType),

test/test_sqlalchemy_cratedb.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import logging
2+
import os
3+
import unittest
4+
5+
import pytest
6+
try:
7+
import crate # noqa
8+
assert crate # quiets unused import warning
9+
except ImportError:
10+
pytest.skip("crate not installed, skipping CrateDB tests",
11+
allow_module_level=True)
12+
13+
from . import context_case
14+
from . import graph_case
15+
16+
17+
if os.environ.get("DB") != "crate":
18+
pytest.skip("CrateDB not under test", allow_module_level=True)
19+
20+
sqlalchemy_url = os.environ.get(
21+
"DBURI",
22+
"crate://crate@localhost/")
23+
24+
_logger = logging.getLogger(__name__)
25+
26+
27+
class SQLACrateDBGraphTestCase(graph_case.GraphTestCase):
28+
storetest = True
29+
storename = "SQLAlchemy"
30+
uri = sqlalchemy_url
31+
create = True
32+
33+
def setUp(self):
34+
super(SQLACrateDBGraphTestCase, self).setUp(
35+
uri=self.uri,
36+
storename=self.storename,
37+
)
38+
39+
def tearDown(self):
40+
super(SQLACrateDBGraphTestCase, self).tearDown(uri=self.uri)
41+
42+
43+
class SQLACrateDBContextTestCase(context_case.ContextTestCase):
44+
storetest = True
45+
storename = "SQLAlchemy"
46+
uri = sqlalchemy_url
47+
create = True
48+
49+
def setUp(self):
50+
super(SQLACrateDBContextTestCase, self).setUp(
51+
uri=self.uri,
52+
storename=self.storename,
53+
)
54+
55+
def tearDown(self):
56+
super(SQLACrateDBContextTestCase, self).tearDown(uri=self.uri)
57+
58+
def testLenInMultipleContexts(self):
59+
pytest.skip("Known issue.")
60+
61+
62+
SQLACrateDBGraphTestCase.storetest = True
63+
SQLACrateDBContextTestCase.storetest = True
64+
65+
if __name__ == "__main__":
66+
unittest.main()

0 commit comments

Comments
 (0)