Skip to content

Harness

The core ReAct loop implementation. Agent is a convenience layer over Harness. Use Harness directly when you need explicit control over tool wiring — see examples/advanced_wiring.py for a complete example.


Harness

data_harness.loop.Harness

Harness(
    adapter: ProviderAdapter,
    system: str,
    tools: list[ToolSpec],
    max_turns: int = 25,
    run_dir: str = "./runs",
    cache: SessionCache | None = None,
)

The core synchronous ReAct loop.

Harness owns the message list, dispatches tools, applies suffix-only reminder hooks, and logs every turn to a JSONL file. It is the central implementation boundary in data-harness: everything above it (Agent, AgentSession) is a convenience layer; everything below it (ProviderAdapter, SessionCache, ToolSpec) is a pure dependency.

The system prompt is never mutated between turns. Reminders, nags, and dynamic state are always appended to the conversation suffix so the provider's KV cache is not invalidated.

For most use cases, prefer Agent over constructing Harness directly. Use Harness when you need full control over tool wiring, as shown in examples/advanced_wiring.py.

Parameters:

Name Type Description Default
adapter ProviderAdapter

Synchronous provider adapter that translates provider SDK objects into harness types.

required
system str

System prompt. Kept byte-identical across all turns.

required
tools list[ToolSpec]

Full tool list. Invisible tools (visible=False) are excluded from the provider call but can still be dispatched.

required
max_turns int

Hard cap on provider turns before the loop stops and returns a "max_turns_exceeded" result.

25
run_dir str

Directory where JSONL logs are written. Created on first run.

'./runs'
cache SessionCache | None

Shared SessionCache. A fresh cache is created if None.

None
Source code in data_harness/loop.py
def __init__(
    self,
    adapter: ProviderAdapter,
    system: str,
    tools: list[ToolSpec],
    max_turns: int = 25,
    run_dir: str = "./runs",
    cache: SessionCache | None = None,
) -> None:
    if max_turns < 1:
        raise ValueError(f"max_turns must be at least 1, got {max_turns!r}")
    self._adapter = adapter
    self._system = system
    self._tools = list(tools)
    self._max_turns = max_turns
    self._run_dir = run_dir
    self._cache = cache if cache is not None else SessionCache()
    self._messages: list[Message] = []
    self._reminders: list[Callable[[int, int], str | None]] = []
    self._run_file: str | None = None

run_file property

run_file: str | None

Path to the JSONL log for this run, or None before the first run.

register_reminder

register_reminder(
    hook: Callable[[int, int], str | None],
) -> None

Register a suffix reminder hook called before each provider turn.

The hook receives (current_turn, max_turns) and returns a reminder string to append to the conversation suffix, or None to skip.

Parameters:

Name Type Description Default
hook Callable[[int, int], str | None]

Callable with signature (turn: int, max_turns: int) -> str | None.

required
Source code in data_harness/loop.py
def register_reminder(self, hook: Callable[[int, int], str | None]) -> None:
    """Register a suffix reminder hook called before each provider turn.

    The hook receives ``(current_turn, max_turns)`` and returns a reminder
    string to append to the conversation suffix, or ``None`` to skip.

    Args:
        hook: Callable with signature ``(turn: int, max_turns: int) -> str | None``.
    """
    self._reminders.append(hook)

run_result

run_result(
    user_message: str,
    *,
    run_id: str | None = None,
    session_id: str | None = None,
) -> RunResult

Start a fresh run and return the full RunResult.

Resets message history. Use ask_result for follow-up turns on the same history.

Parameters:

Name Type Description Default
user_message str

The initial user prompt.

required
run_id str | None

Optional identifier stamped into the RunResult.

None
session_id str | None

Optional session identifier stamped into the RunResult.

None

Returns:

Type Description
RunResult

A RunResult describing the outcome, token usage, and cache state.

Source code in data_harness/loop.py
def run_result(
    self,
    user_message: str,
    *,
    run_id: str | None = None,
    session_id: str | None = None,
) -> RunResult:
    """Start a fresh run and return the full `RunResult`.

    Resets message history. Use `ask_result` for follow-up turns on the
    same history.

    Args:
        user_message: The initial user prompt.
        run_id: Optional identifier stamped into the `RunResult`.
        session_id: Optional session identifier stamped into the `RunResult`.

    Returns:
        A `RunResult` describing the outcome, token usage, and cache state.
    """
    self._run_file = setup_logger(self._run_dir)
    self._messages = [Message(role="user", content=[TextBlock(text=user_message)])]
    result = self._run_loop_result()
    return dataclasses.replace(result, run_id=run_id, session_id=session_id)

ask_result

ask_result(
    user_message: str,
    *,
    run_id: str | None = None,
    session_id: str | None = None,
) -> RunResult

Append a follow-up message and continue the existing run.

Appends user_message to the current history without resetting it. Useful for multi-turn sessions when driving Harness directly.

Parameters:

Name Type Description Default
user_message str

The follow-up user prompt.

required
run_id str | None

Optional identifier stamped into the RunResult.

None
session_id str | None

Optional session identifier stamped into the RunResult.

None

Returns:

Type Description
RunResult

A RunResult describing the outcome of this turn sequence.

Source code in data_harness/loop.py
def ask_result(
    self,
    user_message: str,
    *,
    run_id: str | None = None,
    session_id: str | None = None,
) -> RunResult:
    """Append a follow-up message and continue the existing run.

    Appends ``user_message`` to the current history without resetting it.
    Useful for multi-turn sessions when driving `Harness` directly.

    Args:
        user_message: The follow-up user prompt.
        run_id: Optional identifier stamped into the `RunResult`.
        session_id: Optional session identifier stamped into the `RunResult`.

    Returns:
        A `RunResult` describing the outcome of this turn sequence.
    """
    if self._run_file is None:
        self._run_file = setup_logger(self._run_dir)
    self._messages.append(
        Message(role="user", content=[TextBlock(text=user_message)])
    )
    result = self._run_loop_result()
    return dataclasses.replace(result, run_id=run_id, session_id=session_id)

run

run(user_message: str) -> str

Start a fresh run and return the final text response.

Raises MaxTurnsExceeded if the loop hits max_turns.

Parameters:

Name Type Description Default
user_message str

The initial user prompt.

required

Returns:

Type Description
str

The model's final text response.

Raises:

Type Description
MaxTurnsExceeded

If the loop reaches max_turns without stopping.

RuntimeError

If the provider raises an exception during the run.

Source code in data_harness/loop.py
def run(self, user_message: str) -> str:
    """Start a fresh run and return the final text response.

    Raises `MaxTurnsExceeded` if the loop hits ``max_turns``.

    Args:
        user_message: The initial user prompt.

    Returns:
        The model's final text response.

    Raises:
        MaxTurnsExceeded: If the loop reaches ``max_turns`` without stopping.
        RuntimeError: If the provider raises an exception during the run.
    """
    result = self.run_result(user_message)
    if result.status == "max_turns_exceeded":
        raise MaxTurnsExceeded(result.turns)
    if result.status == "error":
        raise RuntimeError(result.error or "unknown error")
    return result.text

ask

ask(user_message: str) -> str

Append a follow-up message and return the final text response.

Parameters:

Name Type Description Default
user_message str

The follow-up user prompt.

required

Returns:

Type Description
str

The model's final text response.

Raises:

Type Description
MaxTurnsExceeded

If the loop reaches max_turns without stopping.

RuntimeError

If the provider raises an exception during the run.

Source code in data_harness/loop.py
def ask(self, user_message: str) -> str:
    """Append a follow-up message and return the final text response.

    Args:
        user_message: The follow-up user prompt.

    Returns:
        The model's final text response.

    Raises:
        MaxTurnsExceeded: If the loop reaches ``max_turns`` without stopping.
        RuntimeError: If the provider raises an exception during the run.
    """
    result = self.ask_result(user_message)
    if result.status == "max_turns_exceeded":
        raise MaxTurnsExceeded(result.turns)
    if result.status == "error":
        raise RuntimeError(result.error or "unknown error")
    return result.text

AsyncHarness

data_harness.AsyncHarness

AsyncHarness(
    adapter: AsyncProviderAdapter,
    system: str,
    tools: list[ToolSpec],
    max_turns: int = 25,
    run_dir: str = "./runs",
    cache: SessionCache | None = None,
)

Async variant of Harness. Requires an AsyncProviderAdapter.

Exposes the same run_result / ask_result / run / ask surface as Harness, plus run_stream / ask_stream for token-level streaming.

Source code in data_harness/loop.py
def __init__(
    self,
    adapter: AsyncProviderAdapter,
    system: str,
    tools: list[ToolSpec],
    max_turns: int = 25,
    run_dir: str = "./runs",
    cache: SessionCache | None = None,
) -> None:
    if max_turns < 1:
        raise ValueError(f"max_turns must be at least 1, got {max_turns!r}")
    self._adapter = adapter
    self._system = system
    self._tools = list(tools)
    self._max_turns = max_turns
    self._run_dir = run_dir
    self._cache = cache if cache is not None else SessionCache()
    self._messages: list[Message] = []
    self._reminders: list[Callable[[int, int], str | None]] = []
    self._run_file: str | None = None

run_stream async

run_stream(
    user_message: str,
) -> AsyncGenerator[StreamEvent, None]

Stream events for a one-shot run.

Yields StreamEvent objects following the same protocol as the Claude Agent SDK. Each provider turn emits message_start, content_block_start/delta/stop, message_delta, and message_stop events. After the harness dispatches a tool call a ToolResultEvent is emitted. The JSONL logger records fully assembled messages, not individual events.

Source code in data_harness/loop.py
async def run_stream(self, user_message: str) -> AsyncGenerator[StreamEvent, None]:
    """Stream events for a one-shot run.

    Yields StreamEvent objects following the same protocol as the Claude
    Agent SDK.  Each provider turn emits message_start,
    content_block_start/delta/stop, message_delta, and message_stop events.
    After the harness dispatches a tool call a ToolResultEvent is emitted.
    The JSONL logger records fully assembled messages, not individual events.
    """
    self._run_file = setup_logger(self._run_dir)
    self._messages = [Message(role="user", content=[TextBlock(text=user_message)])]
    async for event in self._run_loop_stream():
        yield event

ask_stream async

ask_stream(
    user_message: str,
) -> AsyncGenerator[StreamEvent, None]

Stream events for a follow-up turn in a session.

Source code in data_harness/loop.py
async def ask_stream(self, user_message: str) -> AsyncGenerator[StreamEvent, None]:
    """Stream events for a follow-up turn in a session."""
    if self._run_file is None:
        self._run_file = setup_logger(self._run_dir)
    self._messages.append(
        Message(role="user", content=[TextBlock(text=user_message)])
    )
    async for event in self._run_loop_stream():
        yield event