diff --git a/lute/runtime/include/lute/runtime.h b/lute/runtime/include/lute/runtime.h index a4d41717..94c1cb26 100644 --- a/lute/runtime/include/lute/runtime.h +++ b/lute/runtime/include/lute/runtime.h @@ -2,14 +2,12 @@ #include "Luau/Variant.h" #include "lute/ref.h" - +#include "uv.h" #include -#include #include #include #include #include -#include #include struct lua_State; @@ -46,9 +44,6 @@ struct Runtime bool runToCompletion(); RuntimeStep runOnce(); - // For child runtimes, run a thread waiting for work - void runContinuously(); - // Reports an error for a specified lua state. void reportError(lua_State* L); @@ -70,6 +65,9 @@ struct Runtime void addPendingToken(); void releasePendingToken(); + // Process queued continuations (called by uv_async callback) + void processContinuations(); + // VM for this runtime std::unique_ptr globalState; @@ -85,12 +83,11 @@ struct Runtime std::mutex continuationMutex; std::vector> continuations; - // TODO: can this be handled by libuv? - std::atomic stop; - std::condition_variable runLoopCv; - std::thread runLoopThread; - std::atomic activeTokens; + std::atomic errorOccurred{false}; + + uv_async_t asyncHandle; + bool asyncInitialized = false; }; Runtime* getRuntime(lua_State* L); diff --git a/lute/runtime/src/runtime.cpp b/lute/runtime/src/runtime.cpp index 475a6c56..fc14b385 100644 --- a/lute/runtime/src/runtime.cpp +++ b/lute/runtime/src/runtime.cpp @@ -30,117 +30,80 @@ Runtime::Runtime() : globalState(nullptr, lua_close_checked) , dataCopy(nullptr, lua_close_checked) { - stop.store(false); activeTokens.store(0); + + // Set up the handle that we'll use to process continutations asynchronously + // Whenever a libuv callback gets, we can signal via a resume token to continue + // the original yielding thread + int err = uv_async_init(uv_default_loop(), &asyncHandle, + [](uv_async_t* handle) { + auto* runtime = static_cast(handle->data); + runtime->processContinuations(); + } + ); + + if (err != 0) + { + fprintf(stderr, "Failed to initialize uv_async_t: %s\n", uv_strerror(err)); + asyncInitialized = false; + } + else + { + asyncHandle.data = this; + asyncInitialized = true; + } } Runtime::~Runtime() { + if (asyncInitialized) { - std::unique_lock lock(continuationMutex); - - stop.store(true); - - runLoopCv.notify_one(); + uv_close((uv_handle_t*)&asyncHandle, nullptr); + // Run the loop once to process the close callback + uv_run(uv_default_loop(), UV_RUN_NOWAIT); + // TODO: Do we need to set asyncInitialized to false? } - - if (runLoopThread.joinable()) - runLoopThread.join(); } bool Runtime::hasWork() { - // TODO: activeTokens and uv_loop_alive have a decent amount of overlap. - // Unfortunately, we do currently have some places where we add/release - // tokens that don't correspond to libuv activity, so for now we keep both. - // uv_ref/unref could be used to patch tokens into the libuv loop itself. - return hasContinuations() || hasThreads() || activeTokens.load() != 0 || uv_loop_alive(uv_default_loop()); + // Check if there's any work to do: + // - Continuations queued (includes deferred tasks) - e.g. resuming luau threads + // - Threads waiting to resume - e.g. http requests + // - Active tokens (pending async operations) + // Note: We don't use uv_loop_alive() because the async handle is always referenced + return hasContinuations() || hasThreads() || activeTokens.load() != 0; } RuntimeStep Runtime::runOnce() { - uv_run(uv_default_loop(), UV_RUN_NOWAIT); - - // Complete all C++ continuations - std::vector> copy; - - { - std::unique_lock lock(continuationMutex); - copy = std::move(continuations); - continuations.clear(); - } - - for (auto&& continuation : copy) - continuation(); - - if (runningThreads.empty()) - return StepEmpty{}; - - auto next = std::move(runningThreads.front()); - runningThreads.erase(runningThreads.begin()); - - next.ref->push(GL); - lua_State* L = lua_tothread(GL, -1); - - if (L == nullptr) - { - fprintf(stderr, "Cannot resume a non-thread reference"); - return StepErr{L}; - } - - // We still have 'next' on stack to hold on to thread we are about to run - lua_pop(GL, 1); + // Just run the event loop once + // processContinuations() is called automatically by uv_async callback + uv_run(uv_default_loop(), UV_RUN_ONCE); - int status = LUA_OK; + // Return status based on whether we still have work + if (hasWork()) + return StepSuccess{GL}; - if (!next.success) - status = lua_resumeerror(L, nullptr); - else - status = lua_resume(L, nullptr, next.argumentCount); - - if (status == LUA_YIELD) - { - return StepSuccess{L}; - } - - if (status != LUA_OK) - { - return StepErr{L}; - }; - - if (next.cont) - next.cont(); - - return StepSuccess{L}; + return StepEmpty{}; } bool Runtime::runToCompletion() { - while (hasWork()) - { - auto step = runOnce(); - - if (auto err = Luau::get_if(&step)) - { - if (err->L == nullptr) - { - fprintf(stderr, "lua_State* L is nullptr"); - return false; - } + // Simplified - just run event loop until no more work + // Error handling happens inside processContinuations() - reportError(err->L); + // Process any threads that were added directly to runningThreads + // before entering the event loop (e.g., the initial thread from runBytecode) + processContinuations(); - // ensure we exit the process with error code properly - if (!hasWork()) - return false; - } - else - { - continue; - } - }; + while (hasWork()) + { + uv_run(uv_default_loop(), UV_RUN_ONCE); + } - return true; + // Return false if any errors occurred during execution + return !errorOccurred.load(); } void Runtime::reportError(lua_State* L) @@ -156,33 +119,6 @@ void Runtime::reportError(lua_State* L) fprintf(stderr, "%s", error.c_str()); } -void Runtime::runContinuously() -{ - // TODO: another place for libuv - runLoopThread = std::thread( - [this] - { - while (!stop) - { - // Block to wait on event - { - std::unique_lock lock(continuationMutex); - - runLoopCv.wait( - lock, - [this] - { - return !continuations.empty() || stop; - } - ); - } - - runToCompletion(); - } - } - ); -} - bool Runtime::hasContinuations() { std::unique_lock lock(continuationMutex); @@ -196,49 +132,65 @@ bool Runtime::hasThreads() void Runtime::schedule(std::function f) { - std::unique_lock lock(continuationMutex); - - continuations.push_back(std::move(f)); + { + std::unique_lock lock(continuationMutex); + continuations.push_back(std::move(f)); + } - runLoopCv.notify_one(); + if (asyncInitialized) + { + uv_async_send(&asyncHandle); + } } void Runtime::scheduleLuauError(std::shared_ptr ref, std::string error) { - std::unique_lock lock(continuationMutex); + { + std::unique_lock lock(continuationMutex); - continuations.push_back( - [this, ref, error = std::move(error)]() mutable - { - ref->push(GL); - lua_State* L = lua_tothread(GL, -1); - lua_pop(GL, 1); + continuations.push_back( + [this, ref, error = std::move(error)]() mutable + { + ref->push(GL); + lua_State* L = lua_tothread(GL, -1); + lua_pop(GL, 1); - lua_pushlstring(L, error.data(), error.size()); - runningThreads.push_back({false, ref, lua_gettop(L)}); - } - ); + lua_pushlstring(L, error.data(), error.size()); + runningThreads.push_back({false, ref, lua_gettop(L)}); + } + ); + } - runLoopCv.notify_one(); + // If you schedule a luau error, we send a notice to the event_loop to run the the callback + // we registered + if (asyncInitialized) + { + uv_async_send(&asyncHandle); + } } void Runtime::scheduleLuauResume(std::shared_ptr ref, std::function cont) { - std::unique_lock lock(continuationMutex); + { + std::unique_lock lock(continuationMutex); - continuations.push_back( - [this, ref, cont = std::move(cont)]() mutable - { - ref->push(GL); - lua_State* L = lua_tothread(GL, -1); - lua_pop(GL, 1); + continuations.push_back( + [this, ref, cont = std::move(cont)]() mutable + { + ref->push(GL); + lua_State* L = lua_tothread(GL, -1); + lua_pop(GL, 1); - int results = cont(L); - runningThreads.push_back({true, ref, results}); - } - ); + int results = cont(L); + runningThreads.push_back({true, ref, results}); + } + ); + } - runLoopCv.notify_one(); + if (asyncInitialized) + { + uv_async_send(&asyncHandle); + } } void Runtime::runInWorkQueue(std::function f) @@ -276,6 +228,61 @@ void Runtime::releasePendingToken() assert(before > 0); } +void Runtime::processContinuations() +{ + // Process continuations AND resume threads on the uv event loop + // This is called by the uv_async callback + std::vector> copy; + + { + std::unique_lock lock(continuationMutex); + copy = std::move(continuations); + continuations.clear(); + } + + // Execute all continuations (these populate runningThreads) + for (auto&& continuation : copy) + { + continuation(); + } + + // Now resume all pending Lua threads + // This happens on the event loop thread, not the separate thread + while (!runningThreads.empty()) + { + auto next = std::move(runningThreads.front()); + runningThreads.erase(runningThreads.begin()); + + next.ref->push(GL); + lua_State* L = lua_tothread(GL, -1); + + if (L == nullptr) + { + fprintf(stderr, "Cannot resume a non-thread reference\n"); + lua_pop(GL, 1); + continue; + } + + lua_pop(GL, 1); + + int status = LUA_OK; + + if (!next.success) + status = lua_resumeerror(L, nullptr); + else + status = lua_resume(L, nullptr, next.argumentCount); + + if (status != LUA_OK && status != LUA_YIELD) + { + reportError(L); + errorOccurred.store(true); + } + + if (next.cont) + next.cont(); + } +} + Runtime* getRuntime(lua_State* L) { return static_cast(lua_getthreaddata(lua_mainthread(L))); diff --git a/lute/task/src/task.cpp b/lute/task/src/task.cpp index e235992b..825f17a2 100644 --- a/lute/task/src/task.cpp +++ b/lute/task/src/task.cpp @@ -70,7 +70,14 @@ int lua_defer(lua_State* L) { Runtime* runtime = getRuntime(L); - runtime->runningThreads.push_back({true, getRefForThread(L), 0}); + // Schedule the deferred thread to run on the next event loop iteration + runtime->schedule( + [runtime, ref = getRefForThread(L)]() + { + runtime->runningThreads.push_back({true, ref, 0}); + } + ); + return lua_yield(L, 0); }; diff --git a/lute/vm/src/spawn.cpp b/lute/vm/src/spawn.cpp index 26bd7a36..bcd2b231 100644 --- a/lute/vm/src/spawn.cpp +++ b/lute/vm/src/spawn.cpp @@ -288,8 +288,8 @@ int lua_spawn(lua_State* L) lua_pop(child->GL, 1); - child->runContinuously(); - + // Child runtime is already integrated with the event loop via uv_async_t + // No need to explicitly start it - it will process continuations when scheduled return 1; }