feat: all sync tools should respect aborting
This commit is contained in:
@@ -21,7 +21,7 @@ import type {
|
||||
MacroToolInput,
|
||||
MacroToolResult,
|
||||
} from './types'
|
||||
import { assert, fetchLlmsTxt, normalizeResponse, uid, waitFor } from './utils'
|
||||
import { assert, fetchLlmsTxt, normalizeResponse, onAbortTimeout, uid, waitFor } from './utils'
|
||||
|
||||
export { tool, type PageAgentTool } from './tools'
|
||||
export type * from './types'
|
||||
@@ -75,12 +75,20 @@ export class PageAgentCore extends EventTarget {
|
||||
/**
|
||||
* Called when the agent needs to ask the user questions.
|
||||
* If unset, the `ask_user` tool will be disabled.
|
||||
* The optional `signal` aborts when the task is stopped or disposed —
|
||||
* implementations should reject the promise when it fires.
|
||||
* @example onAskUser: (q) => window.prompt(q) || ''
|
||||
*/
|
||||
onAskUser?: (question: string) => Promise<string>
|
||||
onAskUser?: (question: string, options?: { signal: AbortSignal }) => Promise<string>
|
||||
|
||||
#status: AgentStatus = 'idle'
|
||||
#llm: LLM
|
||||
/**
|
||||
* Task cancellation primitive: its signal reaches the LLM fetch, tools
|
||||
* (via `ctx.signal`) and async callbacks. Aborted only by `stop`/`dispose`
|
||||
* (during a task) or task setup, always WITHOUT a reason so `signal.reason`
|
||||
* stays a standard `AbortError`. Never abort as a cleanup/error shortcut.
|
||||
*/
|
||||
#abortController = new AbortController()
|
||||
#observations: string[] = []
|
||||
|
||||
@@ -140,6 +148,11 @@ export class PageAgentCore extends EventTarget {
|
||||
return this.#status
|
||||
}
|
||||
|
||||
/** Abort signal for the current task. Tools get it via `ctx.signal`. */
|
||||
get abortSignal(): AbortSignal {
|
||||
return this.#abortController.signal
|
||||
}
|
||||
|
||||
/** Emit statuschange event */
|
||||
#emitStatusChange(): void {
|
||||
this.dispatchEvent(new Event('statuschange'))
|
||||
@@ -302,7 +315,8 @@ export class PageAgentCore extends EventTarget {
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
console.groupEnd() // to prevent nested groups
|
||||
const isAbortError = (error as any)?.name === 'AbortError'
|
||||
// Canonical abort check, independent of how the error was wrapped.
|
||||
const isAbortError = this.#abortController.signal.aborted
|
||||
|
||||
if (!isAbortError) console.error('Task failed', error)
|
||||
const errorMessage = isAbortError ? 'Task stopped' : String(error)
|
||||
@@ -400,8 +414,24 @@ export class PageAgentCore extends EventTarget {
|
||||
|
||||
const startTime = Date.now()
|
||||
|
||||
// Execute tool, bind `this` to PageAgent
|
||||
const result = await tool.execute.bind(this)(toolInput)
|
||||
// Run the tool with `this` = agent and the abort signal exposed.
|
||||
// The deadline warning surfaces tools that ignore the signal
|
||||
// without unblocking the loop, keeping the bug visible.
|
||||
const signal = this.#abortController.signal
|
||||
const unsubscribe = onAbortTimeout(signal, 3000, () => {
|
||||
console.warn(
|
||||
`[PageAgent] Tool "${toolName}" did not respond to abort signal within 3s. ` +
|
||||
`Tools MUST honor ctx.signal for proper cancellation. ` +
|
||||
`See: https://page-agent.dev/docs/custom-tools#abort`
|
||||
)
|
||||
})
|
||||
|
||||
let result: string
|
||||
try {
|
||||
result = await tool.execute.bind(this)(toolInput, { signal })
|
||||
} finally {
|
||||
unsubscribe()
|
||||
}
|
||||
|
||||
const duration = Date.now() - startTime
|
||||
console.log(chalk.green.bold(`Tool (${toolName}) executed for ${duration}ms`), result)
|
||||
|
||||
@@ -7,6 +7,14 @@ import * as z from 'zod/v4'
|
||||
import type { PageAgentCore } from '../PageAgentCore'
|
||||
import { waitFor } from '../utils'
|
||||
|
||||
/**
|
||||
* Per-invocation context passed to every tool execution.
|
||||
* Tools MUST honor `signal` to support cooperative cancellation.
|
||||
*/
|
||||
export interface ToolContext {
|
||||
signal: AbortSignal
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal tool definition that has access to PageAgent `this` context
|
||||
*/
|
||||
@@ -14,7 +22,7 @@ export interface PageAgentTool<TParams = any> {
|
||||
// name: string
|
||||
description: string
|
||||
inputSchema: z.ZodType<TParams>
|
||||
execute: (this: PageAgentCore, args: TParams) => Promise<string>
|
||||
execute: (this: PageAgentCore, args: TParams, ctx: ToolContext) => Promise<string>
|
||||
}
|
||||
|
||||
export function tool<TParams>(options: PageAgentTool<TParams>): PageAgentTool<TParams> {
|
||||
@@ -50,12 +58,12 @@ tools.set(
|
||||
inputSchema: z.object({
|
||||
seconds: z.number().min(1).max(10).default(1),
|
||||
}),
|
||||
execute: async function (this: PageAgentCore, input) {
|
||||
execute: async function (this: PageAgentCore, input, { signal }) {
|
||||
// try to subtract LLM calling time from the actual wait time
|
||||
const lastTimeUpdate = await this.pageController.getLastUpdateTime()
|
||||
const actualWaitTime = Math.max(0, input.seconds - (Date.now() - lastTimeUpdate) / 1000)
|
||||
console.log(`actualWaitTime: ${actualWaitTime} seconds`)
|
||||
await waitFor(actualWaitTime)
|
||||
await waitFor(actualWaitTime, signal)
|
||||
|
||||
return `✅ Waited for ${input.seconds} seconds.`
|
||||
},
|
||||
@@ -70,11 +78,11 @@ tools.set(
|
||||
inputSchema: z.object({
|
||||
question: z.string(),
|
||||
}),
|
||||
execute: async function (this: PageAgentCore, input) {
|
||||
execute: async function (this: PageAgentCore, input, { signal }) {
|
||||
if (!this.onAskUser) {
|
||||
throw new Error('ask_user tool requires onAskUser callback to be set')
|
||||
}
|
||||
const answer = await this.onAskUser(input.question)
|
||||
const answer = await this.onAskUser(input.question, { signal })
|
||||
return `User answered: ${answer}`
|
||||
},
|
||||
})
|
||||
|
||||
@@ -2,8 +2,49 @@ import chalk from 'chalk'
|
||||
|
||||
export * from './autoFixer'
|
||||
|
||||
export async function waitFor(seconds: number): Promise<void> {
|
||||
await new Promise((resolve) => setTimeout(resolve, seconds * 1000))
|
||||
/**
|
||||
* Wait for `seconds`. If a `signal` is provided, the wait is cancellable:
|
||||
* aborting rejects with the signal's reason (an `AbortError`).
|
||||
*/
|
||||
export async function waitFor(seconds: number, signal?: AbortSignal): Promise<void> {
|
||||
if (!signal) {
|
||||
await new Promise((resolve) => setTimeout(resolve, seconds * 1000))
|
||||
return
|
||||
}
|
||||
signal.throwIfAborted()
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const timer = setTimeout(() => {
|
||||
signal.removeEventListener('abort', onAbort)
|
||||
resolve()
|
||||
}, seconds * 1000)
|
||||
const onAbort = () => {
|
||||
clearTimeout(timer)
|
||||
// reason is a DOMException AbortError.
|
||||
reject(signal.reason as DOMException)
|
||||
}
|
||||
signal.addEventListener('abort', onAbort, { once: true })
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Diagnostic deadline: fire `callback` if `signal` stays aborted for `ms`,
|
||||
* surfacing async work that ignores abort. Returns an unsubscribe to call once
|
||||
* the awaited work settles.
|
||||
*/
|
||||
export function onAbortTimeout(signal: AbortSignal, ms: number, callback: () => void): () => void {
|
||||
if (signal.aborted) {
|
||||
callback()
|
||||
return () => {}
|
||||
}
|
||||
let timer: ReturnType<typeof setTimeout> | null = null
|
||||
const onAbort = () => {
|
||||
timer = setTimeout(callback, ms)
|
||||
}
|
||||
signal.addEventListener('abort', onAbort, { once: true })
|
||||
return () => {
|
||||
signal.removeEventListener('abort', onAbort)
|
||||
if (timer) clearTimeout(timer)
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
@@ -68,7 +68,7 @@ export class Panel {
|
||||
this.#i18n = new I18n(config.language ?? 'en-US')
|
||||
|
||||
// Set up askUser callback on agent
|
||||
this.#agent.onAskUser = (question) => this.#askUser(question)
|
||||
this.#agent.onAskUser = (question, options) => this.#askUser(question, options?.signal)
|
||||
|
||||
// Create UI elements
|
||||
this.#wrapper = this.#createWrapper()
|
||||
@@ -169,10 +169,12 @@ export class Panel {
|
||||
}
|
||||
|
||||
/**
|
||||
* Ask for user input (internal, called by agent via onAskUser)
|
||||
* Ask for user input (internal, called by agent via onAskUser).
|
||||
* Rejects when `signal` aborts (task stopped or disposed), cleaning up the
|
||||
* question card and pending state so the agent loop can settle.
|
||||
*/
|
||||
#askUser(question: string): Promise<string> {
|
||||
return new Promise((resolve) => {
|
||||
#askUser(question: string, signal?: AbortSignal): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
// Set `waiting for user answer` state
|
||||
this.#isWaitingForUserAnswer = true
|
||||
this.#userAnswerResolver = resolve
|
||||
@@ -195,6 +197,27 @@ export class Panel {
|
||||
this.#scrollToBottom()
|
||||
|
||||
this.#showInputArea(this.#i18n.t('ui.panel.userAnswerPrompt'))
|
||||
|
||||
signal?.addEventListener(
|
||||
'abort',
|
||||
() => {
|
||||
this.#removeTempCards()
|
||||
this.#isWaitingForUserAnswer = false
|
||||
this.#userAnswerResolver = null
|
||||
// reason is a DOMException AbortError (abort() takes no args).
|
||||
reject(signal.reason as DOMException)
|
||||
},
|
||||
{ once: true }
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/** Remove temporary question cards (only direct children for safety) */
|
||||
#removeTempCards(): void {
|
||||
Array.from(this.#historySection.children).forEach((child) => {
|
||||
if (child.getAttribute('data-temp-card') === 'true') {
|
||||
child.remove()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -307,12 +330,7 @@ export class Panel {
|
||||
* Handle user answer
|
||||
*/
|
||||
#handleUserAnswer(input: string): void {
|
||||
// Remove temporary question cards (only direct children for safety)
|
||||
Array.from(this.#historySection.children).forEach((child) => {
|
||||
if (child.getAttribute('data-temp-card') === 'true') {
|
||||
child.remove()
|
||||
}
|
||||
})
|
||||
this.#removeTempCards()
|
||||
|
||||
// Reset state
|
||||
this.#isWaitingForUserAnswer = false
|
||||
|
||||
@@ -63,8 +63,9 @@ export interface PanelAgentAdapter extends EventTarget {
|
||||
* Called when the agent needs to ask the user questions.
|
||||
* If unset, the `ask_user` tool will be disabled.
|
||||
* Panel will set this to handle user questions via its UI.
|
||||
* The optional `signal` aborts when the task is stopped or disposed.
|
||||
*/
|
||||
onAskUser?: (question: string) => Promise<string>
|
||||
onAskUser?: (question: string, options?: { signal: AbortSignal }) => Promise<string>
|
||||
|
||||
/** Execute a task */
|
||||
execute(task: string): Promise<unknown>
|
||||
|
||||
Reference in New Issue
Block a user