Skip to content

ClickHouse Client

The ClickHouse client provides async access to the ClickHouse analytics database with GDPR-compliant privacy utilities.

Overview

The ClickHouse client manages connections to ClickHouse and provides query execution, data insertion, and schema management capabilities. It includes utilities for GDPR-compliant data handling.

Basic Usage

from imbi_common import clickhouse

# Initialize the client
await clickhouse.initialize()

# Set up database schema from schemata.toml
await clickhouse.setup_schema()

# Insert data (list of Pydantic models)
await clickhouse.insert("table_name", [model_instance])

# Query data
results = await clickhouse.query(
    "SELECT user_id, COUNT(*) as count "
    "FROM session_activity "
    "GROUP BY user_id"
)

for row in results:
    print(f"{row['user_id']}: {row['count']}")

Privacy Utilities

from imbi_common.clickhouse import privacy

# Truncate IP addresses for GDPR compliance
ipv4 = privacy.truncate_ip_to_subnet("192.168.1.100")  # "192.168.1.0"
ipv6 = privacy.truncate_ip_to_subnet("2001:0db8::1")   # "2001:0db8::"

# Hash IP addresses
hashed = privacy.hash_ip_address("192.168.1.100")

API Reference

Initialization

initialize async

initialize() -> bool

Create a new async client and test the connection.

Source code in src/imbi_common/clickhouse/__init__.py
async def initialize() -> bool:
    """Create a new async client and test the connection."""
    return await client.Clickhouse.get_instance().initialize()

setup_schema async

setup_schema() -> None

Execute DDL queries from schemata.toml to set up database schema.

Source code in src/imbi_common/clickhouse/__init__.py
async def setup_schema() -> None:
    """Execute DDL queries from schemata.toml to set up database schema."""
    await client.Clickhouse.get_instance().setup_schema()

Query Operations

query async

query(
    sql: str, parameters: dict[str, Any] | None = None
) -> list[dict[str, typing.Any]]

Query the Clickhouse database and return results as a list of dicts.

Parameters:

Name Type Description Default
sql str

SQL query string, possibly with parameter placeholders

required
parameters dict[str, Any] | None

Optional dictionary of parameter values

None

Returns:

Type Description
list[dict[str, Any]]

List of dictionaries mapping column names to values

Raises:

Type Description
DatabaseError

If there's an error executing the query

Source code in src/imbi_common/clickhouse/__init__.py
async def query(
    sql: str, parameters: dict[str, typing.Any] | None = None
) -> list[dict[str, typing.Any]]:
    """Query the Clickhouse database and return results as a list of dicts.

    Args:
       sql: SQL query string, possibly with parameter placeholders
       parameters: Optional dictionary of parameter values

    Returns:
       List of dictionaries mapping column names to values

    Raises:
       DatabaseError: If there's an error executing the query
    """
    clickhouse = client.Clickhouse.get_instance()
    return await clickhouse.query(sql, parameters=parameters)

insert async

insert(
    table: str, data: list[BaseModel]
) -> summary.QuerySummary

Insert data into Clickhouse.

Parameters:

Name Type Description Default
table str

The name of the table to insert into

required
data list[BaseModel]

List of Pydantic models to insert (all must be the same type)

required

Returns:

Type Description
QuerySummary

QuerySummary containing information about the insert operation

Raises:

Type Description
ValueError

If data list is empty or models are not all the same type

Source code in src/imbi_common/clickhouse/__init__.py
async def insert(
    table: str, data: list[pydantic.BaseModel]
) -> summary.QuerySummary:
    """Insert data into Clickhouse.

    Args:
        table: The name of the table to insert into
        data: List of Pydantic models to insert (all must be the same type)

    Returns:
        QuerySummary containing information about the insert operation

    Raises:
        ValueError: If data list is empty or models are not all the same type
    """
    if not data:
        raise ValueError('Data list cannot be empty')

    # Validate all models are of the same type
    first_type = type(data[0])
    if not all(type(model) is first_type for model in data):
        raise ValueError(
            f'All models must be of the same type. '
            f'Expected {first_type.__name__}, but found mixed types.'
        )

    column_names = list(data[0].model_dump().keys())
    clickhouse = client.Clickhouse.get_instance()
    return await clickhouse.insert(
        table,
        [list(model.model_dump().values()) for model in data],
        column_names,
    )

Client

Clickhouse

Clickhouse()
Source code in src/imbi_common/clickhouse/client.py
def __init__(self) -> None:
    self._clickhouse: asyncclient.AsyncClient | None = None
    self._lock = asyncio.Lock()
    self._settings = settings.Clickhouse()
    format.set_read_format('IPv*', 'string')

aclose async

aclose() -> None

Close any open connections to Clickhouse.

Source code in src/imbi_common/clickhouse/client.py
async def aclose(self) -> None:
    """Close any open connections to Clickhouse."""
    async with self._lock:
        if self._clickhouse is not None:
            # https://github.com/ClickHouse/clickhouse-connect/issues/567
            await self._clickhouse.close()  # type: ignore[no-untyped-call]
        self._clickhouse = None

get_instance classmethod

get_instance() -> Clickhouse

Get an instance of the Clickhouse client.

Source code in src/imbi_common/clickhouse/client.py
@classmethod
def get_instance(cls) -> 'Clickhouse':
    """Get an instance of the Clickhouse client."""
    if cls._instance is None:
        cls._instance = cls()
    return cls._instance

initialize async

initialize() -> bool

Create a new async client and test the connection.

Source code in src/imbi_common/clickhouse/client.py
async def initialize(self) -> bool:
    """Create a new async client and test the connection."""
    LOGGER.debug('Starting Clickhouse')
    async with self._lock:
        if self._clickhouse is None:
            self._clickhouse = await self._connect()

    return self._clickhouse is not None

insert async

insert(
    table: str,
    data: list[list[Any]],
    column_names: list[str],
) -> summary.QuerySummary

Insert data into Clickhouse

Source code in src/imbi_common/clickhouse/client.py
async def insert(
    self, table: str, data: list[list[typing.Any]], column_names: list[str]
) -> summary.QuerySummary:
    """Insert data into Clickhouse"""
    LOGGER.debug('Clickhouse INSERT: %s (%r)', table, column_names)
    if not self._clickhouse:
        await self.initialize()
    if not self._clickhouse:
        raise RuntimeError('Failed to initialize ClickHouse client')
    try:
        return await self._clickhouse.insert(
            table, data, column_names=column_names
        )
    except exceptions.DatabaseError as err:
        LOGGER.error('Error inserting data to %s: %s', table, err)
        if sentry_sdk:
            sentry_sdk.capture_exception(err)
        raise DatabaseError(str(err)) from err

query async

query(
    statement: str, parameters: dict[str, Any] | None = None
) -> list[dict[str, typing.Any]]

Query the Clickhouse database and return results as a list of dicts.

Parameters:

Name Type Description Default
statement str

SQL query string, possibly with parameter placeholders

required
parameters dict[str, Any] | None

Optional dictionary of parameter values

None

Returns:

Type Description
list[dict[str, Any]]

List of dictionaries mapping column names to values

Raises:

Type Description
DatabaseError

If there's an error executing the query

Source code in src/imbi_common/clickhouse/client.py
async def query(
    self, statement: str, parameters: dict[str, typing.Any] | None = None
) -> list[dict[str, typing.Any]]:
    """Query the Clickhouse database and return results as a list of dicts.

    Args:
       statement: SQL query string, possibly with parameter placeholders
       parameters: Optional dictionary of parameter values

    Returns:
       List of dictionaries mapping column names to values

    Raises:
       DatabaseError: If there's an error executing the query
    """
    if not self._clickhouse:
        await self.initialize()
    if not self._clickhouse:
        raise RuntimeError('Failed to initialize ClickHouse client')
    LOGGER.debug('Clickhouse QUERY: %s', statement)
    LOGGER.debug('Clickhouse QUERY Parameters: %r', parameters)
    try:
        result = await self._clickhouse.query(
            statement, parameters=parameters or {}
        )
    except exceptions.DatabaseError as err:
        LOGGER.error('Error querying data: %s', err)
        if sentry_sdk:
            sentry_sdk.capture_exception(err)
        raise DatabaseError(str(err)) from err
    results = []
    for row in result.result_rows:
        data = dict(zip(result.column_names, row, strict=False))
        results.append(data)
    return results

setup_schema async

setup_schema() -> None

Execute DDL queries from schemata.toml to set up database schema.

This should be called explicitly during initial setup, not on every startup. Loads and executes enabled queries from schemata.toml.

Source code in src/imbi_common/clickhouse/client.py
async def setup_schema(self) -> None:
    """Execute DDL queries from schemata.toml to set up database schema.

    This should be called explicitly during initial setup, not on every
    startup. Loads and executes enabled queries from schemata.toml.
    """
    if not self._clickhouse:
        await self.initialize()
    if not self._clickhouse:
        raise RuntimeError('Failed to initialize ClickHouse client')

    await self._execute_schemata_queries()

Privacy Utilities

hash_ip_address

hash_ip_address(ip: str) -> str

Hash IP address using SHA256 for privacy-preserving storage.

Parameters:

Name Type Description Default
ip str

IP address string (IPv4 or IPv6)

required

Returns:

Type Description
str

SHA256 hex digest of the IP address

Example

hash_ip_address('192.168.1.1') 'c775e7b757ede630cd0aa1113bd102661ab38829ca52a6422ab782862f268646'

Source code in src/imbi_common/clickhouse/privacy.py
def hash_ip_address(ip: str) -> str:
    """Hash IP address using SHA256 for privacy-preserving storage.

    Args:
        ip: IP address string (IPv4 or IPv6)

    Returns:
        SHA256 hex digest of the IP address

    Example:
        >>> hash_ip_address('192.168.1.1')
        'c775e7b757ede630cd0aa1113bd102661ab38829ca52a6422ab782862f268646'

    """
    return hashlib.sha256(ip.encode('utf-8')).hexdigest()

truncate_ip_to_subnet

truncate_ip_to_subnet(ip: str) -> str

Truncate IP address to subnet for privacy-preserving storage.

IPv4 addresses are truncated to /24 subnet (last octet zeroed). IPv6 addresses are truncated to /48 subnet.

Parameters:

Name Type Description Default
ip str

IP address string (IPv4 or IPv6)

required

Returns:

Type Description
str

Truncated IP subnet string

Raises:

Type Description
ValueError

If IP address is invalid

Examples:

>>> truncate_ip_to_subnet('192.168.1.1')
'192.168.1.0'
>>> truncate_ip_to_subnet('2001:0db8:85a3:0000:0000:8a2e:0370:7334')
'2001:db8:85a3::'
Source code in src/imbi_common/clickhouse/privacy.py
def truncate_ip_to_subnet(ip: str) -> str:
    """Truncate IP address to subnet for privacy-preserving storage.

    IPv4 addresses are truncated to /24 subnet (last octet zeroed).
    IPv6 addresses are truncated to /48 subnet.

    Args:
        ip: IP address string (IPv4 or IPv6)

    Returns:
        Truncated IP subnet string

    Raises:
        ValueError: If IP address is invalid

    Examples:
        >>> truncate_ip_to_subnet('192.168.1.1')
        '192.168.1.0'
        >>> truncate_ip_to_subnet('2001:0db8:85a3:0000:0000:8a2e:0370:7334')
        '2001:db8:85a3::'

    """
    try:
        ip_obj = ipaddress.ip_address(ip)
    except ValueError as err:
        raise ValueError(f'Invalid IP address: {ip}') from err

    if isinstance(ip_obj, ipaddress.IPv4Address):
        # Truncate to /24 (zero last octet)
        network_v4 = ipaddress.IPv4Network(f'{ip}/24', strict=False)
        return str(network_v4.network_address)
    else:
        # Truncate IPv6 to /48
        network_v6 = ipaddress.IPv6Network(f'{ip}/48', strict=False)
        return str(network_v6.network_address)

parse_user_agent

parse_user_agent(user_agent: str | None) -> tuple[str, str]

Parse user agent string to extract browser family and version.

This function performs basic parsing without external dependencies. For production use, consider using the 'user-agents' package for more accurate parsing.

Parameters:

Name Type Description Default
user_agent str | None

User agent string from HTTP headers

required

Returns:

Type Description
str

Tuple of (browser_family, version) where version is 'major.minor'

str

Returns ('unknown', 'unknown') if parsing fails

Examples:

>>> parse_user_agent(
...     'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
...     'AppleWebKit/537.36 (KHTML, like Gecko) '
...     'Chrome/91.0.4472.124 Safari/537.36'
... )
('Chrome', '91.0')
Note

This is a simplified parser. For better accuracy, integrate a dedicated user agent parsing library like 'user-agents' or 'ua-parser'.

Source code in src/imbi_common/clickhouse/privacy.py
def parse_user_agent(
    user_agent: str | None,
) -> tuple[str, str]:
    """Parse user agent string to extract browser family and version.

    This function performs basic parsing without external dependencies.
    For production use, consider using the 'user-agents' package for more
    accurate parsing.

    Args:
        user_agent: User agent string from HTTP headers

    Returns:
        Tuple of (browser_family, version) where version is 'major.minor'
        Returns ('unknown', 'unknown') if parsing fails

    Examples:
        >>> parse_user_agent(
        ...     'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
        ...     'AppleWebKit/537.36 (KHTML, like Gecko) '
        ...     'Chrome/91.0.4472.124 Safari/537.36'
        ... )
        ('Chrome', '91.0')

    Note:
        This is a simplified parser. For better accuracy, integrate
        a dedicated user agent parsing library like 'user-agents' or
        'ua-parser'.

    """
    if not user_agent:
        return ('unknown', 'unknown')

    ua_lower = user_agent.lower()

    # Simple heuristic-based parsing
    browser_patterns: list[tuple[str, str]] = [
        ('edg/', 'Edge'),
        ('chrome/', 'Chrome'),
        ('firefox/', 'Firefox'),
        ('safari/', 'Safari'),
        ('opera/', 'Opera'),
    ]

    for pattern, family in browser_patterns:
        if pattern in ua_lower:
            try:
                # Extract version after pattern
                start = ua_lower.index(pattern) + len(pattern)
                version_str = user_agent[start:].split()[0]
                # Get major.minor only
                parts = version_str.split('.')
                if len(parts) >= 2:
                    version = f'{parts[0]}.{parts[1]}'
                else:
                    version = parts[0]
                return (family, version)
            except (IndexError, ValueError):
                return (family, 'unknown')

    return ('unknown', 'unknown')

sanitize_metadata

sanitize_metadata(metadata: dict[str, Any]) -> str

Sanitize metadata dictionary to ensure no PII is included.

Removes or redacts fields that commonly contain PII: - email, password, token, api_key, secret - Any field containing 'email', 'password', 'token', 'secret'

Parameters:

Name Type Description Default
metadata dict[str, Any]

Dictionary of metadata to sanitize

required

Returns:

Type Description
str

JSON string with PII fields redacted

Example

sanitize_metadata({ ... 'endpoint': '/api/users', ... 'email': 'user@example.com', ... 'status': 200 ... }) '{"endpoint": "/api/users", "email": "[REDACTED]", "status": 200}'

Source code in src/imbi_common/clickhouse/privacy.py
def sanitize_metadata(metadata: dict[str, typing.Any]) -> str:
    """Sanitize metadata dictionary to ensure no PII is included.

    Removes or redacts fields that commonly contain PII:
    - email, password, token, api_key, secret
    - Any field containing 'email', 'password', 'token', 'secret'

    Args:
        metadata: Dictionary of metadata to sanitize

    Returns:
        JSON string with PII fields redacted

    Example:
        >>> sanitize_metadata({
        ...     'endpoint': '/api/users',
        ...     'email': 'user@example.com',
        ...     'status': 200
        ... })
        '{"endpoint": "/api/users", "email": "[REDACTED]", "status": 200}'

    """
    import json

    pii_keywords = ['email', 'password', 'token', 'secret', 'api_key']

    sanitized = {}
    for key, value in metadata.items():
        key_lower = key.lower()
        if any(keyword in key_lower for keyword in pii_keywords):
            sanitized[key] = '[REDACTED]'
        elif isinstance(value, dict):
            # Recursively sanitize nested dicts
            sanitized[key] = json.loads(sanitize_metadata(value))
        elif isinstance(value, str) and any(
            keyword in value.lower() for keyword in pii_keywords
        ):
            # Redact string values that look like they contain PII
            sanitized[key] = '[REDACTED]'
        else:
            sanitized[key] = value

    return json.dumps(sanitized)