Streaming SSE from Durable Endpoints Developer Preview
Durable Endpoints can stream data back to clients in real-time using Server-Sent Events (SSE). This lets you stream AI inference tokens, progress updates, or any other data — while keeping the durability guarantees of durable steps.
Streaming works across multiple steps within a single endpoint invocation, and handles the transition from sync to async mode seamlessly. If a step fails and retries, any data streamed during that step is automatically rolled back on the client.
Streaming SSE from Durable Endpoints is currently only available in the TypeScript SDK. This guide assumes you've already set up a Durable Endpoint.
When to use streaming
- AI inference — Stream LLM tokens to the browser as they're generated, so users see results immediately.
- Status updates — Send progress messages during long-running endpoint executions.
- Making existing streaming endpoints durable — Wrap your existing streaming HTTP endpoints with steps to add retry and observability at no cost to functionality.
If you don't need to stream data directly to an HTTP client, consider using Realtime to push updates from background Inngest functions via pub/sub channels.
Quick start
Server
Import step from inngest and stream from inngest/experimental/durable-endpoints, then use stream.push() or stream.pipe() inside your endpoint handler:
import Anthropic from "@anthropic-ai/sdk";
import { step } from "inngest";
import { stream } from "inngest/experimental/durable-endpoints";
import { inngest } from "@/inngest";
export const GET = inngest.endpoint(async () => {
// Option A: push() with an SDK event callback
const text = await step.run("generate", async () => {
stream.push("Generating...\n");
const client = new Anthropic();
const response = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 512,
messages: [{ role: "user", content: "Write a haiku about durability." }],
});
response.on("text", (token) => stream.push(token));
return await response.finalText();
});
// Option B: pipe() — streams each chunk AND returns the collected text
await step.run("translate", async () => {
stream.push(`\nTranslating...\n`);
const client = new Anthropic();
const response = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 256,
messages: [{ role: "user", content: `Translate to French: ${text}` }],
});
return stream.pipe(async function* () {
for await (const event of response) {
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
) {
yield event.delta.text;
}
}
});
});
return Response.json("\nDone!");
});
Client
Use fetchWithStream() from inngest/experimental/durable-endpoints/client to consume the stream. It connects to your endpoint, parses the SSE frames, follows any sync-to-async redirects, and manages commit/rollback — all automatically.
Each stream.push() call or stream.pipe() yield on the server fires the onData callback on the client with that chunk. Using the server example above, the client would receive chunks in this order: "Generating...\n", then each LLM token from the generate step, then "\nTranslating...\n", then each LLM token from the translate step. The final "\nDone!" is available as the returned Response body.
"use client";
import { useState, useRef } from "react";
import { fetchWithStream } from "inngest/experimental/durable-endpoints/client";
export default function Generate() {
const [chunks, setChunks] = useState<string[]>([]);
const uncommittedCountRef = useRef(0);
async function run() {
setChunks([]);
uncommittedCountRef.current = 0;
const resp = await fetchWithStream("/api/generate", {
onData: ({ data }) => {
if (typeof data === "string") {
uncommittedCountRef.current++;
setChunks((prev) => [...prev, data]);
}
},
onRollback: () => {
// A step failed and will retry — remove the chunks it produced
const count = uncommittedCountRef.current;
setChunks((prev) => prev.slice(0, prev.length - count));
uncommittedCountRef.current = 0;
},
onCommit: () => {
// Step completed — its chunks are now permanent
uncommittedCountRef.current = 0;
},
onStreamError: (error) => {
setChunks((prev) => [...prev, `Error: ${error}`]);
},
});
// The endpoint's return value is available as the Response body
const result = await resp.text();
setChunks((prev) => [...prev, result]);
}
return (
<div>
<button onClick={run}>Generate</button>
<pre>{chunks.join("")}</pre>
</div>
);
}
Server API
stream.push(data)
Send a single chunk of data to the client as an SSE frame.
stream.push("Loading...");
stream.push({ progress: 50, message: "Halfway there" });
- Accepts any JSON-serializable value.
- Fire-and-forget — does not return a value or block execution.
- Synchronous from the caller's perspective.
- Silent no-op outside of an Inngest execution context, so your code works the same when called outside of a durable endpoint.
push() is ideal for one-off status messages. It can also be used with provider SDK event callbacks for streaming, though pipe() is usually simpler for that use case:
// Alternative to pipe() — use push() with an event callback
const response = client.messages.stream({ /* ... */ });
response.on("text", (text) => stream.push(text));
const fullText = await response.finalText();
stream.pipe(source)
Pipe a stream of data to the client. Each chunk from the source is sent as an SSE frame in real-time. When the source is fully consumed, pipe() resolves with the concatenated text of all chunks — so it both streams to the client and collects the result for you.
The simplest case is piping a ReadableStream, like a fetch response body:
const response = await fetch("https://api.example.com/stream");
const text = await stream.pipe(response.body);
// `text` contains the full response; the client received it chunk by chunk
When you need to transform or filter chunks before they're sent, pass an async generator function. Each yield sends one chunk to the client:
const text = await stream.pipe(async function* () {
for await (const event of response) {
// Only yield the parts you want the client to see
if (event.type === "content_block_delta") {
yield event.delta.text;
}
}
});
pipe() accepts three source types:
ReadableStream— piped directly, decoded from bytes to string chunks.AsyncIterable<string>— each value in the iterable becomes a chunk.() => AsyncIterable<string>— a function that returns an async iterable. This is what lets you passasync function*generators directly topipe().
Outside of an Inngest execution context, pipe() resolves with an empty string.
Client API
fetchWithStream(url, options)
The primary way to consume a streaming Durable Endpoint. Import it from inngest/experimental/durable-endpoints/client:
import { fetchWithStream } from "inngest/experimental/durable-endpoints/client";
fetchWithStream() returns a Promise<Response> — await it to drive the stream to completion. When the endpoint finishes, the returned Response contains the endpoint's final return value. If the endpoint does not use streaming, fetchWithStream() returns the raw Response as-is.
The core callbacks handle the majority of streaming use cases:
onData({ data, hashedStepId })— Called for each chunk. Eachstream.push()call orstream.pipe()yield on the server produces oneonDatacall on the client.datais the deserialized value;hashedStepIdidentifies which step produced it (ornullif streamed outside a step).onRollback({ hashedStepId })— Called when a step fails and will retry. Your code is responsible for tracking and removing the chunks produced by that step (see the Quick start example for a pattern using a ref counter).onCommit({ hashedStepId })— Called when a step completes successfully. Chunks from that step are now permanent and will never be rolled back.
Because stream.push() accepts any JSON-serializable value, data in the onData callback is typed as unknown. Narrow the type in your callback as needed:
const uncommittedCount = { current: 0 };
const resp = await fetchWithStream("/api/generate", {
onData: ({ data }) => {
if (typeof data === "string") {
uncommittedCount.current++;
console.log("Chunk:", data);
}
},
onRollback: () => {
// Discard uncommitted chunks and reset counter
uncommittedCount.current = 0;
},
onCommit: () => {
// Chunks are permanent — reset counter
uncommittedCount.current = 0;
},
});
const result = await resp.text();
Additional options
| Option | Type | Description |
|---|---|---|
fetch | typeof fetch | Custom fetch implementation. |
fetchOpts | RequestInit | Options passed to the underlying fetch call (e.g. { signal } for cancellation). |
onMetadata | (args: { runId: string }) => void | Run metadata received (always first). |
onStreamError | (error: string) => void | Terminal stream error (the endpoint failed with a non-retriable error). |
onDone | () => void | Stream fully consumed. |
How it works
Sync-to-async transitions
When a client calls a streaming Durable Endpoint, the SSE stream flows directly from your app to the client. If the endpoint needs to go async (e.g. due to step.sleep(), step.waitForEvent(), or a retry), the SDK sends a redirect frame telling the client where to reconnect, and the stream continues through the Inngest server.
fetchWithStream() handles this redirect automatically — the client sees a single continuous stream regardless of sync-to-async transitions.
Streaming activation
Streaming is activated lazily. The endpoint only sends an SSE response if:
- The client sends the
Accept: text/event-streamheader (whichfetchWithStream()does automatically), and - Your code calls
stream.push()orstream.pipe()during execution.
If neither push() nor pipe() is called, the endpoint behaves like a regular non-streaming Durable Endpoint.
Rollback on retry
When a step fails and retries, any data streamed during that step is automatically rolled back on the client:
- Each chunk is tagged with the step that produced it (via
hashedStepId). - When a step completes successfully, an
inngest.commitevent is sent and theonCommitcallback fires — those chunks can never be rolled back. - When a step errors, an
inngest.rollbackevent is sent and theonRollbackcallback fires. Your client code is responsible for discarding the uncommitted chunks from that step. - On the retry attempt, the step streams fresh data that replaces what was rolled back.
Data streamed outside of a step.run() is never rolled back.
SSE frame types
The stream uses SSE with these frame types. The inngest.* frames are internal protocol frames handled by fetchWithStream() automatically — only inngest.stream frames contain user data.
| Event name | Payload | Purpose |
|---|---|---|
inngest.metadata | { runId } | Always first. Identifies the run. |
inngest.stream | { data, stepId? } | User data from push() / pipe(). |
inngest.commit | { hashedStepId } | Step succeeded — its streamed data is permanent. |
inngest.rollback | { hashedStepId } | Step failed — discard its uncommitted data. |
inngest.redirect_info | { runId, url } | Tells the client to reconnect for async continuation. |
inngest.response | { status, response: { body, headers, statusCode } } | Terminal frame — closes the stream. |
Limitations
Streaming SSE from Durable Endpoints is currently in developer preview. In addition to any general Durable Endpoint limitations, the following apply:
- 15 minute timeout — Client connections time out after 15 minutes, meaning your endpoint should complete within this window (including any retries) to ensure the stream is delivered end-to-end.
- No rollback outside of steps — Data streamed outside of a
step.run()is never rolled back. If you need rollback guarantees, stream from within a step. - One streaming parallel step — You can stream from at most one parallel step. Streaming from multiple parallel steps will result in interleaved output that cannot be disambiguated by the client.
- No streaming from child functions —
step.invoke()calls cannot stream data back to the parent function's client. - Raw
Responseobjects may be lost on async transition — If your endpoint returns aResponse(like a file download) and goes async, the Response is lost because it can't be memoized. Usestream.push()orstream.pipe()instead.
SDK support
| SDK | Support | Version |
|---|---|---|
| TypeScript | Developer Preview | >= 4.x (with endpointAdapter) |