Skip to content

Commit 4af661b

Browse files
authored
Merge pull request #5 from golemcloud/fix-stream-drop
Fix an issue with how stream drop could persist an end remote write marker in the oplog
2 parents 9cf063c + dc40160 commit 4af661b

File tree

4 files changed

+21
-8
lines changed

4 files changed

+21
-8
lines changed

llm-anthropic/src/bindings.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
// Generated by `wit-bindgen` 0.36.0. DO NOT EDIT!
22
// Options used:
33
// * runtime_path: "wit_bindgen_rt"
4-
// * with "wasi:io/poll@0.2.0" = "golem_rust::wasm_rpc::wasi::io::poll"
54
// * with "golem:llm/llm@1.0.0" = "golem_llm::golem::llm::llm"
5+
// * with "wasi:io/poll@0.2.0" = "golem_rust::wasm_rpc::wasi::io::poll"
66
// * generate_unused_types
77
use golem_rust::wasm_rpc::wasi::io::poll as __with_name0;
88
use golem_llm::golem::llm::llm as __with_name1;

llm-grok/src/bindings.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
// Generated by `wit-bindgen` 0.36.0. DO NOT EDIT!
22
// Options used:
33
// * runtime_path: "wit_bindgen_rt"
4-
// * with "wasi:io/poll@0.2.0" = "golem_rust::wasm_rpc::wasi::io::poll"
54
// * with "golem:llm/llm@1.0.0" = "golem_llm::golem::llm::llm"
5+
// * with "wasi:io/poll@0.2.0" = "golem_rust::wasm_rpc::wasi::io::poll"
66
// * generate_unused_types
77
use golem_rust::wasm_rpc::wasi::io::poll as __with_name0;
88
use golem_llm::golem::llm::llm as __with_name1;

llm-openai/src/bindings.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
// Generated by `wit-bindgen` 0.36.0. DO NOT EDIT!
22
// Options used:
33
// * runtime_path: "wit_bindgen_rt"
4-
// * with "golem:llm/llm@1.0.0" = "golem_llm::golem::llm::llm"
54
// * with "wasi:io/poll@0.2.0" = "golem_rust::wasm_rpc::wasi::io::poll"
5+
// * with "golem:llm/llm@1.0.0" = "golem_llm::golem::llm::llm"
66
// * generate_unused_types
77
use golem_rust::wasm_rpc::wasi::io::poll as __with_name0;
88
use golem_llm::golem::llm::llm as __with_name1;

llm/src/durability.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ mod durable_impl {
211211

212212
pub struct DurableChatStream<Impl: ExtendedGuest> {
213213
state: RefCell<Option<DurableChatStreamState<Impl>>>,
214+
subscription: RefCell<Option<Pollable>>,
214215
}
215216

216217
impl<Impl: ExtendedGuest> DurableChatStream<Impl> {
@@ -220,6 +221,7 @@ mod durable_impl {
220221
stream,
221222
pollables: Vec::new(),
222223
})),
224+
subscription: RefCell::new(None),
223225
}
224226
}
225227

@@ -232,16 +234,23 @@ mod durable_impl {
232234
partial_result: Vec::new(),
233235
finished: false,
234236
})),
237+
subscription: RefCell::new(None),
235238
}
236239
}
237240
}
238241

239242
impl<Impl: ExtendedGuest> Drop for DurableChatStream<Impl> {
240243
fn drop(&mut self) {
241-
// Pollables must be dropped first
244+
let _ = self.subscription.take();
242245
match self.state.take() {
243-
Some(DurableChatStreamState::Live { mut pollables, .. }) => {
244-
pollables.clear();
246+
Some(DurableChatStreamState::Live {
247+
mut pollables,
248+
stream,
249+
}) => {
250+
with_persistence_level(PersistenceLevel::PersistNothing, move || {
251+
pollables.clear();
252+
drop(stream);
253+
});
245254
}
246255
Some(DurableChatStreamState::Replay { mut pollables, .. }) => {
247256
pollables.clear();
@@ -354,10 +363,14 @@ mod durable_impl {
354363
}
355364

356365
fn blocking_get_next(&self) -> Vec<StreamEvent> {
357-
let pollable = self.subscribe();
366+
let mut subscription = self.subscription.borrow_mut();
367+
if subscription.is_none() {
368+
*subscription = Some(self.subscribe());
369+
}
370+
let subscription = subscription.as_mut().unwrap();
358371
let mut result = Vec::new();
359372
loop {
360-
pollable.block();
373+
subscription.block();
361374
match self.get_next() {
362375
Some(events) => {
363376
result.extend(events);

0 commit comments

Comments
 (0)