Skip to content

Commit 5de3e14

Browse files
author
Rutik Thakre
committed
Allows application/ndjson header & Add debug print statements for tracing execution flow in OllamaApi and EventStream
1 parent 58e2697 commit 5de3e14

File tree

4 files changed

+42
-4
lines changed

4 files changed

+42
-4
lines changed

llm-ollama/src/client.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub struct OllamaApi {
2323

2424
impl OllamaApi {
2525
pub fn new(default_model: String) -> Self {
26+
println!("OllamaApi::new");
2627
let base_url =
2728
std::env::var("GOLEM_OLLAMA_BASE_URL").unwrap_or("http://localhost:11434".to_string());
2829
let client = Client::builder()
@@ -36,6 +37,7 @@ impl OllamaApi {
3637
}
3738

3839
pub fn send_chat(&self, params: CompletionsRequest) -> Result<CompletionsResponse, Error> {
40+
println!("OllamaApi:: send_chat");
3941
trace!("Sending request to Ollama API: {params:?}");
4042

4143
let mut modified_params = params;
@@ -60,6 +62,7 @@ impl OllamaApi {
6062
}
6163

6264
pub fn send_chat_stream(&self, params: CompletionsRequest) -> Result<EventSource, Error> {
65+
println!("OllamaApi::send_chat_stream");
6366
trace!("Sending request to Ollama API: {params:?}");
6467

6568
let mut modified_params = params;

llm-ollama/src/lib.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ mod conversions;
2020
struct OllamaChatStream {
2121
stream: RefCell<Option<EventSource>>,
2222
failure: Option<Error>,
23-
finished: RefCell<bool>,
23+
finished: RefCell<bool>,
2424
}
2525

2626
impl OllamaChatStream {
2727
pub fn new(stream: EventSource) -> LlmChatStream<Self> {
28-
// Remove EventSource dependency and use direct HTTP response
28+
println!("OllamaChatStream::new");
2929
LlmChatStream::new(OllamaChatStream {
3030
stream: RefCell::new(Some(stream)),
3131
failure: None,
@@ -34,6 +34,7 @@ impl OllamaChatStream {
3434
}
3535

3636
pub fn failed(error: Error) -> LlmChatStream<Self> {
37+
println!("OllamaChatStream::failed");
3738
LlmChatStream::new(OllamaChatStream {
3839
stream: RefCell::new(None),
3940
failure: Some(error),
@@ -44,25 +45,31 @@ impl OllamaChatStream {
4445

4546
impl LlmChatStreamState for OllamaChatStream {
4647
fn failure(&self) -> &Option<Error> {
48+
println!("OllamaChatStream::failure");
4749
&self.failure
4850
}
4951
fn is_finished(&self) -> bool {
52+
println!("OllamaChatStream::is_finished");
5053
*self.finished.borrow()
5154
}
5255

5356
fn set_finished(&self) {
57+
println!("OllamaChatStream::set_finished");
5458
*self.finished.borrow_mut() = true;
5559
}
5660

5761
fn stream(&self) -> Ref<Option<EventSource>> {
62+
println!("OllamaChatStream::stream");
5863
self.stream.borrow()
5964
}
6065

6166
fn stream_mut(&self) -> RefMut<Option<EventSource>> {
67+
println!("OllamaChatStream::stream_mut");
6268
self.stream.borrow_mut()
6369
}
6470

6571
fn decode_message(&self, raw: &str) -> Result<Option<StreamEvent>, String> {
72+
println!("OllamaChatStream::decode_message");
6673
trace!("Parsing NDJSON line: {raw}");
6774
let json: serde_json::Value = serde_json::from_str(raw.trim())
6875
.map_err(|e| format!("JSON parse error: {e}"))?;
@@ -102,6 +109,7 @@ struct OllamaComponent;
102109

103110
impl OllamaComponent {
104111
fn request(client: &OllamaApi, request: CompletionsRequest) -> ChatEvent {
112+
println!("OllamaComponent::request");
105113
match client.send_chat(request) {
106114
Ok(response) => process_response(response),
107115
Err(err) => ChatEvent::Error(err),
@@ -112,6 +120,7 @@ impl OllamaComponent {
112120
client: &OllamaApi,
113121
mut request: CompletionsRequest,
114122
) -> LlmChatStream<OllamaChatStream> {
123+
println!("OllamaComponent::streaming_request");
115124
request.stream = Some(true);
116125
match client.send_chat_stream(request) {
117126
Ok(stream) => OllamaChatStream::new(stream),
@@ -124,6 +133,7 @@ impl Guest for OllamaComponent {
124133
type ChatStream = LlmChatStream<OllamaChatStream>;
125134

126135
fn send(messages: Vec<Message>, config: Config) -> ChatEvent {
136+
println!("OllamaComponent::send");
127137
LOGGING_STATE.with_borrow_mut(|state| state.init());
128138

129139
let client = OllamaApi::new(config.model.clone());
@@ -138,6 +148,7 @@ impl Guest for OllamaComponent {
138148
tool_results: Vec<(ToolCall, ToolResult)>,
139149
config: Config,
140150
) -> ChatEvent {
151+
println!("OllamaComponent::continue_");
141152
LOGGING_STATE.with_borrow_mut(|state| state.init());
142153
let client = OllamaApi::new(config.model.clone());
143154
match messages_to_request(messages, config.clone()) {
@@ -147,12 +158,14 @@ impl Guest for OllamaComponent {
147158
}
148159

149160
fn stream(messages: Vec<Message>, config: Config) -> ChatStream {
150-
ChatStream::new(Self::unwrapped_stream(messages, config))
161+
println!("OllamaComponent::stream");
162+
ChatStream::new(Self::unwrapped_stream(messages, config.clone()))
151163
}
152164
}
153165

154166
impl ExtendedGuest for OllamaComponent {
155167
fn unwrapped_stream(messages: Vec<Message>, config: Config) -> LlmChatStream<OllamaChatStream> {
168+
println!("OllamaComponent::unwrapped_stream");
156169
LOGGING_STATE.with_borrow_mut(|state| state.init());
157170

158171
let client = OllamaApi::new(config.model.clone());
@@ -163,6 +176,7 @@ impl ExtendedGuest for OllamaComponent {
163176
}
164177

165178
fn retry_prompt(original_messages: &[Message], partial_result: &[StreamDelta]) -> Vec<Message> {
179+
println!("OllamaComponent::retry_prompt");
166180
let mut extended_messages = Vec::new();
167181

168182
extended_messages.push(Message {
@@ -214,6 +228,11 @@ impl ExtendedGuest for OllamaComponent {
214228

215229
extended_messages
216230
}
231+
232+
fn subscribe(stream: &Self::ChatStream) -> Pollable {
233+
println!("OllamaComponent::subscribe");
234+
stream.subscribe()
235+
}
217236
}
218237

219238
type DurableOllamaComponent = DurableLLM<OllamaComponent>;

llm/src/event_source/event_stream.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ pub struct EventStream {
136136
impl EventStream {
137137
/// Initialize the EventStream with a Stream
138138
pub fn new(stream: InputStream) -> Self {
139+
println!("EventStream::new");
139140
Self {
140141
stream: Utf8Stream::new(stream),
141142
buffer: String::new(),
@@ -148,21 +149,25 @@ impl EventStream {
148149
/// Set the last event ID of the stream. Useful for initializing the stream with a previous
149150
/// last event ID
150151
pub fn set_last_event_id(&mut self, id: impl Into<String>) {
152+
println!("EventStream::set_last_event_id");
151153
self.last_event_id = id.into();
152154
}
153155

154156
/// Get the last event ID of the stream
155157
pub fn last_event_id(&self) -> &str {
158+
println!("EventStream::last_event_id");
156159
&self.last_event_id
157160
}
158161

159162
pub fn subscribe(&self) -> Pollable {
163+
println!("EventStream::subscribe");
160164
self.stream.subscribe()
161165
}
162166

163167
pub fn poll_next(
164168
&mut self,
165169
) -> Poll<Option<Result<MessageEvent, EventStreamError<StreamError>>>> {
170+
println!("EventStream::poll_next");
166171
trace!("Polling for next event");
167172

168173
match parse_event(&mut self.buffer, &mut self.builder) {
@@ -230,6 +235,7 @@ pub enum EventStreamError<E> {
230235

231236
impl<E> From<Utf8StreamError<E>> for EventStreamError<E> {
232237
fn from(err: Utf8StreamError<E>) -> Self {
238+
println!("EventStreamError::from");
233239
match err {
234240
Utf8StreamError::Utf8(err) => Self::Utf8(err),
235241
Utf8StreamError::Transport(err) => Self::Transport(err),
@@ -239,6 +245,7 @@ impl<E> From<Utf8StreamError<E>> for EventStreamError<E> {
239245

240246
impl<E> From<NomError<&str>> for EventStreamError<E> {
241247
fn from(err: NomError<&str>) -> Self {
248+
println!("EventStreamError::from");
242249
EventStreamError::Parser(NomError::new(err.input.to_string(), err.code))
243250
}
244251
}
@@ -248,6 +255,7 @@ where
248255
E: fmt::Display,
249256
{
250257
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258+
println!("EventStreamError::fmt");
251259
match self {
252260
Self::Utf8(err) => f.write_fmt(format_args!("UTF8 error: {}", err)),
253261
Self::Parser(err) => f.write_fmt(format_args!("Parse error: {}", err)),
@@ -262,6 +270,7 @@ fn parse_event<E>(
262270
buffer: &mut String,
263271
builder: &mut EventBuilder,
264272
) -> Result<Option<MessageEvent>, EventStreamError<E>> {
273+
println!("poll_next->parse_event");
265274
if buffer.is_empty() {
266275
return Ok(None);
267276
}

llm/src/event_source/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub struct EventSource {
3636
impl EventSource {
3737
#[allow(clippy::result_large_err)]
3838
pub fn new(response: Response) -> Result<Self, Error> {
39+
println!("EventSource::new");
3940
match check_response(response) {
4041
Ok(mut response) => {
4142
let handle = unsafe {
@@ -57,11 +58,13 @@ impl EventSource {
5758

5859
/// Close the EventSource stream and stop trying to reconnect
5960
pub fn close(&mut self) {
61+
println!("EventSource::close");
6062
self.is_closed = true;
6163
}
6264

6365
/// Get the current ready state
6466
pub fn ready_state(&self) -> ReadyState {
67+
println!("EventSource::ready_state");
6568
if self.is_closed {
6669
ReadyState::Closed
6770
} else {
@@ -70,10 +73,12 @@ impl EventSource {
7073
}
7174

7275
pub fn subscribe(&self) -> Pollable {
76+
println!("EventSource::subscribe");
7377
self.stream.subscribe()
7478
}
7579

7680
pub fn poll_next(&mut self) -> Poll<Option<Result<Event, Error>>> {
81+
println!("EventSource::poll_next");
7782
if self.is_closed {
7883
return Poll::Ready(None);
7984
}
@@ -97,6 +102,7 @@ impl EventSource {
97102

98103
#[allow(clippy::result_large_err)]
99104
fn check_response(response: Response) -> Result<Response, Error> {
105+
println!("EventSource->check_response");
100106
match response.status() {
101107
StatusCode::OK => {}
102108
status => {
@@ -120,7 +126,7 @@ fn check_response(response: Response) -> Result<Response, Error> {
120126
matches!(
121127
(mime_type.type_(), mime_type.subtype()),
122128
(mime::TEXT, mime::EVENT_STREAM)
123-
)
129+
) || mime_type.subtype().as_str().contains("ndjson")
124130
})
125131
.unwrap_or(false)
126132
{
@@ -141,6 +147,7 @@ pub enum Event {
141147

142148
impl From<MessageEvent> for Event {
143149
fn from(event: MessageEvent) -> Self {
150+
println!("Event::from");
144151
Event::Message(event)
145152
}
146153
}

0 commit comments

Comments
 (0)