Skip to content

[Bug]: DefaultRequestHandler hangs on_message_send for consumer failures #609

@vinoo999

Description

@vinoo999

What happened?

DefaultRequestHandler hangs on_message_send when errors in the consumer occur. on_message_stream does not hang because it puts the _cleanup_producer into a background task but that task will be in a bad state since the producer is still awaiting on updater.x.

Minimal reproducible example with a failing TaskStore is demonstrated below.

import asyncio
import uuid

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.context import ServerCallContext
from a2a.server.events import EventQueue
from a2a.server.request_handlers.default_request_handler import (
    DefaultRequestHandler,
)
from a2a.server.tasks import TaskStore
from a2a.server.tasks.task_updater import TaskUpdater
from a2a.types import (
    Message,
    MessageSendParams,
    Part,
    Role,
    Task,
    TaskState,
    TextPart,
)


class FailingTaskStore(TaskStore):
    """Task store that fails on save to simulate a poisoned configuration."""

    async def get(
        self, task_id: str, context: ServerCallContext | None = None
    ) -> Task | None:
        """Return None for simplicity."""
        return None

    async def save(
        self, task: Task, context: ServerCallContext | None = None
    ) -> None:
        """Always fail to simulate task store error."""
        raise RuntimeError(
            'This is an Error!'
        )

    async def delete(
        self, task_id: str, context: ServerCallContext | None = None
    ) -> None:
        """No-op for simplicity."""


class HelloWorldAgent:
    """Hello World Agent."""

    async def invoke(self) -> str:
        return 'Hello World'

class HelloWorldAgentExecutor(AgentExecutor):
    """Test Agent Implementation."""

    def __init__(self):
        self.agent = HelloWorldAgent()

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        updater = TaskUpdater(
            event_queue,
            task_id=context.task_id or str(uuid.uuid4()),
            context_id=context.context_id or str(uuid.uuid4()),
        )
        # raise ValueError("Simulated error during task execution")
        if not context.task_id:
            await updater.submit()
        await updater.update_status(TaskState.working)
        result = await self.agent.invoke()
        await updater.add_artifact([Part(root=TextPart(text=result))])
        await updater.complete()

    async def cancel(
        self, context: RequestContext, event_queue: EventQueue
    ) -> None:
        raise NotImplementedError('cancel not supported')

async def test_hanging_on_task_save_error() -> None:
    """Test that demonstrates hanging when task save fails.
    """
    agent = HelloWorldAgentExecutor()
    task_store = FailingTaskStore()
    handler = DefaultRequestHandler(
        agent_executor=agent, task_store=task_store
    )

    params = MessageSendParams(
        message=Message(
            role=Role.user,
            parts=[TextPart(text='Test message')],
            message_id=str(uuid.uuid4()),
        )
    )

    await asyncio.wait_for(
        handler.on_message_send(params), timeout=10.0
    )

if __name__ == '__main__':
    asyncio.run(test_hanging_on_task_save_error())

Log Output

Agent execution failed
Traceback (most recent call last):
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/request_handlers/default_request_handler.py", line 325, in on_message_send
    ) = await result_aggregator.consume_and_break_on_interrupt(
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ...<3 lines>...
    )
    ^
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/result_aggregator.py", line 132, in consume_and_break_on_interrupt
    await self.task_manager.process(event)
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/task_manager.py", line 206, in process
    await self.save_task_event(event)
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/task_manager.py", line 137, in save_task_event
    task: Task = await self.ensure_task(event)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/task_manager.py", line 187, in ensure_task
    await self._save_task(task)
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/task_manager.py", line 240, in _save_task
    await self.task_store.save(task, self._call_context)
  File "/path/to/root/a2a-python/tests/server/test_task_save_error_hanging.py", line 36, in save
    raise RuntimeError(
        'This is an Error!'
    )
RuntimeError: This is an Error!
Traceback (most recent call last):
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/request_handlers/default_request_handler.py", line 325, in on_message_send
    ) = await result_aggregator.consume_and_break_on_interrupt(
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ...<3 lines>...
    )
    ^
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/result_aggregator.py", line 132, in consume_and_break_on_interrupt
    await self.task_manager.process(event)
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/task_manager.py", line 206, in process
    await self.save_task_event(event)
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/task_manager.py", line 137, in save_task_event
    task: Task = await self.ensure_task(event)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/task_manager.py", line 187, in ensure_task
    await self._save_task(task)
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/task_manager.py", line 240, in _save_task
    await self.task_store.save(task, self._call_context)
  File "/path/to/root/a2a-python/tests/server/test_task_save_error_hanging.py", line 36, in save
    raise RuntimeError(
        'This is an Error!'
    )
RuntimeError: This is an Error!

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/xxxxxx/.pyenv/versions/3.13.9/lib/python3.13/asyncio/tasks.py", line 507, in wait_for
    return await fut
           ^^^^^^^^^
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 196, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/request_handlers/default_request_handler.py", line 342, in on_message_send
    await self._cleanup_producer(producer_task, task_id)
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 196, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/request_handlers/default_request_handler.py", line 438, in _cleanup_producer
    await producer_task
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 196, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/request_handlers/default_request_handler.py", line 197, in _run_event_stream
    await queue.close()
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 196, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/events/event_queue.py", line 175, in close
    await asyncio.gather(
        self.queue.join(), *(child.close() for child in self._children)
    )
  File "/xxxxxxx/.pyenv/versions/3.13.9/lib/python3.13/asyncio/queues.py", line 248, in join
    await self._finished.wait()
  File "/xxxxxxx/.pyenv/versions/3.13.9/lib/python3.13/asyncio/locks.py", line 213, in wait
    await fut
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/path/to/root/a2a-python/tests/server/test_task_save_error_hanging.py", line 104, in <module>
    asyncio.run(test_hanging_on_task_save_error())
    ~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/xxxxxxx/.pyenv/versions/3.13.9/lib/python3.13/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "/xxxxxxx/.pyenv/versions/3.13.9/lib/python3.13/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "/xxxxxxx/.pyenv/versions/3.13.9/lib/python3.13/asyncio/base_events.py", line 725, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
  File "/path/to/root/a2a-python/tests/server/test_task_save_error_hanging.py", line 99, in test_hanging_on_task_save_error
    await asyncio.wait_for(
        handler.on_message_send(params), timeout=10.0
    )
  File "/xxxxxxx/.pyenv/versions/3.13.9/lib/python3.13/asyncio/tasks.py", line 506, in wait_for
    async with timeouts.timeout(timeout):
               ~~~~~~~~~~~~~~~~^^^^^^^^^
  File "/xxxxxxx/.pyenv/versions/3.13.9/lib/python3.13/asyncio/timeouts.py", line 116, in __aexit__
    raise TimeoutError from exc_val
TimeoutError

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions