Writer
The Writer class provides efficient batch insertion of validated data into ClickHouse tables. It takes already-validated Pydantic model instances, batches them, and inserts them efficiently into the database.
Creating a Writer
Create a writer with a client and table definition:
from pydantic import BaseModel
from pyclickhouse import Client, Table, Writer
class User(BaseModel):
id: int
name: str
email: str
users = Table(model=User, name="users")
client = Client()
async with client:
writer = Writer(client, users)
# ready to insert data
Basic Usage
Insert Single Records
The Writer accepts already-validated Pydantic model instances:
async with client:
writer = Writer(client, users)
# Create a valid User instance
user = User(id=1, name="Alice", email="alice@example.com")
await writer.insert(user)
Insert Multiple Records
Insert multiple records in a single call:
async with client:
writer = Writer(client, users)
users_data = [
User(id=1, name="Alice", email="alice@example.com"),
User(id=2, name="Bob", email="bob@example.com"),
User(id=3, name="Charlie", email="charlie@example.com"),
]
for user in users_data:
await writer.insert(user)
Or all at once:
async with client:
writer = Writer(client, users)
await writer.insert(
User(id=1, name="Alice", email="alice@example.com"),
User(id=2, name="Bob", email="bob@example.com"),
User(id=3, name="Charlie", email="charlie@example.com"),
)
Validation Before Insertion
Validation happens when you create the model instance, not when inserting:
from pydantic import ValidationError
async with client:
writer = Writer(client, users)
# Validation happens here during model creation
try:
user = User(id="invalid", name="Alice", email="alice@example.com")
except ValidationError as e:
print(f"Invalid user: {e}")
# Only valid instances reach the writer
user = User(id=1, name="Alice", email="alice@example.com")
await writer.insert(user)
Type Checking
The Writer enforces type checking - you must pass instances of the correct model:
async with client:
writer = Writer(client, users)
# This works - valid User instance
user = User(id=1, name="Alice", email="alice@example.com")
await writer.insert(user)
# This raises TypeError - dict is not a User instance
invalid = {"id": 1, "name": "Bob"}
await writer.insert(invalid) # TypeError!
Batching
Automatic Batching (Default)
By default, the writer batches inserts for efficiency:
async with client:
writer = Writer(client, users, batch=True, batch_size=1000)
# Records are queued and automatically flushed when batch_size is reached
for i in range(5000):
user = User(id=i, name=f"User {i}", email=f"user{i}@example.com")
await writer.insert(user)
# Any remaining records are flushed on context manager exit
Configure Batch Size
Adjust the batch size based on your needs:
async with client:
# Larger batches = better performance but more memory
writer = Writer(client, users, batch_size=5000)
for user in users_data:
await writer.insert(user)
Disable Batching
Insert immediately without batching:
async with client:
writer = Writer(client, users, batch=False)
# Each insert is sent immediately to the database
await writer.insert(user)
Context Manager Usage
Automatic Flush
Use the writer as an async context manager to automatically flush remaining records:
async with client:
async with Writer(client, users) as writer:
for user in users_data:
await writer.insert(user)
# Remaining records are automatically flushed on exit
Manual Flush
Manually flush the queue when needed:
async with client:
writer = Writer(client, users)
for user in users_data:
await writer.insert(user)
# Flush remaining records
await writer.flush()
Progress Tracking
Monitor Written Records
Track how many records have been written:
async with client:
async with Writer(client, users) as writer:
for user in users_data:
await writer.insert(user)
print(f"Written: {writer.written_rows}")
Check Queue Size
Check how many records are queued but not yet written:
async with client:
writer = Writer(client, users, batch_size=1000)
for i, user in enumerate(users_data):
await writer.insert(user)
if writer.queue_size == 500:
print(f"Queue has 500 records after inserting {i+1} users")
Configuration
Specify Database
Insert into a different database:
Custom Settings
Pass ClickHouse-specific settings for the insert:
async with client:
writer = Writer(
client,
users,
settings={
"insert_quorum": 2,
"insert_quorum_parallel": True
}
)
await writer.insert(user)
Transport Settings
Configure transport behavior:
async with client:
writer = Writer(
client,
users,
transport_settings={
"connection_timeout": 10,
"max_retries": 3
}
)
await writer.insert(user)
Common Patterns
Insert from List
users_list = [
User(id=1, name="Alice", email="alice@example.com"),
User(id=2, name="Bob", email="bob@example.com"),
]
async with client:
async with Writer(client, users) as writer:
for user in users_list:
await writer.insert(user)
Insert from Generator
def user_generator():
for i in range(10000):
yield User(id=i, name=f"User {i}", email=f"user{i}@example.com")
async with client:
async with Writer(client, users) as writer:
for user in user_generator():
await writer.insert(user)
Batch Insert with Progress
async with client:
async with Writer(client, users, batch_size=1000) as writer:
for i, user in enumerate(users_data):
await writer.insert(user)
if (i + 1) % 5000 == 0:
print(f"Progress: {i + 1}/{len(users_data)} ({writer.written_rows} written)")
Insert Multiple Tables
async with client:
users_writer = Writer(client, users)
events_writer = Writer(client, events)
async with users_writer, events_writer:
for user in users_data:
await users_writer.insert(user)
for event in events_data:
await events_writer.insert(event)
Conditional Batching
async with client:
# Use smaller batches for real-time data
if is_real_time:
writer = Writer(client, events, batch_size=100)
else:
# Use larger batches for bulk import
writer = Writer(client, events, batch_size=10000)
async with writer:
for event in events_data:
await writer.insert(event)
Error Handling
Handle Insert Errors
from clickhouse_connect.exc import ClickHouseException
async with client:
async with Writer(client, users) as writer:
try:
for user in users_data:
await writer.insert(user)
except ClickHouseException as e:
print(f"Insert failed: {e}")
# Remaining records in queue can be retried
Handle Type Errors
async with client:
async with Writer(client, users) as writer:
for user_data in users_data:
try:
user = User(**user_data) # Validate when creating model
await writer.insert(user)
except (ValidationError, TypeError) as e:
print(f"Invalid user {user_data}: {e}")
continue # Skip invalid records
Performance Considerations
Batch Size Impact
# Small batches - more database roundtrips, slower
writer = Writer(client, users, batch_size=100)
# Medium batches - balanced
writer = Writer(client, users, batch_size=1000)
# Large batches - fewer roundtrips, more memory
writer = Writer(client, users, batch_size=10000)
Disable Batching for Real-Time Inserts
# Real-time inserts without batching
real_time_writer = Writer(client, events, batch=False)
# Batch inserts for bulk operations
bulk_writer = Writer(client, events, batch=True, batch_size=5000)
Complete Example
Here's a complete example combining multiple Writer features:
from pydantic import BaseModel, ValidationError
from pyclickhouse import Client, Table, Writer
import asyncio
class Event(BaseModel):
timestamp: str
event_name: str
user_id: int
value: float
class User(BaseModel):
id: int
name: str
email: str
async def bulk_import():
client = Client()
events = Table(model=Event, name="events")
users = Table(model=User, name="users")
async with client:
# Insert users with default batch size
async with Writer(client, users) as user_writer:
for i in range(1000):
user = User(
id=i,
name=f"User {i}",
email=f"user{i}@example.com"
)
await user_writer.insert(user)
print(f"Users written: {user_writer.written_rows}")
# Insert events with larger batch size for performance
async with Writer(client, events, batch_size=5000) as event_writer:
for i in range(100000):
event = Event(
timestamp=f"2024-01-{(i % 30) + 1:02d}",
event_name=["click", "view", "purchase"][i % 3],
user_id=i % 1000,
value=float(i % 100) / 10
)
await event_writer.insert(event)
print(f"Events written: {event_writer.written_rows}")
async def streaming_insert():
"""Real-time inserts without batching"""
client = Client()
events = Table(model=Event, name="events")
async with client:
# Real-time mode - insert immediately
writer = Writer(client, events, batch=False)
# Simulate event stream
for i in range(100):
event = Event(
timestamp="2024-01-15",
event_name="click",
user_id=i % 10,
value=float(i)
)
await writer.insert(event)
if __name__ == "__main__":
asyncio.run(bulk_import())
asyncio.run(streaming_insert())
Learn more about the Writer API, Tables, and Admin.