Skip to content

Async & Streaming

data-harness ships AsyncAgent and AsyncAgentSession for async workflows and token-level streaming. They mirror the synchronous API exactly, with await on coroutines and async for on generators.


AsyncAgent

import asyncio
from data_harness import AsyncAgent
from data_harness.providers.anthropic import AnthropicAdapter

async def main():
    agent = AsyncAgent(
        adapter=AnthropicAdapter(model="claude-sonnet-4-6"),
        system="You are a data analyst.",
    )
    result = await agent.run("Compute the mean of [1, 2, 3, 4, 5].")
    print(result)

asyncio.run(main())

AsyncAgent requires an AsyncProviderAdapter. The built-in AnthropicAdapter implements both ProviderAdapter and AsyncProviderAdapter.


Streaming

Use AsyncAgent.run_stream() to receive token-level events as they arrive:

async def stream_example():
    agent = AsyncAgent(
        adapter=AnthropicAdapter(model="claude-sonnet-4-6"),
        system="You are a data analyst.",
    )

    async for event in agent.run_stream("Describe the unemployment trend."):
        match event.type:
            case "content_block_delta":
                from data_harness import TextDelta
                if isinstance(event.delta, TextDelta):
                    print(event.delta.text, end="", flush=True)
            case "tool_result":
                print(f"\n[tool: {event.tool_name}] {event.content[:80]}")

Stream event types

The stream emits a sequence of typed events. Each event has a type discriminator field matching the Anthropic SDK's raw SSE event shape:

Event type When
message_start Before the first content block
content_block_start A new text or tool-use block begins
content_block_delta A text or JSON delta arrives
content_block_stop The current block is complete
message_delta Token counts and stop reason for this turn
message_stop After the last content block
tool_result After the harness dispatches a tool call

ToolResultEvent is data-harness-specific; the raw provider stream does not emit it.


Async sessions

async def session_example():
    agent = AsyncAgent(
        adapter=AnthropicAdapter(model="claude-sonnet-4-6"),
        system="You are a data analyst.",
    )
    session = agent.async_session()

    import pandas as pd
    session.put("sales", pd.read_csv("sales.csv"))

    print(await session.ask("What is the total revenue?"))
    print(await session.ask("Which product category was highest?"))

Streaming follow-up turns use ask_stream():

async for event in session.ask_stream("Summarise the analysis."):
    if event.type == "content_block_delta":
        from data_harness import TextDelta
        if isinstance(event.delta, TextDelta):
            print(event.delta.text, end="", flush=True)

Implementing a streaming adapter

Override stream_events() in your AsyncProviderAdapter to emit real token-level events. The default implementation calls chat() and synthesises the standard event sequence from the assembled response:

from data_harness.providers.base import AsyncProviderAdapter, NormalizedResponse
from data_harness.streaming import StreamEvent
from collections.abc import AsyncGenerator

class MyAdapter(AsyncProviderAdapter):
    async def chat(self, system, messages, tools) -> NormalizedResponse:
        ...

    def format_cache_control(self, obj):
        return obj

    async def stream_events(
        self, system, messages, tools
    ) -> AsyncGenerator[StreamEvent, None]:
        # Yield real token events from your provider SDK here
        ...