Skip to content

Query

The Query builder provides a fluent, composable API for building ClickHouse queries. It uses PRQL as an intermediate representation, which compiles to SQL.

Queries are built by chaining pipeline operations together. Each operation returns a new Query, allowing for a functional, immutable approach to query construction.


Overview

The Query builder supports the following operations:

  • select — Pick or rename columns
  • derive — Compute new columns from existing ones
  • filter — Filter rows based on conditions
  • aggregate — Summarize rows into aggregates
  • group — Group rows and aggregate by groups
  • sort — Order rows by expressions
  • take — Limit and offset rows
  • join — Join tables based on conditions
  • window — Apply window functions for running totals and analytics
  • exclude — Remove specific columns from results

Creating a Query

A Query starts from either a Table object or a table name string:

from pyclickhouse import Query, Table
from pydantic import BaseModel

class User(BaseModel):
    id: int
    name: str
    email: str

users = Table(model=User, name="users")

# Create a query from a Table object
query = Query(users)
print(query)
# SELECT * FROM users

# Or create from a table name string
query = Query("users")
print(query)
# SELECT * FROM users

Building vs Compiling

The Query builder distinguishes between two representations:

  • build() — Show the PRQL pipeline representation
  • compile() or str() — Convert to SQL
query = Query(users).filter(users.id > 10).select(users.name)

print(query.build())
# from users | filter (users.id > 10) | select {name}

print(query.compile())
# SELECT name FROM users WHERE id > 10

print(str(query))  # Same as compile()
# SELECT name FROM users WHERE id > 10

Pipeline Operations

Select

Pick specific columns or rename them.

# Select single column
query = Query(users).select(users.id)
# SELECT id FROM users

# Select multiple columns
query = Query(users).select(users.id, users.name)
# SELECT id, name FROM users

# Select with custom column names
query = Query(users).select(
    user_id=users.id,
    full_name=users.name
)
# SELECT id AS user_id, name AS full_name FROM users

# Mix default and custom names
query = Query(users).select(
    users.email,
    user_id=users.id
)
# SELECT email, id AS user_id FROM users

Derive

Compute new columns based on existing ones. Adds columns to the existing selection.

from pyclickhouse import F

# Add computed columns
query = Query(users).derive(
    name_length=F.length(users.name)
)
# SELECT *, length(name) AS name_length FROM users

# Arithmetic operations
query = Query(products).derive(
    total_price=products.price * products.quantity
)
# SELECT *, price * quantity AS total_price FROM products

# Using functions
query = Query(events).derive(
    hour=F.toStartOfHour(events.timestamp),
    year=F.year(events.timestamp)
)
# SELECT *, toStartOfHour(timestamp) AS hour, year(timestamp) AS year FROM events

Filter

Select rows based on boolean expressions.

# Simple comparison
query = Query(users).filter(users.id > 10)
# SELECT * FROM users WHERE id > 10

# Multiple conditions with AND
query = Query(users).filter(
    (users.id > 10) & (users.name == "Alice")
)
# SELECT * FROM users WHERE id > 10 AND name = 'Alice'

# OR conditions
query = Query(users).filter(
    (users.id == 1) | (users.id == 2)
)
# SELECT * FROM users WHERE id = 1 OR id = 2

# NOT conditions
query = Query(users).filter(~(users.id == 0))
# SELECT * FROM users WHERE NOT id = 0

# IN operator
query = Query(users).filter(users.id.is_in([1, 2, 3]))
# SELECT * FROM users WHERE id IN (1, 2, 3)

# NOT IN operator
query = Query(users).filter(users.name.is_not_in(["admin", "bot"]))
# SELECT * FROM users WHERE NOT name IN ('admin', 'bot')

Sort

Order rows by one or more columns.

# Sort ascending (default)
query = Query(users).sort(users.name)
# SELECT * FROM users ORDER BY name

# Sort descending (using unary minus)
query = Query(users).sort(-users.id)
# SELECT * FROM users ORDER BY id DESC

# Sort by multiple columns
query = Query(events).sort(
    events.date,
    -events.value
)
# SELECT * FROM events ORDER BY date, value DESC

Take

Limit and offset rows for pagination.

# Take first n rows
query = Query(users).take(10)
# SELECT * FROM users LIMIT 10

# Take with offset (start at row 10, take until row 20)
query = Query(users).take(start=10, end=20)
# SELECT * FROM users LIMIT 11 OFFSET 9

# Take from row n onwards
query = Query(users).take(start=100)
# SELECT * FROM users OFFSET 99

# Take up to row n
query = Query(users).take(end=50)
# SELECT * FROM users LIMIT 50

Join

Join the current table with another table based on a join condition.

from pyclickhouse import Query, Table

class User(BaseModel):
    id: int
    name: str

class Order(BaseModel):
    user_id: int
    amount: float

users = Table(model=User, name="users")
orders = Table(model=Order, name="orders")

# Inner join (default)
query = Query(orders).join(users, orders.user_id == users.id)
# SELECT orders.*, users.* FROM orders INNER JOIN users ON orders.user_id = users.id

# Explicit inner join
query = Query(orders).join(users, orders.user_id == users.id, side="inner")
# SELECT orders.*, users.* FROM orders INNER JOIN users ON orders.user_id = users.id

# Left join (all rows from left table)
query = Query(users).join(orders, users.id == orders.user_id, side="left")
# SELECT users.*, orders.* FROM users LEFT OUTER JOIN orders ON users.id = orders.user_id

# Right join (all rows from right table)
query = Query(users).join(orders, users.id == orders.user_id, side="right")
# SELECT users.*, orders.* FROM users RIGHT OUTER JOIN orders ON users.id = orders.user_id

# Full join (all rows from both tables)
query = Query(users).join(orders, users.id == orders.user_id, side="full")
# SELECT users.*, orders.* FROM users FULL JOIN orders ON users.id = orders.user_id

Join Types

  • inner (default) — Only matching rows from both tables
  • left — All rows from left table, matching rows from right table
  • right — All rows from right table, matching rows from left table
  • full — All rows from both tables

Multiple Join Conditions

# Join with AND condition
query = Query(orders).join(
    users,
    (orders.user_id == users.id) & (users.active == 1)
)
# SELECT orders.*, users.* FROM orders INNER JOIN users ON orders.user_id = users.id AND users.active = 1

# Join with OR condition
query = Query(orders).join(
    users,
    (orders.user_id == users.id) | (orders.customer_name == users.name)
)
# SELECT orders.*, users.* FROM orders INNER JOIN users ON orders.user_id = users.id OR orders.customer_name = users.name

Join with Selection and Filtering

# Select specific columns after join
query = Query(orders).join(
    users,
    orders.user_id == users.id,
    side="left"
).select(
    orders.id,
    orders.amount,
    user_name=users.name
)
# SELECT orders.id, orders.amount, users.name AS user_name 
# FROM orders LEFT OUTER JOIN users ON orders.user_id = users.id

# Filter after join
query = Query(orders).join(
    users,
    orders.user_id == users.id
).filter(
    orders.amount > 100
)
# SELECT orders.*, users.* FROM orders 
# INNER JOIN users ON orders.user_id = users.id 
# WHERE orders.amount > 100

Chaining Multiple Joins

class Product(BaseModel):
    id: int
    name: str
    category_id: int

class Category(BaseModel):
    id: int
    name: str

products = Table(model=Product, name="products")
categories = Table(model=Category, name="categories")

# Join orders with users, then with products
query = Query(orders).join(
    users,
    orders.user_id == users.id
).join(
    products,
    orders.product_id == products.id
).join(
    categories,
    products.category_id == categories.id
)
# SELECT orders.*, users.*, products.*, categories.*
# FROM orders
# INNER JOIN users ON orders.user_id = users.id
# INNER JOIN products ON orders.product_id = products.id
# INNER JOIN categories ON products.category_id = categories.id

Join with Aggregation

# Aggregate after join
query = Query(orders).join(
    users,
    orders.user_id == users.id
).group(
    users.id,
    total_amount=Aggregate(F.sum(orders.amount)),
    order_count=Aggregate(F.count())
)
# SELECT users.id, sum(orders.amount) AS total_amount, count() AS order_count
# FROM orders INNER JOIN users ON orders.user_id = users.id
# GROUP BY users.id

Aggregate

Summarize the entire result set into aggregate values. Produces a single row with aggregate functions applied to all rows.

Important: Do NOT use the Aggregate() wrapper in the aggregate() method. Use Aggregate() only in the group() method to distinguish aggregate functions from grouping columns.

Basic Aggregation

from pyclickhouse import F

# Count all rows
query = Query(users).aggregate(F.count())
# SELECT count() FROM users

# Count with alias
query = Query(users).aggregate(
    total_users=F.count()
)
# SELECT count() AS total_users FROM users

# Sum specific column
query = Query(users).aggregate(
    total_ids=F.sum(users.id)
)
# SELECT sum(id) AS total_ids FROM users

Multiple Aggregates

# Multiple aggregates - NO Aggregate wrapper needed
query = Query(orders).aggregate(
    order_count=F.count(),
    total_amount=F.sum(orders.amount),
    avg_amount=F.avg(orders.amount),
    min_amount=F.min(orders.amount),
    max_amount=F.max(orders.amount)
)
# SELECT count() AS order_count, sum(amount) AS total_amount, avg(amount) AS avg_amount, min(amount) AS min_amount, max(amount) AS max_amount FROM orders

Common Aggregate Functions

# Count rows
F.count()

# Count non-null values
F.count(users.id)

# Unique count
F.uniq(users.id)

# Sum, average, min, max
F.sum(orders.amount)
F.avg(orders.amount)
F.min(metrics.value)
F.max(metrics.value)

# Array aggregation (collect values)
F.groupArray(items.name)

# Conditional aggregation
F.sumIf(orders.amount, orders.status == "completed")
F.countIf(users.id, users.active == 1)

Aggregate on Entire Table

# Get stats for all records
query = Query(products).aggregate(
    product_count=F.count(),
    avg_price=F.avg(products.price),
    min_price=F.min(products.price),
    max_price=F.max(products.price)
)
# SELECT count() AS product_count, avg(price) AS avg_price, min(price) AS min_price, max(price) AS max_price FROM products

Group

Group rows by one or more columns and optionally aggregate. Produces one row per unique grouping value.

Critical: Use the Aggregate() wrapper to distinguish aggregate functions from grouping columns. Without it, the expression is treated as a grouping column.

Basic Grouping

from pyclickhouse import Aggregate, F

# Group by single column
query = Query(users).group(users.name)
# SELECT name FROM users GROUP BY name

# Group with column alias
query = Query(users).group(user_name=users.name)
# SELECT name AS user_name FROM users GROUP BY name

# Group by multiple columns
query = Query(events).group(
    events.user_id,
    events.event_type
)
# SELECT user_id, event_type FROM events GROUP BY user_id, event_type

Grouping with Aggregates

# IMPORTANT: Use Aggregate() wrapper to mark aggregate functions
query = Query(orders).group(
    orders.customer_id,
    Aggregate(F.count())
)
# SELECT customer_id, count() FROM orders GROUP BY customer_id

# Named aggregates (REQUIRES Aggregate wrapper)
query = Query(orders).group(
    orders.customer_id,
    total_orders=Aggregate(F.count()),
    total_amount=Aggregate(F.sum(orders.amount))
)
# SELECT customer_id, count() AS total_orders, sum(amount) AS total_amount FROM orders GROUP BY customer_id

# Multiple group columns with multiple aggregates
query = Query(sales).group(
    sales.date,
    sales.region,
    order_count=Aggregate(F.count()),
    total_sales=Aggregate(F.sum(sales.amount)),
    avg_sale=Aggregate(F.avg(sales.amount)),
    max_sale=Aggregate(F.max(sales.amount))
)
# SELECT date, region, count() AS order_count, sum(amount) AS total_sales, avg(amount) AS avg_sale, max(amount) AS max_sale FROM sales GROUP BY date, region

Grouping with Computed Columns

# Group by derived expressions (functions applied to columns)
query = Query(events).group(
    hour=F.toStartOfHour(events.timestamp),
    event_type=events.type,
    event_count=Aggregate(F.count()),
    unique_users=Aggregate(F.uniq(events.user_id))
)
# SELECT toStartOfHour(timestamp) AS hour, type AS event_type, count() AS event_count, uniq(user_id) AS unique_users
# FROM events GROUP BY toStartOfHour(timestamp), type

Common Aggregate Functions in Group

# Count
Aggregate(F.count())                              # Total rows
Aggregate(F.count(column))                        # Non-null rows

# Distinct count
Aggregate(F.uniq(column))                         # Unique values

# Sum, average, min, max
Aggregate(F.sum(column))
Aggregate(F.avg(column))
Aggregate(F.min(column))
Aggregate(F.max(column))

# Array aggregation
Aggregate(F.groupArray(column))                   # Collect all values

# Conditional aggregation
Aggregate(F.sumIf(column, condition))
Aggregate(F.countIf(column, condition))

The Difference: Aggregate vs Group

# aggregate() - one row for entire table
query = Query(orders).aggregate(
    total_orders=F.count(),
    total_amount=F.sum(orders.amount)
)
# SELECT count() AS total_orders, sum(amount) AS total_amount FROM orders

# group() - one row per group, REQUIRES Aggregate() wrapper
query = Query(orders).group(
    orders.customer_id,
    total_orders=Aggregate(F.count()),
    total_amount=Aggregate(F.sum(orders.amount))
)
# SELECT customer_id, count() AS total_orders, sum(amount) AS total_amount FROM orders GROUP BY customer_id

Group with Window Functions

Apply window functions within a group aggregation using the Window class. Window functions compute aggregate values over a subset of rows defined by a range or row count.

from pyclickhouse import Aggregate, Window, F

# Group with window range specification
query = Query(orders).group(
    orders.customer_id,
    total=Aggregate(F.sum(orders.amount)),
    Window(range=(-2, 0))
)
# SELECT customer_id, sum(amount) OVER (RANGE BETWEEN 2 PRECEDING AND CURRENT ROW) AS total FROM orders GROUP BY customer_id

# Group with window rows specification
query = Query(events).group(
    events.date,
    events.type,
    count=Aggregate(F.count()),
    moving_avg=Aggregate(F.avg(events.value)),
    Window(rows=(-3, 0))
)
# SELECT date, type, count() AS count, avg(value) OVER (ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) AS moving_avg
# FROM events GROUP BY date, type

# Unbounded window (all rows before current row)
query = Query(metrics).group(
    metrics.metric_name,
    cumulative=Aggregate(F.sum(metrics.value)),
    Window(range=(None, 0))
)
# SELECT metric_name, sum(value) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative
# FROM metrics GROUP BY metric_name

# Multiple aggregates with window
query = Query(sales).group(
    sales.region,
    sales.date,
    daily_sales=Aggregate(F.sum(sales.amount)),
    running_total=Aggregate(F.sum(sales.amount)),
    Window(range=(None, 0))
)
# SELECT region, date, sum(amount) AS daily_sales, sum(amount) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_total
# FROM sales GROUP BY region, date

Window within Group Use Cases: - Running totals — Cumulative sum with range=(None, 0) - Moving averages — Last N rows with rows=(-N, 0) - Row numbering — Applied within each group - Ranking — Within grouped partitions

Window

Create window functions for running totals, moving averages, and row numbering. Window functions compute values based on a subset of rows within a partition.

from pyclickhouse import Window, Aggregate, F

# Window with range specification (default: current row)
query = Query(metrics).window(
    Window(range=(-2, 0)),
    moving_total=Aggregate(F.sum(metrics.value))
)
# SELECT *, sum(value) OVER (RANGE BETWEEN 2 PRECEDING AND CURRENT ROW) AS moving_total FROM metrics

# Window with rows specification
query = Query(events).window(
    Window(rows=(-3, 0)),
    moving_avg=Aggregate(F.avg(events.amount))
)
# SELECT *, avg(amount) OVER (ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) AS moving_avg FROM events

# Unbounded window (all previous rows)
query = Query(sales).window(
    Window(range=(None, 0)),
    cumulative_sales=Aggregate(F.sum(sales.amount))
)
# SELECT *, sum(amount) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_sales FROM sales

# Full window (all rows)
query = Query(data).window(
    Window(range=(None, None)),
    total_percent=Aggregate(F.sum(data.value))
)
# SELECT *, sum(value) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS total_percent FROM data

# Window with current row only
query = Query(records).window(
    Window(rows=(0, 0)),
    current_value=Aggregate(F.sum(records.amount))
)
# SELECT *, sum(amount) OVER (ROWS BETWEEN CURRENT ROW AND CURRENT ROW) AS current_value FROM records

Range vs Rows

  • Range — Logical distance based on values (useful for time-based windows)
  • Rows — Physical distance based on row count
# RANGE BETWEEN: Groups rows by value distance
query = Query(metrics).window(
    Window(range=(-10, 10)),  # ±10 value units
    count=Aggregate(F.count())
)

# ROWS BETWEEN: Groups by row count
query = Query(metrics).window(
    Window(rows=(-10, 10)),  # 10 rows before and after
    count=Aggregate(F.count())
)

Window Specifications

Values in window tuples: - Negative numbers — PRECEDING (before current row) - Positive numbers — FOLLOWING (after current row) - Zero — CURRENT ROW - None — UNBOUNDED (start/end of partition)

Window(range=(-5, 5))          # 5 before to 5 after
Window(range=(None, 0))        # All previous rows including current
Window(range=(0, None))        # Current row to all following rows
Window(rows=(-10, -1))         # 10 rows before (excluding current)
Window(rows=(1, 10))           # Next 10 rows (excluding current)
Window(rows=(None, None))      # All rows

Exclude

Remove specific columns from the result set. Exclude only works after a select(), derive(), or group() step that explicitly defines which columns to include.

# Exclude with select
query = Query(users).select(
    users.id,
    users.name,
    users.email,
    users.password_hash
).exclude(
    users.password_hash
)
# SELECT id, name, email FROM users

# Exclude multiple columns
query = Query(users).select(
    users.id,
    users.name,
    users.email,
    users.password_hash,
    users.api_key
).exclude(
    users.password_hash,
    users.api_key
)
# SELECT id, name, email FROM users

# Exclude with derive
query = Query(products).select(
    products.id,
    products.name,
    products.price
).derive(
    discounted=products.price * 0.9,
    tax=products.price * 0.1
).exclude(
    products.price
)
# SELECT id, name, discounted, tax FROM products

# Exclude with group
query = Query(orders).group(
    orders.customer_id,
    customer_name=orders.customer_name,
    total_orders=Aggregate(F.count()),
    total_amount=Aggregate(F.sum(orders.amount))
).exclude(
    customer_name
)
# SELECT customer_id, count() AS total_orders, sum(amount) AS total_amount FROM orders GROUP BY customer_id

# Exclude with aggregation
query = Query(events).select(
    events.user_id,
    events.event_type,
    events.timestamp,
    events.raw_data
).filter(
    events.event_type == "click"
).exclude(
    events.raw_data
)
# SELECT user_id, event_type, timestamp FROM events WHERE event_type = 'click'

Use Cases for Exclude

  • Sensitive Data — Remove password hashes, API keys, tokens
  • Intermediate Columns — Remove columns only needed for filtering or calculations
  • Large Data — Exclude large text or binary fields not needed in results
  • Data Privacy — Filter out personally identifiable information (PII)
# Remove sensitive information
query = Query(users).select(
    users.id,
    users.name,
    users.email,
    users.phone,
    users.ssn,
    users.password_hash
).exclude(
    users.ssn,
    users.password_hash
)
# SELECT id, name, email, phone FROM users

# Select all columns from one table, exclude specific ones
query = Query(products).select(
    products.id,
    products.name,
    products.description,
    products.internal_notes,
    products.supplier_info
).exclude(
    products.internal_notes,
    products.supplier_info
)
# SELECT id, name, description FROM products

Parameterized Queries

Use Param to create queries with runtime-bound parameters.

Using Param

from pyclickhouse import Param

# String parameter (default type)
param = Param("name")
# {name:String}

# Integer parameter
param = Param("user_id", int)
# {user_id:Int64}

# Float parameter
param = Param("min_price", float)
# {min_price:Float64}

Param in Filters

# Filter with parameter
query = Query(users).filter(users.id >= Param("min_id", int))
# SELECT * FROM users WHERE id >= {min_id:Int64}

# Multiple parameters
query = Query(users).filter(
    (users.id >= Param("min_id", int)) &
    (users.name == Param("search_name", str))
)
# SELECT * FROM users WHERE id >= {min_id:Int64} AND name = {search_name:String}

Executing Parameterized Queries

from pyclickhouse import Client, Reader

async def query_with_params():
    client = Client()

    query = Query(users).filter(
        users.id >= Param("min_id", int)
    )

    async with client:
        reader = Reader(client, query)
        results = await reader.query(
            parameters={"min_id": 100}
        )
        return results

Functions with F

The F object provides access to ClickHouse functions.

String Functions

from pyclickhouse import F

# String length
query = Query(users).derive(
    name_length=F.length(users.name)
)

# Case conversion
query = Query(users).derive(
    name_upper=F.upper(users.name),
    name_lower=F.lower(users.name)
)

# Substring
query = Query(users).derive(
    first_char=F.substring(users.name, 1, 1)
)

# Concatenation
query = Query(users).derive(
    full_info=F.concat(users.name, " - ", users.email)
)

# Trim
query = Query(users).derive(
    trimmed_name=F.trim(users.name)
)

Date/Time Functions

# Current date/time
query = Query(events).derive(
    current_time=F.now(),
    current_date=F.today()
)

# Extract date components
query = Query(events).derive(
    year=F.year(events.timestamp),
    month=F.month(events.timestamp),
    day=F.dayOfMonth(events.timestamp),
    hour=F.hour(events.timestamp)
)

# Start of intervals
query = Query(events).derive(
    start_of_day=F.toStartOfDay(events.timestamp),
    start_of_month=F.toStartOfMonth(events.timestamp),
    start_of_hour=F.toStartOfHour(events.timestamp)
)

Math Functions

# Rounding and absolute value
query = Query(metrics).derive(
    rounded=F.round(metrics.value, 2),
    absolute=F.abs(metrics.value),
    square_root=F.sqrt(metrics.value)
)

# Floor and ceiling
query = Query(data).derive(
    floored=F.floor(data.value),
    ceiling=F.ceil(data.value)
)

Aggregate Functions

# Count
F.count()
F.count(users.id)  # Count non-null values

# Sum and average
F.sum(orders.amount)
F.avg(orders.amount)

# Min and max
F.min(metrics.value)
F.max(metrics.value)

# Unique count
F.uniq(users.id)

# Array aggregation
F.groupArray(items.name)

# State functions (for materialized views)
F.sumState(values.amount)
F.avgState(values.amount)
F.countState()

# Merge functions
F.sumMerge(agg_values.amount_state)
F.avgMerge(agg_values.avg_state)

Complete Examples

Example 1: Simple Filtering and Selection

from pyclickhouse import Query, Table
from pydantic import BaseModel

class User(BaseModel):
    id: int
    name: str
    email: str

users = Table(model=User, name="users")

query = Query(users).filter(users.id > 10).select(users.name)
print(str(query))
# SELECT name FROM users WHERE id > 10

Example 2: Grouping and Aggregation

from pyclickhouse import Query, Table, F, Aggregate

class Order(BaseModel):
    customer_id: int
    amount: float

orders = Table(model=Order, name="orders")

# Orders per customer with totals
query = Query(orders).group(
    orders.customer_id,
    order_count=Aggregate(F.count()),
    total_spent=Aggregate(F.sum(orders.amount)),
    avg_order=Aggregate(F.avg(orders.amount))
)

print(str(query))
# SELECT customer_id, count() AS order_count, sum(amount) AS total_spent, avg(amount) AS avg_order
# FROM orders GROUP BY customer_id

Example 3: Time Series Aggregation

from pyclickhouse import Query, Table, F, Aggregate

class Metric(BaseModel):
    timestamp: str
    service: str
    cpu_usage: float

metrics = Table(model=Metric, name="metrics")

# Hourly stats by service
query = Query(metrics).group(
    service=metrics.service,
    hour=F.toStartOfHour(metrics.timestamp),
    avg_cpu=Aggregate(F.avg(metrics.cpu_usage)),
    max_cpu=Aggregate(F.max(metrics.cpu_usage))
).sort(metrics.service, -F.toStartOfHour(metrics.timestamp))

print(str(query))
# SELECT service, toStartOfHour(timestamp) AS hour, avg(cpu_usage) AS avg_cpu, max(cpu_usage) AS max_cpu
# FROM metrics GROUP BY service, toStartOfHour(timestamp) ORDER BY service, toStartOfHour(timestamp) DESC

Example 4: Filtering with Aggregation

from pyclickhouse import Query, Table, F

class Event(BaseModel):
    user_id: int
    event_type: str
    value: float

events = Table(model=Event, name="events")

# High-value purchases
query = Query(events).filter(
    (events.event_type == "purchase") & (events.value > 100)
).sort(-events.value).take(10)

print(str(query))
# SELECT * FROM events WHERE event_type = 'purchase' AND value > 100 ORDER BY value DESC LIMIT 10

Example 5: Parameterized Query

from pyclickhouse import Query, Table, Param

users = Table(model=User, name="users")

query = Query(users).filter(
    users.id >= Param("min_id", int)
).filter(
    users.name == Param("search_name", str)
)

print(str(query))
# SELECT * FROM users WHERE id >= {min_id:Int64} AND name = {search_name:String}

Example 6: Derive with Functions

from pyclickhouse import Query, Table, F

class Product(BaseModel):
    name: str
    price: float
    quantity: int

products = Table(model=Product, name="products")

query = Query(products).derive(
    total_value=products.price * products.quantity,
    name_length=F.length(products.name),
    price_rounded=F.round(products.price, 2)
)

print(str(query))
# SELECT *, price * quantity AS total_value, length(name) AS name_length, round(price, 2) AS price_rounded FROM products

Example 7: Window Functions - Running Totals

from pyclickhouse import Query, Table, Window, Aggregate, F

class Transaction(BaseModel):
    date: str
    amount: float

transactions = Table(model=Transaction, name="transactions")

# Running total (cumulative sum) across all rows
query = Query(transactions).window(
    Window(range=(None, 0)),
    cumulative_amount=Aggregate(F.sum(transactions.amount))
).sort(transactions.date)

print(str(query))
# SELECT *, sum(amount) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_amount 
# FROM transactions ORDER BY date

# Moving average (average of last 3 rows)
query = Query(transactions).window(
    Window(rows=(-3, 0)),
    moving_avg=Aggregate(F.avg(transactions.amount))
)

print(str(query))
# SELECT *, avg(amount) OVER (ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) AS moving_avg FROM transactions

Example 8: Group with Window Functions - Per-Group Running Totals

from pyclickhouse import Query, Table, Window, Aggregate, F

class Sale(BaseModel):
    region: str
    date: str
    amount: float

sales = Table(model=Sale, name="sales")

# Running total per region
query = Query(sales).group(
    sales.region,
    sales.date,
    daily_sales=Aggregate(F.sum(sales.amount)),
    region_running_total=Aggregate(F.sum(sales.amount)),
    Window(range=(None, 0))
).sort(sales.region, sales.date)

print(str(query))
# SELECT region, date, sum(amount) AS daily_sales,
#        sum(amount) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS region_running_total
# FROM sales GROUP BY region, date ORDER BY region, date

# Running average within each region (last 5 days)
query = Query(sales).group(
    sales.region,
    sales.date,
    daily_total=Aggregate(F.sum(sales.amount)),
    moving_avg_region=Aggregate(F.avg(sales.amount)),
    Window(rows=(-5, 0))
)

print(str(query))
# SELECT region, date, sum(amount) AS daily_total, avg(amount) OVER (ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS moving_avg_region
# FROM sales GROUP BY region, date

Example 9: Join Operations

from pyclickhouse import Query, Table, F, Aggregate
from pydantic import BaseModel

class User(BaseModel):
    id: int
    name: str
    email: str

class Order(BaseModel):
    id: int
    user_id: int
    amount: float
    created_at: str

class Product(BaseModel):
    id: int
    order_id: int
    name: str
    price: float

users = Table(model=User, name="users")
orders = Table(model=Order, name="orders")
products = Table(model=Product, name="products")

# Inner join (only matching records)
query = Query(orders).join(users, orders.user_id == users.id)
print(str(query))
# SELECT orders.*, users.* FROM orders INNER JOIN users ON orders.user_id = users.id

# Left join (keep all orders, even without users)
query = Query(orders).join(users, orders.user_id == users.id, side="left")
print(str(query))
# SELECT orders.*, users.* FROM orders LEFT OUTER JOIN users ON orders.user_id = users.id

# Join with multiple conditions
query = Query(orders).join(
    users,
    (orders.user_id == users.id) & (users.email.is_not_in(["invalid@test.com"]))
)
print(str(query))
# SELECT orders.*, users.* FROM orders INNER JOIN users ON orders.user_id = users.id AND users.email NOT IN ('invalid@test.com')

# Chaining multiple joins
query = Query(orders).join(
    users,
    orders.user_id == users.id
).join(
    products,
    orders.id == products.order_id
).select(
    orders.id,
    user_name=users.name,
    product_name=products.name,
    orders.amount
)
print(str(query))
# SELECT orders.id, users.name AS user_name, products.name AS product_name, orders.amount
# FROM orders
# INNER JOIN users ON orders.user_id = users.id
# INNER JOIN products ON orders.id = products.order_id

# Join with aggregation (order totals with user info)
query = Query(orders).join(
    users,
    orders.user_id == users.id
).group(
    users.id,
    user_name=users.name,
    total_spent=Aggregate(F.sum(orders.amount)),
    order_count=Aggregate(F.count()),
    avg_order_value=Aggregate(F.avg(orders.amount))
).sort(-F.sum(orders.amount)).take(10)

print(str(query))
# SELECT users.id, users.name AS user_name, sum(orders.amount) AS total_spent, count() AS order_count, avg(orders.amount) AS avg_order_value
# FROM orders INNER JOIN users ON orders.user_id = users.id
# GROUP BY users.id, users.name ORDER BY sum(orders.amount) DESC LIMIT 10

Example 10: Exclude Sensitive Columns

from pyclickhouse import Query, Table
from pydantic import BaseModel

class User(BaseModel):
    id: int
    name: str
    email: str
    phone: str
    ssn: str
    password_hash: str
    api_key: str

users = Table(model=User, name="users")

# Remove sensitive personal information
query = Query(users).select(
    users.id,
    users.name,
    users.email,
    users.phone,
    users.ssn,
    users.password_hash,
    users.api_key
).exclude(
    users.ssn,
    users.password_hash,
    users.api_key
)

print(str(query))
# SELECT id, name, email, phone FROM users

# Exclude intermediate calculation columns
class Product(BaseModel):
    id: int
    name: str
    cost: float
    markup_percent: float
    base_price: float
    tax_rate: float
    final_price: float

products = Table(model=Product, name="products")

query = Query(products).select(
    products.id,
    products.name,
    products.cost,
    products.base_price,
    products.tax_rate,
    products.final_price
).exclude(
    products.cost,
    products.tax_rate
)

print(str(query))
# SELECT id, name, base_price, final_price FROM products

# Exclude with filtering and derived columns
query = Query(users).filter(
    users.email.is_not_in(["test@example.com", "admin@example.com"])
).select(
    users.id,
    users.name,
    users.email,
    users.password_hash,
    users.created_at,
    users.last_login,
    users.internal_notes
).exclude(
    users.password_hash,
    users.internal_notes
)

print(str(query))
# SELECT id, name, email, created_at, last_login FROM users
# WHERE email NOT IN ('test@example.com', 'admin@example.com')

Aggregate vs Group vs Window

Understanding the differences between these three operations is crucial for writing correct queries.

Comparison Table

Feature aggregate() group() window()
Output rows Single row One per group One per input row
Use case Summarize all data Summarize per group Ranking, running totals
Aggregate wrapper NO wrapper YES wrapper YES wrapper
Partitioning Entire table By GROUP BY columns Implicit or explicit
Example Total sales Sales per region Running total per region

Aggregate: Summarize Entire Table

aggregate() produces a single row summarizing all input data. Do NOT use the Aggregate() wrapper.

# Count total orders
query = Query(orders).aggregate(F.count())
# SELECT count() FROM orders
# Output: 1 row with total count

# Multiple aggregate metrics
query = Query(orders).aggregate(
    total_orders=F.count(),
    total_amount=F.sum(orders.amount),
    avg_amount=F.avg(orders.amount),
    max_amount=F.max(orders.amount)
)
# SELECT count() AS total_orders, sum(amount) AS total_amount, avg(amount) AS avg_amount, max(amount) AS max_amount FROM orders
# Output: 1 row with all metrics

Group: Summarize by Categories

group() produces one row per unique group. MUST use the Aggregate() wrapper to mark aggregate functions.

# Group by customer (one row per customer)
query = Query(orders).group(
    orders.customer_id,
    total_orders=Aggregate(F.count()),
    total_amount=Aggregate(F.sum(orders.amount))
)
# SELECT customer_id, count() AS total_orders, sum(amount) AS total_amount FROM orders GROUP BY customer_id
# Output: N rows (one per customer)

# Group by date and region
query = Query(sales).group(
    sales.date,
    sales.region,
    daily_sales=Aggregate(F.sum(sales.amount)),
    transaction_count=Aggregate(F.count())
)
# SELECT date, region, sum(amount) AS daily_sales, count() AS transaction_count 
# FROM sales GROUP BY date, region
# Output: N rows (one per unique date+region combination)

Window: Per-Row Aggregates with Context

window() produces one row per input row with aggregate values computed over a window of rows. MUST use the Aggregate() wrapper.

# Running total for each row (cumulative sum)
query = Query(sales).window(
    Window(range=(None, 0)),
    running_total=Aggregate(F.sum(sales.amount))
)
# SELECT *, sum(amount) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_total FROM sales
# Output: Same number of rows as input, with running total

# Moving average (last 3 rows)
query = Query(metrics).window(
    Window(rows=(-3, 0)),
    moving_avg=Aggregate(F.avg(metrics.value))
)
# SELECT *, avg(value) OVER (ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) AS moving_avg FROM metrics
# Output: Same number of rows as input, with moving average

Group with Window: Grouped Running Totals

Combine group() with Window() to get running totals per group:

# Running total per customer
query = Query(orders).group(
    orders.customer_id,
    order_amount=Aggregate(F.sum(orders.amount)),
    cumulative=Aggregate(F.sum(orders.amount)),
    Window(range=(None, 0))
)
# SELECT customer_id, sum(amount) AS order_amount, 
#        sum(amount) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative
# FROM orders GROUP BY customer_id
# Output: One row per customer with daily amount and running total

# Hourly sales with daily running total
query = Query(sales).group(
    sales.date,
    sales.hour,
    hourly_sales=Aggregate(F.sum(sales.amount)),
    daily_running_total=Aggregate(F.sum(sales.amount)),
    Window(range=(None, 0))
)
# SELECT date, hour, sum(amount) AS hourly_sales,
#        sum(amount) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS daily_running_total
# FROM sales GROUP BY date, hour
# Output: One row per hour, with cumulative total within each day

Side-by-Side Example

from pyclickhouse import Query, Table, F, Aggregate, Window
from pydantic import BaseModel

class Sale(BaseModel):
    date: str
    amount: float

sales = Table(model=Sale, name="sales")

# 1. Aggregate: Total across all days
q1 = Query(sales).aggregate(
    total_sales=F.sum(sales.amount)
)
# Output: | total_sales |
#         | 1000        |

# 2. Group: Total per day
q2 = Query(sales).group(
    sales.date,
    daily_sales=Aggregate(F.sum(sales.amount))
)
# Output: | date       | daily_sales |
#         | 2024-01-01 | 100         |
#         | 2024-01-02 | 150         |
#         | 2024-01-03 | 250         |

# 3. Window: Each row with running total
q3 = Query(sales).window(
    Window(range=(None, 0)),
    cumulative=Aggregate(F.sum(sales.amount))
)
# Output: | date       | amount | cumulative |
#         | 2024-01-01 | 100    | 100        |
#         | 2024-01-02 | 150    | 250        |
#         | 2024-01-03 | 250    | 500        |

# 4. Group + Window: Per-day running total
q4 = Query(sales).group(
    sales.date,
    Aggregate(F.sum(sales.amount)),
    Window(range=(None, 0))
)
# Output: | date       | sum(amount) | sum(...) OVER (...) |
#         | 2024-01-01 | 100         | 100                 |
#         | 2024-01-02 | 150         | 250                 |
#         | 2024-01-03 | 250         | 500                 |

Learn More