Skip to content

Client

The Client is the main async interface for connecting to and interacting with ClickHouse.

It wraps the clickhouse_connect library and provides methods for executing queries, managing tables/views, and inserting data.


Creating a Client

Quick Start

Create and initialize a client in one step:

from pyclickhouse import create_async_client

client = await create_async_client()

This uses default settings or reads from environment variables.

Manual Initialization

For more control, create a Client instance and use it as an async context manager:

from pyclickhouse import Client

client = Client()

async with client:
    # execute queries
    result = await client.query("SELECT 1")

Configuration

Environment Variables

Configure ClickHouse connection using environment variables with the CLICKHOUSE_ prefix:

CLICKHOUSE_HOST=localhost
CLICKHOUSE_PORT=8123
CLICKHOUSE_DATABASE=default
CLICKHOUSE_USERNAME=default
CLICKHOUSE_PASSWORD=password
CLICKHOUSE_SECURE=false

Then create a client:

from pyclickhouse import create_async_client

# Automatically reads from environment variables
client = await create_async_client()

Explicit Configuration

Pass settings directly as keyword arguments:

from pyclickhouse import Client

client = Client(
    host="localhost",
    port=8123,
    database="default",
    username="default",
    password="password"
)

async with client:
    result = await client.query("SELECT 1")

Or use the Settings class:

from pyclickhouse import Client, Settings

settings = Settings(
    host="localhost",
    port=8123,
    database="default",
    username="default",
    password="password"
)

client = Client(**settings.model_dump())

async with client:
    result = await client.query("SELECT 1")

See Settings for all available configuration options including SSL, proxy, compression, and timeouts.


Basic Usage

Execute Raw SQL

async with client:
    result = await client.query("SELECT * FROM system.tables LIMIT 5")
    print(result)

Execute with Parameters

async with client:
    result = await client.query(
        "SELECT * FROM my_table WHERE id = {id:Int32}",
        parameters={"id": 42}
    )

Insert Data Directly

async with client:
    await client.insert(
        "INSERT INTO my_table VALUES",
        [(1, "Alice"), (2, "Bob")]
    )

Execute Commands

async with client:
    await client.command("CREATE DATABASE IF NOT EXISTS my_db")

Always use the client as an async context manager to ensure the connection is properly closed:

async with client:
    # connection is active here
    result = await client.query("SELECT 1")

# connection is automatically closed

Working with Tables

Create a Table

Define a Pydantic model and create a table from it:

from pydantic import BaseModel
from pyclickhouse import Admin, Table

class User(BaseModel):
    id: int
    name: str
    email: str

users = Table(model=User, name="users")

async with client:
    admin = Admin(client)
    await admin.create_table(users)

Insert Data with Writer

Use the Writer class for efficient batch inserts with validation:

from pyclickhouse import Writer

users_data = [
    User(id=1, name="Alice", email="alice@example.com"),
    User(id=2, name="Bob", email="bob@example.com"),
]

async with client:
    writer = Writer(client, users)
    async with writer:
        for user in users_data:
            await writer.insert(user)

Query Data with Reader

Use the Reader class to query and validate results:

from pyclickhouse import Reader

async with client:
    reader = Reader(client, users, model=User)
    results = await reader.query()

    for user in results:
        print(user)  # User objects with validation

Stream Results

For large result sets, stream data without loading everything into memory:

async with client:
    reader = Reader(client, users, model=User)
    results = await reader.stream()

    async for user in results:
        print(user)

Query Builder

Compose queries programmatically using the Query builder:

from pyclickhouse import Query, F

query = (
    Query(users)
    .filter(users.id > 10)
    .select(users.name, users.email)
    .sort(-users.id)
    .take(100)
)

async with client:
    result = await client.query(query.compile())

See Query Builder for more details on composing queries.


Complete Example

Here's a complete example combining all features:

from pydantic import BaseModel
from pyclickhouse import Admin, Client, F, Query, Reader, Table, Writer

class Event(BaseModel):
    name: str
    value: int

async def main():
    client = Client()

    # Define a table
    events = Table(model=Event, name="events")

    async with client:
        # Create table
        admin = Admin(client)
        await admin.create_table(events)

        # Insert data
        writer = Writer(client, events)
        async with writer:
            await writer.insert(Event(name="first", value=1))
            await writer.insert(Event(name="second", value=2))

        # Query data
        reader = Reader(client, events, model=Event)
        results = await reader.stream()
        async for event in results:
            print(event)

        # Aggregate with query builder
        query = Query(events).aggregate(total=F.sum(events.value))
        reader = Reader(client, query)
        result = await reader.query()
        print(result)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Error Handling

The client raises exceptions for connection and query errors:

from clickhouse_connect.exc import ClickHouseException

async with client:
    try:
        result = await client.query("SELECT * FROM nonexistent_table")
    except ClickHouseException as e:
        print(f"ClickHouse error: {e}")

Advanced Configuration

SSL/TLS

client = Client(
    host="clickhouse.example.com",
    secure=True,
    verify=True,  # Verify SSL certificate
    ca_cert="/path/to/ca.crt"
)

Proxy

client = Client(
    host="localhost",
    http_proxy="http://proxy.example.com:8080",
    https_proxy="https://proxy.example.com:8443"
)

Connection Pooling

client = Client(
    host="localhost",
    connector_limit=100,  # Max total connections
    connector_limit_per_host=20  # Max connections per host
)

Compression

client = Client(
    host="localhost",
    compress=True  # Enable compression for queries and responses
)

Timeouts

client = Client(
    host="localhost",
    connect_timeout=10,  # Connection timeout in seconds
    send_receive_timeout=300  # Query timeout in seconds
)

Learn more about the Client API, Admin, Query Builder, and other ORM features.