Skip to content

Reader

The Reader class executes queries against ClickHouse and returns results with optional Pydantic model validation. It supports both eager evaluation (loading all results at once) and streaming (for large result sets).


Creating a Reader

From a Table

Create a reader from a table definition:

from pyclickhouse import Reader, Client, Table
from pydantic import BaseModel

class Event(BaseModel):
    id: int
    name: str
    value: float

events = Table(model=Event, name="events")

client = Client()

async with client:
    reader = Reader(client, events)
    results = await reader.query()

From a Query

Use a Query builder result:

from pyclickhouse import Query

query = Query(events).filter(events.value > 10)

async with client:
    reader = Reader(client, query)
    results = await reader.query()

From Raw SQL

Pass a raw SQL string:

async with client:
    reader = Reader(client, "SELECT * FROM events WHERE id > 10")
    results = await reader.query()

With Result Model Validation

Automatically validate results against a Pydantic model:

async with client:
    reader = Reader(client, events, model=Event)
    results = await reader.query()

    for result in results:
        # Each result is an Event instance
        print(result.name, result.value)

With Parameter Model

Define a Pydantic model for query parameter validation. Pass the model class (not an instance) when creating the Reader:

from pydantic import BaseModel

class EventFilter(BaseModel):
    min_value: float
    max_value: float

async with client:
    # Pass the class, not an instance
    reader = Reader(
        client,
        "SELECT * FROM events WHERE value BETWEEN {min_value:Float32} AND {max_value:Float32}",
        parameters=EventFilter
    )

Query Execution

Load All Results at Once

Use query() to execute and return all results as a list:

async with client:
    reader = Reader(client, events)
    results = await reader.query()

    print(f"Got {len(results)} results")
    for event in results:
        print(event)

Parameter Handling

Parameters passed to query() or stream() can be either: - A dict[str, Any] - raw dictionary with parameter values - A Pydantic model instance - already validated model instance

Using Dictionary Parameters

Pass parameters as a simple dictionary:

async with client:
    reader = Reader(
        client,
        "SELECT * FROM events WHERE value > {min_val:Float32}"
    )
    # Pass dict with parameter values
    results = await reader.query(
        parameters={"min_val": 10.0}
    )

Using Pydantic Model Instance

Pass an already-instantiated Pydantic model as parameters:

class EventFilter(BaseModel):
    min_value: float

async with client:
    reader = Reader(
        client,
        "SELECT * FROM events WHERE value > {min_value:Float32}"
    )
    # Pass a model instance
    results = await reader.query(
        parameters=EventFilter(min_value=10.0)
    )

Parameter Validation

Without Parameter Model

If no parameter model was specified in Reader, parameters are used as-is:

async with client:
    reader = Reader(
        client,
        "SELECT * FROM events WHERE value > {min_val:Float32}"
    )

    # Both dict and model instance work - no validation
    results = await reader.query(parameters={"min_val": 10.0})

    # Or pass a model instance
    class Filter(BaseModel):
        min_val: float

    results = await reader.query(parameters=Filter(min_val=20.0))

With Parameter Model

When a parameter model class is specified in Reader, parameters are validated against it:

class EventFilter(BaseModel):
    min_value: float
    max_value: float

async with client:
    reader = Reader(
        client,
        "SELECT * FROM events WHERE value BETWEEN {min_value:Float32} AND {max_value:Float32}",
        parameters=EventFilter  # Specify model class
    )

    # Option 1: Pass a dict - validated as EventFilter
    results = await reader.query(
        parameters={"min_value": 10.0, "max_value": 100.0}
    )

    # Option 2: Pass a model instance - must be EventFilter
    results = await reader.query(
        parameters=EventFilter(min_value=10.0, max_value=100.0)
    )

    # Option 3: Wrong type - raises TypeError
    class WrongFilter(BaseModel):
        min_value: float

    try:
        results = await reader.query(
            parameters=WrongFilter(min_value=10.0)  # TypeError!
        )
    except TypeError as e:
        print(f"Wrong model type: {e}")

Streaming Results

Stream Large Result Sets

Use stream() to iterate over results without loading everything into memory. Results from stream() are iterated using async for:

async with client:
    reader = Reader(client, events, model=Event)
    results = await reader.stream()

    async for event in results:
        print(event)

Stream with Dictionary Parameters

async with client:
    reader = Reader(client, events, model=Event)
    results = await reader.stream(
        parameters={"min_value": 10.0}
    )

    async for event in results:
        process(event)

Stream with Parameter Model

class EventFilter(BaseModel):
    min_value: float

async with client:
    reader = Reader(
        client,
        events,
        model=Event,
        parameters=EventFilter
    )

    # Pass dict - validated as EventFilter
    results = await reader.stream(
        parameters={"min_value": 10.0}
    )

    async for event in results:
        process(event)

Or pass a model instance:

async with client:
    reader = Reader(
        client,
        events,
        model=Event,
        parameters=EventFilter
    )

    # Pass model instance - must be EventFilter
    results = await reader.stream(
        parameters=EventFilter(min_value=10.0)
    )

    async for event in results:
        process(event)

Configure Block Size

Control the amount of data loaded at once when streaming:

async with client:
    reader = Reader(
        client,
        events,
        model=Event,
        max_block_size=100000  # Larger blocks = more memory
    )
    results = await reader.stream()

    async for event in results:
        print(event)

Result Model Validation

Automatic Deserialization

Reader automatically validates and deserializes rows:

class Event(BaseModel):
    id: int
    name: str
    timestamp: str
    value: float

async with client:
    reader = Reader(client, "SELECT * FROM events", model=Event)
    results = await reader.query()

    # All results are Event instances
    for event in results:
        assert isinstance(event, Event)
        print(f"Event {event.id}: {event.name}")

Without Model Validation

Get raw dictionaries without validation:

async with client:
    reader = Reader(client, "SELECT * FROM events")
    results = await reader.query()

    for row in results:
        # row is a dict
        print(row["id"], row["name"])

Advanced Configuration

Custom Settings

Pass ClickHouse-specific settings:

async with client:
    reader = Reader(
        client,
        events,
        model=Event,
        settings={
            "max_rows_to_read": 1000000,
            "read_overflow_mode": "throw",
            "max_execution_time": 300
        }
    )

Transport Settings

Configure transport behavior:

async with client:
    reader = Reader(
        client,
        events,
        model=Event,
        transport_settings={
            "connection_timeout": 10,
            "max_retries": 3
        }
    )

Progress Tracking

Monitor Rows Read During Streaming

Track the number of rows read:

async with client:
    reader = Reader(client, events, model=Event)
    results = await reader.stream()

    async for event in results:
        if reader.read_rows % 1000 == 0:
            print(f"Read {reader.read_rows} rows...")

Common Patterns

Filter and Count

from pyclickhouse import Query

async with client:
    query = Query(events).filter(events.value > 50)
    reader = Reader(client, query)
    results = await reader.query()

    print(f"Found {len(results)} high-value events")

Aggregate Data

from pyclickhouse import Query, F

async with client:
    query = Query(events).group(events.name).aggregate(
        total=F.sum(events.value),
        count=F.count()
    )
    reader = Reader(client, query)
    results = await reader.query()

    for result in results:
        print(f"{result['name']}: {result['total']} ({result['count']} events)")

Process Large Result Set in Batches

async with client:
    reader = Reader(client, events, model=Event)
    results = await reader.stream()

    batch = []
    async for event in results:
        batch.append(event)

        if len(batch) >= 100:
            await process_batch(batch)
            batch = []

    # Process remaining
    if batch:
        await process_batch(batch)

Stream with Filter Parameters

async with client:
    query = Query(events).filter(
        (events.timestamp >= "{start_date:String}") & 
        (events.timestamp <= "{end_date:String}")
    )

    reader = Reader(
        client,
        query,
        model=Event,
        parameters=DateRange
    )

    results = await reader.query(
        parameters=DateRange(start_date="2024-01-01", end_date="2024-01-31")
    )

    for event in results:
        print(event)

Multiple Readers in Parallel

import asyncio

async with client:
    reader1 = Reader(client, users, model=User)
    reader2 = Reader(client, events, model=Event)

    users_results, events_results = await asyncio.gather(
        reader1.query(),
        reader2.query()
    )

Error Handling

Handle Query Errors

from clickhouse_connect.exc import ClickHouseException

async with client:
    reader = Reader(client, "SELECT * FROM nonexistent")
    try:
        results = await reader.query()
    except ClickHouseException as e:
        print(f"Query failed: {e}")

Handle Parameter Validation Errors

from pydantic import ValidationError

class EventFilter(BaseModel):
    min_value: float

async with client:
    reader = Reader(
        client,
        "SELECT * FROM events WHERE value > {min_value:Float32}",
        parameters=EventFilter
    )

    try:
        # This dict is missing required field - validation fails
        results = await reader.query(
            parameters={"invalid_field": 10.0}
        )
    except (ValidationError, TypeError) as e:
        print(f"Invalid parameters: {e}")

Handle Result Validation Errors

from pydantic import ValidationError

async with client:
    reader = Reader(client, "SELECT * FROM events", model=Event)
    try:
        results = await reader.query()
    except ValidationError as e:
        print(f"Result validation failed: {e}")

Handle Streaming Errors

from clickhouse_connect.exc import ClickHouseException

async with client:
    reader = Reader(client, events, model=Event)
    try:
        results = await reader.stream()
        async for event in results:
            print(event)
    except ClickHouseException as e:
        print(f"Stream failed: {e}")

Complete Example

Here's a complete example combining query building, parameter validation, and both query and streaming patterns:

from pydantic import BaseModel
from pyclickhouse import Client, Reader, Query, Table, F

class Event(BaseModel):
    timestamp: str
    event_name: str
    value: float

class DateRange(BaseModel):
    start_date: str
    end_date: str

events = Table(model=Event, name="events")

async def analyze_events():
    client = Client()

    async with client:
        # Simple query - load all results
        reader = Reader(client, events, model=Event)
        all_events = await reader.query()
        print(f"Total events: {len(all_events)}")

        # Filtered query with parameter model
        filtered_reader = Reader(
            client,
            "SELECT * FROM events WHERE timestamp BETWEEN {start_date:String} AND {end_date:String}",
            model=Event,
            parameters=DateRange
        )
        # Parameters are validated against DateRange
        filtered = await filtered_reader.query(
            parameters=DateRange(start_date="2024-01-01", end_date="2024-12-31")
        )
        print(f"Events in 2024: {len(filtered)}")

        # Aggregation query
        agg_query = Query(events).group(events.event_name).aggregate(
            total=F.sum(events.value),
            count=F.count()
        )
        agg_reader = Reader(client, agg_query)
        stats = await agg_reader.query()

        for stat in stats:
            avg = stat['total'] / stat['count']
            print(f"{stat['event_name']}: total={stat['total']}, avg={avg:.2f}")

        # Stream large result set
        print("\nStreaming high-value events:")
        stream_query = Query(events).filter(events.value > 1000)
        stream_reader = Reader(
            client,
            stream_query,
            model=Event,
            max_block_size=50000
        )
        results = await stream_reader.stream()

        count = 0
        async for event in results:
            count += 1
            if count <= 5:
                print(f"  {event.event_name}: {event.value}")

        print(f"Total high-value events: {stream_reader.read_rows}")

if __name__ == "__main__":
    import asyncio
    asyncio.run(analyze_events())

Choosing Between query() and stream()

Use query() when:

  • You need all results at once
  • The result set is small to medium sized
  • You need to know the total count
  • Random access to results is needed
  • Results fit comfortably in memory
# Load all results into a list
async with client:
    reader = Reader(client, Query(events).filter(events.id < 1000))
    results = await reader.query()
    print(f"Got {len(results)} results")

Use stream() when:

  • Processing large result sets
  • Memory usage is a concern
  • Processing rows one at a time
  • Real-time processing as data arrives
  • You only need to iterate once
  • Processing data immediately without storing
# Stream results without loading everything
async with client:
    reader = Reader(client, Query(events))
    results = await reader.stream()
    async for event in results:
        await process_event(event)

Learn more about the Reader API, Query Builder, and Tables.