Skip to main content

Workflow Streams - TypeScript SDK

Workflow Streams is a Temporal TypeScript SDK contrib library that gives a Workflow a durable, offset-addressed event channel built on Temporal's basic message primitives: Signals, Updates, and Queries. It batch-publishes events to amortize per-Signal cost, deduplicates batches for exactly-once delivery to the log, supports topic filtering, and carries state across Continue-As-New for long-running streams.

Use Workflow Streams when you want outside observers to follow the progress of a Workflow and its Activities: updating a UI as an AI agent works, surfacing status from a payment or order pipeline, or reporting intermediate results from a data job. It is not suited to ultra-low-latency cases like real-time voice, and it targets modest fan-out: tens of publishers and subscribers per Workflow, not thousands.

The Workflow hosts the event log. Publishers append events — the Workflow itself, Activities, or external processes via WorkflowStreamClient. Subscribers attach to the Workflow ID, optionally filter by topic (a string label set when publishing; topics are implicit and created on first publish), and consume events by long-polling from an offset they store.

The package has no root entrypoint; import from one of two subpaths:

  • @temporalio/workflow-streams/workflow — the workflow-safe surface (WorkflowStream, WorkflowStreamState, ...). Bundles cleanly into Workflow code.
  • @temporalio/workflow-streams/client — the client surface (WorkflowStreamClient, ...). Pulls in crypto, @temporalio/activity, and @temporalio/client, none of which resolve inside the Workflow sandbox; do not import from a Workflow file.
SUPPORT, STABILITY, and DEPENDENCY INFO

The @temporalio/workflow-streams module is currently in Public Preview. Refer to the Temporal product release stages guide for more information.

Cross-language client support is on the roadmap. Only the TypeScript and Python clients are available today.

The API may change before general availability.

Looking for...

  • Runnable end-to-end samples (basic publish/subscribe, reconnecting subscriber, external publisher, bounded log, LLM streaming): Workflow Streams samples.
  • A complete LLM streaming example on this page (Activity publishes deltas, terminal consumer that resets on retry): Stream LLM output.
  • Delivery guarantees, ordering, and retry semantics: Delivery semantics.
  • History-size cost and tuning: Architecture.
  • Long-running streams that need Continue-As-New: Continue-As-New.

Choose where to host the stream

A WorkflowStream is hosted inside a Workflow, so the first design choice is whether one Workflow handles both the work and the stream, or whether a separate Workflow exists only to host the stream. The choice is mostly about lifecycle.

Host the stream on the Workflow that does the work when the events come from what that Workflow is already orchestrating: an agent run, an order pipeline, a chat session. The stream's lifecycle aligns with the run, starting when the run starts and ending when it returns. The Workflow ID you use to start the work is the same one subscribers attach to. This is the common shape for AI agents and most progress-streaming cases, where streaming is just one more thing the Workflow does as part of its job.

Use a dedicated Workflow for the stream alone when the stream should outlive any single producer, accept fan-in from multiple unrelated sources, or be subscribable before any work has started. Producers publish from outside the stream Workflow (Activities of other Workflows, or external WorkflowStreamClient instances). The trade-off is explicit lifecycle management: a dedicated stream Workflow does not terminate on its own, so you need a signal-driven shutdown or a Continue-As-New strategy.

Whichever shape you pick, the Workflow ID is the address subscribers use to attach. Multiple subscribers can attach to the same ID concurrently, which is the normal case for a UI with multiple browser tabs. Use distinct Workflow IDs for unrelated streams rather than packing them into one Workflow.

Enable streaming on a Workflow

The library ships as @temporalio/workflow-streams; import the Workflow-side surface from @temporalio/workflow-streams/workflow. Enable streaming by constructing a WorkflowStream at the very top of your Workflow function, before any await. Construction must happen there because the stream's handlers have to be registered before the first publish Signal arrives; doing it after an await would miss any publishes that arrived before the run body resumed.

import { WorkflowStream } from '@temporalio/workflow-streams/workflow';

export interface OrderInput {
orderId: string;
}

export async function orderWorkflow(input: OrderInput): Promise<void> {
const stream = new WorkflowStream();
// ... rest of the workflow
}

Constructing WorkflowStream creates the in-memory event log and registers the publish Signal, subscribe Update, and offset Query handlers on the current Workflow. Constructing more than one WorkflowStream on the same Workflow silently replaces the handlers — the TypeScript Workflow runtime does not expose an inspection API for existing handlers, so the library cannot raise on a duplicate the way the Python SDK does. Construct exactly one WorkflowStream at the top of the Workflow function.

If your Workflow uses Continue-As-New, see Continue-As-New below for how to carry stream state across runs so subscribers see no gap.

Publish from a Workflow

Bind a topic name to its event type once via stream.topic<T>(name), then call publish() on the returned handle to append events. The handle carries the topic name and the type T so call sites don't have to repeat them on every publish, and so subscribers reading the same handle decode to the matching type. Repeated calls with the same name return the same handle instance.

import { WorkflowStream } from '@temporalio/workflow-streams/workflow';

export interface StatusEvent {
state: string;
progress?: number;
detail?: string;
}

export interface OrderInput {
orderId: string;
}

export async function orderWorkflow(input: OrderInput): Promise<void> {
const stream = new WorkflowStream();
const status = stream.topic<StatusEvent>('status');

status.publish({ state: 'validating', detail: 'checking inventory' });
await validateOrder(input.orderId);

status.publish({ state: 'charging', progress: 33, detail: 'authorizing payment' });
await chargePayment(input.orderId);

status.publish({ state: 'shipping', progress: 66, detail: 'dispatching to warehouse' });
await dispatchOrder(input.orderId);

status.publish({ state: 'completed', progress: 100 });
}

publish() runs the default payload converter to encode each value. The codec chain (encryption, compression, and so on) runs once on the Signal or Update envelope that carries the batch, never per item, so encryption and compression are applied exactly once each direction.

Unlike the Python SDK, T here is a compile-time annotation only: TypeScript has no runtime type representation, so the library cannot enforce per-topic type uniformity at the publish site. If two publishers bind the same topic name to different types, the mismatch is not caught at publish; the subscriber gets a decode error when it processes events from the mismatched publisher. A pre-built Payload may be passed to publish() regardless of the handle's T, taking the zero-copy fast path.

Publish from a client

Any process that has a Temporal Client and the target Workflow ID can publish to that Workflow's stream by constructing a WorkflowStreamClient. This is the general pattern and covers HTTP backends, starters, one-off scripts, other Workflows' Activities, and standalone Activities. Construct one with WorkflowStreamClient.create(client, workflowId), then use it the same way you would the Workflow-side handle: bind a topic, publish through it, and let await using flush on scope exit.

When events originate in an Activity, publish from the Activity directly rather than returning them for the Workflow to forward. The Workflow hosts the stream but does not read its own stream; it processes the Activity's return value and emits its own lifecycle events. Keeping Workflow state independent of streamed output is what lets retried Activity attempts surface to subscribers without polluting the Workflow's durable state — see Delivery semantics.

import { Client } from '@temporalio/client';
import { WorkflowStreamClient } from '@temporalio/workflow-streams/client';

export async function publishStatus(workflowId: string): Promise<void> {
const temporalClient = new Client();
await using streamClient = WorkflowStreamClient.create(temporalClient, workflowId, {
batchInterval: '200 milliseconds',
});

const status = streamClient.topic<StatusEvent>('status');
status.publish({ state: 'started' });
// ...
// Buffer is flushed automatically on `await using` scope exit.
}

The await using declaration relies on TypeScript 5.2+ and Node 20.11+; on older runtimes, call await streamClient[Symbol.asyncDispose]() explicitly at the end of the publishing scope.

Inside an Activity scheduled by a Workflow, WorkflowStreamClient.fromWithinActivity() is a convenience that infers the Temporal Client and the parent Workflow ID from the Activity context, so you don't have to thread them through the Activity's input:

import { Context } from '@temporalio/activity';
import { WorkflowStreamClient } from '@temporalio/workflow-streams/client';

export interface Delta {
text: string;
}

export async function streamDeltas(orderId: string): Promise<void> {
await using client = WorkflowStreamClient.fromWithinActivity();
const deltas = client.topic<Delta>('delta');

for await (const delta of generateDeltas(orderId)) {
deltas.publish(delta);
Context.current().heartbeat();
}
// Buffer is flushed automatically on scope exit.
}

For a standalone Activity (one started directly via Client.workflow.start rather than from a Workflow), there is no parent Workflow context to infer, so fromWithinActivity() throws. Fall back to the general pattern with Context.current().client and the target Workflow ID threaded through the Activity's input.

Two operations give the application explicit control over when batches ship: forceFlush: true on a publish for latency, and await client.flush() for confirmation that prior publications have landed.

Pass { forceFlush: true } on a publish to wake the background flusher so the current buffer ships without waiting for the next interval. The flusher only runs while the client is alive (between first publish() and Symbol.asyncDispose); outside that, forceFlush: true queues the wake event but nothing ships until a flush or dispose occurs. The call returns immediately after appending to the buffer and signaling the flusher; it does not wait for delivery to the Workflow or to subscribers:

deltas.publish(delta, { forceFlush: true });

Use it for latency-sensitive events: the first delta of a response so the user sees something fast, or punctuated events like RETRY and STATUS_CHANGE. See Tuning for the trade-off against history pressure.

Use await client.flush() when you need a mid-stream barrier. Successful completion of the flush is proof that the Temporal server has received all prior publications, so subsequent work that depends on those events being durable can proceed. The client stays open for further publishing afterward. Exiting await using already flushes on its way out, so the explicit call is only for barriers in the middle:

await using client = WorkflowStreamClient.fromWithinActivity();
const deltas = client.topic<Delta>('delta');

for (const delta of firstPhase()) {
deltas.publish(delta);
}

await client.flush();
const checkpointId = await recordPhaseOneComplete(); // only safe once phase-one events are durable

for (const delta of secondPhase(checkpointId)) {
deltas.publish(delta);
}

publish() is non-blocking and applies no backpressure. From an Activity or other client, it appends to the client's in-memory buffer and returns; from inside a Workflow, it appends synchronously to the in-memory log (no buffer, nothing to flush). Subscribers pull from the Workflow's log on their own schedule, so a slow subscriber does not slow down publishers. If a publisher emits faster than batches can ship to the server, the buffer grows: the process uses more memory, the stream falls further behind real time, and at the limit Signals cannot keep up at all.

If your application needs to bound this (to cap memory, to keep the stream close to real time, or to apply a policy when the publisher overruns the network), apply that policy upstream of publish(). The choice (block, drop, error, sample) is application-specific, and Workflow Streams does not pick one for you.

Subscribe

Subscribing uses the same client construction as publishing: WorkflowStreamClient.create(client, workflowId) from any process that has a Temporal Client, or fromWithinActivity() inside an Activity. Subscribing from an Activity is less common in practice, so the general client case is the primary example below.

Subscribing from inside the host Workflow is intentionally unsupported. The Workflow only sees the successful return value of each Activity; the stream may carry partial output from attempts that failed and were retried. Letting the Workflow read its own stream would mix those two views and break the conduit role the Workflow is meant to play.

The Workflow is the single source of truth for stream state, so any process bridging events to the outside world (an SSE proxy serving a browser, a forwarding Activity) can stay stateless — store the last delivered item.offset, and reconnects resume from that offset without coordinating with anyone but the Workflow.

Once you have a client, iterate a topic handle's subscribe(), the counterpart to publish(). The handle's bound type drives decoding, so each item.data arrives as T via the client's payload converter. The codec chain is applied once at the Update envelope, not per item.

import { Client } from '@temporalio/client';
import { WorkflowStreamClient } from '@temporalio/workflow-streams/client';

export async function watchOrder(orderId: string): Promise<void> {
const temporalClient = new Client();
const stream = WorkflowStreamClient.create(temporalClient, orderId);

const status = stream.topic<StatusEvent>('status');
for await (const item of status.subscribe()) {
const evt = item.data;
console.log(`[${(evt.progress ?? 0).toString().padStart(3)}%] ${evt.state}: ${evt.detail ?? ''}`);
if (evt.state === 'completed') break;
}
}

The iterator handles re-polling, pagination when a poll response hits the ~1 MB cap, and Workflow-side log truncation transparently. Callers don't need to wrap the iterator for the common cases. Two edge cases are worth knowing: an RPC timeout where Continue-As-New cannot be followed ends the iterator silently, and a validator rejection during a CAN handoff can surface as a WorkflowUpdateFailedError. A subscriber that does not need flushing can skip await using — the background flusher only runs for publishers.

Heterogeneous topics

A topic handle binds one name to one type, so it only fits a single-type subscription. To consume multiple topics whose payload types differ, call client.subscribe() directly with a list of names (or subscribe() with no arguments for every topic). The default overload yields WorkflowStreamItem<Payload>, so each item arrives as the raw Payload carrying encoding metadata. Dispatch on item.topic and decode the payload with defaultPayloadConverter.fromPayload<T>(item.data):

import { defaultPayloadConverter } from '@temporalio/common';

for await (const item of stream.subscribe(['status', 'progress'])) {
if (item.topic === 'status') {
const evt = defaultPayloadConverter.fromPayload<StatusEvent>(item.data);
console.log(`[status] ${evt.state}: ${evt.detail ?? ''}`);
} else if (item.topic === 'progress') {
const evt = defaultPayloadConverter.fromPayload<ProgressEvent>(item.data);
console.log(`[progress] ${evt.message}`);
}
}

A single iterator over multiple topics also avoids the cancellation race that two concurrent subscribers would create. Holding the raw Payload is also the right shape when you want to forward the bytes through to another system without decoding them.

Closing the stream

A subscriber's for await does not know when the publisher is done. End-of-stream is an application-level concern; Workflow Streams does not impose a marker. Without coordination, a subscriber will keep polling until the Workflow reaches a terminal state, and a Workflow that returns immediately after its last publish can lose that publish's poll round-trip in the gap.

How you close depends on what the application needs. As one example, a common pattern combines two pieces:

  1. An in-band terminator. The Workflow (or its Activity) publishes a sentinel event the subscriber recognizes and breaks on. In the watchOrder example above, { state: 'completed' } is the minimal form, and the consumer's if (evt.state === 'completed') break is the matching half. Each subscription decides what its own end-of-stream marker is.
  2. A brief overlap before the Workflow returns. A poll Update that is still in flight when the Workflow returns is surfaced to the iterator and consumed silently (the iterator either follows Continue-As-New or exits cleanly), and no new polls can complete after that. If the Workflow returns immediately after publishing the terminator, subscribers may miss it.

There are two ways to provide that overlap.

Fixed sleep (simplest). Sleep between the terminator and the return so any in-flight poll has time to fetch the terminator before the Workflow exits:

import { sleep } from '@temporalio/workflow';

// at the end of the workflow function
status.publish({ state: 'completed', progress: 100 });
await sleep('30 seconds');
return result;

The sleep needs to be long enough to cover the time between when the terminator becomes visible and when the subscriber's next poll reaches the server, including any client-side cooldown and network round-trips. A few hundred milliseconds is tight under realistic conditions; thirty seconds is a generous default. The cost is small: the Workflow Run stays open for that duration but does no other work.

Acknowledgment handshake. The subscriber sends a Signal once it has the terminator; the Workflow waits up to a timeout, returning as soon as the ack arrives:

import { condition, defineSignal, setHandler } from '@temporalio/workflow';
import { WorkflowStream } from '@temporalio/workflow-streams/workflow';

export const subscriberAcknowledgedTerminator = defineSignal('subscriberAcknowledgedTerminator');

export async function chatWorkflow(input: ChatInput): Promise<string> {
const stream = new WorkflowStream();
let subscriberDone = false;
setHandler(subscriberAcknowledgedTerminator, () => {
subscriberDone = true;
});

// ... do work and publish events ...

await condition(() => subscriberDone, '30 seconds');
// Returns true if the ack arrived, false on timeout — either way, fall through.
return result;
}

The timeout is still required because the subscriber may not be attached, or may have gone away. With the ack on top, the typical case (subscriber online) exits as soon as the subscriber confirms receipt, regardless of how long the fallback timeout is. The full pattern is wired into the Stream LLM output example below.

Inspecting terminal status. subscribe() exits cleanly when the Workflow reaches COMPLETED, FAILED, CANCELLED, TERMINATED, or TIMED_OUT, but does not distinguish among them. If your application needs to know which (to display success or failure to the user, log the outcome, or decide whether to retry), call await temporalClient.workflow.getHandle(workflowId).describe() after the loop returns to inspect the Workflow's status.

Continue-As-New

If your Workflow runs for minutes and finishes (a single chat completion, an order pipeline, a one-shot agent), you can skip this section. Continue-As-New becomes relevant for streams that run for hours or accumulate thousands of events, where you need to roll the run over to keep history bounded.

Subscribers automatically follow Continue-As-New chains, so a long-running Workflow can roll over without disrupting active consumers. Workflow IDs are stable across Continue-As-New, so the iterator simply fetches a fresh handle for the same Workflow ID and continues polling from the carried offset. CAN-following requires the client retained from WorkflowStreamClient.create() or fromWithinActivity(); clients constructed directly from a single WorkflowHandle cannot re-target the new run.

To roll a long-running streaming Workflow over without subscribers seeing a gap, carry both your application state and the stream state across the boundary. Add an optional streamState?: WorkflowStreamState field to your Workflow input, pass it to the constructor, and call stream.continueAsNew(buildArgs) to invoke the rollover. The helper drains waiting subscribers, waits for in-flight handlers to finish, then calls continueAsNew with the args produced by buildArgs(postDrainState):

import { workflowInfo } from '@temporalio/workflow';
import { WorkflowStream, type WorkflowStreamState } from '@temporalio/workflow-streams/workflow';

export interface WorkflowInput {
itemsProcessed: number;
streamState?: WorkflowStreamState;
}

export async function longRunningWorkflow(input: WorkflowInput): Promise<void> {
const stream = new WorkflowStream(input.streamState);
let itemsProcessed = input.itemsProcessed;

while (true) {
await doOneIteration(stream);
itemsProcessed++;

if (workflowInfo().continueAsNewSuggested) {
await stream.continueAsNew<typeof longRunningWorkflow>((state) => [
{
itemsProcessed,
streamState: state,
},
]);
}
}
}

The optional streamState? on the input field is required: priorState is undefined on a fresh start and a WorkflowStreamState after a rollover. The buildArgs lambda receives the post-detach WorkflowStreamState as its only argument so the snapshot is guaranteed to happen after pollers detach.

To pass other Continue-As-New parameters such as taskQueue, searchAttributes, or workflowRunTimeout, use the explicit recipe with makeContinueAsNewFunc instead:

import { allHandlersFinished, condition, makeContinueAsNewFunc } from '@temporalio/workflow';

stream.detachPollers();
await condition(allHandlersFinished);
const continueWithOptions = makeContinueAsNewFunc<typeof longRunningWorkflow>({
taskQueue: 'other-tq',
});
await continueWithOptions({
itemsProcessed,
streamState: stream.getState(),
});

The carried WorkflowStreamState includes the entire in-memory log of the previous run, so streams that carry large items can hit Temporal's per-payload size limit at the rollover. (Individual publish Signals and subscribe Update responses can also exceed the limit, but the carried state is the most acute case because it accumulates the full log window.) Offload the bytes via External Storage so each item is a small reference rather than the full payload, and combine that with truncate() to keep the carried log itself small.

Tuning

The most important question when tuning is: how often do you want to update your UI? That answer drives the trade-off between user-perceived latency and the number of history events your Workflow accumulates. The library defaults assume a slow-moving UI; LLM token streaming and other interactive cases need lower latency, which means tuning.

The trade-off is direct. Each batched publish is one Signal, and each subscriber poll is one Update. Each Signal and each Update accumulates against the Workflow's history. A more responsive UI means more messages and more history per second; messages drive workload (and on metered deployments, billing), while history accumulates against Temporal's per-run limits. For long-running streams, plan a Continue-As-New policy from the start.

Settings that matter most

  • batchInterval (default 2 seconds). Maximum time between automatic flushes from the client. Lower it to make the stream feel live; raise it to amortize Signal cost. For an LLM token stream feeding a chat UI, '200 milliseconds' is a good starting point: the user perceives it as live, and a 30-second response generates roughly 150 publish Signals rather than several hundred. Below 100 ms the per-Signal RPC overhead starts to dominate.

For per-publish overrides where one specific event needs lower latency than the batch interval (for example, the first delta of a response so the user sees something fast, or punctuated events like RETRY and STATUS_CHANGE), pass { forceFlush: true } on that publish. Don't make this the default mode: per-token forceFlush: true on a 500-token completion produces 500 publish Signals, which is meaningful but tractable; per-character forceFlush: true is not.

Other settings

You usually do not need to touch these, but they are available when the basic settings are not enough:

  • maxBatchSize (default unbounded). Caps the number of items per batch. With the default, only batchInterval bounds batch size, so a hot publisher can accumulate enough items between intervals that the resulting Signal exceeds Temporal's per-message gRPC payload limit. Set maxBatchSize to bound by item count, or call { forceFlush: true } after each logical chunk to bound by application boundaries (for example, publish per generated sentence in a TTS Activity so each Signal carries one audio chunk). For large items, offload via External Storage so each item is a small reference.
  • pollCooldown (subscriber-side, default 100 ms). The subscriber sleeps for this interval between polls. The cooldown is skipped only when a poll response was capped at the ~1 MB gRPC limit and more items remain (a more_ready flag in the response), so the next poll can drain the rest immediately. That path is an optimization for bursty producers; in the steady state, every poll waits the cooldown before the next. Hold a single iterator and consume from it rather than opening and closing subscriptions in a loop.
  • maxRetryDuration (default 10 minutes). How long the client retries a failed publish batch before giving up and raising FlushTimeoutError. Tune higher if your application can tolerate longer outages while a publisher retries through transient failures; lower if you want failures to surface quickly.
  • publisherTtl (default 15 minutes). How long the Workflow retains per-publisher deduplicate state. At each Continue-As-New, entries older than this are dropped. Tune higher if your publishers can be silent for extended windows.

The last two settings are related. Keep maxRetryDuration < publisherTtl so a long-running retry cannot outlast its dedup record and produce a duplicate when it finally succeeds. If you tune one, tune the other. See Delivery semantics for the full failure model.

Delivery semantics

Exactly-once at the execution layer. Each (publisherId, sequence) batch lands in the Workflow's event log at most once, even if the publisher's underlying Signal is retried by the SDK or the network. Once an event is in the log, every subscriber that polls past its offset will see it, and deduplicate state is carried across Continue-As-New so a retried publish that arrives after a rollover still lands at most once.

Ordering. The log imposes a single total order on all events, fixed once written: an event at offset N stays at offset N on every read. Within one publisher (one WorkflowStreamClient instance, or the Workflow itself), events appear in publish order. Across concurrent publishers, the interleaving is whatever the Workflow saw when serializing inbound Signals; the order is stable once recorded but not under application control. If event A must precede event B, publish them from the same publisher.

Activity retries surface to subscribers. When an Activity that publishes events fails partway through and Temporal retries it, both attempts' events appear in the stream. Concretely: an Activity that publishes three TEXT_DELTA events and then errors, then retries and publishes its full output, will deliver three partial events followed by the complete sequence. The Workflow itself sees only the successful attempt's return value (that's what durable execution hides), but a UI subscribed to the stream will see the partial output unless it dedupes. Consumers must reset or annotate on retry events; the library does not do this automatically.

The conventional pattern is for an Activity that detects it's on a retry attempt to publish a RETRY event with { forceFlush: true }, and for the consumer to clear or annotate prior-attempt output when it sees one. Treat the stream as an append-only log of attempts and let an idempotent consumer reducer reconcile them: overwrite on terminal events like STATUS_CHANGE or TEXT_COMPLETE, or reset an accumulator on a sentinel like AGENT_START before deltas resume. Because the Workflow processes only Activity return values rather than reading the stream itself, its own state stays independent of these retried events.

This is the price of streaming events as they happen rather than waiting for the Workflow's durable view to settle. If the library waited for a successful Activity return before surfacing anything, there would be nothing to stream.

Other failure modes. Events still in a publisher's in-memory client buffer are lost if the process crashes before they ship. Subscribers that handle an item and crash before persisting their next offset will reprocess that item on resume. Build consumer state with both in mind.

Two limits on the deduplication window are worth understanding:

  • publisherTtl (default 15 minutes). Retention for the per-publisher deduplicate state. At each Continue-As-New, deduplicate entries whose lastSeen is older than this are dropped. lastSeen is updated on each successful publish (not on each retry attempt), so a publisher that retries through a long partition without success can still age out. A publisher that returns after a longer pause may produce a duplicate. Tune upward via stream.continueAsNew(buildArgs, { publisherTtl: '...' }) if your publishers can be silent for extended windows.

  • maxRetryDuration (default 10 minutes). A WorkflowStreamClient retries a failed batch for up to this long. If the duration elapses with the batch still pending (for example, during a sustained network partition), the client gives up, the pending batch is dropped, and a FlushTimeoutError is raised.

    On timeout, the dropped batch is at-most-once: it may or may not have reached the Workflow. Subsequent publishes resume cleanly with the next sequence. One operational caveat: the FlushTimeoutError is raised from inside the background flusher task and terminates it. Until you call await client.flush() or exit the await using scope, subsequent publishes accumulate in the buffer with no flusher to ship them.

The two limits must satisfy maxRetryDuration < publisherTtl. If a publisher's retry window exceeds the dedup retention, the dedup state for that publisher can age out (at the next Continue-As-New) before the retry lands. A retry that arrives after its dedup record has been pruned is treated as a fresh publish, and if the original delivery had also succeeded, the same logical batch lands twice. The defaults (10 minutes < 15 minutes) satisfy this; if you tune one, tune the other to preserve the relationship.

Architecture

The user-facing API hides three pieces of machinery worth understanding when you tune throughput, debug delivery, or reason about history size.

Append-only log inside the Workflow. WorkflowStream keeps an in-memory list of (topic, data) entries inside the Workflow's state, each with a monotonically increasing global offset. Subscribers maintain their own cursor and on each long-poll receive the next range past it. Because the log lives in Workflow state, it is replay-safe and is carried across Continue-As-New via WorkflowStreamState.

Two mechanisms bound log growth, and they do different jobs:

  • stream.truncate(upToOffset) drops entries from the in-memory log (and therefore from the carried Continue-As-New payload). It does not remove publish Signals already recorded in history. Calling with an offset past the end raises an ApplicationFailure with type: 'TruncateOutOfRange'.
  • Continue-As-New starts a fresh history. This is the only way to shrink history; truncate alone cannot.

A subscriber whose offset falls below the new base after a truncate() is silently advanced. Internally, the poll raises ApplicationFailure with type: 'TruncatedOffset'; the TypeScript client catches it and resets to offset 0, which the Workflow reads as "from the current base." The iterator does not raise, but the subscriber may re-receive items already in the log past the new base. Applications that depend on seeing every event exactly once must keep subscribers ahead of truncation or implement their own gap and re-delivery handling using item.offset.

Wire-level handlers. The three handlers registered when you construct a WorkflowStream are __temporal_workflow_stream_publish (the Signal that receives batched publishes), __temporal_workflow_stream_poll (the long-poll Update that subscribers use), and __temporal_workflow_stream_offset (the Query that reports the current head offset). Poll responses are capped at roughly 1 MB by accumulating items until the next would exceed the budget, so high-throughput producers see a steady stream of small batches. A single item that exceeds 1 MB on its own is admitted unconditionally; offload large items via External Storage so each item is a small reference. Each item's wire data is a base64-encoded temporal.api.common.v1.Payload protobuf, preserving payload metadata for typed decode and cross-language interop.

Batching and deduplicating. Every batch carries a unique identifier (the client's id paired with a monotonic batch sequence number), so a Signal retried by the SDK or the network deduplicates to a single landing in the Workflow's event log. Deduplicate state is part of the Workflow's carried state, so the guarantee survives Continue-As-New (subject to publisherTtl).

This dedup applies at the Signal layer, not the Activity layer. An Activity retry is a different concept from a publish retry: when Temporal retries the Activity, the retried execution constructs a new WorkflowStreamClient with its own client id, so from the stream's perspective every attempt is a fresh publisher whose batches will not deduplicate against the prior attempt's. That is why retried-attempt events appear in the stream alongside the successful attempt's output.

Gotchas

A few details worth knowing about, mostly relevant if you're writing custom message handlers or pushing the library to its limits.

  • WorkflowStreamClient is single-event-loop. The client buffer is mutated on the publish path and read from the background flusher inside one Node event loop. Don't call publish() from a Worker thread; route events back to the loop that owns the client.
  • Constructing two WorkflowStreams silently replaces handlers. The TypeScript Workflow runtime does not expose an inspection API for existing handlers, so the library cannot raise on a duplicate the way the Python SDK does. Construct exactly one WorkflowStream per Workflow at the top of the function.
  • Type bindings aren't shared across publishers. Each WorkflowStream and each WorkflowStreamClient records topic types only for its own instance, and the type parameter T is erased at compile time, so no runtime check enforces uniformity. If two publishers bind the same topic name to different types, the mismatch is not caught at publish, and the subscriber gets a decode error when it processes events from the mismatched publisher.
  • Custom payload converters. A WorkflowStreamClient created via WorkflowStreamClient.create(client, ...) picks up the client's configured payload converter; subscribers decode through the same converter. The Workflow side always uses defaultPayloadConverter. If you ship a custom converter, make sure both sides agree, or stick to types the default converter handles.
  • Cross-realm Uint8Array for binary publishes. Hand-publishing a Uint8Array from Workflow code uses a dedicated code path that constructs a binary/plain Payload directly, because the sandbox's TextEncoder returns a host-realm Uint8Array that fails instanceof checks against the sandbox's own globals. You generally don't need to think about this, but if you bypass the workflow-side handle and construct payloads manually, prefer the workflow-side WorkflowStream API rather than building payloads by hand.

Application: Stream LLM output

The headline use case fits the publish/subscribe shapes documented above. An Activity calls the model and publishes deltas as they arrive; the Workflow kicks off the Activity and waits for the consumer to acknowledge end-of-stream; the consumer subscribes, accumulates the deltas, and clears its accumulated state on RETRY before continuing. The shape works for a terminal client, a desktop UI, or a Server-Sent Events (SSE) endpoint forwarding to a browser; whatever holds the displayed state calls render() to display it.

If your Activity can retry, the consumer side has to account for it: a retried attempt is a fresh publisher, so its output appears in the stream alongside the previous attempt's. In the LLM streaming pattern below, that means the failed attempt's partial deltas and the retried attempt's full output both reach a subscribed UI unless the UI resets on a RETRY event. The example wires up that pattern; see Delivery semantics for the precise guarantees.

// activities.ts
import { Context } from '@temporalio/activity';
import { WorkflowStreamClient } from '@temporalio/workflow-streams/client';
import OpenAI from 'openai';

export interface TextDelta {
text: string;
}

export interface RetryEvent {
attempt: number;
}

export async function streamCompletion(prompt: string): Promise<string> {
const attempt = Context.current().info.attempt;
await using streamClient = WorkflowStreamClient.fromWithinActivity({
batchInterval: '200 milliseconds',
});
// Disable provider-side retries; let Temporal own retry policy at the Activity layer.
const openai = new OpenAI({ maxRetries: 0 });

const deltas = streamClient.topic<TextDelta>('delta');
const retry = streamClient.topic<RetryEvent>('retry');
const close = streamClient.topic<Record<string, never>>('close');

// Tell consumers an earlier attempt's deltas are stale.
if (attempt > 1) {
retry.publish({ attempt }, { forceFlush: true });
}

const oaiStream = await openai.chat.completions.create({
model: 'gpt-4o-mini',
messages: [{ role: 'user', content: prompt }],
stream: true,
});

const full: string[] = [];
let first = true;
for await (const chunk of oaiStream) {
const text = chunk.choices[0]?.delta?.content;
if (!text) continue;
// forceFlush only on the first delta so the user sees something
// immediately; subsequent deltas batch at the 200 ms interval.
deltas.publish({ text }, first ? { forceFlush: true } : undefined);
first = false;
full.push(text);
}
close.publish({});
return full.join('');
}
// workflows.ts
import { condition, defineSignal, executeActivity, setHandler } from '@temporalio/workflow';
import { WorkflowStream } from '@temporalio/workflow-streams/workflow';
import type * as activities from './activities';

export const subscriberAcknowledgedTerminator = defineSignal('subscriberAcknowledgedTerminator');

export interface ChatInput {
prompt: string;
}

export async function chatWorkflow(input: ChatInput): Promise<string> {
const stream = new WorkflowStream();
let subscriberDone = false;
setHandler(subscriberAcknowledgedTerminator, () => {
subscriberDone = true;
});

const result = await executeActivity<typeof activities.streamCompletion>('streamCompletion', input.prompt, {
startToCloseTimeout: '5 minutes',
});

// Wait for the subscriber to ack the terminal `close` event. The timeout
// is a fallback for when no subscriber is attached; with the ack, the
// typical case exits as soon as the subscriber confirms.
await condition(() => subscriberDone, '30 seconds');
return result;
}
// consumer.ts: accumulates the model's output and resets on retry
import { Client } from '@temporalio/client';
import { defaultPayloadConverter } from '@temporalio/common';
import { WorkflowStreamClient } from '@temporalio/workflow-streams/client';
import { subscriberAcknowledgedTerminator } from './workflows';

export async function streamChat(chatId: string): Promise<string> {
const temporalClient = new Client();
// Subscribe-only; no `await using` needed because the flusher only runs for publishers.
const stream = WorkflowStreamClient.create(temporalClient, chatId);
const output: string[] = [];

function render(): void {
// ... display the accumulated output (terminal redraw, UI update, etc.)
}

for await (const item of stream.subscribe(['delta', 'retry', 'close'])) {
if (item.topic === 'retry') {
// Earlier attempt's deltas are stale; drop what we've shown.
output.length = 0;
render();
} else if (item.topic === 'delta') {
const delta = defaultPayloadConverter.fromPayload<TextDelta>(item.data);
output.push(delta.text);
render();
} else if (item.topic === 'close') {
// Acknowledge so the Workflow can return without waiting on the fallback timeout.
await temporalClient.workflow.getHandle(chatId).signal(subscriberAcknowledgedTerminator);
break;
}
}

return output.join('');
}

A few choices in this shape are deliberate:

  • The Activity is the publisher because it owns the non-deterministic LLM call. The Workflow processes only the Activity's return value, never reading its own stream — see Publish from a client for why.
  • The Activity publishes a RETRY event when Context.current().info.attempt > 1. This lets the UI respond appropriately to the failure, typically by clearing accumulated deltas before the next attempt's deltas arrive (see Delivery semantics).
  • Termination uses an ack handshake: the consumer signals the Workflow once it has received the close event, so the Workflow can return as soon as the subscriber confirms. The condition timeout is the fallback when no subscriber is attached (see Closing the stream for the simpler fixed-sleep alternative).
  • { forceFlush: true } is used only on the first delta and on the RETRY sentinel, where latency matters. Subsequent deltas batch at the 200 ms batchInterval; per-delta forceFlush: true would generate one Signal per token (see Tuning for the trade-off).

See also