Skip to content

Agent

The high-level entry point for most use cases. Agent and AsyncAgent compose a Harness, SessionCache, and optional tools from a single configuration.


Agent

data_harness.Agent

Agent(
    adapter: ProviderAdapter,
    system: str,
    *,
    max_turns: int = 25,
    cache: SessionCache | None = None,
    run_dir: str | Path | None = None,
)

High-level synchronous agent.

Agent composes a Harness, a SessionCache, and optional tools from a single configuration. Each call to run builds a fresh Harness with a fresh message history. Use session when you need multi-turn conversation state to persist across questions.

Example::

from data_harness import Agent
from data_harness.providers.anthropic import AnthropicAdapter

agent = Agent(
    adapter=AnthropicAdapter(model="claude-sonnet-4-6"),
    system="You are a data analyst.",
)
print(agent.run("Compute the mean of [1, 2, 3]."))

Parameters:

Name Type Description Default
adapter ProviderAdapter

Synchronous provider adapter.

required
system str

System prompt passed unchanged to every Harness run.

required
max_turns int

Hard cap on provider turns per run call.

25
cache SessionCache | None

Shared SessionCache. A fresh cache is created when None.

None
run_dir str | Path | None

Directory for JSONL logs. Defaults to ./runs.

None
Source code in data_harness/agent.py
def __init__(
    self,
    adapter: ProviderAdapter,
    system: str,
    *,
    max_turns: int = 25,
    cache: SessionCache | None = None,
    run_dir: str | Path | None = None,
) -> None:
    self._adapter = adapter
    self._system = system
    self._max_turns = max_turns
    self._cache = cache if cache is not None else SessionCache()
    self._run_dir = run_dir
    self._last_harness: Harness | None = None
    self._last_run_file: str | None = None
    self._connectors: dict[str, _ConnectorDefinition] = {}
    self._connector_tools: list[_ConnectorToolDefinition] = []
    self._planner_enabled = False
    self._subagent_factory: Callable[[], ProviderAdapter] | None = None

connector

connector(
    name: str, *, description: str
) -> ConnectorBuilder

Register a named connector and return a builder for attaching tools.

Connector tools start hidden; the model must call load_connectors before it can use them (progressive disclosure).

Parameters:

Name Type Description Default
name str

Unique connector name. Used as the tool-name prefix.

required
description str

Shown to the model when it lists available connectors.

required

Returns:

Type Description
ConnectorBuilder

A ConnectorBuilder for registering tools under this connector.

Source code in data_harness/agent.py
def connector(self, name: str, *, description: str) -> ConnectorBuilder:
    """Register a named connector and return a builder for attaching tools.

    Connector tools start hidden; the model must call ``load_connectors``
    before it can use them (progressive disclosure).

    Args:
        name: Unique connector name. Used as the tool-name prefix.
        description: Shown to the model when it lists available connectors.

    Returns:
        A `ConnectorBuilder` for registering tools under this connector.
    """
    self._connectors[name] = _ConnectorDefinition(
        name=name, description=description
    )
    return ConnectorBuilder(self, name)

enable_planner

enable_planner() -> Agent

Enable the planning tool and suffix-based nag reminders.

The planner escalates reminders at turns 4, 8, and 12 when no progress has been recorded. Call this before run or session.

Returns:

Type Description
Agent

self, for method chaining.

Source code in data_harness/agent.py
def enable_planner(self) -> Agent:
    """Enable the planning tool and suffix-based nag reminders.

    The planner escalates reminders at turns 4, 8, and 12 when no progress
    has been recorded. Call this before `run` or `session`.

    Returns:
        ``self``, for method chaining.
    """
    self._planner_enabled = True
    return self

enable_subagents

enable_subagents(
    *, adapter_factory: Callable[[], ProviderAdapter]
) -> Agent

Enable the subagent tool, using adapter_factory for spawned agents.

Each spawned subagent gets a fresh adapter, fresh message history, and fresh cache. State crosses the boundary only through explicit input_handles.

Parameters:

Name Type Description Default
adapter_factory Callable[[], ProviderAdapter]

Zero-argument callable that returns a fresh ProviderAdapter for each subagent.

required

Returns:

Type Description
Agent

self, for method chaining.

Source code in data_harness/agent.py
def enable_subagents(
    self, *, adapter_factory: Callable[[], ProviderAdapter]
) -> Agent:
    """Enable the subagent tool, using ``adapter_factory`` for spawned agents.

    Each spawned subagent gets a fresh adapter, fresh message history, and
    fresh cache. State crosses the boundary only through explicit
    ``input_handles``.

    Args:
        adapter_factory: Zero-argument callable that returns a fresh
            `ProviderAdapter` for each subagent.

    Returns:
        ``self``, for method chaining.
    """
    self._subagent_factory = adapter_factory
    return self

session

session() -> AgentSession

Create a stateful AgentSession for multi-turn conversations.

Returns:

Type Description
AgentSession

A new AgentSession backed by a copy of this agent's cache.

Source code in data_harness/agent.py
def session(self) -> AgentSession:
    """Create a stateful `AgentSession` for multi-turn conversations.

    Returns:
        A new `AgentSession` backed by a copy of this agent's cache.
    """
    return AgentSession(self)

run_result

run_result(user_message: str) -> RunResult

Run the agent and return the full RunResult.

Builds a fresh Harness with a fresh message history for each call.

Parameters:

Name Type Description Default
user_message str

The user prompt to send.

required

Returns:

Type Description
RunResult

A RunResult with the text response, token usage, and cache state.

Source code in data_harness/agent.py
def run_result(self, user_message: str) -> RunResult:
    """Run the agent and return the full `RunResult`.

    Builds a fresh `Harness` with a fresh message history for each call.

    Args:
        user_message: The user prompt to send.

    Returns:
        A `RunResult` with the text response, token usage, and cache state.
    """
    harness = self._make_harness()
    self._last_harness = harness
    result = harness.run_result(
        user_message, run_id=str(uuid.uuid4()), session_id=None
    )
    self._last_run_file = harness.run_file
    return result

run

run(user_message: str) -> str

Run the agent and return the final text response.

Parameters:

Name Type Description Default
user_message str

The user prompt to send.

required

Returns:

Type Description
str

The model's final text response.

Raises:

Type Description
MaxTurnsExceeded

If the loop reaches max_turns.

RuntimeError

If the provider raises an exception.

Source code in data_harness/agent.py
def run(self, user_message: str) -> str:
    """Run the agent and return the final text response.

    Args:
        user_message: The user prompt to send.

    Returns:
        The model's final text response.

    Raises:
        MaxTurnsExceeded: If the loop reaches ``max_turns``.
        RuntimeError: If the provider raises an exception.
    """
    harness = self._make_harness()
    self._last_harness = harness
    result = harness.run(user_message)
    self._last_run_file = harness.run_file
    return result

explain

explain() -> str

Return a string showing the equivalent explicit Harness wiring.

Source code in data_harness/agent.py
def explain(self) -> str:
    """Return a string showing the equivalent explicit `Harness` wiring."""
    return _EXPLAIN_TEMPLATE.format(
        system=_truncate(self._system),
        max_turns=self._max_turns,
        run_dir=self._run_dir if self._run_dir is not None else "./runs",
    )

AgentSession

data_harness.AgentSession

AgentSession(agent: Agent)

Stateful chat session built from an Agent definition.

Agent.run() intentionally stays one-shot for examples and tests. Use Agent.session() when an application needs follow-up questions over the same message history and cache handles.

Source code in data_harness/agent.py
def __init__(self, agent: Agent) -> None:
    self._agent = agent
    self._cache = SessionCache(
        sample_size=agent.cache.sample_size,
        storage_dir=None,
        hot_limit=agent.cache.hot_limit,
    )
    for name, value in agent.cache.items():
        self._cache.put(name, _copy_cache_value(value))
    self._harness = agent._make_harness(cache=self._cache)
    self._id: str = str(uuid.uuid4())
    self._last_result: RunResult | None = None
    self._turns: int = 0

put

put(
    name: str, value: Any, *, overwrite: bool = False
) -> str

Store a value in the session cache and return the handle used.

Parameters:

Name Type Description Default
name str

Desired handle name. Must be a valid Python identifier.

required
value Any

Any Python object to store.

required
overwrite bool

Replace the existing handle if True.

False

Returns:

Type Description
str

The handle name under which the value was stored.

Source code in data_harness/agent.py
def put(self, name: str, value: Any, *, overwrite: bool = False) -> str:
    """Store a value in the session cache and return the handle used.

    Args:
        name: Desired handle name. Must be a valid Python identifier.
        value: Any Python object to store.
        overwrite: Replace the existing handle if ``True``.

    Returns:
        The handle name under which the value was stored.
    """
    return self._cache.put(name, value, overwrite=overwrite)

list_handles

list_handles() -> dict[str, str]

Return a mapping of all cache handle names to their snapshot strings.

Source code in data_harness/agent.py
def list_handles(self) -> dict[str, str]:
    """Return a mapping of all cache handle names to their snapshot strings."""
    return self._cache.list_handles()

ask_result

ask_result(user_message: str) -> RunResult

Send a follow-up message and return the full RunResult.

Parameters:

Name Type Description Default
user_message str

The follow-up user prompt.

required

Returns:

Type Description
RunResult

A RunResult for this turn sequence.

Source code in data_harness/agent.py
def ask_result(self, user_message: str) -> RunResult:
    """Send a follow-up message and return the full `RunResult`.

    Args:
        user_message: The follow-up user prompt.

    Returns:
        A `RunResult` for this turn sequence.
    """
    result = self._harness.ask_result(
        user_message, run_id=str(uuid.uuid4()), session_id=self._id
    )
    self._last_result = result
    self._turns += result.turns
    self._agent._last_harness = self._harness
    self._agent._last_run_file = self._harness.run_file
    return result

ask

ask(user_message: str) -> str

Send a follow-up message and return the final text response.

Parameters:

Name Type Description Default
user_message str

The follow-up user prompt.

required

Returns:

Type Description
str

The model's final text response.

Raises:

Type Description
MaxTurnsExceeded

If the loop reaches max_turns.

RuntimeError

If the provider raises an exception.

Source code in data_harness/agent.py
def ask(self, user_message: str) -> str:
    """Send a follow-up message and return the final text response.

    Args:
        user_message: The follow-up user prompt.

    Returns:
        The model's final text response.

    Raises:
        MaxTurnsExceeded: If the loop reaches ``max_turns``.
        RuntimeError: If the provider raises an exception.
    """
    result = self.ask_result(user_message)
    if result.status == "max_turns_exceeded":
        from data_harness.exceptions import MaxTurnsExceeded

        raise MaxTurnsExceeded(result.turns)
    if result.status == "error":
        raise RuntimeError(result.error or "unknown error")
    return result.text

AsyncAgent

data_harness.AsyncAgent

AsyncAgent(
    adapter: AsyncProviderAdapter,
    system: str,
    *,
    max_turns: int = 25,
    cache: SessionCache | None = None,
    run_dir: str | Path | None = None,
)

Async agent for use with AsyncProviderAdapter.

run() and run_result() are coroutines. run_stream() is an async generator that yields text tokens as they arrive from the provider. Use async_session() for multi-turn streaming conversations.

Source code in data_harness/agent.py
def __init__(
    self,
    adapter: AsyncProviderAdapter,
    system: str,
    *,
    max_turns: int = 25,
    cache: SessionCache | None = None,
    run_dir: str | Path | None = None,
) -> None:
    self._adapter = adapter
    self._system = system
    self._max_turns = max_turns
    self._cache = cache if cache is not None else SessionCache()
    self._run_dir = run_dir
    self._last_harness: AsyncHarness | None = None
    self._last_run_file: str | None = None
    self._connectors: dict[str, _ConnectorDefinition] = {}
    self._connector_tools: list[_ConnectorToolDefinition] = []
    self._planner_enabled = False

run_stream async

run_stream(
    user_message: str,
) -> AsyncGenerator[StreamEvent, None]

Stream events for a one-shot run.

Yields StreamEvent objects (message_start, content_block_*, message_delta, message_stop, tool_result) following the Claude Agent SDK protocol.

Usage::

async for event in agent.run_stream("hello"):
    if event.type == "content_block_delta":
        from data_harness.streaming import TextDelta
        if isinstance(event.delta, TextDelta):
            print(event.delta.text, end="", flush=True)
Source code in data_harness/agent.py
async def run_stream(self, user_message: str) -> AsyncGenerator[StreamEvent, None]:
    """Stream events for a one-shot run.

    Yields StreamEvent objects (message_start, content_block_*, message_delta,
    message_stop, tool_result) following the Claude Agent SDK protocol.

    Usage::

        async for event in agent.run_stream("hello"):
            if event.type == "content_block_delta":
                from data_harness.streaming import TextDelta
                if isinstance(event.delta, TextDelta):
                    print(event.delta.text, end="", flush=True)
    """
    harness = self._make_harness()
    self._last_harness = harness
    async for event in harness.run_stream(user_message):
        yield event
    self._last_run_file = harness.run_file

AsyncAgentSession

data_harness.AsyncAgentSession

AsyncAgentSession(agent: AsyncAgent)

Stateful async chat session built from an AsyncAgent definition.

Source code in data_harness/agent.py
def __init__(self, agent: AsyncAgent) -> None:
    self._agent = agent
    self._cache = SessionCache(
        sample_size=agent.cache.sample_size,
        storage_dir=None,
        hot_limit=agent.cache.hot_limit,
    )
    for name, value in agent.cache.items():
        self._cache.put(name, _copy_cache_value(value))
    self._harness = agent._make_harness(cache=self._cache)
    self._id: str = str(uuid.uuid4())
    self._last_result: RunResult | None = None
    self._turns: int = 0

ask_stream async

ask_stream(
    user_message: str,
) -> AsyncGenerator[StreamEvent, None]

Stream events for a follow-up turn.

Source code in data_harness/agent.py
async def ask_stream(self, user_message: str) -> AsyncGenerator[StreamEvent, None]:
    """Stream events for a follow-up turn."""
    async for event in self._harness.ask_stream(user_message):
        yield event
    self._agent._last_harness = self._harness
    self._agent._last_run_file = self._harness.run_file

RunResult

data_harness.RunResult dataclass

RunResult(
    text: str,
    status: Literal[
        "success", "max_turns_exceeded", "error"
    ],
    turns: int,
    run_file: str | None,
    stop_reason: StopReason | None,
    usage: Usage,
    cache_snapshots: dict[str, str] = dict(),
    cache_storage: dict[str, CacheStorageInfo] = dict(),
    error: str | None = None,
    run_id: str | None = None,
    session_id: str | None = None,
)

The complete outcome of a single Harness.run() or Agent.run() call.

Attributes:

Name Type Description
text str

The final text response from the model.

status Literal['success', 'max_turns_exceeded', 'error']

"success", "max_turns_exceeded", or "error".

turns int

Number of provider turns executed.

run_file str | None

Path to the JSONL log for this run, or None if logging was disabled.

stop_reason StopReason | None

Provider stop reason from the final turn, or None on error/max-turns.

usage Usage

Cumulative token counts across all turns.

cache_snapshots dict[str, str]

Mapping of handle name → compact snapshot string for every value in the session cache at the end of the run.

cache_storage dict[str, CacheStorageInfo]

Mapping of handle name → CacheStorageInfo describing where each handle is stored.

error str | None

Exception repr when status == "error", otherwise None.

run_id str | None

Optional UUID assigned by Agent; None when using Harness directly.

session_id str | None

Optional session UUID when the run is part of an AgentSession; None for one-shot runs.


Usage

data_harness.Usage dataclass

Usage(
    input_tokens: int = 0,
    output_tokens: int = 0,
    cache_read_tokens: int = 0,
    cache_write_tokens: int = 0,
)

Accumulated token counts for a run or session turn.

Attributes:

Name Type Description
input_tokens int

Tokens in the prompt sent to the provider.

output_tokens int

Tokens in the provider's response.

cache_read_tokens int

Prompt tokens served from the provider's KV cache.

cache_write_tokens int

Prompt tokens written to the provider's KV cache.


CacheStorageInfo

data_harness.CacheStorageInfo dataclass

CacheStorageInfo(
    location: Literal["memory", "disk"], storage_type: str
)

Where a named handle is physically stored in the SessionCache.

Attributes:

Name Type Description
location Literal['memory', 'disk']

Either "memory" (hot) or "disk" (spilled cold).

storage_type str

Format used on disk, e.g. "dataframe_parquet" or "numpy_npy". Always "memory" for hot entries.