Skip to content

feat: trigger reactive queries for lists on executeBatch #299

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 15 additions & 20 deletions cpp/DBHostObject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ namespace react = facebook::react;

#ifdef OP_SQLITE_USE_LIBSQL
void DBHostObject::flush_pending_reactive_queries(
const std::shared_ptr<jsi::Value> &resolve) {
const std::shared_ptr<jsi::Value> &resolve, std::optional<int> rowsAffected) {
invoker->invokeAsync([this, resolve]() {
resolve->asObject(rt).asFunction(rt).call(rt, {});
});
}
#else
void DBHostObject::flush_pending_reactive_queries(
const std::shared_ptr<jsi::Value> &resolve) {
const std::shared_ptr<jsi::Value> &resolve, std::optional<int> rowsAffected) {
for (const auto &query_ptr : pending_reactive_queries) {
auto query = query_ptr.get();

Expand All @@ -36,14 +36,13 @@ void DBHostObject::flush_pending_reactive_queries(
auto status = opsqlite_execute_prepared_statement(db, query->stmt,
&results, metadata);

invoker->invokeAsync(
[this,
results = std::make_shared<std::vector<DumbHostObject>>(results),
callback = query->callback, metadata, status = std::move(status)] {
auto jsiResult =
create_result(rt, status, results.get(), metadata);
callback->asObject(rt).asFunction(rt).call(rt, jsiResult);
});
invoker->invokeAsync([this, resolve, rowsAffected]() {
auto res = jsi::Object(rt);
if (rowsAffected.has_value()) {
res.setProperty(rt, "rowsAffected", jsi::Value(rowsAffected.value()));
}
resolve->asObject(rt).asFunction(rt).call(rt, std::move(res));
});
}

pending_reactive_queries.clear();
Expand Down Expand Up @@ -530,19 +529,15 @@ void DBHostObject::create_jsi_functions() {
auto batchResult = opsqlite_execute_batch(db, &commands);
#endif

for (const auto& table : batchResult.modifiedTables) {
on_update(table, "UNKNOWN", -1);
}

if (invalidated) {
return;
}

invoker->invokeAsync([&rt,
batchResult = std::move(batchResult),
resolve] {
auto res = jsi::Object(rt);
res.setProperty(rt, "rowsAffected",
jsi::Value(batchResult.affectedRows));
resolve->asObject(rt).asFunction(rt).call(
rt, std::move(res));
});
flush_pending_reactive_queries(resolve, batchResult.affectedRows);
} catch (std::runtime_error &e) {
auto what = e.what();
invoker->invokeAsync([&rt, what, reject] {
Expand Down Expand Up @@ -795,7 +790,7 @@ void DBHostObject::create_jsi_functions() {
auto resolve = std::make_shared<jsi::Value>(rt, args[0]);

auto task = [&rt, this, resolve]() {
flush_pending_reactive_queries(resolve);
flush_pending_reactive_queries(resolve, std::nullopt);
};

_thread_pool->queueWork(task);
Expand Down
2 changes: 1 addition & 1 deletion cpp/DBHostObject.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class JSI_EXPORT DBHostObject : public jsi::HostObject {
void auto_register_update_hook();
void create_jsi_functions();
void
flush_pending_reactive_queries(const std::shared_ptr<jsi::Value> &resolve);
flush_pending_reactive_queries(const std::shared_ptr<jsi::Value> &resolve, std::optional<int> rowsAffected);

std::unordered_map<std::string, jsi::Value> function_map;
std::string base_path;
Expand Down
6 changes: 6 additions & 0 deletions cpp/bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -850,12 +850,17 @@ opsqlite_execute_batch(sqlite3 *db,
}

int affectedRows = 0;
std::unordered_set<std::string> modifiedTables;
opsqlite_execute(db, "BEGIN EXCLUSIVE TRANSACTION", nullptr);
for (int i = 0; i < commandCount; i++) {
const auto &command = commands->at(i);
// We do not provide a datastructure to receive query data because we
// don't need/want to handle this results in a batch execution
try {
auto maybeTable = extract_modified_table(command.sql);
if (maybeTable.has_value()) {
modifiedTables.insert(*maybeTable);
}
auto result = opsqlite_execute(db, command.sql, &command.params);
affectedRows += result.affectedRows;
} catch (std::exception &exc) {
Expand All @@ -867,6 +872,7 @@ opsqlite_execute_batch(sqlite3 *db,
return BatchResult{
.affectedRows = affectedRows,
.commands = static_cast<int>(commandCount),
.modifiedTables = std::move(modifiedTables),
};
}

Expand Down
6 changes: 6 additions & 0 deletions cpp/libsql/bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -716,9 +716,14 @@ opsqlite_libsql_execute_batch(DB const &db,

try {
int affectedRows = 0;
std::unordered_set<std::string> modifiedTables;
opsqlite_libsql_execute(db, "BEGIN EXCLUSIVE TRANSACTION", nullptr);
for (int i = 0; i < commandCount; i++) {
auto command = commands->at(i);
auto maybeTable = extract_modified_table(command.sql);
if (maybeTable.has_value()) {
modifiedTables.insert(*maybeTable);
}
// We do not provide a datastructure to receive query data because
// we don't need/want to handle this results in a batch execution
auto result =
Expand All @@ -729,6 +734,7 @@ opsqlite_libsql_execute_batch(DB const &db,
return BatchResult{
.affectedRows = affectedRows,
.commands = static_cast<int>(commandCount),
.modifiedTables = std::move(modifiedTables),
};
} catch (std::exception &exc) {
opsqlite_libsql_execute(db, "ROLLBACK", nullptr);
Expand Down
2 changes: 2 additions & 0 deletions cpp/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <string>
#include <variant>
#include <vector>
#include <unordered_set>

struct ArrayBuffer {
std::shared_ptr<uint8_t> data;
Expand All @@ -25,6 +26,7 @@ struct BatchResult {
std::string message;
int affectedRows;
int commands;
std::unordered_set<std::string> modifiedTables;
};

struct BatchArguments {
Expand Down
41 changes: 41 additions & 0 deletions cpp/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#endif
#include <fstream>
#include <sys/stat.h>
#include <sstream>

namespace opsqlite {

Expand Down Expand Up @@ -324,4 +325,44 @@ void log_to_console(jsi::Runtime &runtime, const std::string &message) {
log.call(runtime, jsi::String::createFromUtf8(runtime, message));
}


std::optional<std::string> extract_modified_table(const std::string &sql) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... I'm sorry, but I will probably not merge this. This is brittle and maintaining when people complain it's not working for their very particular use-case is going to bring me too much work, which I don't feel like doing. Sorry :) but feel free to fork your own version, it's MIT

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I could live with a patch, but if you have a better idea to implement this, I would be happy to update my pr, maybe we could add an api to manually trigger update on js side ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No thoughts from the top of my head, sorry, it's been a while since I implemented reactive queries. But basically, how it's working on transactions is the only way that is generic enough and not crazy. I believe they flush the queued updates from JS already but also, I don't want to expose the internals functions again because people complain too much and don't read the internal C++ so they end up dumping work on my lap.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that you have to limit the api surface for maintenance purpose, I didn't see flushPendingReactiveQueries earlier, but without a way to update pending_reactive_queries, I can’t use it. Maybe we could find a mechanism on reactiveExecute itself, with a specific fireOn flag to react to external events

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending_reactive_queries should be updated on their on based on the update hook, they don't need manual updating as far as I remember, but again, I already forgot what's in here. You need to read the whole C++ code to understand how things are connected.

std::istringstream stream(sql);
std::string token;
std::string table;

while (stream >> token) {
std::string upper = token;
std::transform(upper.begin(), upper.end(), upper.begin(), ::toupper);

if (upper == "INTO" || upper == "UPDATE" || upper == "FROM") {
// Next token should be the table name
if (stream >> table) {
table.erase(std::remove_if(table.begin(), table.end(), [](char c) {
return c == '`' || c == '"' || c == '\'' || c == '(';
}), table.end());
return table;
}
}

// Early exit for UPDATE statements
if (upper == "UPDATE") {
if (stream >> table) {
table.erase(std::remove_if(table.begin(), table.end(), [](char c) {
return c == '`' || c == '"' || c == '\'' || c == '(';
}), table.end());
return table;
}
}

// When we reach a semicolon, we can stop processing
if (token.find(';') != std::string::npos) {
break;
}
}

return std::nullopt;
}


} // namespace opsqlite
2 changes: 2 additions & 0 deletions cpp/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ bool file_exists(const std::string &path);

void log_to_console(jsi::Runtime &rt, const std::string &message);

std::optional<std::string> extract_modified_table(std::string const& sql);

} // namespace opsqlite