Table
The Table class represents a ClickHouse table definition backed by a Pydantic model. It automatically maps Python types to ClickHouse column types and provides a type-safe way to define table schemas.
Creating a Table
From a Pydantic Model
Create a table from a Pydantic model:
from pydantic import BaseModel
from pyclickhouse import Table
class User(BaseModel):
id: int
name: str
email: str
users = Table(model=User)
By default, the table name is the snake_case version of the model name (user). Column names and types are automatically derived from the model fields.
Customizing the Table Name
With a Custom Engine
Specify the table engine using the engines module:
from pyclickhouse import Table, engines
users = Table(
model=User,
name="users",
engine=engines.MergeTree(
order_by="id",
partition_by="toYYYYMM(updated_at)"
)
)
Or use a string:
Available Engines
Common engines include:
from pyclickhouse import engines
# MergeTree family
engines.MergeTree(order_by="id")
engines.ReplacingMergeTree(order_by="id", ver="version")
engines.AggregatingMergeTree(order_by="id")
engines.CollapsingMergeTree(order_by="id", sign="sign")
engines.VersionedCollapsingMergeTree(order_by="id", sign="sign", version="version")
# Other engines
engines.Memory()
engines.Kafka(...)
engines.PostgreSQL(...)
With Comments
Add a comment to the table:
Columns
Automatic Column Detection
Columns are automatically created from Pydantic model fields:
class Event(BaseModel):
id: int # Maps to Int64
name: str # Maps to String
value: float # Maps to Float64
timestamp: str # Maps to String
events = Table(model=Event)
# Access columns via the table
columns = events.get_columns()
for col_name, col in columns.items():
print(f"{col_name}: {col.type}")
Customizing Columns with Annotated
Use typing.Annotated to customize column properties:
from typing import Annotated
from pydantic import BaseModel
from pyclickhouse import Table, Column
class Event(BaseModel):
id: int
event_name: Annotated[str, Column(comment="Name of the event")]
value: Annotated[float, Column(comment="Event value")]
events = Table(model=Event)
Customizing Columns with Annotated and Field
Combine Annotated with pydantic.Field for full control:
from typing import Annotated
from pydantic import BaseModel, Field
from pyclickhouse import Table, Column
class Event(BaseModel):
id: int = Field(description="Event ID")
timestamp: Annotated[str, Column(
comment="Event timestamp",
codec_expression="ZSTD(1)",
ttl_expression="timestamp + INTERVAL 30 DAY"
)] = Field(description="When the event occurred")
event_name: Annotated[str, Column(comment="Event name")]
value: Annotated[float, Column(comment="Event value")]
events = Table(model=Event)
Column with All Properties
Add comments, codecs, and TTL to columns:
from typing import Annotated
from pydantic import BaseModel, Field
from pyclickhouse import Column, Table
class Event(BaseModel):
timestamp: Annotated[str, Column(
comment="Event timestamp",
codec_expression="ZSTD(1)",
ttl_expression="timestamp + INTERVAL 30 DAY",
default_type="DEFAULT",
default_expression="now()"
)] = Field(description="When the event occurred")
event_name: str
value: float
events = Table(model=Event)
Column Properties Reference
The Column class accepts the following properties:
name: Column name (auto-detected from field name)type: ClickHouse type (auto-detected from Python type)comment: Description of the columncodec_expression: Compression codec (e.g., "ZSTD(1)")ttl_expression: TTL (Time To Live) expressiondefault_type: DEFAULT or MATERIALIZEDdefault_expression: Default value expression
Custom Column Names and Types
Use the name parameter to use a different column name in the database than in your Python model:
from typing import Annotated
from pydantic import BaseModel
from pyclickhouse import Table, Column
class User(BaseModel):
user_id: Annotated[int, Column(name="id")] # Python field "user_id" -> DB column "id"
full_name: Annotated[str, Column(name="name")] # Python field "full_name" -> DB column "name"
email_address: Annotated[str, Column(name="email")]
users = Table(model=User)
# The table will have columns: id, name, email (not user_id, full_name, email_address)
Use the type parameter to specify a custom ClickHouse type:
from typing import Annotated
from pydantic import BaseModel
from pyclickhouse import Table, Column
class Product(BaseModel):
id: int
price: Annotated[float, Column(type="Decimal(10, 2)")] # Custom Decimal with precision
description: Annotated[str, Column(type="String")] # Explicit String type
tags: Annotated[list[str], Column(type="Array(String)")] # Explicit Array type
products = Table(model=Product)
Combine custom name and type:
from typing import Annotated
from pydantic import BaseModel
from pyclickhouse import Table, Column
class Order(BaseModel):
order_id: Annotated[int, Column(
name="id",
type="UInt32" # Use UInt32 instead of Int64
)]
order_amount: Annotated[float, Column(
name="amount",
type="Decimal(18, 4)",
comment="Order total in currency"
)]
order_date: Annotated[str, Column(
name="created_date",
type="Date",
comment="Date order was placed"
)]
orders = Table(model=Order)
# The table will have columns: id (UInt32), amount (Decimal(18, 4)), created_date (Date)
Advanced Column Configuration
Combine all Column parameters for full control:
from typing import Annotated
from pydantic import BaseModel, Field
from pyclickhouse import Table, Column
class Analytics(BaseModel):
metric_id: Annotated[int, Column(
name="id",
type="UInt64",
comment="Unique metric identifier"
)]
metric_value: Annotated[float, Column(
name="value",
type="Float32", # Use Float32 instead of Float64 to save space
comment="Metric measurement value",
codec_expression="ZSTD(3)", # Compression codec
default_type="DEFAULT",
default_expression="0.0"
)]
timestamp: Annotated[str, Column(
name="recorded_at",
type="DateTime",
comment="When metric was recorded",
ttl_expression="recorded_at + INTERVAL 1 YEAR" # Auto-delete after 1 year
)]
dimensions: Annotated[dict, Column(
name="tags",
type="Map(String, String)",
comment="Dimension tags"
)]
analytics = Table(model=Analytics)
Getting Column Information
events = Table(model=Event)
# Get all columns
columns = events.get_columns()
# Iterate over columns
for col_name, col in columns.items():
print(f"Column: {col_name}")
print(f" Type: {col.type}")
print(f" Comment: {col.comment}")
print(f" DB Name: {col.name}") # Shows the actual database column name
Custom Data Types for Columns
Using Optional Types
Create nullable columns using Optional or Union[Type, None]:
from typing import Optional, Annotated
from pydantic import BaseModel
from pyclickhouse import Table, Column
class Product(BaseModel):
id: int
name: str
description: Optional[str] = None
tags: Optional[list[str]] = None
price: float
discount: Optional[float] = None
products = Table(model=Product)
# Columns will be:
# - id: Int64
# - name: String
# - description: Nullable(String)
# - tags: Nullable(Array(String))
# - price: Float64
# - discount: Nullable(Float64)
Using Array Types
Store collections of data:
from typing import List, Annotated
from pydantic import BaseModel
from pyclickhouse import Table, Column
class BlogPost(BaseModel):
id: int
title: str
tags: List[str]
categories: Annotated[List[str], Column(comment="Content categories")]
ratings: List[float]
comments_count: int
posts = Table(model=BlogPost)
# Columns will be:
# - tags: Array(String)
# - categories: Array(String)
# - ratings: Array(Float64)
Using Tuple Types
Store fixed-size structured data:
from typing import Tuple, Annotated
from pydantic import BaseModel
from pyclickhouse import Table, Column
class LocationEvent(BaseModel):
event_id: int
coordinates: Tuple[float, float]
location_info: Annotated[Tuple[str, str, int], Column(comment="City, Country, Zip")]
timestamp: str
events = Table(model=LocationEvent)
# Columns will be:
# - coordinates: Tuple(Float64, Float64)
# - location_info: Tuple(String, String, Int64)
Using Map/Dict Types
Store key-value data:
from typing import Dict
from pydantic import BaseModel
from pyclickhouse import Table, Column
class MetricsData(BaseModel):
event_id: int
properties: Dict[str, str]
numeric_metrics: Dict[str, float]
tags: Dict[str, int]
metrics = Table(model=MetricsData)
# Columns will be:
# - properties: Map(String, String)
# - numeric_metrics: Map(String, Float64)
# - tags: Map(String, Int64)
Using Date and DateTime Types
Store temporal data:
from datetime import date, datetime
from typing import Annotated
from pydantic import BaseModel
from pyclickhouse import Table, Column
class TimeSeriesEvent(BaseModel):
event_date: date
created_at: datetime
updated_at: Annotated[datetime, Column(comment="Last update timestamp")]
events = Table(model=TimeSeriesEvent)
# Columns will be:
# - event_date: Date
# - created_at: DateTime
# - updated_at: DateTime
Using Enum Types
Store enumerated values:
from enum import StrEnum, auto
from typing import Annotated
from pydantic import BaseModel
from pyclickhouse import Table, Column
class Status(StrEnum):
PENDING = auto()
ACTIVE = auto()
INACTIVE = auto()
ARCHIVED = auto()
class Task(BaseModel):
id: int
title: str
status: Annotated[Status, Column(comment="Task status")]
tasks = Table(model=Task)
# Columns will be:
# - status: Enum('PENDING', 'ACTIVE', 'INACTIVE', 'ARCHIVED')
Using UUID Type
Store universally unique identifiers:
from uuid import UUID
from typing import Annotated
from pydantic import BaseModel
from pyclickhouse import Table, Column
class Account(BaseModel):
account_id: Annotated[UUID, Column(comment="Unique account identifier")]
user_id: UUID
created_at: str
accounts = Table(model=Account)
# Columns will be:
# - account_id: UUID
# - user_id: UUID
Using Decimal Type
Store precise decimal numbers:
from decimal import Decimal
from typing import Annotated
from pydantic import BaseModel
from pyclickhouse import Table, Column
class Transaction(BaseModel):
transaction_id: int
amount: Decimal
tax: Annotated[Decimal, Column(comment="Tax amount")]
total: Decimal
transactions = Table(model=Transaction)
# Columns will be:
# - amount: Decimal
# - tax: Decimal
# - total: Decimal
Using IP Address Types
Store IP addresses:
from ipaddress import IPv4Address, IPv6Address
from typing import Optional, Annotated
from pydantic import BaseModel
from pyclickhouse import Table, Column
class ConnectionLog(BaseModel):
connection_id: int
client_ipv4: Optional[IPv4Address] = None
client_ipv6: Optional[IPv6Address] = None
server_ip: Annotated[IPv4Address, Column(comment="Server IP address")]
logs = Table(model=ConnectionLog)
# Columns will be:
# - client_ipv4: Nullable(IPv4)
# - client_ipv6: Nullable(IPv6)
# - server_ip: IPv4
Expressions and Column References
Accessing Columns as Expressions
Access table columns using attribute notation. This returns an Expression that can be used in queries:
users = Table(model=User)
# Access column as expression
user_id_expr = users.id
user_name_expr = users.name
# Use in filters
print(str(users.id)) # "users.id"
print(str(users.name == "Alice")) # "users.name == 'Alice'"
The Expression class represents a query expression that can be combined with operators to build complex filter conditions.
Comparison Operators
Expressions support all comparison operators for creating filter conditions:
users = Table(model=User)
# Greater than
users.id > 10
# Greater than or equal
users.id >= 10
# Less than
users.id < 100
# Less than or equal
users.id <= 100
# Equals
users.name == "Alice"
# Not equals
users.name != "Bob"
Arithmetic Operations
events = Table(model=Event)
# Addition
events.value + 10
# Subtraction
events.value - 5
# Multiplication
events.value * 2
# Division
events.value / 2
Logical Operations
Combine multiple conditions using logical operators:
# AND - both conditions must be true
query = Query(users).filter(
(users.id > 10) & (users.name == "Alice")
)
# OR - either condition can be true
query = Query(users).filter(
(users.name == "Alice") | (users.name == "Bob")
)
# NOT - negate a condition
query = Query(users).filter(~(users.id == 0))
IN and NOT IN
Check if a column value is in or not in a list:
# Check if column is in list
query = Query(users).filter(users.id.is_in([1, 2, 3, 4, 5]))
# Check if column is NOT in list
query = Query(users).filter(users.name.is_not_in(["spam", "bot"]))
Using Expressions in Filters
from pyclickhouse import Query
users = Table(model=User)
# Simple filter
query = Query(users).filter(users.id > 10)
# Complex filter with multiple conditions
query = Query(users).filter(
(users.id > 10) &
(users.name != "admin") &
(users.email.is_in(["alice@example.com", "bob@example.com"]))
)
Using Expressions in SELECT
Pass column expressions to select specific columns:
# Select specific columns
query = Query(users).select(users.id, users.name)
# With custom column names
query = Query(users).select(
user_id=users.id,
full_name=users.name
)
Using Expressions in DERIVE (Computed Columns)
Create computed columns based on existing columns:
from pyclickhouse import F
# Add computed columns
query = Query(users).derive(
name_lower=F.lower(users.name),
name_length=F.length(users.name)
)
Using Expressions in GROUP BY
# Group by a column
query = Query(events).group(events.event_name)
# Group by multiple columns
query = Query(events).group(events.event_name, events.user_id)
Using Expressions in Aggregations
from pyclickhouse import F
events = Table(model=Event)
# Group and aggregate
query = Query(events).group(events.event_name).aggregate(
total=F.sum(events.value),
count=F.count(),
avg_value=F.avg(events.value),
max_value=F.max(events.value)
)
Using Expressions in SORT
# Ascending order (default)
query = Query(users).sort(users.id)
# Descending order
query = Query(users).sort(-users.id)
# Sort by multiple columns
query = Query(events).sort(events.event_name, -events.value)
Expression String Representation
Get the SQL representation of an expression:
users = Table(model=User)
expr = users.id > 10
print(str(expr)) # "users.id > 10"
expr = users.name == "Alice"
print(str(expr)) # "users.name == 'Alice'"
expr = (users.id > 10) & (users.name != "Bob")
print(str(expr)) # "users.id > 10 && users.name != 'Bob'"
Table Information
Get Table Metadata
users = Table(model=User, name="users")
# Get the table name
name = users.get_name() # "users"
# Get the model class
model = users.get_model() # <class 'User'>
# Get the engine
engine = users.get_engine() # "MergeTree() ORDER BY ()"
# Get all columns
columns = users.get_columns() # dict of columns
# Get lifecycle
lifecycle = users.get_lifecycle() # Lifecycle.managed
String Representation
Table Lifecycle
Tables can have different lifecycle states that control how the ORM manages them. The lifecycle determines whether the ORM can create, alter, drop, or migrate the table.
Lifecycle States
PyClickHouse supports three lifecycle states:
managed(default): The ORM has full control. Tables can be created, altered, dropped, and migrated.protected: The ORM can create and read tables, but cannot drop or modify columns. Use for shared/production tables.external: The ORM treats the table as read-only and external. No creation, modification, or deletion allowed.
Creating Tables with Lifecycle
from pyclickhouse import Table, Lifecycle
from pydantic import BaseModel
class Event(BaseModel):
id: int
name: str
# Managed table (default) - full ORM control
managed_table = Table(model=Event, lifecycle=Lifecycle.managed)
# Protected table - ORM can create/read but not drop/modify
protected_table = Table(
model=Event,
lifecycle=Lifecycle.protected,
name="shared_events"
)
# External/unmanaged table - read-only from ORM perspective
external_table = Table(
model=Event,
lifecycle=Lifecycle.external,
name="legacy_events"
)
Checking Lifecycle
table = Table(model=Event)
lifecycle = table.get_lifecycle()
print(lifecycle) # Lifecycle.managed
if table.get_lifecycle() == Lifecycle.managed:
print("ORM has full control over this table")
elif table.get_lifecycle() == Lifecycle.protected:
print("ORM can create/read but not modify")
elif table.get_lifecycle() == Lifecycle.external:
print("Table is external/read-only")
Table Registry
Tables are automatically registered with the default global registry when created. This enables batch operations like creating, migrating, or dropping multiple tables together.
Auto-Registration
When you create a table, it's automatically registered:
from pyclickhouse import Table
from pydantic import BaseModel
class User(BaseModel):
id: int
name: str
# Automatically registered with default_registry
users = Table(model=User)
# Access from default registry
from pyclickhouse.registry import default_registry
same_table = default_registry.get_table("user")
Custom Registry
Create a custom registry if you want to manage tables separately:
from pyclickhouse import Table, Registry
from pydantic import BaseModel
class Event(BaseModel):
id: int
user_id: int
# Create custom registry (tables won't auto-register to default)
custom_registry = Registry()
# Manually register with custom registry
events = Table(model=Event, registry=custom_registry)
# List tables in custom registry
for table in custom_registry.list_tables():
print(f"Table: {table.get_name()}")
Bulk Operations with Registry
Since tables are automatically registered, you can perform bulk operations on all registered tables:
from pyclickhouse import Client, Admin, default_registry
from pydantic import BaseModel
from pyclickhouse import Table
class User(BaseModel):
id: int
name: str
class Event(BaseModel):
event_id: int
user_id: int
event_name: str
# Both tables are automatically registered with default_registry
users = Table(model=User)
events = Table(model=Event)
async def setup_database():
client = Client()
admin = Admin(client)
async with client:
# Create all registered tables at once
await admin.create_all(default_registry)
# Migrate all registered tables
await admin.migrate_all(default_registry)
# Drop all registered tables
await admin.drop_all(default_registry)
Disable Auto-Registration
To prevent a table from being automatically registered, use registry=None:
from pyclickhouse import Table
from pydantic import BaseModel
class TempTable(BaseModel):
id: int
data: str
# This table won't be registered with default_registry
temp = Table(model=TempTable, registry=None)
# Still usable locally, just not included in bulk operations
Creating and Managing Tables
Creating Tables
Use the Admin class to create tables in ClickHouse:
from pyclickhouse import Client, Admin, Table, engines
from pydantic import BaseModel
class Event(BaseModel):
timestamp: str
event_id: int
event_name: str
user_id: int
value: float
async def create_tables():
client = Client()
admin = Admin(client)
async with client:
# Create a simple table
events = Table(
model=Event,
name="events",
engine=engines.MergeTree(
order_by="timestamp",
partition_by="toYYYYMM(timestamp)"
),
comment="Application events"
)
await admin.create_table(events)
print(f"Created table: {events.get_name()}")
Creating Tables with Conditional Logic
async def setup_tables():
client = Client()
admin = Admin(client)
async with client:
events = Table(model=Event, name="events")
# Create only if table doesn't exist (default)
await admin.create_table(events, if_not_exists=True)
# Create in a specific database
await admin.create_table(
events,
if_not_exists=True,
database="analytics"
)
# Create on a cluster
await admin.create_table(
events,
database="analytics",
cluster="cluster_name"
)
Dropping Tables
async def drop_tables():
client = Client()
admin = Admin(client)
async with client:
events = Table(model=Event, name="events")
# Drop table (safe - won't error if it doesn't exist)
await admin.drop_table(events, if_exists=True)
# Drop only if empty
await admin.drop_table(events, if_empty=True)
# Force drop protected/external tables
await admin.drop_table(
events,
if_exists=True,
force=True
)
Truncating Tables
async def truncate_table():
client = Client()
admin = Admin(client)
async with client:
events = Table(model=Event, name="events")
# Clear all data from the table
await admin.truncate_table(events)
Table Reflection and Migration
Reflecting Existing Tables
Inspect an existing ClickHouse table and create a Pydantic model from its definition:
from pyclickhouse import Client, Admin
async def reflect_table():
client = Client()
admin = Admin(client)
async with client:
# Get table definition from ClickHouse
table = await admin.get_table("events", database="default")
# Now you have a Table object with columns
print(f"Table name: {table.get_name()}")
print(f"Engine: {table.get_engine()}")
# Get the generated Pydantic model
model = table.get_model()
# Use the model to work with the data
for col_name, col in table.get_columns().items():
print(f" {col_name}: {col.type}")
Creating Models from Query Results
Generate a Pydantic model from query results:
from pyclickhouse import Client, Admin, Query
async def create_model_from_query():
client = Client()
admin = Admin(client)
async with client:
# Create a model from a query result shape
query = "SELECT user_id, COUNT(*) as count FROM events GROUP BY user_id"
model = await admin.create_model(query, name="EventStats")
# Use the model
print(f"Model fields: {model.model_fields.keys()}")
Migrating Tables
Automatically update table schemas to match their Pydantic model definitions:
from pyclickhouse import Client, Admin, Table, Lifecycle
from pydantic import BaseModel
class Event(BaseModel):
id: int
name: str
description: str # New field
created_at: str
async def migrate_table():
client = Client()
admin = Admin(client)
async with client:
# Define the table with updated schema
events = Table(
model=Event,
name="events",
lifecycle=Lifecycle.managed
)
# Migrate table to match new schema
# - Adds new columns (description, created_at)
# - Keeps existing columns
migrated = await admin.migrate_table(events)
if migrated:
print("Table migrated successfully")
else:
print("Table migration skipped")
Migration Behavior by Lifecycle
Migration behavior depends on the table's lifecycle:
from pyclickhouse import Admin, Table, Lifecycle
admin = Admin(client)
# Managed tables: migrate everything (add, modify, drop columns)
managed_table = Table(model=Event, lifecycle=Lifecycle.managed)
await admin.migrate_table(managed_table)
# Protected tables: only add columns (no drop/modify)
protected_table = Table(model=Event, lifecycle=Lifecycle.protected)
await admin.migrate_table(protected_table)
# External tables: no migration
external_table = Table(model=Event, lifecycle=Lifecycle.external)
# Migration will be skipped, unless force=True
await admin.migrate_table(external_table, force=True)
Bulk Migrations
Migrate all tables in a registry:
from pyclickhouse import Client, Admin, default_registry
async def migrate_all_tables():
client = Client()
admin = Admin(client)
async with client:
# Migrate all tables from the default registry
await admin.migrate_all(
default_registry,
database="analytics",
force=False
)
Column-Level Modifications
Perform specific column operations:
async def modify_columns():
client = Client()
admin = Admin(client)
async with client:
events = Table(model=Event, name="events")
# Add a new column
from pyclickhouse import Column
new_col = Column(
type="String",
name="source",
comment="Event source"
)
await admin.add_column(events, new_col)
# Drop a column
await admin.drop_column(events, "deprecated_field")
# Modify a column
modified_col = Column(
type="Nullable(String)",
name="description",
comment="Updated description"
)
await admin.modify_column(events, modified_col)
Type Mapping and Conversion
PyClickHouse automatically converts between Python and ClickHouse types. This section shows all supported type mappings and conversions.
Python to ClickHouse Type Mapping
Primitive Types
| Python Type | ClickHouse Type | Example |
|---|---|---|
int |
Int64 |
age: int |
float |
Float64 |
temperature: float |
str |
String |
name: str |
bool |
Bool |
is_active: bool |
date |
Date |
birth_date: date |
datetime |
DateTime |
created_at: datetime |
Decimal |
Decimal |
amount: Decimal |
UUID |
UUID |
id: UUID |
IPv4Address |
IPv4 |
ip: IPv4Address |
IPv6Address |
IPv6 |
ip: IPv6Address |
Collection Types
| Python Type | ClickHouse Type | Example |
|---|---|---|
list[T] |
Array(T) |
tags: list[str] → Array(String) |
tuple[T1, T2, ...] |
Tuple(T1, T2, ...) |
coords: tuple[float, float] → Tuple(Float64, Float64) |
dict[K, V] |
Map(K, V) |
metadata: dict[str, str] → Map(String, String) |
Special Types
| Python Type | ClickHouse Type | Example |
|---|---|---|
Optional[T] or T \| None |
Nullable(T) |
nickname: Optional[str] → Nullable(String) |
Enum |
Enum(values) |
status: Status → Enum('ACTIVE', 'INACTIVE') |
Literal[...] |
Enum(values) |
priority: Literal['low', 'high'] → Enum('low', 'high') |
dict (untyped) |
JSON |
metadata: dict → JSON |
Union[T1, T2, ...] |
Variant(T1, T2, ...) |
data: Union[int, str] → Variant(Int64, String) |
Nested Generic Types
| Python Expression | ClickHouse Type |
|---|---|
list[Optional[int]] |
Array(Nullable(Int64)) |
Optional[list[str]] |
Nullable(Array(String)) |
dict[str, list[int]] |
Map(String, Array(Int64)) |
tuple[str, list[int], Optional[float]] |
Tuple(String, Array(Int64), Nullable(Float64)) |
ClickHouse to Python Type Mapping
Conversion During Query Results
When querying data, ClickHouse types are automatically converted to Python types:
| ClickHouse Type | Python Type | Notes |
|---|---|---|
Int* (Int8, Int16, ..., Int64) |
int |
All integer variants map to Python int |
UInt* (UInt8, UInt16, ..., UInt64) |
int |
Unsigned integers map to Python int |
Float32, Float64 |
float |
All float variants map to Python float |
Decimal(...) |
float |
Decimal is converted to Python float |
String, FixedString(...) |
str |
All string types map to Python str |
UUID |
str |
UUID is returned as string |
Bool |
bool |
ClickHouse Bool maps to Python bool |
Date |
date |
ClickHouse Date maps to Python date |
DateTime |
datetime |
ClickHouse DateTime maps to Python datetime |
Array(T) |
list |
Arrays map to Python lists |
Map(K, V) |
dict |
Maps map to Python dicts |
Tuple(...) |
tuple |
Tuples map to Python tuples |
Enum(...) |
str |
Enum values are returned as strings |
Nullable(T) |
Optional[T] |
Nullable types can be None or T |
LowCardinality(T) |
T |
LowCardinality wrapper is transparent |
JSON |
Any |
JSON is returned with dynamic type |
Dynamic |
Any |
Dynamic type maps to Any |
Type Conversion Examples
Converting Integer Variants
from pydantic import BaseModel
from pyclickhouse import Table
class Metrics(BaseModel):
small_count: int # Int64 (all ints use Int64)
big_number: int # Int64
flag: bool # Bool (distinct type)
metrics = Table(model=Metrics)
# All integer fields become Int64 in ClickHouse
# Query results convert back to Python int
Converting Nullable Types
from typing import Optional
from pydantic import BaseModel
from pyclickhouse import Table
class User(BaseModel):
id: int
nickname: Optional[str] = None
age: Optional[int] = None
users = Table(model=User)
# Query result conversions:
# id: int (non-null)
# nickname: Optional[str] (can be None)
# age: Optional[int] (can be None)
Converting Collection Types
from typing import List, Dict, Tuple
from pydantic import BaseModel
from pyclickhouse import Table
class Document(BaseModel):
id: int
tags: List[str]
scores: List[float]
metadata: Dict[str, str]
location: Tuple[float, float]
documents = Table(model=Document)
# Query result conversions:
# tags: list of strings
# scores: list of floats
# metadata: dict with string keys and values
# location: tuple of two floats
Type Conversion Best Practices
Use Explicit Types
# Good: explicit types
class Event(BaseModel):
id: int
timestamp: str
value: float
tags: list[str]
# Avoid: using Any
from typing import Any
class WeakEvent(BaseModel):
data: Any # Results in Dynamic type
Use Optional for Nullable Columns
from typing import Optional
# Good: explicitly nullable
class Product(BaseModel):
id: int
name: str
description: Optional[str] = None
# Columns will be: id Int64, name String, description Nullable(String)
Use Proper Date/DateTime Types
from datetime import date, datetime
# Good: specific temporal types
class Event(BaseModel):
event_date: date
created_at: datetime
processed_at: str # If you need string format
# Avoid: storing dates as strings unnecessarily
class WeakEvent(BaseModel):
date: str # Better as date type
Define Enums for Limited Values
from enum import StrEnum, auto
# Good: enum for status field
class OrderStatus(StrEnum):
PENDING = auto()
PROCESSING = auto()
COMPLETED = auto()
class Order(BaseModel):
id: int
status: OrderStatus
# Avoid: string for limited values
class WeakOrder(BaseModel):
status: str # Better as Enum
Complete Example
Here's a complete example showing table creation, migration, and usage:
from typing import Annotated, Optional
from datetime import datetime
from pydantic import BaseModel, Field
from pyclickhouse import (
Client, Table, Query, F, Admin, Writer, Reader,
Column, engines, Lifecycle, default_registry
)
# Define a Pydantic model with customized columns
class Event(BaseModel):
timestamp: Annotated[str, Column(
comment="Event timestamp",
ttl_expression="timestamp + INTERVAL 30 DAY"
)] = Field(description="When the event occurred")
event_id: int
event_name: Annotated[str, Column(comment="Name of the event")]
user_id: int
value: Annotated[float, Column(comment="Event value")]
tags: Annotated[list[str], Column(comment="Event tags")]
metadata: Optional[dict] = None
# Create a table with custom configuration
# (automatically registered with default_registry)
events = Table(
model=Event,
name="events",
engine=engines.MergeTree(
order_by="(user_id, timestamp)",
partition_by="toYYYYMM(timestamp)"
),
comment="Application events",
lifecycle=Lifecycle.managed
)
async def main():
client = Client()
async with client:
admin = Admin(client)
# Create the table
await admin.create_table(events)
print(f"Created table: {events.get_name()}")
# Get table info
print(f"Table name: {events.get_name()}")
print(f"Model: {events.get_model()}")
print(f"Engine: {events.get_engine()}")
print(f"Lifecycle: {events.get_lifecycle()}")
# Get columns
print("\nColumns:")
for col_name, col in events.get_columns().items():
print(f" {col_name}: {col.type} - {col.comment}")
# Insert data
print("\nInserting data...")
async with Writer(client, events) as writer:
for i in range(1000):
event = Event(
timestamp="2024-01-15",
event_id=i,
event_name=["click", "view", "purchase"][i % 3],
user_id=i % 100,
value=float(i % 50),
tags=["tag1", "tag2"] if i % 2 == 0 else ["tag3"],
metadata={"source": "api"} if i % 5 == 0 else None
)
await writer.insert(event)
# Query with expressions and filters
print("\nQuerying with filters...")
query = Query(events).filter(
(events.value > 10) &
(events.event_name == "click")
).aggregate(
count=F.count(),
total_value=F.sum(events.value)
)
reader = Reader(client, query)
results = await reader.query()
for result in results:
print(f"Click events: {result['count']}, Total: {result['total_value']}")
# Stream high-value events
print("\nStreaming high-value events...")
stream_query = Query(events).filter(
events.value > 20
).sort(-events.value).take(100)
stream_reader = Reader(client, stream_query, model=Event)
results = await stream_reader.stream()
print("Top high-value events:")
async for event in results:
print(f" {event.event_name}: {event.value}")
# Reflect an existing table
print("\nReflecting table structure...")
reflected_table = await admin.get_table("events")
print(f"Reflected table: {reflected_table.get_name()}")
print(f"Reflected columns: {list(reflected_table.get_columns().keys())}")
# Migrate table (if schema changed)
print("\nMigrating table...")
was_migrated = await admin.migrate_table(events)
print(f"Table migration completed: {was_migrated}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Learn more about the Table API, Query Builder, Admin, Reader, and Writer.