feat: Complete zCode CLI X with Telegram bot integration

- Add full Telegram bot functionality with Z.AI API integration
- Implement 4 tools: Bash, FileEdit, WebSearch, Git
- Add 3 agents: Code Reviewer, Architect, DevOps Engineer
- Add 6 skills for common coding tasks
- Add systemd service file for 24/7 operation
- Add nginx configuration for HTTPS webhook
- Add comprehensive documentation
- Implement WebSocket server for real-time updates
- Add logging system with Winston
- Add environment validation

🤖 zCode CLI X - Agentic coder with Z.AI + Telegram integration
This commit is contained in:
admin
2026-05-05 09:01:26 +00:00
Unverified
parent 4a7035dd92
commit 875c7f9b91
24688 changed files with 3224957 additions and 221 deletions

View File

@@ -0,0 +1,4 @@
File generated from our OpenAPI spec by Stainless.
This directory can be used to store custom files to expand the SDK.
It is ignored by Stainless code generation and its content (other than this keep file) won't be touched.

View File

@@ -0,0 +1,764 @@
import { partialParse } from '../_vendor/partial-json-parser/parser';
import type { Logger } from '../client';
import { AnthropicError, APIUserAbortError } from '../error';
import { isAbortError } from '../internal/errors';
import { type RequestOptions } from '../internal/request-options';
import {
type BetaContentBlock,
type BetaMCPToolUseBlock,
type BetaMessage,
type BetaMessageParam,
Messages as BetaMessages,
type BetaRawMessageStreamEvent as BetaMessageStreamEvent,
type BetaServerToolUseBlock,
type BetaTextBlock,
type BetaTextCitation,
type BetaToolUseBlock,
type MessageCreateParams,
type MessageCreateParamsBase,
MessageCreateParamsStreaming,
} from '../resources/beta/messages/messages';
import { Stream } from '../streaming';
import { maybeParseBetaMessage, type ParsedBetaMessage } from './beta-parser';
export interface MessageStreamEvents {
connect: () => void;
streamEvent: (event: BetaMessageStreamEvent, snapshot: BetaMessage) => void;
text: (textDelta: string, textSnapshot: string) => void;
citation: (citation: BetaTextCitation, citationsSnapshot: BetaTextCitation[]) => void;
inputJson: (partialJson: string, jsonSnapshot: unknown) => void;
thinking: (thinkingDelta: string, thinkingSnapshot: string) => void;
signature: (signature: string) => void;
compaction: (compactedContent: string) => void;
message: (message: BetaMessage) => void;
contentBlock: (content: BetaContentBlock) => void;
finalMessage: (message: BetaMessage) => void;
error: (error: AnthropicError) => void;
abort: (error: APIUserAbortError) => void;
end: () => void;
}
type MessageStreamEventListeners<Event extends keyof MessageStreamEvents> = {
listener: MessageStreamEvents[Event];
once?: boolean;
}[];
const JSON_BUF_PROPERTY = '__json_buf';
export type TracksToolInput = BetaToolUseBlock | BetaServerToolUseBlock | BetaMCPToolUseBlock;
function tracksToolInput(content: BetaContentBlock): content is TracksToolInput {
return content.type === 'tool_use' || content.type === 'server_tool_use' || content.type === 'mcp_tool_use';
}
export class BetaMessageStream<ParsedT = null> implements AsyncIterable<BetaMessageStreamEvent> {
messages: BetaMessageParam[] = [];
receivedMessages: ParsedBetaMessage<ParsedT>[] = [];
#currentMessageSnapshot: BetaMessage | undefined;
#params: MessageCreateParams | null = null;
controller: AbortController = new AbortController();
#connectedPromise: Promise<Response | null>;
#resolveConnectedPromise: (response: Response | null) => void = () => {};
#rejectConnectedPromise: (error: AnthropicError) => void = () => {};
#endPromise: Promise<void>;
#resolveEndPromise: () => void = () => {};
#rejectEndPromise: (error: AnthropicError) => void = () => {};
#listeners: { [Event in keyof MessageStreamEvents]?: MessageStreamEventListeners<Event> } = {};
#ended = false;
#errored = false;
#aborted = false;
#catchingPromiseCreated = false;
#response: Response | null | undefined;
#request_id: string | null | undefined;
#logger: Logger;
constructor(params: MessageCreateParamsBase | null, opts?: { logger?: Logger | undefined }) {
this.#connectedPromise = new Promise<Response | null>((resolve, reject) => {
this.#resolveConnectedPromise = resolve;
this.#rejectConnectedPromise = reject;
});
this.#endPromise = new Promise<void>((resolve, reject) => {
this.#resolveEndPromise = resolve;
this.#rejectEndPromise = reject;
});
// Don't let these promises cause unhandled rejection errors.
// we will manually cause an unhandled rejection error later
// if the user hasn't registered any error listener or called
// any promise-returning method.
this.#connectedPromise.catch(() => {});
this.#endPromise.catch(() => {});
this.#params = params;
this.#logger = opts?.logger ?? console;
}
get response(): Response | null | undefined {
return this.#response;
}
get request_id(): string | null | undefined {
return this.#request_id;
}
/**
* Returns the `MessageStream` data, the raw `Response` instance and the ID of the request,
* returned vie the `request-id` header which is useful for debugging requests and resporting
* issues to Anthropic.
*
* This is the same as the `APIPromise.withResponse()` method.
*
* This method will raise an error if you created the stream using `MessageStream.fromReadableStream`
* as no `Response` is available.
*/
async withResponse(): Promise<{
data: BetaMessageStream<ParsedT>;
response: Response;
request_id: string | null | undefined;
}> {
this.#catchingPromiseCreated = true;
const response = await this.#connectedPromise;
if (!response) {
throw new Error('Could not resolve a `Response` object');
}
return {
data: this,
response,
request_id: response.headers.get('request-id'),
};
}
/**
* Intended for use on the frontend, consuming a stream produced with
* `.toReadableStream()` on the backend.
*
* Note that messages sent to the model do not appear in `.on('message')`
* in this context.
*/
static fromReadableStream(stream: ReadableStream): BetaMessageStream {
const runner = new BetaMessageStream(null);
runner._run(() => runner._fromReadableStream(stream));
return runner;
}
static createMessage<ParsedT>(
messages: BetaMessages,
params: MessageCreateParamsBase,
options?: RequestOptions,
{ logger }: { logger?: Logger | undefined } = {},
): BetaMessageStream<ParsedT> {
const runner = new BetaMessageStream<ParsedT>(params as MessageCreateParamsStreaming, { logger });
for (const message of params.messages) {
runner._addMessageParam(message);
}
runner.#params = { ...params, stream: true };
runner._run(() =>
runner._createMessage(
messages,
{ ...params, stream: true },
{ ...options, headers: { ...options?.headers, 'X-Stainless-Helper-Method': 'stream' } },
),
);
return runner;
}
protected _run(executor: () => Promise<any>) {
executor().then(() => {
this._emitFinal();
this._emit('end');
}, this.#handleError);
}
protected _addMessageParam(message: BetaMessageParam) {
this.messages.push(message);
}
protected _addMessage(message: ParsedBetaMessage<ParsedT>, emit = true) {
this.receivedMessages.push(message);
if (emit) {
this._emit('message', message);
}
}
protected async _createMessage(
messages: BetaMessages,
params: MessageCreateParams,
options?: RequestOptions,
): Promise<void> {
const signal = options?.signal;
let abortHandler: (() => void) | undefined;
if (signal) {
if (signal.aborted) this.controller.abort();
abortHandler = this.controller.abort.bind(this.controller);
signal.addEventListener('abort', abortHandler);
}
try {
this.#beginRequest();
const { response, data: stream } = await messages
.create({ ...params, stream: true }, { ...options, signal: this.controller.signal })
.withResponse();
this._connected(response);
for await (const event of stream) {
this.#addStreamEvent(event);
}
if (stream.controller.signal?.aborted) {
throw new APIUserAbortError();
}
this.#endRequest();
} finally {
if (signal && abortHandler) {
signal.removeEventListener('abort', abortHandler);
}
}
}
protected _connected(response: Response | null) {
if (this.ended) return;
this.#response = response;
this.#request_id = response?.headers.get('request-id');
this.#resolveConnectedPromise(response);
this._emit('connect');
}
get ended(): boolean {
return this.#ended;
}
get errored(): boolean {
return this.#errored;
}
get aborted(): boolean {
return this.#aborted;
}
abort() {
this.controller.abort();
}
/**
* Adds the listener function to the end of the listeners array for the event.
* No checks are made to see if the listener has already been added. Multiple calls passing
* the same combination of event and listener will result in the listener being added, and
* called, multiple times.
* @returns this MessageStream, so that calls can be chained
*/
on<Event extends keyof MessageStreamEvents>(event: Event, listener: MessageStreamEvents[Event]): this {
const listeners: MessageStreamEventListeners<Event> =
this.#listeners[event] || (this.#listeners[event] = []);
listeners.push({ listener });
return this;
}
/**
* Removes the specified listener from the listener array for the event.
* off() will remove, at most, one instance of a listener from the listener array. If any single
* listener has been added multiple times to the listener array for the specified event, then
* off() must be called multiple times to remove each instance.
* @returns this MessageStream, so that calls can be chained
*/
off<Event extends keyof MessageStreamEvents>(event: Event, listener: MessageStreamEvents[Event]): this {
const listeners = this.#listeners[event];
if (!listeners) return this;
const index = listeners.findIndex((l) => l.listener === listener);
if (index >= 0) listeners.splice(index, 1);
return this;
}
/**
* Adds a one-time listener function for the event. The next time the event is triggered,
* this listener is removed and then invoked.
* @returns this MessageStream, so that calls can be chained
*/
once<Event extends keyof MessageStreamEvents>(event: Event, listener: MessageStreamEvents[Event]): this {
const listeners: MessageStreamEventListeners<Event> =
this.#listeners[event] || (this.#listeners[event] = []);
listeners.push({ listener, once: true });
return this;
}
/**
* This is similar to `.once()`, but returns a Promise that resolves the next time
* the event is triggered, instead of calling a listener callback.
* @returns a Promise that resolves the next time given event is triggered,
* or rejects if an error is emitted. (If you request the 'error' event,
* returns a promise that resolves with the error).
*
* Example:
*
* const message = await stream.emitted('message') // rejects if the stream errors
*/
emitted<Event extends keyof MessageStreamEvents>(
event: Event,
): Promise<
Parameters<MessageStreamEvents[Event]> extends [infer Param] ? Param
: Parameters<MessageStreamEvents[Event]> extends [] ? void
: Parameters<MessageStreamEvents[Event]>
> {
return new Promise((resolve, reject) => {
this.#catchingPromiseCreated = true;
if (event !== 'error') this.once('error', reject);
this.once(event, resolve as any);
});
}
async done(): Promise<void> {
this.#catchingPromiseCreated = true;
await this.#endPromise;
}
get currentMessage(): BetaMessage | undefined {
return this.#currentMessageSnapshot;
}
#getFinalMessage(): ParsedBetaMessage<ParsedT> {
if (this.receivedMessages.length === 0) {
throw new AnthropicError('stream ended without producing a Message with role=assistant');
}
return this.receivedMessages.at(-1)!;
}
/**
* @returns a promise that resolves with the the final assistant Message response,
* or rejects if an error occurred or the stream ended prematurely without producing a Message.
* If structured outputs were used, this will be a ParsedMessage with a `parsed` field.
*/
async finalMessage(): Promise<ParsedBetaMessage<ParsedT>> {
await this.done();
return this.#getFinalMessage();
}
#getFinalText(): string {
if (this.receivedMessages.length === 0) {
throw new AnthropicError('stream ended without producing a Message with role=assistant');
}
const textBlocks = this.receivedMessages
.at(-1)!
.content.filter((block): block is BetaTextBlock => block.type === 'text')
.map((block) => block.text);
if (textBlocks.length === 0) {
throw new AnthropicError('stream ended without producing a content block with type=text');
}
return textBlocks.join(' ');
}
/**
* @returns a promise that resolves with the the final assistant Message's text response, concatenated
* together if there are more than one text blocks.
* Rejects if an error occurred or the stream ended prematurely without producing a Message.
*/
async finalText(): Promise<string> {
await this.done();
return this.#getFinalText();
}
#handleError = (error: unknown) => {
this.#errored = true;
if (isAbortError(error)) {
error = new APIUserAbortError();
}
if (error instanceof APIUserAbortError) {
this.#aborted = true;
return this._emit('abort', error);
}
if (error instanceof AnthropicError) {
return this._emit('error', error);
}
if (error instanceof Error) {
const anthropicError: AnthropicError = new AnthropicError(error.message);
// @ts-ignore
anthropicError.cause = error;
return this._emit('error', anthropicError);
}
return this._emit('error', new AnthropicError(String(error)));
};
protected _emit<Event extends keyof MessageStreamEvents>(
event: Event,
...args: Parameters<MessageStreamEvents[Event]>
) {
// make sure we don't emit any MessageStreamEvents after end
if (this.#ended) return;
if (event === 'end') {
this.#ended = true;
this.#resolveEndPromise();
}
const listeners: MessageStreamEventListeners<Event> | undefined = this.#listeners[event];
if (listeners) {
this.#listeners[event] = listeners.filter((l) => !l.once) as any;
listeners.forEach(({ listener }: any) => listener(...args));
}
if (event === 'abort') {
const error = args[0] as APIUserAbortError;
if (!this.#catchingPromiseCreated && !listeners?.length) {
Promise.reject(error);
}
this.#rejectConnectedPromise(error);
this.#rejectEndPromise(error);
this._emit('end');
return;
}
if (event === 'error') {
// NOTE: _emit('error', error) should only be called from #handleError().
const error = args[0] as AnthropicError;
if (!this.#catchingPromiseCreated && !listeners?.length) {
// Trigger an unhandled rejection if the user hasn't registered any error handlers.
// If you are seeing stack traces here, make sure to handle errors via either:
// - runner.on('error', () => ...)
// - await runner.done()
// - await runner.final...()
// - etc.
Promise.reject(error);
}
this.#rejectConnectedPromise(error);
this.#rejectEndPromise(error);
this._emit('end');
}
}
protected _emitFinal() {
const finalMessage = this.receivedMessages.at(-1);
if (finalMessage) {
this._emit('finalMessage', this.#getFinalMessage());
}
}
#beginRequest() {
if (this.ended) return;
this.#currentMessageSnapshot = undefined;
}
#addStreamEvent(event: BetaMessageStreamEvent) {
if (this.ended) return;
const messageSnapshot = this.#accumulateMessage(event);
this._emit('streamEvent', event, messageSnapshot);
switch (event.type) {
case 'content_block_delta': {
const content = messageSnapshot.content.at(-1)!;
switch (event.delta.type) {
case 'text_delta': {
if (content.type === 'text') {
this._emit('text', event.delta.text, content.text || '');
}
break;
}
case 'citations_delta': {
if (content.type === 'text') {
this._emit('citation', event.delta.citation, content.citations ?? []);
}
break;
}
case 'input_json_delta': {
if (tracksToolInput(content) && content.input) {
this._emit('inputJson', event.delta.partial_json, content.input);
}
break;
}
case 'thinking_delta': {
if (content.type === 'thinking') {
this._emit('thinking', event.delta.thinking, content.thinking);
}
break;
}
case 'signature_delta': {
if (content.type === 'thinking') {
this._emit('signature', content.signature);
}
break;
}
case 'compaction_delta': {
if (content.type === 'compaction' && content.content) {
this._emit('compaction', content.content);
}
break;
}
default:
checkNever(event.delta);
}
break;
}
case 'message_stop': {
this._addMessageParam(messageSnapshot);
this._addMessage(
maybeParseBetaMessage(messageSnapshot, this.#params, { logger: this.#logger }),
true,
);
break;
}
case 'content_block_stop': {
this._emit('contentBlock', messageSnapshot.content.at(-1)!);
break;
}
case 'message_start': {
this.#currentMessageSnapshot = messageSnapshot;
break;
}
case 'content_block_start':
case 'message_delta':
break;
}
}
#endRequest(): ParsedBetaMessage<ParsedT> {
if (this.ended) {
throw new AnthropicError(`stream has ended, this shouldn't happen`);
}
const snapshot = this.#currentMessageSnapshot;
if (!snapshot) {
throw new AnthropicError(`request ended without sending any chunks`);
}
this.#currentMessageSnapshot = undefined;
return maybeParseBetaMessage(snapshot, this.#params, { logger: this.#logger });
}
protected async _fromReadableStream(
readableStream: ReadableStream,
options?: RequestOptions,
): Promise<void> {
const signal = options?.signal;
let abortHandler: (() => void) | undefined;
if (signal) {
if (signal.aborted) this.controller.abort();
abortHandler = this.controller.abort.bind(this.controller);
signal.addEventListener('abort', abortHandler);
}
try {
this.#beginRequest();
this._connected(null);
const stream = Stream.fromReadableStream<BetaMessageStreamEvent>(readableStream, this.controller);
for await (const event of stream) {
this.#addStreamEvent(event);
}
if (stream.controller.signal?.aborted) {
throw new APIUserAbortError();
}
this.#endRequest();
} finally {
if (signal && abortHandler) {
signal.removeEventListener('abort', abortHandler);
}
}
}
/**
* Mutates this.#currentMessage with the current event. Handling the accumulation of multiple messages
* will be needed to be handled by the caller, this method will throw if you try to accumulate for multiple
* messages.
*/
#accumulateMessage(event: BetaMessageStreamEvent): BetaMessage {
let snapshot = this.#currentMessageSnapshot;
if (event.type === 'message_start') {
if (snapshot) {
throw new AnthropicError(`Unexpected event order, got ${event.type} before receiving "message_stop"`);
}
return event.message;
}
if (!snapshot) {
throw new AnthropicError(`Unexpected event order, got ${event.type} before "message_start"`);
}
switch (event.type) {
case 'message_stop':
return snapshot;
case 'message_delta':
snapshot.container = event.delta.container;
snapshot.stop_reason = event.delta.stop_reason;
snapshot.stop_sequence = event.delta.stop_sequence;
snapshot.usage.output_tokens = event.usage.output_tokens;
snapshot.context_management = event.context_management;
if (event.usage.input_tokens != null) {
snapshot.usage.input_tokens = event.usage.input_tokens;
}
if (event.usage.cache_creation_input_tokens != null) {
snapshot.usage.cache_creation_input_tokens = event.usage.cache_creation_input_tokens;
}
if (event.usage.cache_read_input_tokens != null) {
snapshot.usage.cache_read_input_tokens = event.usage.cache_read_input_tokens;
}
if (event.usage.server_tool_use != null) {
snapshot.usage.server_tool_use = event.usage.server_tool_use;
}
if (event.usage.iterations != null) {
snapshot.usage.iterations = event.usage.iterations;
}
return snapshot;
case 'content_block_start':
snapshot.content.push(event.content_block);
return snapshot;
case 'content_block_delta': {
const snapshotContent = snapshot.content.at(event.index);
switch (event.delta.type) {
case 'text_delta': {
if (snapshotContent?.type === 'text') {
snapshot.content[event.index] = {
...snapshotContent,
text: (snapshotContent.text || '') + event.delta.text,
};
}
break;
}
case 'citations_delta': {
if (snapshotContent?.type === 'text') {
snapshot.content[event.index] = {
...snapshotContent,
citations: [...(snapshotContent.citations ?? []), event.delta.citation],
};
}
break;
}
case 'input_json_delta': {
if (snapshotContent && tracksToolInput(snapshotContent)) {
// we need to keep track of the raw JSON string as well so that we can
// re-parse it for each delta, for now we just store it as an untyped
// non-enumerable property on the snapshot
let jsonBuf = (snapshotContent as any)[JSON_BUF_PROPERTY] || '';
jsonBuf += event.delta.partial_json;
const newContent = { ...snapshotContent };
Object.defineProperty(newContent, JSON_BUF_PROPERTY, {
value: jsonBuf,
enumerable: false,
writable: true,
});
if (jsonBuf) {
try {
newContent.input = partialParse(jsonBuf);
} catch (err) {
const error = new AnthropicError(
`Unable to parse tool parameter JSON from model. Please retry your request or adjust your prompt. Error: ${err}. JSON: ${jsonBuf}`,
);
this.#handleError(error);
}
}
snapshot.content[event.index] = newContent;
}
break;
}
case 'thinking_delta': {
if (snapshotContent?.type === 'thinking') {
snapshot.content[event.index] = {
...snapshotContent,
thinking: snapshotContent.thinking + event.delta.thinking,
};
}
break;
}
case 'signature_delta': {
if (snapshotContent?.type === 'thinking') {
snapshot.content[event.index] = {
...snapshotContent,
signature: event.delta.signature,
};
}
break;
}
case 'compaction_delta': {
if (snapshotContent?.type === 'compaction') {
snapshot.content[event.index] = {
...snapshotContent,
content: (snapshotContent.content || '') + event.delta.content,
};
}
break;
}
default:
checkNever(event.delta);
}
return snapshot;
}
case 'content_block_stop':
return snapshot;
}
}
[Symbol.asyncIterator](): AsyncIterator<BetaMessageStreamEvent> {
const pushQueue: BetaMessageStreamEvent[] = [];
const readQueue: {
resolve: (chunk: BetaMessageStreamEvent | undefined) => void;
reject: (error: unknown) => void;
}[] = [];
let done = false;
this.on('streamEvent', (event) => {
const reader = readQueue.shift();
if (reader) {
reader.resolve(event);
} else {
pushQueue.push(event);
}
});
this.on('end', () => {
done = true;
for (const reader of readQueue) {
reader.resolve(undefined);
}
readQueue.length = 0;
});
this.on('abort', (err) => {
done = true;
for (const reader of readQueue) {
reader.reject(err);
}
readQueue.length = 0;
});
this.on('error', (err) => {
done = true;
for (const reader of readQueue) {
reader.reject(err);
}
readQueue.length = 0;
});
return {
next: async (): Promise<IteratorResult<BetaMessageStreamEvent>> => {
if (!pushQueue.length) {
if (done) {
return { value: undefined, done: true };
}
return new Promise<BetaMessageStreamEvent | undefined>((resolve, reject) =>
readQueue.push({ resolve, reject }),
).then((chunk) => (chunk ? { value: chunk, done: false } : { value: undefined, done: true }));
}
const chunk = pushQueue.shift()!;
return { value: chunk, done: false };
},
return: async () => {
this.abort();
return { value: undefined, done: true };
},
};
}
toReadableStream(): ReadableStream {
const stream = new Stream(this[Symbol.asyncIterator].bind(this), this.controller);
return stream.toReadableStream();
}
}
// used to ensure exhaustive case matching without throwing a runtime error
function checkNever(x: never) {}

View File

@@ -0,0 +1,743 @@
import { isAbortError } from '../internal/errors';
import { AnthropicError, APIUserAbortError } from '../error';
import {
type ContentBlock,
Messages,
type Message,
type MessageStreamEvent,
type MessageParam,
type MessageCreateParams,
type MessageCreateParamsBase,
type TextBlock,
type TextCitation,
type ToolUseBlock,
type ServerToolUseBlock,
} from '../resources/messages';
import { Stream } from '../streaming';
import { partialParse } from '../_vendor/partial-json-parser/parser';
import { RequestOptions } from '../internal/request-options';
import type { Logger } from '../client';
import { maybeParseMessage, type ParsedMessage } from './parser';
export interface MessageStreamEvents<ParsedT = null> {
connect: () => void;
streamEvent: (event: MessageStreamEvent, snapshot: Message) => void;
text: (textDelta: string, textSnapshot: string) => void;
citation: (citation: TextCitation, citationsSnapshot: TextCitation[]) => void;
inputJson: (partialJson: string, jsonSnapshot: unknown) => void;
thinking: (thinkingDelta: string, thinkingSnapshot: string) => void;
signature: (signature: string) => void;
message: (message: ParsedMessage<ParsedT>) => void;
contentBlock: (content: ContentBlock) => void;
finalMessage: (message: ParsedMessage<ParsedT>) => void;
error: (error: AnthropicError) => void;
abort: (error: APIUserAbortError) => void;
end: () => void;
}
type MessageStreamEventListeners<ParsedT, Event extends keyof MessageStreamEvents<ParsedT>> = {
listener: MessageStreamEvents<ParsedT>[Event];
once?: boolean;
}[];
const JSON_BUF_PROPERTY = '__json_buf';
export type TracksToolInput = ToolUseBlock | ServerToolUseBlock;
function tracksToolInput(content: ContentBlock): content is TracksToolInput {
return content.type === 'tool_use' || content.type === 'server_tool_use';
}
export class MessageStream<ParsedT = null> implements AsyncIterable<MessageStreamEvent> {
messages: MessageParam[] = [];
receivedMessages: ParsedMessage<ParsedT>[] = [];
#currentMessageSnapshot: Message | undefined;
#params: MessageCreateParams | null = null;
controller: AbortController = new AbortController();
#connectedPromise: Promise<Response | null>;
#resolveConnectedPromise: (response: Response | null) => void = () => {};
#rejectConnectedPromise: (error: AnthropicError) => void = () => {};
#endPromise: Promise<void>;
#resolveEndPromise: () => void = () => {};
#rejectEndPromise: (error: AnthropicError) => void = () => {};
#listeners: {
[Event in keyof MessageStreamEvents<ParsedT>]?: MessageStreamEventListeners<ParsedT, Event>;
} = {};
#ended = false;
#errored = false;
#aborted = false;
#catchingPromiseCreated = false;
#response: Response | null | undefined;
#request_id: string | null | undefined;
#logger: Logger;
constructor(params: MessageCreateParamsBase | null, opts?: { logger?: Logger | undefined }) {
this.#connectedPromise = new Promise<Response | null>((resolve, reject) => {
this.#resolveConnectedPromise = resolve;
this.#rejectConnectedPromise = reject;
});
this.#endPromise = new Promise<void>((resolve, reject) => {
this.#resolveEndPromise = resolve;
this.#rejectEndPromise = reject;
});
// Don't let these promises cause unhandled rejection errors.
// we will manually cause an unhandled rejection error later
// if the user hasn't registered any error listener or called
// any promise-returning method.
this.#connectedPromise.catch(() => {});
this.#endPromise.catch(() => {});
this.#params = params;
this.#logger = opts?.logger ?? console;
}
get response(): Response | null | undefined {
return this.#response;
}
get request_id(): string | null | undefined {
return this.#request_id;
}
/**
* Returns the `MessageStream` data, the raw `Response` instance and the ID of the request,
* returned vie the `request-id` header which is useful for debugging requests and resporting
* issues to Anthropic.
*
* This is the same as the `APIPromise.withResponse()` method.
*
* This method will raise an error if you created the stream using `MessageStream.fromReadableStream`
* as no `Response` is available.
*/
async withResponse(): Promise<{
data: MessageStream<ParsedT>;
response: Response;
request_id: string | null | undefined;
}> {
this.#catchingPromiseCreated = true;
const response = await this.#connectedPromise;
if (!response) {
throw new Error('Could not resolve a `Response` object');
}
return {
data: this,
response,
request_id: response.headers.get('request-id'),
};
}
/**
* Intended for use on the frontend, consuming a stream produced with
* `.toReadableStream()` on the backend.
*
* Note that messages sent to the model do not appear in `.on('message')`
* in this context.
*/
static fromReadableStream(stream: ReadableStream): MessageStream {
const runner = new MessageStream(null);
runner._run(() => runner._fromReadableStream(stream));
return runner;
}
static createMessage<ParsedT>(
messages: Messages,
params: MessageCreateParamsBase,
options?: RequestOptions,
{ logger }: { logger?: Logger | undefined } = {},
): MessageStream<ParsedT> {
const runner = new MessageStream<ParsedT>(params, { logger });
for (const message of params.messages) {
runner._addMessageParam(message);
}
runner.#params = { ...params, stream: true };
runner._run(() =>
runner._createMessage(
messages,
{ ...params, stream: true },
{ ...options, headers: { ...options?.headers, 'X-Stainless-Helper-Method': 'stream' } },
),
);
return runner;
}
protected _run(executor: () => Promise<any>) {
executor().then(() => {
this._emitFinal();
this._emit('end');
}, this.#handleError);
}
protected _addMessageParam(message: MessageParam) {
this.messages.push(message);
}
protected _addMessage(message: ParsedMessage<ParsedT>, emit = true) {
this.receivedMessages.push(message);
if (emit) {
this._emit('message', message);
}
}
protected async _createMessage(
messages: Messages,
params: MessageCreateParams,
options?: RequestOptions,
): Promise<void> {
const signal = options?.signal;
let abortHandler: (() => void) | undefined;
if (signal) {
if (signal.aborted) this.controller.abort();
abortHandler = this.controller.abort.bind(this.controller);
signal.addEventListener('abort', abortHandler);
}
try {
this.#beginRequest();
const { response, data: stream } = await messages
.create({ ...params, stream: true }, { ...options, signal: this.controller.signal })
.withResponse();
this._connected(response);
for await (const event of stream) {
this.#addStreamEvent(event);
}
if (stream.controller.signal?.aborted) {
throw new APIUserAbortError();
}
this.#endRequest();
} finally {
if (signal && abortHandler) {
signal.removeEventListener('abort', abortHandler);
}
}
}
protected _connected(response: Response | null) {
if (this.ended) return;
this.#response = response;
this.#request_id = response?.headers.get('request-id');
this.#resolveConnectedPromise(response);
this._emit('connect');
}
get ended(): boolean {
return this.#ended;
}
get errored(): boolean {
return this.#errored;
}
get aborted(): boolean {
return this.#aborted;
}
abort() {
this.controller.abort();
}
/**
* Adds the listener function to the end of the listeners array for the event.
* No checks are made to see if the listener has already been added. Multiple calls passing
* the same combination of event and listener will result in the listener being added, and
* called, multiple times.
* @returns this MessageStream, so that calls can be chained
*/
on<Event extends keyof MessageStreamEvents<ParsedT>>(
event: Event,
listener: MessageStreamEvents<ParsedT>[Event],
): this {
const listeners: MessageStreamEventListeners<ParsedT, Event> =
this.#listeners[event] || (this.#listeners[event] = []);
listeners.push({ listener });
return this;
}
/**
* Removes the specified listener from the listener array for the event.
* off() will remove, at most, one instance of a listener from the listener array. If any single
* listener has been added multiple times to the listener array for the specified event, then
* off() must be called multiple times to remove each instance.
* @returns this MessageStream, so that calls can be chained
*/
off<Event extends keyof MessageStreamEvents<ParsedT>>(
event: Event,
listener: MessageStreamEvents<ParsedT>[Event],
): this {
const listeners = this.#listeners[event];
if (!listeners) return this;
const index = listeners.findIndex((l) => l.listener === listener);
if (index >= 0) listeners.splice(index, 1);
return this;
}
/**
* Adds a one-time listener function for the event. The next time the event is triggered,
* this listener is removed and then invoked.
* @returns this MessageStream, so that calls can be chained
*/
once<Event extends keyof MessageStreamEvents<ParsedT>>(
event: Event,
listener: MessageStreamEvents<ParsedT>[Event],
): this {
const listeners: MessageStreamEventListeners<ParsedT, Event> =
this.#listeners[event] || (this.#listeners[event] = []);
listeners.push({ listener, once: true });
return this;
}
/**
* This is similar to `.once()`, but returns a Promise that resolves the next time
* the event is triggered, instead of calling a listener callback.
* @returns a Promise that resolves the next time given event is triggered,
* or rejects if an error is emitted. (If you request the 'error' event,
* returns a promise that resolves with the error).
*
* Example:
*
* const message = await stream.emitted('message') // rejects if the stream errors
*/
emitted<Event extends keyof MessageStreamEvents<ParsedT>>(
event: Event,
): Promise<
Parameters<MessageStreamEvents<ParsedT>[Event]> extends [infer Param] ? Param
: Parameters<MessageStreamEvents<ParsedT>[Event]> extends [] ? void
: Parameters<MessageStreamEvents<ParsedT>[Event]>
> {
return new Promise((resolve, reject) => {
this.#catchingPromiseCreated = true;
if (event !== 'error') this.once('error', reject);
this.once(event, resolve as any);
});
}
async done(): Promise<void> {
this.#catchingPromiseCreated = true;
await this.#endPromise;
}
get currentMessage(): Message | undefined {
return this.#currentMessageSnapshot;
}
#getFinalMessage(): ParsedMessage<ParsedT> {
if (this.receivedMessages.length === 0) {
throw new AnthropicError('stream ended without producing a Message with role=assistant');
}
return this.receivedMessages.at(-1)!;
}
/**
* @returns a promise that resolves with the the final assistant Message response,
* or rejects if an error occurred or the stream ended prematurely without producing a Message.
* If structured outputs were used, this will be a ParsedMessage with a `parsed_output` field.
*/
async finalMessage(): Promise<ParsedMessage<ParsedT>> {
await this.done();
return this.#getFinalMessage();
}
#getFinalText(): string {
if (this.receivedMessages.length === 0) {
throw new AnthropicError('stream ended without producing a Message with role=assistant');
}
const textBlocks = this.receivedMessages
.at(-1)!
.content.filter((block): block is TextBlock => block.type === 'text')
.map((block) => block.text);
if (textBlocks.length === 0) {
throw new AnthropicError('stream ended without producing a content block with type=text');
}
return textBlocks.join(' ');
}
/**
* @returns a promise that resolves with the the final assistant Message's text response, concatenated
* together if there are more than one text blocks.
* Rejects if an error occurred or the stream ended prematurely without producing a Message.
*/
async finalText(): Promise<string> {
await this.done();
return this.#getFinalText();
}
#handleError = (error: unknown) => {
this.#errored = true;
if (isAbortError(error)) {
error = new APIUserAbortError();
}
if (error instanceof APIUserAbortError) {
this.#aborted = true;
return this._emit('abort', error);
}
if (error instanceof AnthropicError) {
return this._emit('error', error);
}
if (error instanceof Error) {
const anthropicError: AnthropicError = new AnthropicError(error.message);
// @ts-ignore
anthropicError.cause = error;
return this._emit('error', anthropicError);
}
return this._emit('error', new AnthropicError(String(error)));
};
protected _emit<Event extends keyof MessageStreamEvents<ParsedT>>(
event: Event,
...args: Parameters<MessageStreamEvents<ParsedT>[Event]>
) {
// make sure we don't emit any MessageStreamEvents after end
if (this.#ended) return;
if (event === 'end') {
this.#ended = true;
this.#resolveEndPromise();
}
const listeners: MessageStreamEventListeners<ParsedT, Event> | undefined = this.#listeners[event];
if (listeners) {
this.#listeners[event] = listeners.filter((l: { once?: boolean }) => !l.once) as any;
listeners.forEach(({ listener }: any) => listener(...args));
}
if (event === 'abort') {
const error = args[0] as APIUserAbortError;
if (!this.#catchingPromiseCreated && !listeners?.length) {
Promise.reject(error);
}
this.#rejectConnectedPromise(error);
this.#rejectEndPromise(error);
this._emit('end');
return;
}
if (event === 'error') {
// NOTE: _emit('error', error) should only be called from #handleError().
const error = args[0] as AnthropicError;
if (!this.#catchingPromiseCreated && !listeners?.length) {
// Trigger an unhandled rejection if the user hasn't registered any error handlers.
// If you are seeing stack traces here, make sure to handle errors via either:
// - runner.on('error', () => ...)
// - await runner.done()
// - await runner.final...()
// - etc.
Promise.reject(error);
}
this.#rejectConnectedPromise(error);
this.#rejectEndPromise(error);
this._emit('end');
}
}
protected _emitFinal() {
const finalMessage = this.receivedMessages.at(-1);
if (finalMessage) {
this._emit('finalMessage', this.#getFinalMessage());
}
}
#beginRequest() {
if (this.ended) return;
this.#currentMessageSnapshot = undefined;
}
#addStreamEvent(event: MessageStreamEvent) {
if (this.ended) return;
const messageSnapshot = this.#accumulateMessage(event);
this._emit('streamEvent', event, messageSnapshot);
switch (event.type) {
case 'content_block_delta': {
const content = messageSnapshot.content.at(-1)!;
switch (event.delta.type) {
case 'text_delta': {
if (content.type === 'text') {
this._emit('text', event.delta.text, content.text || '');
}
break;
}
case 'citations_delta': {
if (content.type === 'text') {
this._emit('citation', event.delta.citation, content.citations ?? []);
}
break;
}
case 'input_json_delta': {
if (tracksToolInput(content) && content.input) {
this._emit('inputJson', event.delta.partial_json, content.input);
}
break;
}
case 'thinking_delta': {
if (content.type === 'thinking') {
this._emit('thinking', event.delta.thinking, content.thinking);
}
break;
}
case 'signature_delta': {
if (content.type === 'thinking') {
this._emit('signature', content.signature);
}
break;
}
default:
checkNever(event.delta);
}
break;
}
case 'message_stop': {
this._addMessageParam(messageSnapshot);
this._addMessage(maybeParseMessage(messageSnapshot, this.#params, { logger: this.#logger }), true);
break;
}
case 'content_block_stop': {
this._emit('contentBlock', messageSnapshot.content.at(-1)!);
break;
}
case 'message_start': {
this.#currentMessageSnapshot = messageSnapshot;
break;
}
case 'content_block_start':
case 'message_delta':
break;
}
}
#endRequest(): ParsedMessage<ParsedT> {
if (this.ended) {
throw new AnthropicError(`stream has ended, this shouldn't happen`);
}
const snapshot = this.#currentMessageSnapshot;
if (!snapshot) {
throw new AnthropicError(`request ended without sending any chunks`);
}
this.#currentMessageSnapshot = undefined;
return maybeParseMessage(snapshot, this.#params, { logger: this.#logger });
}
protected async _fromReadableStream(
readableStream: ReadableStream,
options?: RequestOptions,
): Promise<void> {
const signal = options?.signal;
let abortHandler: (() => void) | undefined;
if (signal) {
if (signal.aborted) this.controller.abort();
abortHandler = this.controller.abort.bind(this.controller);
signal.addEventListener('abort', abortHandler);
}
try {
this.#beginRequest();
this._connected(null);
const stream = Stream.fromReadableStream<MessageStreamEvent>(readableStream, this.controller);
for await (const event of stream) {
this.#addStreamEvent(event);
}
if (stream.controller.signal?.aborted) {
throw new APIUserAbortError();
}
this.#endRequest();
} finally {
if (signal && abortHandler) {
signal.removeEventListener('abort', abortHandler);
}
}
}
/**
* Mutates this.#currentMessage with the current event. Handling the accumulation of multiple messages
* will be needed to be handled by the caller, this method will throw if you try to accumulate for multiple
* messages.
*/
#accumulateMessage(event: MessageStreamEvent): Message {
let snapshot = this.#currentMessageSnapshot;
if (event.type === 'message_start') {
if (snapshot) {
throw new AnthropicError(`Unexpected event order, got ${event.type} before receiving "message_stop"`);
}
return event.message;
}
if (!snapshot) {
throw new AnthropicError(`Unexpected event order, got ${event.type} before "message_start"`);
}
switch (event.type) {
case 'message_stop':
return snapshot;
case 'message_delta':
snapshot.stop_reason = event.delta.stop_reason;
snapshot.stop_sequence = event.delta.stop_sequence;
snapshot.usage.output_tokens = event.usage.output_tokens;
// Update other usage fields if they exist in the event
if (event.usage.input_tokens != null) {
snapshot.usage.input_tokens = event.usage.input_tokens;
}
if (event.usage.cache_creation_input_tokens != null) {
snapshot.usage.cache_creation_input_tokens = event.usage.cache_creation_input_tokens;
}
if (event.usage.cache_read_input_tokens != null) {
snapshot.usage.cache_read_input_tokens = event.usage.cache_read_input_tokens;
}
if (event.usage.server_tool_use != null) {
snapshot.usage.server_tool_use = event.usage.server_tool_use;
}
return snapshot;
case 'content_block_start':
snapshot.content.push({ ...event.content_block });
return snapshot;
case 'content_block_delta': {
const snapshotContent = snapshot.content.at(event.index);
switch (event.delta.type) {
case 'text_delta': {
if (snapshotContent?.type === 'text') {
snapshot.content[event.index] = {
...snapshotContent,
text: (snapshotContent.text || '') + event.delta.text,
};
}
break;
}
case 'citations_delta': {
if (snapshotContent?.type === 'text') {
snapshot.content[event.index] = {
...snapshotContent,
citations: [...(snapshotContent.citations ?? []), event.delta.citation],
};
}
break;
}
case 'input_json_delta': {
if (snapshotContent && tracksToolInput(snapshotContent)) {
// we need to keep track of the raw JSON string as well so that we can
// re-parse it for each delta, for now we just store it as an untyped
// non-enumerable property on the snapshot
let jsonBuf = (snapshotContent as any)[JSON_BUF_PROPERTY] || '';
jsonBuf += event.delta.partial_json;
const newContent = { ...snapshotContent };
Object.defineProperty(newContent, JSON_BUF_PROPERTY, {
value: jsonBuf,
enumerable: false,
writable: true,
});
if (jsonBuf) {
newContent.input = partialParse(jsonBuf);
}
snapshot.content[event.index] = newContent;
}
break;
}
case 'thinking_delta': {
if (snapshotContent?.type === 'thinking') {
snapshot.content[event.index] = {
...snapshotContent,
thinking: snapshotContent.thinking + event.delta.thinking,
};
}
break;
}
case 'signature_delta': {
if (snapshotContent?.type === 'thinking') {
snapshot.content[event.index] = {
...snapshotContent,
signature: event.delta.signature,
};
}
break;
}
default:
checkNever(event.delta);
}
return snapshot;
}
case 'content_block_stop':
return snapshot;
}
}
[Symbol.asyncIterator](): AsyncIterator<MessageStreamEvent> {
const pushQueue: MessageStreamEvent[] = [];
const readQueue: {
resolve: (chunk: MessageStreamEvent | undefined) => void;
reject: (error: unknown) => void;
}[] = [];
let done = false;
this.on('streamEvent', (event) => {
const reader = readQueue.shift();
if (reader) {
reader.resolve(event);
} else {
pushQueue.push(event);
}
});
this.on('end', () => {
done = true;
for (const reader of readQueue) {
reader.resolve(undefined);
}
readQueue.length = 0;
});
this.on('abort', (err) => {
done = true;
for (const reader of readQueue) {
reader.reject(err);
}
readQueue.length = 0;
});
this.on('error', (err) => {
done = true;
for (const reader of readQueue) {
reader.reject(err);
}
readQueue.length = 0;
});
return {
next: async (): Promise<IteratorResult<MessageStreamEvent>> => {
if (!pushQueue.length) {
if (done) {
return { value: undefined, done: true };
}
return new Promise<MessageStreamEvent | undefined>((resolve, reject) =>
readQueue.push({ resolve, reject }),
).then((chunk) => (chunk ? { value: chunk, done: false } : { value: undefined, done: true }));
}
const chunk = pushQueue.shift()!;
return { value: chunk, done: false };
},
return: async () => {
this.abort();
return { value: undefined, done: true };
},
};
}
toReadableStream(): ReadableStream {
const stream = new Stream(this[Symbol.asyncIterator].bind(this), this.controller);
return stream.toReadableStream();
}
}
// used to ensure exhaustive case matching without throwing a runtime error
function checkNever(x: never) {}

View File

@@ -0,0 +1,148 @@
import type { Logger } from '../client';
import { AnthropicError } from '../core/error';
import {
BetaContentBlock,
BetaJSONOutputFormat,
BetaMessage,
BetaOutputConfig,
BetaTextBlock,
MessageCreateParams,
} from '../resources/beta/messages/messages';
// vendored from typefest just to make things look a bit nicer on hover
type Simplify<T> = { [KeyType in keyof T]: T[KeyType] } & {};
type AutoParseableBetaOutputConfig = Omit<BetaOutputConfig, 'format'> & {
format?: BetaJSONOutputFormat | AutoParseableBetaOutputFormat<any> | null;
};
export type BetaParseableMessageCreateParams = Simplify<
Omit<MessageCreateParams, 'output_format' | 'output_config'> & {
/**
* @deprecated Use `output_config.format` instead. This parameter will be removed in a future
* release.
*/
output_format?: BetaJSONOutputFormat | AutoParseableBetaOutputFormat<any> | null;
output_config?: AutoParseableBetaOutputConfig | null;
}
>;
export type ExtractParsedContentFromBetaParams<Params extends BetaParseableMessageCreateParams> =
Params['output_format'] extends AutoParseableBetaOutputFormat<infer P> ? P
: Params['output_config'] extends { format: AutoParseableBetaOutputFormat<infer P> } ? P
: null;
export type AutoParseableBetaOutputFormat<ParsedT> = BetaJSONOutputFormat & {
parse(content: string): ParsedT;
};
export type ParsedBetaMessage<ParsedT> = BetaMessage & {
content: Array<ParsedBetaContentBlock<ParsedT>>;
parsed_output: ParsedT | null;
};
export type ParsedBetaContentBlock<ParsedT> =
| (BetaTextBlock & { parsed_output: ParsedT | null })
| Exclude<BetaContentBlock, BetaTextBlock>;
function getOutputFormat(
params: BetaParseableMessageCreateParams | null,
): BetaJSONOutputFormat | AutoParseableBetaOutputFormat<any> | null | undefined {
// Prefer output_format (deprecated) over output_config.format for backward compatibility
return params?.output_format ?? params?.output_config?.format;
}
export function maybeParseBetaMessage<Params extends BetaParseableMessageCreateParams | null>(
message: BetaMessage,
params: Params,
opts: { logger: Logger },
): ParsedBetaMessage<ExtractParsedContentFromBetaParams<NonNullable<Params>>> {
const outputFormat = getOutputFormat(params);
if (!params || !('parse' in (outputFormat ?? {}))) {
return {
...message,
content: message.content.map((block) => {
if (block.type === 'text') {
const parsedBlock = Object.defineProperty({ ...block }, 'parsed_output', {
value: null,
enumerable: false,
}) as ParsedBetaContentBlock<ExtractParsedContentFromBetaParams<NonNullable<Params>>>;
return Object.defineProperty(parsedBlock, 'parsed', {
get() {
opts.logger.warn(
'The `parsed` property on `text` blocks is deprecated, please use `parsed_output` instead.',
);
return null;
},
enumerable: false,
});
}
return block;
}),
parsed_output: null,
} as ParsedBetaMessage<ExtractParsedContentFromBetaParams<NonNullable<Params>>>;
}
return parseBetaMessage(message, params, opts);
}
export function parseBetaMessage<Params extends BetaParseableMessageCreateParams>(
message: BetaMessage,
params: Params,
opts: { logger: Logger },
): ParsedBetaMessage<ExtractParsedContentFromBetaParams<Params>> {
let firstParsedOutput: ReturnType<typeof parseBetaOutputFormat<Params>> | null = null;
const content: Array<ParsedBetaContentBlock<ExtractParsedContentFromBetaParams<Params>>> =
message.content.map((block) => {
if (block.type === 'text') {
const parsedOutput = parseBetaOutputFormat(params, block.text);
if (firstParsedOutput === null) {
firstParsedOutput = parsedOutput;
}
const parsedBlock = Object.defineProperty({ ...block }, 'parsed_output', {
value: parsedOutput,
enumerable: false,
}) as ParsedBetaContentBlock<ExtractParsedContentFromBetaParams<Params>>;
return Object.defineProperty(parsedBlock, 'parsed', {
get() {
opts.logger.warn(
'The `parsed` property on `text` blocks is deprecated, please use `parsed_output` instead.',
);
return parsedOutput;
},
enumerable: false,
});
}
return block;
});
return {
...message,
content,
parsed_output: firstParsedOutput,
} as ParsedBetaMessage<ExtractParsedContentFromBetaParams<Params>>;
}
function parseBetaOutputFormat<Params extends BetaParseableMessageCreateParams>(
params: Params,
content: string,
): ExtractParsedContentFromBetaParams<Params> | null {
const outputFormat = getOutputFormat(params);
if (outputFormat?.type !== 'json_schema') {
return null;
}
try {
if ('parse' in outputFormat) {
return outputFormat.parse(content);
}
return JSON.parse(content);
} catch (error) {
throw new AnthropicError(`Failed to parse structured output: ${error}`);
}
}

View File

@@ -0,0 +1,125 @@
import type { Logger } from '../client';
import { AnthropicError } from '../core/error';
import {
ContentBlock,
JSONOutputFormat,
Message,
OutputConfig,
TextBlock,
MessageCreateParams,
} from '../resources/messages/messages';
// vendored from typefest just to make things look a bit nicer on hover
type Simplify<T> = { [KeyType in keyof T]: T[KeyType] } & {};
type AutoParseableOutputConfig = Omit<OutputConfig, 'format'> & {
format?: JSONOutputFormat | AutoParseableOutputFormat<any> | null;
};
export type ParseableMessageCreateParams = Simplify<
Omit<MessageCreateParams, 'output_config'> & {
output_config?: AutoParseableOutputConfig | null;
}
>;
export type ExtractParsedContentFromParams<Params extends ParseableMessageCreateParams> =
Params['output_config'] extends { format: AutoParseableOutputFormat<infer P> } ? P : null;
export type AutoParseableOutputFormat<ParsedT> = JSONOutputFormat & {
parse(content: string): ParsedT;
};
export type ParsedMessage<ParsedT> = Message & {
content: Array<ParsedContentBlock<ParsedT>>;
parsed_output: ParsedT | null;
};
export type ParsedContentBlock<ParsedT> =
| (TextBlock & { parsed_output: ParsedT | null })
| Exclude<ContentBlock, TextBlock>;
function getOutputFormat(
params: ParseableMessageCreateParams | null,
): JSONOutputFormat | AutoParseableOutputFormat<any> | null | undefined {
return params?.output_config?.format;
}
export function maybeParseMessage<Params extends ParseableMessageCreateParams | null>(
message: Message,
params: Params,
opts: { logger: Logger },
): ParsedMessage<ExtractParsedContentFromParams<NonNullable<Params>>> {
const outputFormat = getOutputFormat(params);
if (!params || !('parse' in (outputFormat ?? {}))) {
return {
...message,
content: message.content.map((block) => {
if (block.type === 'text') {
const parsedBlock = Object.defineProperty({ ...block }, 'parsed_output', {
value: null,
enumerable: false,
}) as ParsedContentBlock<ExtractParsedContentFromParams<NonNullable<Params>>>;
return parsedBlock;
}
return block;
}),
parsed_output: null,
} as ParsedMessage<ExtractParsedContentFromParams<NonNullable<Params>>>;
}
return parseMessage(message, params, opts);
}
export function parseMessage<Params extends ParseableMessageCreateParams>(
message: Message,
params: Params,
opts: { logger: Logger },
): ParsedMessage<ExtractParsedContentFromParams<Params>> {
let firstParsedOutput: ReturnType<typeof parseOutputFormat<Params>> | null = null;
const content: Array<ParsedContentBlock<ExtractParsedContentFromParams<Params>>> = message.content.map(
(block) => {
if (block.type === 'text') {
const parsedOutput = parseOutputFormat(params, block.text);
if (firstParsedOutput === null) {
firstParsedOutput = parsedOutput;
}
const parsedBlock = Object.defineProperty({ ...block }, 'parsed_output', {
value: parsedOutput,
enumerable: false,
}) as ParsedContentBlock<ExtractParsedContentFromParams<Params>>;
return parsedBlock;
}
return block;
},
);
return {
...message,
content,
parsed_output: firstParsedOutput,
} as ParsedMessage<ExtractParsedContentFromParams<Params>>;
}
function parseOutputFormat<Params extends ParseableMessageCreateParams>(
params: Params,
content: string,
): ExtractParsedContentFromParams<Params> | null {
const outputFormat = getOutputFormat(params);
if (outputFormat?.type !== 'json_schema') {
return null;
}
try {
if ('parse' in outputFormat) {
return outputFormat.parse(content);
}
return JSON.parse(content);
} catch (error) {
throw new AnthropicError(`Failed to parse structured output: ${error}`);
}
}

View File

@@ -0,0 +1,80 @@
/**
* Shared utilities for tracking SDK helper usage.
*/
import type { BetaMessageParam, BetaToolUnion } from '../resources/beta';
/**
* Symbol used to mark objects created by SDK helpers for tracking.
* The value is the helper name (e.g., 'mcpTool', 'betaZodTool').
*/
export const SDK_HELPER_SYMBOL = Symbol('anthropic.sdk.stainlessHelper');
type StainlessHelperObject = { [SDK_HELPER_SYMBOL]: string };
export function wasCreatedByStainlessHelper(value: unknown): value is StainlessHelperObject {
return typeof value === 'object' && value !== null && SDK_HELPER_SYMBOL in value;
}
/**
* Collects helper names from tools and messages arrays.
* Returns a deduplicated array of helper names found.
*/
export function collectStainlessHelpers(
tools: BetaToolUnion[] | undefined,
messages: BetaMessageParam[] | undefined,
): string[] {
const helpers = new Set<string>();
// Collect from tools
if (tools) {
for (const tool of tools) {
if (wasCreatedByStainlessHelper(tool)) {
helpers.add(tool[SDK_HELPER_SYMBOL]);
}
}
}
// Collect from messages and their content blocks
if (messages) {
for (const message of messages) {
if (wasCreatedByStainlessHelper(message)) {
helpers.add(message[SDK_HELPER_SYMBOL]);
}
if (Array.isArray(message.content)) {
for (const block of message.content) {
if (wasCreatedByStainlessHelper(block)) {
helpers.add(block[SDK_HELPER_SYMBOL]);
}
}
}
}
}
return Array.from(helpers);
}
/**
* Builds x-stainless-helper header value from tools and messages.
* Returns an empty object if no helpers are found.
*/
export function stainlessHelperHeader(
tools: BetaToolUnion[] | undefined,
messages: BetaMessageParam[] | undefined,
): { 'x-stainless-helper'?: string } {
const helpers = collectStainlessHelpers(tools, messages);
if (helpers.length === 0) return {};
return { 'x-stainless-helper': helpers.join(', ') };
}
/**
* Builds x-stainless-helper header value from a file object.
* Returns an empty object if the file is not marked with a helper.
*/
export function stainlessHelperHeaderFromFile(file: unknown): { 'x-stainless-helper'?: string } {
if (wasCreatedByStainlessHelper(file)) {
return { 'x-stainless-helper': file[SDK_HELPER_SYMBOL] };
}
return {};
}

View File

@@ -0,0 +1,40 @@
import {
BetaMemoryTool20250818,
BetaTool,
BetaToolBash20241022,
BetaToolBash20250124,
BetaToolComputerUse20241022,
BetaToolComputerUse20250124,
BetaToolComputerUse20251124,
BetaToolResultContentBlockParam,
BetaToolTextEditor20241022,
BetaToolTextEditor20250124,
BetaToolTextEditor20250429,
BetaToolTextEditor20250728,
} from '../../resources/beta';
export type Promisable<T> = T | Promise<T>;
/**
* Tool types that can be implemented on the client.
* Excludes server-side tools like code execution, web search, and MCP toolsets.
*/
export type BetaClientRunnableToolType =
| BetaTool
| BetaMemoryTool20250818
| BetaToolBash20241022
| BetaToolBash20250124
| BetaToolComputerUse20241022
| BetaToolComputerUse20250124
| BetaToolComputerUse20251124
| BetaToolTextEditor20241022
| BetaToolTextEditor20250124
| BetaToolTextEditor20250429
| BetaToolTextEditor20250728;
// this type is just an extension of BetaTool with a run and parse method
// that will be called by `toolRunner()` helpers
export type BetaRunnableTool<Input = any> = BetaClientRunnableToolType & {
run: (args: Input) => Promisable<string | Array<BetaToolResultContentBlockParam>>;
parse: (content: unknown) => Input;
};

View File

@@ -0,0 +1,481 @@
import { BetaRunnableTool } from './BetaRunnableTool';
import { ToolError } from './ToolError';
import { Anthropic } from '../..';
import { AnthropicError } from '../../core/error';
import { BetaMessage, BetaMessageParam, BetaToolUnion, MessageCreateParams } from '../../resources/beta';
import { BetaMessageStream } from '../BetaMessageStream';
import { RequestOptions } from '../../internal/request-options';
import { buildHeaders } from '../../internal/headers';
import { CompactionControl, DEFAULT_SUMMARY_PROMPT, DEFAULT_TOKEN_THRESHOLD } from './CompactionControl';
import { collectStainlessHelpers } from '../stainless-helper-header';
/**
* Just Promise.withResolvers(), which is not available in all environments.
*/
function promiseWithResolvers<T>(): {
promise: Promise<T>;
resolve: (value: T) => void;
reject: (reason?: any) => void;
} {
let resolve: (value: T) => void;
let reject: (reason?: any) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve: resolve!, reject: reject! };
}
/**
* A ToolRunner handles the automatic conversation loop between the assistant and tools.
*
* A ToolRunner is an async iterable that yields either BetaMessage or BetaMessageStream objects
* depending on the streaming configuration.
*/
export class BetaToolRunner<Stream extends boolean> {
/** Whether the async iterator has been consumed */
#consumed = false;
/** Whether parameters have been mutated since the last API call */
#mutated = false;
/** Current state containing the request parameters */
#state: { params: BetaToolRunnerParams };
#options: BetaToolRunnerRequestOptions;
/** Promise for the last message received from the assistant */
#message?: Promise<BetaMessage> | undefined;
/** Cached tool response to avoid redundant executions */
#toolResponse?: Promise<BetaMessageParam | null> | undefined;
/** Promise resolvers for waiting on completion */
#completion: {
promise: Promise<BetaMessage>;
resolve: (value: BetaMessage) => void;
reject: (reason?: any) => void;
};
/** Number of iterations (API requests) made so far */
#iterationCount = 0;
constructor(
private client: Anthropic,
params: BetaToolRunnerParams,
options?: BetaToolRunnerRequestOptions,
) {
this.#state = {
params: {
// You can't clone the entire params since there are functions as handlers.
// You also don't really need to clone params.messages, but it probably will prevent a foot gun
// somewhere.
...params,
messages: structuredClone(params.messages),
},
};
const helpers = collectStainlessHelpers(params.tools, params.messages);
const helperValue = ['BetaToolRunner', ...helpers].join(', ');
this.#options = {
...options,
headers: buildHeaders([{ 'x-stainless-helper': helperValue }, options?.headers]),
};
this.#completion = promiseWithResolvers();
}
async #checkAndCompact(): Promise<boolean> {
const compactionControl = this.#state.params.compactionControl;
if (!compactionControl || !compactionControl.enabled) {
return false;
}
let tokensUsed = 0;
if (this.#message !== undefined) {
try {
const message = await this.#message;
const totalInputTokens =
message.usage.input_tokens +
(message.usage.cache_creation_input_tokens ?? 0) +
(message.usage.cache_read_input_tokens ?? 0);
tokensUsed = totalInputTokens + message.usage.output_tokens;
} catch {
// If we can't get the message, skip compaction
return false;
}
}
const threshold = compactionControl.contextTokenThreshold ?? DEFAULT_TOKEN_THRESHOLD;
if (tokensUsed < threshold) {
return false;
}
const model = compactionControl.model ?? this.#state.params.model;
const summaryPrompt = compactionControl.summaryPrompt ?? DEFAULT_SUMMARY_PROMPT;
const messages = this.#state.params.messages;
if (messages[messages.length - 1]!.role === 'assistant') {
// Remove tool_use blocks from the last message to avoid 400 error
// (tool_use requires tool_result, which we don't have yet)
const lastMessage = messages[messages.length - 1]!;
if (Array.isArray(lastMessage.content)) {
const nonToolBlocks = lastMessage.content.filter((block) => block.type !== 'tool_use');
if (nonToolBlocks.length === 0) {
// If all blocks were tool_use, just remove the message entirely
messages.pop();
} else {
lastMessage.content = nonToolBlocks;
}
}
}
const response = await this.client.beta.messages.create(
{
model,
messages: [
...messages,
{
role: 'user',
content: [
{
type: 'text',
text: summaryPrompt,
},
],
},
],
max_tokens: this.#state.params.max_tokens,
},
{
headers: { 'x-stainless-helper': 'compaction' },
},
);
if (response.content[0]?.type !== 'text') {
throw new AnthropicError('Expected text response for compaction');
}
this.#state.params.messages = [
{
role: 'user',
content: response.content,
},
];
return true;
}
async *[Symbol.asyncIterator](): AsyncIterator<
Stream extends true ? BetaMessageStream
: Stream extends false ? BetaMessage
: BetaMessage | BetaMessageStream
> {
if (this.#consumed) {
throw new AnthropicError('Cannot iterate over a consumed stream');
}
this.#consumed = true;
this.#mutated = true;
this.#toolResponse = undefined;
try {
while (true) {
let stream;
try {
if (
this.#state.params.max_iterations &&
this.#iterationCount >= this.#state.params.max_iterations
) {
break;
}
this.#mutated = false;
this.#toolResponse = undefined;
this.#iterationCount++;
this.#message = undefined;
const { max_iterations, compactionControl, ...params } = this.#state.params;
if (params.stream) {
stream = this.client.beta.messages.stream({ ...params }, this.#options);
this.#message = stream.finalMessage();
// Make sure that this promise doesn't throw before we get the option to do something about it.
// Error will be caught when we call await this.#message ultimately
this.#message.catch(() => {});
yield stream as any;
} else {
this.#message = this.client.beta.messages.create({ ...params, stream: false }, this.#options);
yield this.#message as any;
}
const isCompacted = await this.#checkAndCompact();
if (!isCompacted) {
if (!this.#mutated) {
const { role, content } = await this.#message;
this.#state.params.messages.push({ role, content });
}
const toolMessage = await this.#generateToolResponse(this.#state.params.messages.at(-1)!);
if (toolMessage) {
this.#state.params.messages.push(toolMessage);
} else if (!this.#mutated) {
break;
}
}
} finally {
if (stream) {
stream.abort();
}
}
}
if (!this.#message) {
throw new AnthropicError('ToolRunner concluded without a message from the server');
}
this.#completion.resolve(await this.#message);
} catch (error) {
this.#consumed = false;
// Silence unhandled promise errors
this.#completion.promise.catch(() => {});
this.#completion.reject(error);
this.#completion = promiseWithResolvers();
throw error;
}
}
/**
* Update the parameters for the next API call. This invalidates any cached tool responses.
*
* @param paramsOrMutator - Either new parameters or a function to mutate existing parameters
*
* @example
* // Direct parameter update
* runner.setMessagesParams({
* model: 'claude-haiku-4-5',
* max_tokens: 500,
* });
*
* @example
* // Using a mutator function
* runner.setMessagesParams((params) => ({
* ...params,
* max_tokens: 100,
* }));
*/
setMessagesParams(params: BetaToolRunnerParams): void;
setMessagesParams(mutator: (prevParams: BetaToolRunnerParams) => BetaToolRunnerParams): void;
setMessagesParams(
paramsOrMutator: BetaToolRunnerParams | ((prevParams: BetaToolRunnerParams) => BetaToolRunnerParams),
) {
if (typeof paramsOrMutator === 'function') {
this.#state.params = paramsOrMutator(this.#state.params);
} else {
this.#state.params = paramsOrMutator;
}
this.#mutated = true;
// Invalidate cached tool response since parameters changed
this.#toolResponse = undefined;
}
/**
* Get the tool response for the last message from the assistant.
* Avoids redundant tool executions by caching results.
*
* @returns A promise that resolves to a BetaMessageParam containing tool results, or null if no tools need to be executed
*
* @example
* const toolResponse = await runner.generateToolResponse();
* if (toolResponse) {
* console.log('Tool results:', toolResponse.content);
* }
*/
async generateToolResponse() {
const message = (await this.#message) ?? this.params.messages.at(-1);
if (!message) {
return null;
}
return this.#generateToolResponse(message);
}
async #generateToolResponse(lastMessage: BetaMessageParam) {
if (this.#toolResponse !== undefined) {
return this.#toolResponse;
}
this.#toolResponse = generateToolResponse(this.#state.params, lastMessage);
return this.#toolResponse;
}
/**
* Wait for the async iterator to complete. This works even if the async iterator hasn't yet started, and
* will wait for an instance to start and go to completion.
*
* @returns A promise that resolves to the final BetaMessage when the iterator completes
*
* @example
* // Start consuming the iterator
* for await (const message of runner) {
* console.log('Message:', message.content);
* }
*
* // Meanwhile, wait for completion from another part of the code
* const finalMessage = await runner.done();
* console.log('Final response:', finalMessage.content);
*/
done(): Promise<BetaMessage> {
return this.#completion.promise;
}
/**
* Returns a promise indicating that the stream is done. Unlike .done(), this will eagerly read the stream:
* * If the iterator has not been consumed, consume the entire iterator and return the final message from the
* assistant.
* * If the iterator has been consumed, waits for it to complete and returns the final message.
*
* @returns A promise that resolves to the final BetaMessage from the conversation
* @throws {AnthropicError} If no messages were processed during the conversation
*
* @example
* const finalMessage = await runner.runUntilDone();
* console.log('Final response:', finalMessage.content);
*/
async runUntilDone(): Promise<BetaMessage> {
// If not yet consumed, start consuming and wait for completion
if (!this.#consumed) {
for await (const _ of this) {
// Iterator naturally populates this.#message
}
}
// If consumed but not completed, wait for completion
return this.done();
}
/**
* Get the current parameters being used by the ToolRunner.
*
* @returns A readonly view of the current ToolRunnerParams
*
* @example
* const currentParams = runner.params;
* console.log('Current model:', currentParams.model);
* console.log('Message count:', currentParams.messages.length);
*/
get params(): Readonly<BetaToolRunnerParams> {
return this.#state.params as Readonly<BetaToolRunnerParams>;
}
/**
* Add one or more messages to the conversation history.
*
* @param messages - One or more BetaMessageParam objects to add to the conversation
*
* @example
* runner.pushMessages(
* { role: 'user', content: 'Also, what about the weather in NYC?' }
* );
*
* @example
* // Adding multiple messages
* runner.pushMessages(
* { role: 'user', content: 'What about NYC?' },
* { role: 'user', content: 'And Boston?' }
* );
*/
pushMessages(...messages: BetaMessageParam[]) {
this.setMessagesParams((params) => ({
...params,
messages: [...params.messages, ...messages],
}));
}
/**
* Makes the ToolRunner directly awaitable, equivalent to calling .runUntilDone()
* This allows using `await runner` instead of `await runner.runUntilDone()`
*/
then<TResult1 = BetaMessage, TResult2 = never>(
onfulfilled?: ((value: BetaMessage) => TResult1 | PromiseLike<TResult1>) | undefined | null,
onrejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined | null,
): Promise<TResult1 | TResult2> {
return this.runUntilDone().then(onfulfilled, onrejected);
}
}
async function generateToolResponse(
params: BetaToolRunnerParams,
lastMessage = params.messages.at(-1),
): Promise<BetaMessageParam | null> {
// Only process if the last message is from the assistant and has tool use blocks
if (
!lastMessage ||
lastMessage.role !== 'assistant' ||
!lastMessage.content ||
typeof lastMessage.content === 'string'
) {
return null;
}
const toolUseBlocks = lastMessage.content.filter((content) => content.type === 'tool_use');
if (toolUseBlocks.length === 0) {
return null;
}
const toolResults = await Promise.all(
toolUseBlocks.map(async (toolUse) => {
const tool = params.tools.find((t) => ('name' in t ? t.name : t.mcp_server_name) === toolUse.name);
if (!tool || !('run' in tool)) {
return {
type: 'tool_result' as const,
tool_use_id: toolUse.id,
content: `Error: Tool '${toolUse.name}' not found`,
is_error: true,
};
}
try {
let input = toolUse.input;
if ('parse' in tool && tool.parse) {
input = tool.parse(input);
}
const result = await tool.run(input);
return {
type: 'tool_result' as const,
tool_use_id: toolUse.id,
content: result,
};
} catch (error) {
return {
type: 'tool_result' as const,
tool_use_id: toolUse.id,
content:
error instanceof ToolError ?
error.content
: `Error: ${error instanceof Error ? error.message : String(error)}`,
is_error: true,
};
}
}),
);
return {
role: 'user' as const,
content: toolResults,
};
}
// vendored from typefest just to make things look a bit nicer on hover
type Simplify<T> = { [KeyType in keyof T]: T[KeyType] } & {};
/**
* Parameters for creating a ToolRunner, extending MessageCreateParams with runnable tools.
*/
export type BetaToolRunnerParams = Simplify<
Omit<MessageCreateParams, 'tools'> & {
tools: (BetaToolUnion | BetaRunnableTool<any>)[];
/**
* Maximum number of iterations (API requests) to make in the tool execution loop.
* Each iteration consists of: assistant response → tool execution → tool results.
* When exceeded, the loop will terminate even if tools are still being requested.
*/
max_iterations?: number;
compactionControl?: CompactionControl;
}
>;
export type BetaToolRunnerRequestOptions = Pick<RequestOptions, 'headers'>;

View File

@@ -0,0 +1,52 @@
import { Model } from '../../resources';
export const DEFAULT_TOKEN_THRESHOLD = 100_000;
export const DEFAULT_SUMMARY_PROMPT = `You have been working on the task described above but have not yet completed it. Write a continuation summary that will allow you (or another instance of yourself) to resume work efficiently in a future context window where the conversation history will be replaced with this summary. Your summary should be structured, concise, and actionable. Include:
1. Task Overview
The user's core request and success criteria
Any clarifications or constraints they specified
2. Current State
What has been completed so far
Files created, modified, or analyzed (with paths if relevant)
Key outputs or artifacts produced
3. Important Discoveries
Technical constraints or requirements uncovered
Decisions made and their rationale
Errors encountered and how they were resolved
What approaches were tried that didn't work (and why)
4. Next Steps
Specific actions needed to complete the task
Any blockers or open questions to resolve
Priority order if multiple steps remain
5. Context to Preserve
User preferences or style requirements
Domain-specific details that aren't obvious
Any promises made to the user
Be concise but complete—err on the side of including information that would prevent duplicate work or repeated mistakes. Write in a way that enables immediate resumption of the task.
Wrap your summary in <summary></summary> tags.`;
export interface CompactionControl {
/**
* The context token threshold at which to trigger compaction.
*
* When the cumulative token count (input + output) across all messages exceeds this threshold,
* the message history will be automatically summarized and compressed.
*
* @default 100000
*/
contextTokenThreshold?: number;
/**
* The model to use for generating the compaction summary.
* If not specified, defaults to the same model used for the tool runner.
*/
model?: Model;
/**
* The prompt used to instruct the model on how to generate the summary.
*/
summaryPrompt?: string;
enabled: boolean;
}

View File

@@ -0,0 +1,47 @@
import { BetaToolResultContentBlockParam } from '../../resources/beta';
/**
* An error that can be thrown from a tool's `run` method to return structured
* content blocks as the error result, rather than just a string message.
*
* When the ToolRunner catches this error, it will use the `content` property
* as the tool result with `is_error: true`.
*
* @example
* ```ts
* const tool = {
* name: 'my_tool',
* run: async (input) => {
* if (somethingWentWrong) {
* throw new ToolError([
* { type: 'text', text: 'Error details here' },
* { type: 'image', source: { type: 'base64', data: '...', media_type: 'image/png' } },
* ]);
* }
* return 'success';
* },
* };
* ```
*/
export class ToolError extends Error {
/**
* The content to return as the tool result. This will be sent back to the model
* with `is_error: true`.
*/
readonly content: string | Array<BetaToolResultContentBlockParam>;
constructor(content: string | Array<BetaToolResultContentBlockParam>) {
const message =
typeof content === 'string' ? content : (
content
.map((block) => {
if (block.type === 'text') return block.text;
return `[${block.type}]`;
})
.join(' ')
);
super(message);
this.name = 'ToolError';
this.content = content;
}
}

View File

@@ -0,0 +1,381 @@
import { BetaRunnableTool } from './BetaRunnableTool';
import { ToolError } from './ToolError';
import { Anthropic } from '../..';
import { AnthropicError } from '../../core/error';
import { BetaMessage, BetaMessageParam, BetaToolUnion, MessageCreateParams } from '../../resources/beta';
import { BetaMessageStream } from '../BetaMessageStream';
/**
* Just Promise.withResolvers(), which is not available in all environments.
*/
function promiseWithResolvers<T>(): {
promise: Promise<T>;
resolve: (value: T) => void;
reject: (reason?: any) => void;
} {
let resolve: (value: T) => void;
let reject: (reason?: any) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve: resolve!, reject: reject! };
}
/**
* A ToolRunner handles the automatic conversation loop between the assistant and tools.
*
* A ToolRunner is an async iterable that yields either BetaMessage or BetaMessageStream objects
* depending on the streaming configuration.
*/
export class BetaToolRunner<Stream extends boolean> {
/** Whether the async iterator has been consumed */
#consumed = false;
/** Whether parameters have been mutated since the last API call */
#mutated = false;
/** Current state containing the request parameters */
#state: { params: BetaToolRunnerParams };
/** Promise for the last message received from the assistant */
#message?: Promise<BetaMessage> | undefined;
/** Cached tool response to avoid redundant executions */
#toolResponse?: Promise<BetaMessageParam | null> | undefined;
/** Promise resolvers for waiting on completion */
#completion: {
promise: Promise<BetaMessage>;
resolve: (value: BetaMessage) => void;
reject: (reason?: any) => void;
};
/** Number of iterations (API requests) made so far */
#iterationCount = 0;
constructor(
private client: Anthropic,
params: BetaToolRunnerParams,
) {
this.#state = {
params: {
// You can't clone the entire params since there are functions as handlers.
// You also don't really need to clone params.messages, but it probably will prevent a foot gun
// somewhere.
...params,
messages: structuredClone(params.messages),
},
};
this.#completion = promiseWithResolvers();
}
async *[Symbol.asyncIterator](): AsyncIterator<
Stream extends true ? BetaMessageStream
: Stream extends false ? BetaMessage
: BetaMessage | BetaMessageStream
> {
if (this.#consumed) {
throw new AnthropicError('Cannot iterate over a consumed stream');
}
this.#consumed = true;
this.#mutated = true;
this.#toolResponse = undefined;
try {
while (true) {
let stream;
try {
if (
this.#state.params.max_iterations &&
this.#iterationCount >= this.#state.params.max_iterations
) {
break;
}
this.#mutated = false;
this.#message = undefined;
this.#toolResponse = undefined;
this.#iterationCount++;
const { max_iterations, ...params } = this.#state.params;
if (params.stream) {
stream = this.client.beta.messages.stream({ ...params });
this.#message = stream.finalMessage();
// Make sure that this promise doesn't throw before we get the option to do something about it.
// Error will be caught when we call await this.#message ultimately
this.#message.catch(() => {});
yield stream as any;
} else {
this.#message = this.client.beta.messages.create({ ...params, stream: false });
yield this.#message as any;
}
if (!this.#mutated) {
const { role, content } = await this.#message;
this.#state.params.messages.push({ role, content });
}
const toolMessage = await this.#generateToolResponse(this.#state.params.messages.at(-1)!);
if (toolMessage) {
this.#state.params.messages.push(toolMessage);
}
if (!toolMessage && !this.#mutated) {
break;
}
} finally {
if (stream) {
stream.abort();
}
}
}
if (!this.#message) {
throw new AnthropicError('ToolRunner concluded without a message from the server');
}
this.#completion.resolve(await this.#message);
} catch (error) {
this.#consumed = false;
// Silence unhandled promise errors
this.#completion.promise.catch(() => {});
this.#completion.reject(error);
this.#completion = promiseWithResolvers();
throw error;
}
}
/**
* Update the parameters for the next API call. This invalidates any cached tool responses.
*
* @param paramsOrMutator - Either new parameters or a function to mutate existing parameters
*
* @example
* // Direct parameter update
* runner.setMessagesParams({
* model: 'claude-haiku-4-5',
* max_tokens: 500,
* });
*
* @example
* // Using a mutator function
* runner.setMessagesParams((params) => ({
* ...params,
* max_tokens: 100,
* }));
*/
setMessagesParams(params: BetaToolRunnerParams): void;
setMessagesParams(mutator: (prevParams: BetaToolRunnerParams) => BetaToolRunnerParams): void;
setMessagesParams(
paramsOrMutator: BetaToolRunnerParams | ((prevParams: BetaToolRunnerParams) => BetaToolRunnerParams),
) {
if (typeof paramsOrMutator === 'function') {
this.#state.params = paramsOrMutator(this.#state.params);
} else {
this.#state.params = paramsOrMutator;
}
this.#mutated = true;
// Invalidate cached tool response since parameters changed
this.#toolResponse = undefined;
}
/**
* Get the tool response for the last message from the assistant.
* Avoids redundant tool executions by caching results.
*
* @returns A promise that resolves to a BetaMessageParam containing tool results, or null if no tools need to be executed
*
* @example
* const toolResponse = await runner.generateToolResponse();
* if (toolResponse) {
* console.log('Tool results:', toolResponse.content);
* }
*/
async generateToolResponse() {
const message = (await this.#message) ?? this.params.messages.at(-1);
if (!message) {
return null;
}
return this.#generateToolResponse(message);
}
async #generateToolResponse(lastMessage: BetaMessageParam) {
if (this.#toolResponse !== undefined) {
return this.#toolResponse;
}
this.#toolResponse = generateToolResponse(this.#state.params, lastMessage);
return this.#toolResponse;
}
/**
* Wait for the async iterator to complete. This works even if the async iterator hasn't yet started, and
* will wait for an instance to start and go to completion.
*
* @returns A promise that resolves to the final BetaMessage when the iterator completes
*
* @example
* // Start consuming the iterator
* for await (const message of runner) {
* console.log('Message:', message.content);
* }
*
* // Meanwhile, wait for completion from another part of the code
* const finalMessage = await runner.done();
* console.log('Final response:', finalMessage.content);
*/
done(): Promise<BetaMessage> {
return this.#completion.promise;
}
/**
* Returns a promise indicating that the stream is done. Unlike .done(), this will eagerly read the stream:
* * If the iterator has not been consumed, consume the entire iterator and return the final message from the
* assistant.
* * If the iterator has been consumed, waits for it to complete and returns the final message.
*
* @returns A promise that resolves to the final BetaMessage from the conversation
* @throws {AnthropicError} If no messages were processed during the conversation
*
* @example
* const finalMessage = await runner.runUntilDone();
* console.log('Final response:', finalMessage.content);
*/
async runUntilDone(): Promise<BetaMessage> {
// If not yet consumed, start consuming and wait for completion
if (!this.#consumed) {
for await (const _ of this) {
// Iterator naturally populates this.#message
}
}
// If consumed but not completed, wait for completion
return this.done();
}
/**
* Get the current parameters being used by the ToolRunner.
*
* @returns A readonly view of the current ToolRunnerParams
*
* @example
* const currentParams = runner.params;
* console.log('Current model:', currentParams.model);
* console.log('Message count:', currentParams.messages.length);
*/
get params(): Readonly<BetaToolRunnerParams> {
return this.#state.params as Readonly<BetaToolRunnerParams>;
}
/**
* Add one or more messages to the conversation history.
*
* @param messages - One or more BetaMessageParam objects to add to the conversation
*
* @example
* runner.pushMessages(
* { role: 'user', content: 'Also, what about the weather in NYC?' }
* );
*
* @example
* // Adding multiple messages
* runner.pushMessages(
* { role: 'user', content: 'What about NYC?' },
* { role: 'user', content: 'And Boston?' }
* );
*/
pushMessages(...messages: BetaMessageParam[]) {
this.setMessagesParams((params) => ({
...params,
messages: [...params.messages, ...messages],
}));
}
/**
* Makes the ToolRunner directly awaitable, equivalent to calling .runUntilDone()
* This allows using `await runner` instead of `await runner.runUntilDone()`
*/
then<TResult1 = BetaMessage, TResult2 = never>(
onfulfilled?: ((value: BetaMessage) => TResult1 | PromiseLike<TResult1>) | undefined | null,
onrejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined | null,
): Promise<TResult1 | TResult2> {
return this.runUntilDone().then(onfulfilled, onrejected);
}
}
async function generateToolResponse(
params: BetaToolRunnerParams,
lastMessage = params.messages.at(-1),
): Promise<BetaMessageParam | null> {
// Only process if the last message is from the assistant and has tool use blocks
if (
!lastMessage ||
lastMessage.role !== 'assistant' ||
!lastMessage.content ||
typeof lastMessage.content === 'string'
) {
return null;
}
const toolUseBlocks = lastMessage.content.filter((content) => content.type === 'tool_use');
if (toolUseBlocks.length === 0) {
return null;
}
const toolResults = await Promise.all(
toolUseBlocks.map(async (toolUse) => {
const tool = params.tools.find((t) => ('name' in t ? t.name : t.mcp_server_name) === toolUse.name);
if (!tool || !('run' in tool)) {
return {
type: 'tool_result' as const,
tool_use_id: toolUse.id,
content: `Error: Tool '${toolUse.name}' not found`,
is_error: true,
};
}
try {
let input = toolUse.input;
if ('parse' in tool && tool.parse) {
input = tool.parse(input);
}
const result = await tool.run(input);
return {
type: 'tool_result' as const,
tool_use_id: toolUse.id,
content: result,
};
} catch (error) {
return {
type: 'tool_result' as const,
tool_use_id: toolUse.id,
content:
error instanceof ToolError ?
error.content
: `Error: ${error instanceof Error ? error.message : String(error)}`,
is_error: true,
};
}
}),
);
return {
role: 'user' as const,
content: toolResults,
};
}
// vendored from typefest just to make things look a bit nicer on hover
type Simplify<T> = { [KeyType in keyof T]: T[KeyType] } & {};
/**
* Parameters for creating a ToolRunner, extending MessageCreateParams with runnable tools.
*/
export type BetaToolRunnerParams = Simplify<
Omit<MessageCreateParams, 'tools'> & {
tools: (BetaToolUnion | BetaRunnableTool<any>)[];
/**
* Maximum number of iterations (API requests) to make in the tool execution loop.
* Each iteration consists of: assistant response → tool execution → tool results.
* When exceeded, the loop will terminate even if tools are still being requested.
*/
max_iterations?: number;
}
>;

View File

@@ -0,0 +1,124 @@
import { pop } from '../internal/utils';
// Supported string formats
const SUPPORTED_STRING_FORMATS = new Set([
'date-time',
'time',
'date',
'duration',
'email',
'hostname',
'uri',
'ipv4',
'ipv6',
'uuid',
]);
export type JSONSchema = Record<string, any>;
function deepClone<T>(obj: T): T {
return JSON.parse(JSON.stringify(obj));
}
export function transformJSONSchema(jsonSchema: JSONSchema): JSONSchema {
const workingCopy = deepClone(jsonSchema);
return _transformJSONSchema(workingCopy);
}
function _transformJSONSchema(jsonSchema: JSONSchema): JSONSchema {
const strictSchema: JSONSchema = {};
const ref = pop(jsonSchema, '$ref');
if (ref !== undefined) {
strictSchema['$ref'] = ref;
return strictSchema;
}
const defs = pop(jsonSchema, '$defs');
if (defs !== undefined) {
const strictDefs: Record<string, any> = {};
strictSchema['$defs'] = strictDefs;
for (const [name, defSchema] of Object.entries(defs)) {
strictDefs[name] = _transformJSONSchema(defSchema as JSONSchema);
}
}
const type = pop(jsonSchema, 'type');
const anyOf = pop(jsonSchema, 'anyOf');
const oneOf = pop(jsonSchema, 'oneOf');
const allOf = pop(jsonSchema, 'allOf');
if (Array.isArray(anyOf)) {
strictSchema['anyOf'] = anyOf.map((variant) => _transformJSONSchema(variant as JSONSchema));
} else if (Array.isArray(oneOf)) {
strictSchema['anyOf'] = oneOf.map((variant) => _transformJSONSchema(variant as JSONSchema));
} else if (Array.isArray(allOf)) {
strictSchema['allOf'] = allOf.map((entry) => _transformJSONSchema(entry as JSONSchema));
} else {
if (type === undefined) {
throw new Error('JSON schema must have a type defined if anyOf/oneOf/allOf are not used');
}
strictSchema['type'] = type;
}
const description = pop(jsonSchema, 'description');
if (description !== undefined) {
strictSchema['description'] = description;
}
const title = pop(jsonSchema, 'title');
if (title !== undefined) {
strictSchema['title'] = title;
}
if (type === 'object') {
const properties = pop(jsonSchema, 'properties') || {};
strictSchema['properties'] = Object.fromEntries(
Object.entries(properties).map(([key, propSchema]) => [
key,
_transformJSONSchema(propSchema as JSONSchema),
]),
);
pop(jsonSchema, 'additionalProperties');
strictSchema['additionalProperties'] = false;
const required = pop(jsonSchema, 'required');
if (required !== undefined) {
strictSchema['required'] = required;
}
} else if (type === 'string') {
const format = pop(jsonSchema, 'format');
if (format !== undefined && SUPPORTED_STRING_FORMATS.has(format)) {
strictSchema['format'] = format;
} else if (format !== undefined) {
jsonSchema['format'] = format;
}
} else if (type === 'array') {
const items = pop(jsonSchema, 'items');
if (items !== undefined) {
strictSchema['items'] = _transformJSONSchema(items as JSONSchema);
}
const minItems = pop(jsonSchema, 'minItems');
if (minItems !== undefined && (minItems === 0 || minItems === 1)) {
strictSchema['minItems'] = minItems;
} else if (minItems !== undefined) {
jsonSchema['minItems'] = minItems;
}
}
if (Object.keys(jsonSchema).length > 0) {
const existingDescription = strictSchema['description'];
strictSchema['description'] =
(existingDescription ? existingDescription + '\n\n' : '') +
'{' +
Object.entries(jsonSchema)
.map(([key, value]) => `${key}: ${JSON.stringify(value)}`)
.join(', ') +
'}';
}
return strictSchema;
}