Skip to content

Neo4j Client

The Neo4j client provides async access to the Neo4j graph database with connection pooling and CRUD operations.

Overview

The Neo4j client is a singleton that maintains a connection pool to the Neo4j database. It provides high-level CRUD operations and low-level query execution capabilities through cypherantic integration.

Basic Usage

from imbi_common import neo4j, models

# Initialize the client (creates indexes/constraints)
await neo4j.initialize()

# Create a node
org = models.Organization(name="My Org", slug="my-org")
created = await neo4j.create_node(org)

# Fetch a single node
org = await neo4j.fetch_node(
    models.Organization, {"slug": "my-org"}
)

# Fetch multiple nodes
async for team in neo4j.fetch_nodes(
    models.Team, order_by="name"
):
    print(team.name)

# Upsert a node
await neo4j.upsert(org, constraint={"slug": org.slug})

# Create a relationship
await neo4j.create_relationship(
    from_node=team,
    to_node=org,
    rel_type="BELONGS_TO"
)

# Delete a node
await neo4j.delete_node(
    models.Organization, {"slug": "my-org"}
)

# Close the connection
await neo4j.aclose()

API Reference

Lifecycle

initialize async

initialize() -> None

Initialize the Neo4j connection, setting up the indexes if necessary.

Source code in src/imbi_common/neo4j/__init__.py
async def initialize() -> None:
    """Initialize the Neo4j connection, setting up the indexes if necessary."""
    await client.Neo4j.get_instance().initialize()

aclose async

aclose() -> None

Close the Neo4j connection

Source code in src/imbi_common/neo4j/__init__.py
async def aclose() -> None:
    """Close the Neo4j connection"""
    await client.Neo4j.get_instance().aclose()

session async

session() -> typing.AsyncGenerator[
    cypherantic.SessionType, None
]

Return a Neo4j AsyncSession for use in queries

Source code in src/imbi_common/neo4j/__init__.py
@contextlib.asynccontextmanager
async def session() -> typing.AsyncGenerator[cypherantic.SessionType, None]:
    """Return a Neo4j AsyncSession for use in queries"""
    instance = client.Neo4j.get_instance()
    async with instance.session() as sess:
        yield sess

CRUD Operations

create_node async

create_node(model: ModelType) -> ModelType

Create a node in the graph.

This method uses cypherantic to create a node with: - Labels extracted from model's cypherantic_config or class name - Automatic unique constraints for fields marked with Field(unique=True) - Properties from the model's fields (excluding relationship fields)

:param model: Pydantic model instance to create as a node :returns: The created node as a Pydantic model with round-trip values

Source code in src/imbi_common/neo4j/__init__.py
async def create_node(model: ModelType) -> ModelType:
    """Create a node in the graph.

    This method uses cypherantic to create a node with:
    - Labels extracted from model's ``cypherantic_config`` or class name
    - Automatic unique constraints for fields marked with
      ``Field(unique=True)``
    - Properties from the model's fields (excluding relationship fields)

    :param model: Pydantic model instance to create as a node
    :returns: The created node as a Pydantic model with round-trip values

    """
    async with session() as sess:
        node = await cypherantic.create_node(sess, model)
        node_props = convert_neo4j_types(dict(node))
        # Use model_validate to ensure field validators run on
        # round-tripped data (e.g. json_schema stored as string)
        prepared = _prepare_node_data(type(model), node_props)
        return type(model).model_validate(prepared)

fetch_node async

fetch_node(
    model: type[ModelType], parameters: dict[str, Any]
) -> ModelType | None

Fetch a single node from the graph by its unique key fields

Source code in src/imbi_common/neo4j/__init__.py
async def fetch_node(
    model: type[ModelType],
    parameters: dict[str, typing.Any],
) -> ModelType | None:
    """Fetch a single node from the graph by its unique key fields"""
    query = _build_fetch_query(model, parameters)
    LOGGER.debug('Running Query: %s', query)
    async with run(query, **parameters) as result:
        record = await result.single()
    if record:
        node_data = convert_neo4j_types(record.data()['node'])
        prepared_data = _prepare_node_data(model, node_data)
        return model.model_validate(prepared_data)
    return None

fetch_nodes async

fetch_nodes(
    model: type[ModelType],
    parameters: dict[str, Any] | None = None,
    order_by: str | list[str] | None = None,
) -> typing.AsyncGenerator[ModelType, None]

Fetch nodes from the graph, optionally filtered by parameters

Source code in src/imbi_common/neo4j/__init__.py
async def fetch_nodes(
    model: type[ModelType],
    parameters: dict[str, typing.Any] | None = None,
    order_by: str | list[str] | None = None,
) -> typing.AsyncGenerator[ModelType, None]:
    """Fetch nodes from the graph, optionally filtered by parameters"""
    query = _build_fetch_query(model, parameters, order_by)
    LOGGER.debug('Running Query: %s', query)
    async with run(query, **parameters or {}) as result:
        async for record in result:
            node_data = convert_neo4j_types(record.data()['node'])
            prepared_data = _prepare_node_data(model, node_data)
            yield model.model_validate(prepared_data)

upsert async

upsert(
    node: BaseModel,
    constraint: dict[str, Any],
    auto_increment: list[str] | None = None,
) -> str

Save a node to the graph, returning the elementId.

:param node: Pydantic model instance to upsert :param constraint: Dict of properties for the MERGE match :param auto_increment: Optional list of field names (not aliases) to increment atomically server-side using coalesce(node.field, 0) + 1 instead of client values. The model instance is updated in-place with the new values. :returns: The elementId of the upserted node

Source code in src/imbi_common/neo4j/__init__.py
async def upsert(
    node: pydantic.BaseModel,
    constraint: dict[str, typing.Any],
    auto_increment: list[str] | None = None,
) -> str:
    """Save a node to the graph, returning the elementId.

    :param node: Pydantic model instance to upsert
    :param constraint: Dict of properties for the MERGE match
    :param auto_increment: Optional list of field names (not aliases)
        to increment atomically server-side using
        ``coalesce(node.field, 0) + 1`` instead of client values.
        The model instance is updated in-place with the new values.
    :returns: The elementId of the upserted node

    """
    auto_increment_fields = set(auto_increment or [])

    # Build field-name-to-alias mapping for alias-safe handling
    field_to_alias = {
        name: info.alias or name for name, info in node.model_fields.items()
    }
    alias_to_field = {v: k for k, v in field_to_alias.items()}
    auto_increment_aliases = {
        field_to_alias.get(f, f) for f in auto_increment_fields
    }

    properties = node.model_dump(by_alias=True)
    label = node.__class__.__name__
    assignment = []
    for key in properties.keys():
        if key in auto_increment_aliases:
            assignment.append(f'node.{key} = coalesce(node.{key}, 0) + 1')
        else:
            assignment.append(f'node.{key} = ${key}')

    # Namespace constraint params to avoid collisions with
    # model properties (e.g. when renaming a slug)
    constraint_params = {f'_c_{k}': v for k, v in constraint.items()}
    model_params = {
        k: v for k, v in properties.items() if k not in auto_increment_aliases
    }
    collisions = set(constraint_params).intersection(model_params)
    if collisions:
        raise ValueError(
            f'Parameter namespace collision in upsert: {sorted(collisions)}'
        )
    parameters = {**constraint_params, **model_params}

    where_props = ', '.join(f'{k}: $_c_{k}' for k in constraint)

    return_fields = 'elementId(node) AS nodeId'
    if auto_increment_aliases:
        return_fields += ', ' + ', '.join(
            f'node.{key} AS {key}' for key in sorted(auto_increment_aliases)
        )

    query = (
        f'         MERGE (node:{label} {{{where_props}}})'
        f' ON CREATE SET {", ".join(assignment)}'
        f'  ON MATCH SET {", ".join(assignment)}'
        f'        RETURN {return_fields}'
    ).strip()
    LOGGER.debug('Upsert query: %s', query)
    LOGGER.debug('Upsert parameters: %r', parameters)
    # Upsert the node
    async with run(query, **parameters) as result:
        record = await result.single()
        if record is None:
            raise ValueError('Upsert query returned no results')
        # Update model in-place with server-computed values
        for alias in sorted(auto_increment_aliases):
            if alias in record:
                field_name = alias_to_field.get(alias, alias)
                setattr(node, field_name, record[alias])
        return str(record['nodeId'])

delete_node async

delete_node(
    model: type[BaseModel], parameters: dict[str, Any]
) -> bool

Delete a node from the graph by matching parameters.

This method deletes a node that matches the given parameters. The node label is extracted from the model class name.

:param model: Pydantic model class (label extracted from class name) :param parameters: Dict of properties to match (e.g., {'slug': 'my-slug', 'type': 'Project'}) :returns: True if node was deleted, False if not found

Example::

from imbi_common import models, neo4j

# Delete a blueprint by slug and type
deleted = await neo4j.delete_node(
    models.Blueprint,
    {'slug': 'my-blueprint', 'type': 'Project'}
)
if deleted:
    print('Blueprint deleted successfully')
else:
    print('Blueprint not found')
Source code in src/imbi_common/neo4j/__init__.py
async def delete_node(
    model: type[pydantic.BaseModel],
    parameters: dict[str, typing.Any],
) -> bool:
    """Delete a node from the graph by matching parameters.

    This method deletes a node that matches the given parameters.
    The node label is extracted from the model class name.

    :param model: Pydantic model class (label extracted from class name)
    :param parameters: Dict of properties to match
        (e.g., ``{'slug': 'my-slug', 'type': 'Project'}``)
    :returns: True if node was deleted, False if not found

    Example::

        from imbi_common import models, neo4j

        # Delete a blueprint by slug and type
        deleted = await neo4j.delete_node(
            models.Blueprint,
            {'slug': 'my-blueprint', 'type': 'Project'}
        )
        if deleted:
            print('Blueprint deleted successfully')
        else:
            print('Blueprint not found')

    """
    label = model.__name__
    where_clauses = [f'node.{key} = ${key}' for key in parameters]
    where_clause = ' AND '.join(where_clauses)

    query = f"""
     MATCH (node:{label})
     WHERE {where_clause}
    DETACH DELETE node
    RETURN count(node) as deleted
    """

    LOGGER.debug('Delete query: %s', query)
    LOGGER.debug('Delete parameters: %r', parameters)

    async with run(query, **parameters) as result:
        record = await result.single()
        return record is not None and record['deleted'] > 0

Relationships

create_relationship async

create_relationship(
    from_node: SourceNode,
    to_node: TargetNode,
    rel_props: RelationshipProperties,
) -> neo4j.graph.Relationship
create_relationship(
    from_node: SourceNode,
    to_node: TargetNode,
    *,
    rel_type: str,
) -> neo4j.graph.Relationship
create_relationship(
    from_node: SourceNode,
    to_node: TargetNode,
    rel_props: RelationshipProperties | None = None,
    *,
    rel_type: str | None = None,
) -> neo4j.graph.Relationship

Create a relationship between two nodes.

This method creates a relationship by:

  • Matching nodes by their unique key fields (Field(unique=True) metadata)
  • Using relationship type from rel_props.cypherantic_config or rel_type parameter
  • Attaching properties from rel_props model if provided

Either rel_props (a Pydantic model with relationship properties and config) or rel_type (a string relationship type) must be provided.

:param from_node: Source node model instance :param to_node: Target node model instance :param rel_props: Optional Pydantic model containing relationship properties :param rel_type: Optional explicit relationship type name :returns: The created Neo4j relationship object

Source code in src/imbi_common/neo4j/__init__.py
async def create_relationship(
    from_node: SourceNode,
    to_node: TargetNode,
    rel_props: RelationshipProperties | None = None,
    *,
    rel_type: str | None = None,
) -> neo4j.graph.Relationship:
    """Create a relationship between two nodes.

    This method creates a relationship by:

    - Matching nodes by their unique key fields
      (``Field(unique=True)`` metadata)
    - Using relationship type from ``rel_props.cypherantic_config``
      or ``rel_type`` parameter
    - Attaching properties from ``rel_props`` model if provided

    Either ``rel_props`` (a Pydantic model with relationship properties
    and config) or ``rel_type`` (a string relationship type) must be
    provided.

    :param from_node: Source node model instance
    :param to_node: Target node model instance
    :param rel_props: Optional Pydantic model containing relationship
        properties
    :param rel_type: Optional explicit relationship type name
    :returns: The created Neo4j relationship object

    """
    async with session() as sess:
        if rel_props is not None:
            return await cypherantic.create_relationship(
                sess, from_node, to_node, rel_props
            )
        elif rel_type is not None:
            return await cypherantic.create_relationship(
                sess, from_node, to_node, rel_type=rel_type
            )
        else:
            raise ValueError('Either rel_props or rel_type must be provided')

refresh_relationship async

refresh_relationship(
    model: SourceNode, rel_property: str
) -> None

Lazy-load and populate a relationship property on a model instance.

This is a lazy-loading mechanism that:

  • Fetches related nodes from the graph based on relationship metadata
  • Populates the specified relationship property on the model instance
  • Validates the property is a Sequence with Relationship metadata

The relationship property must be annotated as a sequence type (e.g., list[EdgeType]) and have Relationship metadata that specifies the relationship type and direction.

Use this when you have a model instance and need to populate its relationship properties from the graph database.

:param model: Model instance to populate :param rel_property: Name of the relationship property to refresh

Example::

from typing import Annotated
from cypherantic import Relationship

class Person(NodeModel):
    name: Annotated[str, Field(unique=True)]
    friends: Annotated[
        list[FriendEdge],
        Relationship(rel_type='FRIENDS_WITH', direction='OUTGOING')
    ] = []

person = Person(name='Alice')
await refresh_relationship(person, 'friends')
# person.friends now contains list of FriendEdge instances
Source code in src/imbi_common/neo4j/__init__.py
async def refresh_relationship(model: SourceNode, rel_property: str) -> None:
    """Lazy-load and populate a relationship property on a model instance.

    This is a lazy-loading mechanism that:

    - Fetches related nodes from the graph based on relationship metadata
    - Populates the specified relationship property on the model instance
    - Validates the property is a Sequence with ``Relationship`` metadata

    The relationship property must be annotated as a sequence type
    (e.g., ``list[EdgeType]``) and have ``Relationship`` metadata that
    specifies the relationship type and direction.

    Use this when you have a model instance and need to populate its
    relationship properties from the graph database.

    :param model: Model instance to populate
    :param rel_property: Name of the relationship property to refresh

    Example::

        from typing import Annotated
        from cypherantic import Relationship

        class Person(NodeModel):
            name: Annotated[str, Field(unique=True)]
            friends: Annotated[
                list[FriendEdge],
                Relationship(rel_type='FRIENDS_WITH', direction='OUTGOING')
            ] = []

        person = Person(name='Alice')
        await refresh_relationship(person, 'friends')
        # person.friends now contains list of FriendEdge instances

    """
    async with session() as sess:
        await cypherantic.refresh_relationship(sess, model, rel_property)

retrieve_relationship_edges async

retrieve_relationship_edges(
    model: SourceNode,
    rel_name: str,
    direction: Literal[
        'INCOMING', 'OUTGOING', 'UNDIRECTED'
    ],
    edge_cls: type[EdgeType],
) -> list[EdgeType]

Retrieve relationship edges (nodes + properties) from the graph.

This method fetches related nodes along with their relationship properties, returning them as "edge" instances that contain both the target node and the relationship properties.

The edge_cls must be a type (typically a NamedTuple or dataclass) with: - node attribute: The target node Pydantic model class - properties attribute: The relationship properties Pydantic model class

:param model: Source node model instance to query from :param rel_name: Relationship type name to traverse :param direction: Direction to traverse (INCOMING, OUTGOING, or UNDIRECTED) :param edge_cls: Edge type class containing node and properties :returns: List of edge instances, each containing a node and its relationship properties

Example::

from typing import NamedTuple

class FriendEdge(NamedTuple):
    node: Person
    properties: FriendshipProperties

person = Person(name='Alice')
edges = await retrieve_relationship_edges(
    person, 'FRIENDS_WITH', 'OUTGOING', FriendEdge
)
for edge in edges:
    print(f"Friend: {edge.node.name}, since: {edge.properties.since}")
Source code in src/imbi_common/neo4j/__init__.py
async def retrieve_relationship_edges(
    model: SourceNode,
    rel_name: str,
    direction: typing.Literal['INCOMING', 'OUTGOING', 'UNDIRECTED'],
    edge_cls: type[EdgeType],
) -> list[EdgeType]:
    """Retrieve relationship edges (nodes + properties) from the graph.

    This method fetches related nodes along with their relationship properties,
    returning them as "edge" instances that contain both the target node and
    the relationship properties.

    The ``edge_cls`` must be a type (typically a NamedTuple or dataclass)
    with:
    - ``node`` attribute: The target node Pydantic model class
    - ``properties`` attribute: The relationship properties Pydantic
      model class

    :param model: Source node model instance to query from
    :param rel_name: Relationship type name to traverse
    :param direction: Direction to traverse (INCOMING, OUTGOING, or
        UNDIRECTED)
    :param edge_cls: Edge type class containing node and properties
    :returns: List of edge instances, each containing a node and its
        relationship properties

    Example::

        from typing import NamedTuple

        class FriendEdge(NamedTuple):
            node: Person
            properties: FriendshipProperties

        person = Person(name='Alice')
        edges = await retrieve_relationship_edges(
            person, 'FRIENDS_WITH', 'OUTGOING', FriendEdge
        )
        for edge in edges:
            print(f"Friend: {edge.node.name}, since: {edge.properties.since}")

    """
    async with session() as sess:
        return typing.cast(
            list[EdgeType],
            await cypherantic.retrieve_relationship_edges(
                sess, model, rel_name, direction, edge_cls
            ),
        )

Low-Level

run async

run(
    query: str, **parameters: Any
) -> typing.AsyncGenerator[neo4j.AsyncResult, None]

Run a Cypher query and return the result as an AsyncResult

Source code in src/imbi_common/neo4j/__init__.py
@contextlib.asynccontextmanager
async def run(
    query: str, **parameters: typing.Any
) -> typing.AsyncGenerator[neo4j.AsyncResult, None]:
    """Run a Cypher query and return the result as an AsyncResult"""
    async with session() as sess:
        result = await sess.run(
            re.sub(r'\s+', ' ', query),
            **parameters,
        )
        yield result

convert_neo4j_types

convert_neo4j_types(data: Any) -> typing.Any

Convert Neo4j-specific types to Python native types.

Parameters:

Name Type Description Default
data Any

Data to convert (can be dict, list, or primitive type)

required

Returns:

Type Description
Any

Data with Neo4j types converted to Python native types

Source code in src/imbi_common/neo4j/__init__.py
def convert_neo4j_types(data: typing.Any) -> typing.Any:
    """Convert Neo4j-specific types to Python native types.

    Args:
        data: Data to convert (can be dict, list, or primitive type)

    Returns:
        Data with Neo4j types converted to Python native types
    """
    if isinstance(data, dict):
        return {key: convert_neo4j_types(value) for key, value in data.items()}
    if isinstance(data, list):
        return [convert_neo4j_types(item) for item in data]
    # Duck-typing: check for to_native() (Neo4j types and mocks)
    if hasattr(data, 'to_native') and callable(data.to_native):
        return data.to_native()
    return data