Merge pull request #532 from alibaba/feat/rubost-abort

feat: robust aborting
This commit is contained in:
Simon
2026-06-08 21:53:05 +08:00
committed by GitHub
5 changed files with 104 additions and 60 deletions

View File

@@ -75,12 +75,19 @@ export class PageAgentCore extends EventTarget {
/** /**
* Called when the agent needs to ask the user questions. * Called when the agent needs to ask the user questions.
* If unset, the `ask_user` tool will be disabled. * If unset, the `ask_user` tool will be disabled.
* Implementations should reject the promise when `signal` aborts.
* @example onAskUser: (q) => window.prompt(q) || '' * @example onAskUser: (q) => window.prompt(q) || ''
*/ */
onAskUser?: (question: string) => Promise<string> onAskUser?: (question: string, options?: { signal: AbortSignal }) => Promise<string>
#status: AgentStatus = 'idle' #status: AgentStatus = 'idle'
#llm: LLM #llm: LLM
/**
* Task cancellation primitive: its signal reaches the LLM fetch, tools
* (via `ctx.signal`) and async callbacks. Aborted only by `stop`/`dispose`
* (during a task) or task setup, always WITHOUT a reason so `signal.reason`
* stays a standard `AbortError`.
*/
#abortController = new AbortController() #abortController = new AbortController()
#observations: string[] = [] #observations: string[] = []
@@ -218,6 +225,8 @@ export class PageAgentCore extends EventTarget {
this.#states = { totalWaitTime: 0, lastURL: '', browserState: null } this.#states = { totalWaitTime: 0, lastURL: '', browserState: null }
let step = 0 let step = 0
let taskSuccess: boolean
let taskResult: string
while (true) { while (true) {
try { try {
@@ -279,63 +288,49 @@ export class PageAgentCore extends EventTarget {
} as AgentStepEvent) } as AgentStepEvent)
this.#emitHistoryChange() this.#emitHistoryChange()
//
await onAfterStep?.(this, this.history) await onAfterStep?.(this, this.history)
console.groupEnd() console.groupEnd()
// finish task if done
if (actionName === 'done') { if (actionName === 'done') {
const success = action.input?.success ?? false taskSuccess = action.input?.success ?? false
const text = action.input?.text || 'no text provided' taskResult = action.input?.text || 'no text provided'
console.log(chalk.green.bold('Task completed'), success, text) console.log(chalk.green.bold('Task completed'), taskSuccess, taskResult)
this.#onDone(success) break
const result: ExecutionResult = {
success,
data: text,
history: this.history,
}
await onAfterTask?.(this, result)
return result
} }
} catch (error: unknown) { } catch (error: unknown) {
console.groupEnd() // to prevent nested groups console.groupEnd()
const isAbortError = (error as any)?.name === 'AbortError' const isAbortError = (error as any)?.name === 'AbortError'
if (!isAbortError) console.error('Task failed', error) if (!isAbortError) console.error('Task failed', error)
const errorMessage = isAbortError ? 'Task stopped' : String(error) taskResult = isAbortError ? 'Task aborted' : String(error)
this.#emitActivity({ type: 'error', message: errorMessage }) taskSuccess = false
this.history.push({ type: 'error', message: errorMessage, rawResponse: error }) this.#emitActivity({ type: 'error', message: taskResult })
this.history.push({ type: 'error', message: taskResult, rawResponse: error })
this.#emitHistoryChange() this.#emitHistoryChange()
this.#onDone(false) break
const result: ExecutionResult = {
success: false,
data: errorMessage,
history: this.history,
}
await onAfterTask?.(this, result)
return result
} }
step++ step++
if (step > this.config.maxSteps) { if (step > this.config.maxSteps) {
const errorMessage = 'Step count exceeded maximum limit' taskResult = 'Step count exceeded maximum limit'
this.history.push({ type: 'error', message: errorMessage }) taskSuccess = false
this.#emitActivity({ type: 'error', message: taskResult })
this.history.push({ type: 'error', message: taskResult })
this.#emitHistoryChange() this.#emitHistoryChange()
this.#onDone(false) break
const result: ExecutionResult = {
success: false,
data: errorMessage,
history: this.history,
}
await onAfterTask?.(this, result)
return result
} }
await waitFor(this.config.stepDelay ?? 0.4) await waitFor(this.config.stepDelay ?? 0.4)
} }
this.#onDone(taskSuccess)
const result: ExecutionResult = {
success: taskSuccess,
data: taskResult,
history: this.history,
}
await onAfterTask?.(this, result)
return result
} }
/** /**
@@ -368,7 +363,8 @@ export class PageAgentCore extends EventTarget {
description: 'You MUST call this tool every step!', description: 'You MUST call this tool every step!',
inputSchema: macroToolSchema as z.ZodType<MacroToolInput>, inputSchema: macroToolSchema as z.ZodType<MacroToolInput>,
execute: async (input: MacroToolInput): Promise<MacroToolResult> => { execute: async (input: MacroToolInput): Promise<MacroToolResult> => {
this.#abortController.signal.throwIfAborted() const signal = this.#abortController.signal
signal.throwIfAborted()
console.log(chalk.blue.bold('MacroTool input'), input) console.log(chalk.blue.bold('MacroTool input'), input)
const action = input.action const action = input.action
@@ -400,8 +396,9 @@ export class PageAgentCore extends EventTarget {
const startTime = Date.now() const startTime = Date.now()
// Execute tool, bind `this` to PageAgent const result = await tool.execute.bind(this)(toolInput, { signal })
const result = await tool.execute.bind(this)(toolInput) // Enforce abort even if the tool ignored the signal and resolved normally.
signal.throwIfAborted()
const duration = Date.now() - startTime const duration = Date.now() - startTime
console.log(chalk.green.bold(`Tool (${toolName}) executed for ${duration}ms`), result) console.log(chalk.green.bold(`Tool (${toolName}) executed for ${duration}ms`), result)

View File

@@ -7,6 +7,14 @@ import * as z from 'zod/v4'
import type { PageAgentCore } from '../PageAgentCore' import type { PageAgentCore } from '../PageAgentCore'
import { waitFor } from '../utils' import { waitFor } from '../utils'
/**
* Per-invocation context passed to every tool execution.
* Tools MUST honor `signal` to support cooperative cancellation.
*/
export interface ToolContext {
signal: AbortSignal
}
/** /**
* Internal tool definition that has access to PageAgent `this` context * Internal tool definition that has access to PageAgent `this` context
*/ */
@@ -14,7 +22,7 @@ export interface PageAgentTool<TParams = any> {
// name: string // name: string
description: string description: string
inputSchema: z.ZodType<TParams> inputSchema: z.ZodType<TParams>
execute: (this: PageAgentCore, args: TParams) => Promise<string> execute: (this: PageAgentCore, args: TParams, ctx: ToolContext) => Promise<string>
} }
export function tool<TParams>(options: PageAgentTool<TParams>): PageAgentTool<TParams> { export function tool<TParams>(options: PageAgentTool<TParams>): PageAgentTool<TParams> {
@@ -50,12 +58,12 @@ tools.set(
inputSchema: z.object({ inputSchema: z.object({
seconds: z.number().min(1).max(10).default(1), seconds: z.number().min(1).max(10).default(1),
}), }),
execute: async function (this: PageAgentCore, input) { execute: async function (this: PageAgentCore, input, { signal }) {
// try to subtract LLM calling time from the actual wait time // try to subtract LLM calling time from the actual wait time
const lastTimeUpdate = await this.pageController.getLastUpdateTime() const lastTimeUpdate = await this.pageController.getLastUpdateTime()
const actualWaitTime = Math.max(0, input.seconds - (Date.now() - lastTimeUpdate) / 1000) const actualWaitTime = Math.max(0, input.seconds - (Date.now() - lastTimeUpdate) / 1000)
console.log(`actualWaitTime: ${actualWaitTime} seconds`) console.log(`actualWaitTime: ${actualWaitTime} seconds`)
await waitFor(actualWaitTime) await waitFor(actualWaitTime, signal)
return `✅ Waited for ${input.seconds} seconds.` return `✅ Waited for ${input.seconds} seconds.`
}, },
@@ -70,11 +78,11 @@ tools.set(
inputSchema: z.object({ inputSchema: z.object({
question: z.string(), question: z.string(),
}), }),
execute: async function (this: PageAgentCore, input) { execute: async function (this: PageAgentCore, input, { signal }) {
if (!this.onAskUser) { if (!this.onAskUser) {
throw new Error('ask_user tool requires onAskUser callback to be set') throw new Error('ask_user tool requires onAskUser callback to be set')
} }
const answer = await this.onAskUser(input.question) const answer = await this.onAskUser(input.question, { signal })
return `User answered: ${answer}` return `User answered: ${answer}`
}, },
}) })

View File

@@ -2,8 +2,28 @@ import chalk from 'chalk'
export * from './autoFixer' export * from './autoFixer'
export async function waitFor(seconds: number): Promise<void> { /**
await new Promise((resolve) => setTimeout(resolve, seconds * 1000)) * Wait for `seconds`. If a `signal` is provided, the wait is cancellable:
* aborting rejects with the signal's reason (an `AbortError`).
*/
export async function waitFor(seconds: number, signal?: AbortSignal): Promise<void> {
if (!signal) {
await new Promise((resolve) => setTimeout(resolve, seconds * 1000))
return
}
signal.throwIfAborted()
await new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => {
signal.removeEventListener('abort', onAbort)
resolve()
}, seconds * 1000)
const onAbort = () => {
clearTimeout(timer)
// reason is a DOMException AbortError.
reject(signal.reason as DOMException)
}
signal.addEventListener('abort', onAbort, { once: true })
})
} }
// //

View File

@@ -68,7 +68,7 @@ export class Panel {
this.#i18n = new I18n(config.language ?? 'en-US') this.#i18n = new I18n(config.language ?? 'en-US')
// Set up askUser callback on agent // Set up askUser callback on agent
this.#agent.onAskUser = (question) => this.#askUser(question) this.#agent.onAskUser = (question, options) => this.#askUser(question, options?.signal)
// Create UI elements // Create UI elements
this.#wrapper = this.#createWrapper() this.#wrapper = this.#createWrapper()
@@ -169,10 +169,12 @@ export class Panel {
} }
/** /**
* Ask for user input (internal, called by agent via onAskUser) * Ask for user input (internal, called by agent via onAskUser).
* Rejects when `signal` aborts (task stopped or disposed), cleaning up the
* question card and pending state so the agent loop can settle.
*/ */
#askUser(question: string): Promise<string> { #askUser(question: string, signal?: AbortSignal): Promise<string> {
return new Promise((resolve) => { return new Promise((resolve, reject) => {
// Set `waiting for user answer` state // Set `waiting for user answer` state
this.#isWaitingForUserAnswer = true this.#isWaitingForUserAnswer = true
this.#userAnswerResolver = resolve this.#userAnswerResolver = resolve
@@ -195,6 +197,27 @@ export class Panel {
this.#scrollToBottom() this.#scrollToBottom()
this.#showInputArea(this.#i18n.t('ui.panel.userAnswerPrompt')) this.#showInputArea(this.#i18n.t('ui.panel.userAnswerPrompt'))
signal?.addEventListener(
'abort',
() => {
this.#removeTempCards()
this.#isWaitingForUserAnswer = false
this.#userAnswerResolver = null
// reason is a DOMException AbortError (abort() takes no args).
reject(signal.reason as DOMException)
},
{ once: true }
)
})
}
/** Remove temporary question cards (only direct children for safety) */
#removeTempCards(): void {
Array.from(this.#historySection.children).forEach((child) => {
if (child.getAttribute('data-temp-card') === 'true') {
child.remove()
}
}) })
} }
@@ -307,12 +330,7 @@ export class Panel {
* Handle user answer * Handle user answer
*/ */
#handleUserAnswer(input: string): void { #handleUserAnswer(input: string): void {
// Remove temporary question cards (only direct children for safety) this.#removeTempCards()
Array.from(this.#historySection.children).forEach((child) => {
if (child.getAttribute('data-temp-card') === 'true') {
child.remove()
}
})
// Reset state // Reset state
this.#isWaitingForUserAnswer = false this.#isWaitingForUserAnswer = false

View File

@@ -63,8 +63,9 @@ export interface PanelAgentAdapter extends EventTarget {
* Called when the agent needs to ask the user questions. * Called when the agent needs to ask the user questions.
* If unset, the `ask_user` tool will be disabled. * If unset, the `ask_user` tool will be disabled.
* Panel will set this to handle user questions via its UI. * Panel will set this to handle user questions via its UI.
* The optional `signal` aborts when the task is stopped or disposed.
*/ */
onAskUser?: (question: string) => Promise<string> onAskUser?: (question: string, options?: { signal: AbortSignal }) => Promise<string>
/** Execute a task */ /** Execute a task */
execute(task: string): Promise<unknown> execute(task: string): Promise<unknown>