@@ -29,10 +29,10 @@ def update_exception_db(self, file_name, exp_msg, retry_condition=None):
29
29
if retry_condition is not None :
30
30
retry_condition = None
31
31
self .graph .query ("""MERGE(d:Document {fileName :$fName}) SET d.status = $status, d.errorMessage = $error_msg, d.retry_condition = $retry_condition""" ,
32
- {"fName" :file_name , "status" :job_status , "error_msg" :exp_msg , "retry_condition" :retry_condition })
32
+ {"fName" :file_name , "status" :job_status , "error_msg" :exp_msg , "retry_condition" :retry_condition }, session_params = { "database" : self . graph . _database } )
33
33
else :
34
34
self .graph .query ("""MERGE(d:Document {fileName :$fName}) SET d.status = $status, d.errorMessage = $error_msg""" ,
35
- {"fName" :file_name , "status" :job_status , "error_msg" :exp_msg })
35
+ {"fName" :file_name , "status" :job_status , "error_msg" :exp_msg }, session_params = { "database" : self . graph . _database } )
36
36
except Exception as e :
37
37
error_message = str (e )
38
38
logging .error (f"Error in updating document node status as failed: { error_message } " )
@@ -66,7 +66,7 @@ def create_source_node(self, obj_source_node:sourceNode):
66
66
"entityEntityRelCount" :obj_source_node .entityEntityRelCount ,
67
67
"communityNodeCount" :obj_source_node .communityNodeCount ,
68
68
"communityRelCount" :obj_source_node .communityRelCount
69
- })
69
+ }, session_params = { "database" : self . graph . _database } )
70
70
except Exception as e :
71
71
error_message = str (e )
72
72
logging .info (f"error_message = { error_message } " )
@@ -118,7 +118,7 @@ def update_source_node(self, obj_source_node:sourceNode):
118
118
logging .info (f'Base Param value 1 : { param } ' )
119
119
query = "MERGE(d:Document {fileName :$props.fileName}) SET d += $props"
120
120
logging .info ("Update source node properties" )
121
- self .graph .query (query ,param )
121
+ self .graph .query (query ,param , session_params = { "database" : self . graph . _database } )
122
122
except Exception as e :
123
123
error_message = str (e )
124
124
self .update_exception_db (self ,self .file_name ,error_message )
@@ -139,15 +139,15 @@ def get_source_list(self):
139
139
"""
140
140
logging .info ("Get existing files list from graph" )
141
141
query = "MATCH(d:Document) WHERE d.fileName IS NOT NULL RETURN d ORDER BY d.updatedAt DESC"
142
- result = self .graph .query (query )
142
+ result = self .graph .query (query , session_params = { "database" : self . graph . _database } )
143
143
list_of_json_objects = [entry ['d' ] for entry in result ]
144
144
return list_of_json_objects
145
145
146
146
def update_KNN_graph (self ):
147
147
"""
148
148
Update the graph node with SIMILAR relationship where embedding scrore match
149
149
"""
150
- index = self .graph .query ("""show indexes yield * where type = 'VECTOR' and name = 'vector'""" )
150
+ index = self .graph .query ("""show indexes yield * where type = 'VECTOR' and name = 'vector'""" , session_params = { "database" : self . graph . _database } )
151
151
# logging.info(f'show index vector: {index}')
152
152
knn_min_score = os .environ .get ('KNN_MIN_SCORE' )
153
153
if len (index ) > 0 :
@@ -158,14 +158,14 @@ def update_KNN_graph(self):
158
158
WHERE node <> c and score >= $score MERGE (c)-[rel:SIMILAR]-(node) SET rel.score = score
159
159
""" ,
160
160
{"score" :float (knn_min_score )}
161
- )
161
+ , session_params = { "database" : self . graph . _database } )
162
162
else :
163
163
logging .info ("Vector index does not exist, So KNN graph not update" )
164
164
165
165
def check_account_access (self , database ):
166
166
try :
167
167
query_dbms_componenet = "call dbms.components() yield edition"
168
- result_dbms_componenet = self .graph .query (query_dbms_componenet )
168
+ result_dbms_componenet = self .graph .query (query_dbms_componenet , session_params = { "database" : self . graph . _database } )
169
169
170
170
if result_dbms_componenet [0 ]["edition" ] == "enterprise" :
171
171
query = """
@@ -177,7 +177,7 @@ def check_account_access(self, database):
177
177
178
178
logging .info (f"Checking access for database: { database } " )
179
179
180
- result = self .graph .query (query , params = {"database" : database })
180
+ result = self .graph .query (query , params = {"database" : database }, session_params = { "database" : self . graph . _database } )
181
181
read_access_count = result [0 ]["readAccessCount" ] if result else 0
182
182
183
183
logging .info (f"Read access count: { read_access_count } " )
@@ -202,7 +202,7 @@ def check_gds_version(self):
202
202
gds_procedure_count = """
203
203
SHOW FUNCTIONS YIELD name WHERE name STARTS WITH 'gds.version' RETURN COUNT(*) AS totalGdsProcedures
204
204
"""
205
- result = self .graph .query (gds_procedure_count )
205
+ result = self .graph .query (gds_procedure_count , session_params = { "database" : self . graph . _database } )
206
206
total_gds_procedures = result [0 ]['totalGdsProcedures' ] if result else 0
207
207
208
208
if total_gds_procedures > 0 :
@@ -231,11 +231,11 @@ def connection_check_and_get_vector_dimensions(self,database):
231
231
db_vector_dimension = self .graph .query ("""SHOW INDEXES YIELD *
232
232
WHERE type = 'VECTOR' AND name = 'vector'
233
233
RETURN options.indexConfig['vector.dimensions'] AS vector_dimensions
234
- """ )
234
+ """ , session_params = { "database" : self . graph . _database } )
235
235
236
236
result_chunks = self .graph .query ("""match (c:Chunk) return size(c.embedding) as embeddingSize, count(*) as chunks,
237
237
count(c.embedding) as hasEmbedding
238
- """ )
238
+ """ , session_params = { "database" : self . graph . _database } )
239
239
240
240
embedding_model = os .getenv ('EMBEDDING_MODEL' )
241
241
embeddings , application_dimension = load_embedding_model (embedding_model )
@@ -260,7 +260,7 @@ def execute_query(self, query, param=None,max_retries=3, delay=2):
260
260
retries = 0
261
261
while retries < max_retries :
262
262
try :
263
- return self .graph .query (query , param )
263
+ return self .graph .query (query , param , session_params = { "database" : self . graph . _database } )
264
264
except TransientError as e :
265
265
if "DeadlockDetected" in str (e ):
266
266
retries += 1
@@ -473,8 +473,8 @@ def drop_create_vector_index(self, isVectorIndexExist):
473
473
embeddings , dimension = load_embedding_model (embedding_model )
474
474
475
475
if isVectorIndexExist == 'true' :
476
- self .graph .query ("""drop index vector""" )
477
- # self.graph.query("""drop index vector""")
476
+ self .graph .query ("""drop index vector""" , session_params = { "database" : self . graph . _database } )
477
+
478
478
self .graph .query ("""CREATE VECTOR INDEX `vector` if not exists for (c:Chunk) on (c.embedding)
479
479
OPTIONS {indexConfig: {
480
480
`vector.dimensions`: $dimensions,
@@ -483,7 +483,7 @@ def drop_create_vector_index(self, isVectorIndexExist):
483
483
""" ,
484
484
{
485
485
"dimensions" : dimension
486
- }
486
+ }, session_params = { "database" : self . graph . _database }
487
487
)
488
488
return "Drop and Re-Create vector index succesfully"
489
489
0 commit comments