Skip to content

Providers

Provider adapters translate provider SDK objects into data-harness's normalised types. The harness never imports provider SDK classes directly.


ProviderAdapter

data_harness.ProviderAdapter

Bases: ABC

Synchronous provider adapter interface.

Implement chat and format_cache_control to integrate a new model provider. The harness calls chat once per turn and never touches any provider SDK objects directly.

chat abstractmethod

chat(
    system: str,
    messages: list[Message],
    tools: list[ToolSpec],
) -> NormalizedResponse

Send one turn to the provider and return a normalised response.

Parameters:

Name Type Description Default
system str

The system prompt (must be prefix-stable across turns).

required
messages list[Message]

Full conversation history up to and including the latest user message.

required
tools list[ToolSpec]

Only the currently visible ToolSpec instances.

required

Returns:

Type Description
NormalizedResponse

A NormalizedResponse with token counts and assembled content.

Source code in data_harness/providers/base.py
@abstractmethod
def chat(
    self,
    system: str,
    messages: list[Message],
    tools: list[ToolSpec],
) -> NormalizedResponse:
    """Send one turn to the provider and return a normalised response.

    Args:
        system: The system prompt (must be prefix-stable across turns).
        messages: Full conversation history up to and including the latest
            user message.
        tools: Only the currently visible `ToolSpec` instances.

    Returns:
        A `NormalizedResponse` with token counts and assembled content.
    """
    ...

format_cache_control abstractmethod

format_cache_control(obj: dict) -> dict

Attach provider-specific cache-control metadata to a content object.

Source code in data_harness/providers/base.py
@abstractmethod
def format_cache_control(self, obj: dict) -> dict:
    """Attach provider-specific cache-control metadata to a content object."""
    ...

AsyncProviderAdapter

data_harness.AsyncProviderAdapter

Bases: ABC

Asynchronous provider adapter with optional token-level streaming.

Implement chat and format_cache_control. Override stream_events to emit real token-level StreamEvent objects; the default implementation synthesises events from the assembled chat response.

chat abstractmethod async

chat(
    system: str,
    messages: list[Message],
    tools: list[ToolSpec],
) -> NormalizedResponse

Send one turn to the provider and return a normalised response.

Parameters:

Name Type Description Default
system str

The system prompt (must be prefix-stable across turns).

required
messages list[Message]

Full conversation history up to and including the latest user message.

required
tools list[ToolSpec]

Only the currently visible ToolSpec instances.

required

Returns:

Type Description
NormalizedResponse

A NormalizedResponse with token counts and assembled content.

Source code in data_harness/providers/base.py
@abstractmethod
async def chat(
    self,
    system: str,
    messages: list[Message],
    tools: list[ToolSpec],
) -> NormalizedResponse:
    """Send one turn to the provider and return a normalised response.

    Args:
        system: The system prompt (must be prefix-stable across turns).
        messages: Full conversation history up to and including the latest
            user message.
        tools: Only the currently visible `ToolSpec` instances.

    Returns:
        A `NormalizedResponse` with token counts and assembled content.
    """
    ...

stream_events async

stream_events(
    system: str,
    messages: list[Message],
    tools: list[ToolSpec],
) -> AsyncGenerator[StreamEvent, None]

Yield stream events for one provider turn.

The default implementation calls chat() and synthesises the six standard event types from the assembled response. Override in provider subclasses to emit real token-level events.

Source code in data_harness/providers/base.py
async def stream_events(
    self,
    system: str,
    messages: list[Message],
    tools: list[ToolSpec],
) -> AsyncGenerator[StreamEvent, None]:
    """Yield stream events for one provider turn.

    The default implementation calls chat() and synthesises the six
    standard event types from the assembled response.  Override in
    provider subclasses to emit real token-level events.
    """
    from data_harness.streaming import (
        ContentBlockDeltaEvent,
        ContentBlockStartEvent,
        ContentBlockStopEvent,
        InputJSONDelta,
        MessageDeltaEvent,
        MessageStartEvent,
        MessageStopEvent,
        TextDelta,
    )

    response = await self.chat(system, messages, tools)
    yield MessageStartEvent()
    for i, block in enumerate(response.content):
        if isinstance(block, TextBlock):
            yield ContentBlockStartEvent(index=i, content_block=TextBlock(text=""))
            yield ContentBlockDeltaEvent(index=i, delta=TextDelta(text=block.text))
            yield ContentBlockStopEvent(index=i)
        elif isinstance(block, ToolUseBlock):
            yield ContentBlockStartEvent(
                index=i,
                content_block=ToolUseBlock(
                    tool_use_id=block.tool_use_id,
                    tool_name=block.tool_name,
                    tool_input={},
                ),
            )
            yield ContentBlockDeltaEvent(
                index=i,
                delta=InputJSONDelta(partial_json=json.dumps(block.tool_input)),
            )
            yield ContentBlockStopEvent(index=i)
    yield MessageDeltaEvent(
        stop_reason=response.stop_reason,
        input_tokens=response.input_tokens,
        output_tokens=response.output_tokens,
        cache_read_tokens=response.cache_read_tokens,
        cache_write_tokens=response.cache_write_tokens,
    )
    yield MessageStopEvent()

stream async

stream(
    system: str,
    messages: list[Message],
    tools: list[ToolSpec],
    *,
    on_chunk: Callable[[str], Awaitable[None]],
) -> NormalizedResponse

Backward-compat text-only streaming; calls stream_events() internally.

Source code in data_harness/providers/base.py
async def stream(
    self,
    system: str,
    messages: list[Message],
    tools: list[ToolSpec],
    *,
    on_chunk: Callable[[str], Awaitable[None]],
) -> NormalizedResponse:
    """Backward-compat text-only streaming; calls stream_events() internally."""
    from data_harness.streaming import (
        ContentBlockDeltaEvent,
        TextDelta,
        accumulate_stream_events,
    )

    events = []
    async for evt in self.stream_events(system, messages, tools):
        events.append(evt)
        if isinstance(evt, ContentBlockDeltaEvent) and isinstance(
            evt.delta, TextDelta
        ):
            await on_chunk(evt.delta.text)
    return accumulate_stream_events(events)

NormalizedResponse

data_harness.NormalizedResponse dataclass

NormalizedResponse(
    stop_reason: StopReason,
    content: list[ContentBlock],
    input_tokens: int,
    output_tokens: int,
    cache_read_tokens: int,
    cache_write_tokens: int,
)

Provider-normalised response from a single chat call.

Adapters translate provider-specific response objects into this type so that the harness never touches provider SDK classes directly.

Attributes:

Name Type Description
stop_reason StopReason

Why generation stopped.

content list[ContentBlock]

Ordered list of TextBlock and ToolUseBlock items.

input_tokens int

Prompt tokens billed by the provider.

output_tokens int

Completion tokens billed by the provider.

cache_read_tokens int

Tokens served from the provider's prompt cache.

cache_write_tokens int

Tokens written to the provider's prompt cache.


StopReason

data_harness.StopReason

Bases: Enum

Why the provider ended the current generation turn.

Attributes:

Name Type Description
END_TURN

The model produced a complete response with no tool calls.

TOOL_USE

The model emitted one or more tool-use blocks.

MAX_TOKENS

The response was truncated at the token limit.

STOP_SEQUENCE

A stop sequence in the prompt was matched.


Built-in adapters

AnthropicAdapter

data_harness.providers.anthropic.AnthropicAdapter

AnthropicAdapter(
    model: str = "claude-sonnet-4-6", max_tokens: int = 8096
)

Bases: _AnthropicHelpers, ProviderAdapter

Source code in data_harness/providers/anthropic.py
def __init__(
    self, model: str = "claude-sonnet-4-6", max_tokens: int = 8096
) -> None:
    self._model = model
    self._max_tokens = max_tokens
    self._client = anthropic.Anthropic()

OpenAIAdapter

data_harness.providers.openai.OpenAIAdapter

OpenAIAdapter(
    model: str = "gpt-4o-mini", max_tokens: int = 4096
)

Bases: _OpenAIHelpers, ProviderAdapter

Source code in data_harness/providers/openai.py
def __init__(self, model: str = "gpt-4o-mini", max_tokens: int = 4096) -> None:
    self._model = model
    self._max_tokens = max_tokens
    self._client = openai.OpenAI()