How CoreAI streams tokens from LLMs into your UI — end-to-end, with every layer you can override.
TL;DR. Both HTTP SSE and local LLMUnity paths produce
IAsyncEnumerable<LlmStreamChunk>. The chunks are scrubbed by a single statefulThinkBlockStreamFilter(tag-safe across chunk boundaries) and delivered toCoreAiChatPanelon the Unity main thread. Whether streaming is used at all is decided by a three-layer flag hierarchy — UI → per-agent → global.
From any script (beginners and pros): use the static API CoreAi.StreamAsync / CoreAi.SmartAskAsync — they delegate to CoreAiChatService and the same chunk pipeline. Full guide: COREAI_SINGLETON_API.md. Orchestrator streaming (CoreAi.OrchestrateStreamAsync) is documented in §6 below.
┌──────────────────────────────────────────┐
│ Caller (CoreAiChatPanel, │
│ CoreAiChatService, AgentBuilder.Ask...) │
└──────────────┬───────────────────────────┘
│ IAsyncEnumerable<LlmStreamChunk>
▼
┌──────────────────────────────────────────┐
│ MeaiLlmClient (wrapper) │
│ • routing (LLMUnity / OpenAI HTTP) │
│ • ThinkBlockStreamFilter (stateful) │
│ • yields final IsDone=true chunk │
└──────────────┬───────────────────────────┘
│ MEAI ChatResponseUpdate
▼
┌───────────────────────┴───────────────────────┐
│ │
┌────────▼─────────┐ ┌───────────▼──────────┐
│ MeaiOpenAiChat │ │ LlmUnityMeaiChatClient│
│ Client (HTTP) │ │ (local GGUF) │
│ • IOpenAiHttpTransport │ • LLMAgent.Chat │
│ – HttpClient (default) │ • ConcurrentQueue │
│ – UnityWebRequest (WebGL player) │ • frame callbacks │
│ • SSE + simulated stream (see §2) │ │
└────────┬─────────┘ └───────────┬──────────┘
│ │
└──────────────► LLM backend ◄──────────────────┘
Key files:
| Layer | File |
|---|---|
| Filter (portable) | Assets/CoreAI/Runtime/Core/Features/Orchestration/ThinkBlockStreamFilter.cs |
| Wrapper | Assets/CoreAiUnity/Runtime/Source/Features/Llm/Infrastructure/MeaiLlmClient.cs |
| HTTP client + transport | Assets/CoreAI/Runtime/Core/Features/Llm/MeaiOpenAiChatClient.cs |
| HTTP transports | HttpClientOpenAiTransport.cs, UnityWebRequestOpenAiTransport.cs (Unity) |
| LLMUnity | Assets/CoreAiUnity/Runtime/Source/Features/Llm/Infrastructure/LlmUnityMeaiChatClient.cs |
| Tool execution policy (portable) | Assets/CoreAI/Runtime/Core/Features/Llm/ToolExecutionPolicy.cs |
| Non-streaming tool loop (portable) | Assets/CoreAI/Runtime/Core/Features/Llm/SmartToolCallingChatClient.cs |
| UI | Assets/CoreAiUnity/Runtime/Source/Features/Chat/CoreAiChatPanel.cs |
| Service | Assets/CoreAiUnity/Runtime/Source/Features/Chat/CoreAiChatService.cs |
Default (Editor, standalone, mobile): MeaiOpenAiChatClient uses IOpenAiHttpTransport with HttpClientOpenAiTransport. GetStreamingResponseAsync sends stream: true, opens HttpCompletionOption.ResponseHeadersRead, and parses SSE lines. Prefixes data: and data: (no space) are accepted. When choices[0].delta.content is empty, the parser may use choices[0].message or choices[0].text. After response headers arrive, the client logs HTTP status and Content-Type before reading the body; a stream that ends with no parsed deltas emits a Warn diagnostic.
WebGL player: the browser forbids System.Net / HttpClient. UnityWebRequestOpenAiTransport implements IOpenAiHttpTransport with SupportsSseStreaming = false. The chat client uses a non-streaming JSON completion and simulates ChatResponseUpdate yields so MEAI / tool loops stay unchanged (UX: typically one visible chunk). Servers must expose CORS for the game origin. See HTTP_TRANSPORT_SPEC.md and Unity Web networking.
Future: true SSE in WebGL can add another IOpenAiHttpTransport with SupportsSseStreaming = true (e.g. .jslib + EventSource / fetch stream reader).
- Timeouts → long streams use a per-read stall budget (see
RequestTimeoutSeconds) on theHttpClientpath;HttpClient.Timeouton the streaming client is kept high. - Cancellation → cooperative via
CancellationToken. - Errors → logged; failures surface as
LlmClientException/ terminal stream chunks where supported.
LlmUnityMeaiChatClient.GetStreamingResponseAsync calls LLMAgent.Chat(prompt, callback). The delta is pushed onto a ConcurrentQueue<string> from LLMUnity's worker and drained on the Unity main thread via await foreach.
- Cancellation → cooperative; the async loop checks the token every iteration.
- Think blocks — the
<think>regex-per-chunk that used to live here was removed in 0.20.2; filtering happens centrally inMeaiLlmClient.
Reasoning models (DeepSeek-R1, Qwen3 thinking, o1-class) emit chain-of-thought inside <think>…</think> tags. OpenAI-compatible HTTP (LM Studio, vLLM, etc.) may instead stream a separate delta.reasoning_content field; MeaiOpenAiChatClient does not forward that to MEAI/Chat (it never becomes update.Text for the think filter). The tag-based filter only sees in-content tags.
Idle / stall budgets for HTTP SSE are enforced in MeaiOpenAiChatClient (read-loop timeouts aligned with RequestTimeoutSeconds), separate from HttpClient.Timeout on the streaming client (kept high so long generations are not cut off at the transport level).
Those blocks must never reach the UI, but:
- Opening and closing tags can arrive in separate chunks (e.g.
"<thi"+"nk>…"). - A stray
<that is not part of a<think>tag must still be rendered. - The stream can end in the middle of
<think>— we must flush cleanly.
CoreAI.Ai.ThinkBlockStreamFilter solves all three. It's a pure C# state machine:
var filter = new ThinkBlockStreamFilter();
await foreach (var chunk in client.GetStreamingResponseAsync(...))
{
string visible = filter.ProcessChunk(chunk.Text);
if (!string.IsNullOrEmpty(visible))
ui.Append(visible);
}
string tail = filter.Flush(); // empty in normal termination
if (!string.IsNullOrEmpty(tail)) ui.Append(tail);Covered by 24 EditMode tests (ThinkBlockStreamFilterEditModeTests) including split-tag boundary cases.
Streaming is enabled when every layer agrees. First false wins.
| Priority | Layer | Where | Default |
|---|---|---|---|
| 1 (highest) | UI toggle | CoreAiChatConfig.EnableStreaming (Inspector) |
true |
| 2 | Per-agent override | AgentBuilder.WithStreaming(bool) → AgentMemoryPolicy.SetStreamingEnabled(role, bool) |
(unset) |
| 3 | Global | CoreAISettings.EnableStreaming (ScriptableObject / static CoreAISettings.EnableStreaming) |
true |
// Always stream this NPC even if the project default is non-streaming
new AgentBuilder("SmartChat")
.WithSystemPrompt("You are a friendly guide.")
.WithStreaming(true)
.Build();
// Never stream — caller wants the full JSON in one shot
new AgentBuilder("JsonParser")
.WithSystemPrompt("You output a strict JSON object.")
.WithStreaming(false)
.Build();
// Resolve effective value
var service = CoreAiChatService.TryCreateFromScene();
bool useStream = service.IsStreamingEnabled("SmartChat", uiFallback: true);Covered by CoreAiChatServiceEditModeTests.
CoreAiChatPanel.SendToAI owns an instance of ThinkBlockStreamFilter per message. As chunks arrive:
- The typing indicator stays visible while
filter.ProcessChunk(...)returns empty (model is still inside<think>). - As soon as visible text appears, the current bubble is swapped from "typing" to "streaming" and incrementally grows.
- On cancellation or error, the bubble is finalised with what we have, and the HTTP request is aborted if applicable.
Programmatic consumers can bypass the panel entirely:
await foreach (var chunk in service.SendMessageStreamingAsync("Hello", "SmartChat", ct))
{
if (!string.IsNullOrEmpty(chunk.Text)) label.text += chunk.Text;
if (chunk.IsDone) break;
}Or use the static CoreAi singleton (see COREAI_SINGLETON_API.md) — no manual service resolution:
await foreach (string chunk in CoreAi.StreamAsync("Hello", "SmartChat"))
label.text += chunk;Streaming is not limited to CoreAiChatService; it also flows through the full AI pipeline (IAiOrchestrationService). Differences:
| Layer | CoreAiChatService.SendMessageStreamingAsync |
IAiOrchestrationService.RunStreamingAsync |
|---|---|---|
| Prompt composer | No (explicit system + user) | Yes — 3-layer prompt composer |
| Authority check | No | Yes — IAuthorityHost.CanRunAiTasks |
Queue + MaxConcurrent |
No | Yes — QueuedAiOrchestrator (fair, by priority) |
CancellationScope (cancel prior task with same key) |
No | Yes |
| Structured validation | No | Yes (after stream completes) |
Publish ApplyAiGameCommand |
No | Yes (after full response) |
| Metrics | No | Yes — IAiOrchestrationMetrics |
Use CoreAi.OrchestrateStreamAsync(task) for agent workflows (Creator / Programmer / Mechanic) and CoreAi.StreamAsync("text") for simple chat.
Inside AiOrchestrator.RunStreamingAsync:
- Build snapshot + prompt composer (shared with
RunTaskAsync, factored intoBuildRequest). - Create
LlmCompletionRequestwith tools, history, temperature. await foreachonILlmClient.CompleteStreamingAsync(already includesThinkBlockStreamFilterinMeaiLlmClient).- Accumulate full text in a
StringBuilder(required for step 5). - When the stream ends — structured validation, publish
ApplyAiGameCommand, append chat history, record metrics.
QueuedAiOrchestrator.RunStreamingAsync forwards through its own producer/consumer queue (AsyncChunkQueue on SemaphoreSlim + ConcurrentQueue — no System.Threading.Channels, which is unavailable in this Unity build), respecting MaxConcurrent and CancellationScope.
Since v0.24.0, streaming tool-calling uses a dual-path architecture:
The primary mechanism, designed for local models (Ollama, llama.cpp, LM Studio) that output tool calls as text.
MeaiLlmClient.TryExtractToolCallsFromText scans the accumulated visible text for JSON objects containing both "name" and "arguments" keys.
- Multi-tool: extracts multiple tool calls from a single response
- False-positive protection: ignores JSON inside fenced code blocks (
```...```) - Pattern-aware: only matches JSON with required
name+argumentsstructure - Graceful: partial/malformed JSON is silently skipped
For cloud providers (OpenAI, Anthropic via OpenRouter) that emit delta.tool_calls in SSE chunks.
MeaiOpenAiChatClient.ExtractDeltaUpdate parses choices[0].delta.tool_calls and emits FunctionCallContent in ChatResponseUpdate.
If the SSE stream contains FunctionCallContent, MeaiLlmClient uses native detection instead of text extraction.
Both streaming and non-streaming paths use ToolExecutionPolicy for:
| Guarantee | Description |
|---|---|
| Duplicate detection | Signature-based (name + arguments hash). Blocks repeated identical calls within one request cycle. Per-tool AllowDuplicates flag overrides. |
| Consecutive error tracking | Counter resets on success, increments on failure. Agent aborts at MaxToolCallRetries threshold. |
| Notification | Every tool execution fires IToolCallEventPublisher.PublishStarted/Completed/Failed (portable) → MessagePipeToolCallEventPublisher adapter → GlobalMessagePipe. Also calls IToolExecutionNotifier.NotifyToolExecuted → CoreAiToolExecutionNotifier adapter → CoreAi.NotifyToolExecuted. |
StopActiveGeneration()has a_isStoppingre-entrancy guard — concurrent Escape + button click cannot double-fire.- Send button stop mode (0.25.6+) stays enabled while a request is running; the button is the stop control in that state, so click events must reach
StopActiveGeneration(). StopAgent()delegates toStopActiveGeneration()and additionally resets the root CTS and cleans up UI.- Cancellation cleanup (0.25.6+) cancels the active request CTS and resets streaming/sending UI state even when the static
CoreAi.StopAgent(roleId)path is unavailable. ClearChat()callsStopActiveGeneration()before clearing history.
Rule: Timeout responsibility lives exclusively in the Unity layer (
CoreAiChatService), not in the portable layer (AiOrchestrator,LoggingLlmClientDecorator).
Before v1.5.1, AiOrchestrator and LoggingLlmClientDecorator both used CancellationTokenSource.CancelAfter() to enforce request timeouts. This relies on System.Threading.Timer, which is non-functional in WebGL (Emscripten single-threaded model, no native timer callbacks), causing indefinite hangs.
In v1.5.1:
AiOrchestratorandLoggingLlmClientDecoratorpasscancellationTokenthrough without wrapping it in timeout-linked sources.CoreAiChatService.SendMessageAsyncandSendMessageStreamingAsynccreate a linkedCancellationTokenSourcewithCancelAfterSlim(TimeSpan)fromCysharp.Threading.Tasks(UniTask), which uses Unity'sPlayerLoop— fully WebGL-compatible.- The timeout value comes from
ICoreAISettings.LlmRequestTimeoutSeconds(default: 300s).
// Inside CoreAiChatService.SendMessageAsync (simplified)
float timeoutSec = _settings?.LlmRequestTimeoutSeconds ?? 0f;
if (timeoutSec > 0)
{
timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
timeoutCts.CancelAfterSlim(TimeSpan.FromSeconds(timeoutSec)); // UniTask PlayerLoop
effectiveCt = timeoutCts.Token;
}
string result = await _orchestrator.RunTaskAsync(request, effectiveCt);Rule: Network-level retries (HTTP 429, 5xx, exponential backoff) are handled exclusively by
LoggingLlmClientDecorator. The orchestrator invokes the LLM client exactly once per request.
Before v1.5.1, AiOrchestrator.RunTaskAsync had its own for (attempt...) retry loop, creating an M × N retry multiplier (e.g., 2 orchestrator retries × 3 decorator retries = 6 actual network requests on a single failure).
CoreAiChatService no longer swallows exceptions. Errors from AiOrchestrator → LoggingLlmClientDecorator → ILlmClient propagate to CoreAiChatPanel, which catches Exception and displays the error message to the user.
- No output-length timeout — there is a per-request cancellation token but no total response length guard. Add one externally if you need it.
- Mobile — HTTP streaming behaviour depends on the OS / Mono / IL2CPP stack; measure before shipping.
- Partial SSE
tool_calls— Cloud providers may split tool call arguments across multiple SSE chunks. The current implementation only handles completedelta.tool_callswith bothnameand fully-formedargumentsin a single chunk. Progressive accumulation across chunks is not yet implemented. - WebGL — browser
HttpClient/ XHR may buffer SSE (historically reported with incremental delivery). In a built WebGL player, the response body sometimes reaches the parser in fewer, larger reads than on desktop. Symptoms can include logLLM ◀ (stream) chunks=1for medium-length replies;CoreAiChatConfig.EnableStreaming = falseunderUNITY_WEBGL && !UNITY_EDITORis the supported workaround (seeSTREAMING_WEBGL_TODO.md).
Related deep dives: LUA_SANDBOX_SECURITY (TODO) · TOOL_CALLING_BEST_PRACTICES (TODO).