Skip to content

Authentication

Authentication primitives for JWT tokens and encryption.

Overview

The auth module provides core authentication functionality that can be used by any service in the Imbi ecosystem:

  • JWT Tokens: Access and refresh token creation/verification
  • Token Encryption: Fernet encryption for sensitive data at rest

JWT Tokens

from imbi_common.auth import core

# Create an access token
access_token = core.create_access_token(
    subject="user@example.com",
    extra_claims={"role": "admin"}
)

# Create a refresh token
refresh_token = core.create_refresh_token(
    subject="user@example.com"
)

# Verify and decode a token
try:
    payload = core.verify_token(access_token)
    user_id = payload["sub"]
    role = payload.get("role")
except Exception as e:
    print(f"Invalid token: {e}")

Token Encryption

from imbi_common.auth import encryption

# Encrypt sensitive data (e.g., OAuth tokens)
encrypted = encryption.encrypt_token("sensitive_oauth_token")

# Decrypt
decrypted = encryption.decrypt_token(encrypted)

API Reference

Core Functions

create_access_token

create_access_token(
    subject: str,
    extra_claims: dict[str, Any] | None = None,
    auth_settings: Auth | None = None,
) -> str

Create JWT access token.

Parameters:

Name Type Description Default
subject str

Subject (user identifier) to encode in token

required
extra_claims dict[str, Any] | None

Optional additional claims to include

None
auth_settings Auth | None

Optional auth settings for JWT configuration (uses singleton if not provided)

None

Returns:

Type Description
str

JWT token string

Source code in src/imbi_common/auth/core.py
def create_access_token(
    subject: str,
    extra_claims: dict[str, typing.Any] | None = None,
    auth_settings: settings.Auth | None = None,
) -> str:
    """Create JWT access token.

    Args:
        subject: Subject (user identifier) to encode in token
        extra_claims: Optional additional claims to include
        auth_settings: Optional auth settings for JWT configuration
            (uses singleton if not provided)

    Returns:
        JWT token string

    """
    if auth_settings is None:
        auth_settings = settings.get_auth_settings()

    jti = secrets.token_urlsafe(16)
    now = datetime.datetime.now(datetime.UTC)
    expires = now + datetime.timedelta(
        seconds=auth_settings.access_token_expire_seconds
    )

    claims = {
        'sub': subject,
        'jti': jti,
        'type': 'access',
        'iat': now,
        'exp': expires,
        **(extra_claims or {}),
    }

    token: str = jwt.encode(
        claims, auth_settings.jwt_secret, algorithm=auth_settings.jwt_algorithm
    )
    return token

create_refresh_token

create_refresh_token(
    subject: str,
    extra_claims: dict[str, Any] | None = None,
    auth_settings: Auth | None = None,
) -> str

Create JWT refresh token.

Parameters:

Name Type Description Default
subject str

Subject (user identifier) to encode in token

required
extra_claims dict[str, Any] | None

Optional additional claims to include

None
auth_settings Auth | None

Optional auth settings for JWT configuration (uses singleton if not provided)

None

Returns:

Type Description
str

JWT token string

Source code in src/imbi_common/auth/core.py
def create_refresh_token(
    subject: str,
    extra_claims: dict[str, typing.Any] | None = None,
    auth_settings: settings.Auth | None = None,
) -> str:
    """Create JWT refresh token.

    Args:
        subject: Subject (user identifier) to encode in token
        extra_claims: Optional additional claims to include
        auth_settings: Optional auth settings for JWT configuration
            (uses singleton if not provided)

    Returns:
        JWT token string

    """
    if auth_settings is None:
        auth_settings = settings.get_auth_settings()

    jti = secrets.token_urlsafe(16)
    now = datetime.datetime.now(datetime.UTC)
    expires = now + datetime.timedelta(
        seconds=auth_settings.refresh_token_expire_seconds
    )

    claims = {
        'sub': subject,
        'jti': jti,
        'type': 'refresh',
        'iat': now,
        'exp': expires,
        **(extra_claims or {}),
    }

    token: str = jwt.encode(
        claims, auth_settings.jwt_secret, algorithm=auth_settings.jwt_algorithm
    )
    return token

verify_token

verify_token(
    token: str, auth_settings: Auth | None = None
) -> dict[str, typing.Any]

Decode and validate JWT token.

Parameters:

Name Type Description Default
token str

JWT token string to decode

required
auth_settings Auth | None

Optional auth settings for JWT configuration (uses singleton if not provided)

None

Returns:

Type Description
dict[str, Any]

Decoded token claims

Raises:

Type Description
ExpiredSignatureError

If token has expired

InvalidTokenError

If token is invalid

Source code in src/imbi_common/auth/core.py
def verify_token(
    token: str, auth_settings: settings.Auth | None = None
) -> dict[str, typing.Any]:
    """Decode and validate JWT token.

    Args:
        token: JWT token string to decode
        auth_settings: Optional auth settings for JWT configuration
            (uses singleton if not provided)

    Returns:
        Decoded token claims

    Raises:
        jwt.ExpiredSignatureError: If token has expired
        jwt.InvalidTokenError: If token is invalid

    """
    if auth_settings is None:
        auth_settings = settings.get_auth_settings()

    decoded: dict[str, typing.Any] = jwt.decode(
        token,
        auth_settings.jwt_secret,
        algorithms=[auth_settings.jwt_algorithm],
        options={'require': ['sub', 'jti', 'type', 'exp']},
    )
    return decoded

Encryption Functions

TokenEncryption

TokenEncryption(encryption_key: str)

Handles encryption/decryption of sensitive tokens using Fernet.

This class follows the singleton pattern to ensure a single encryption instance is used throughout the application lifecycle.

Example

encryptor = TokenEncryption.get_instance() encrypted = encryptor.encrypt('my-secret-token') decrypted = encryptor.decrypt(encrypted) assert decrypted == 'my-secret-token'

Initialize with base64-encoded Fernet key.

Parameters:

Name Type Description Default
encryption_key str

Base64-encoded Fernet encryption key

required

Raises:

Type Description
ValueError

If encryption key format is invalid

Source code in src/imbi_common/auth/encryption.py
def __init__(self, encryption_key: str) -> None:
    """Initialize with base64-encoded Fernet key.

    Args:
        encryption_key: Base64-encoded Fernet encryption key

    Raises:
        ValueError: If encryption key format is invalid
    """
    try:
        key_bytes = encryption_key.encode('ascii')
        self._fernet = fernet.Fernet(key_bytes)
    except Exception as err:
        LOGGER.error('Invalid encryption key format: %s', err)
        raise ValueError('Invalid encryption key') from err

decrypt

decrypt(ciphertext: str | None) -> str | None

Decrypt a token string.

Parameters:

Name Type Description Default
ciphertext str | None

Base64-encoded encrypted token, or None

required

Returns:

Type Description
str | None

Decrypted plaintext token, or None if input is None or decryption fails

Note

Returns None instead of raising exception on decryption failure to handle corrupted/invalid ciphertext gracefully. Handles both new format (Fernet base64) and legacy format (double base64 encoding) for backward compatibility.

Source code in src/imbi_common/auth/encryption.py
def decrypt(self, ciphertext: str | None) -> str | None:
    """Decrypt a token string.

    Args:
        ciphertext: Base64-encoded encrypted token, or None

    Returns:
        Decrypted plaintext token, or None if input is None
            or decryption fails

    Note:
        Returns None instead of raising exception on decryption failure
        to handle corrupted/invalid ciphertext gracefully.
        Handles both new format (Fernet base64) and legacy format
        (double base64 encoding) for backward compatibility.
    """
    if ciphertext is None:
        return None

    try:
        # Try new format first (Fernet base64 only)
        encrypted_bytes = ciphertext.encode('ascii')
        plaintext_bytes: bytes = self._fernet.decrypt(encrypted_bytes)
        return plaintext_bytes.decode('utf-8')
    except fernet.InvalidToken:
        # Try legacy format (double base64 encoding)
        try:
            encrypted_bytes = base64.urlsafe_b64decode(
                ciphertext.encode('ascii')
            )
            plaintext_bytes = self._fernet.decrypt(encrypted_bytes)
            return plaintext_bytes.decode('utf-8')
        except (
            fernet.InvalidToken,
            binascii.Error,
            ValueError,
            UnicodeDecodeError,
        ):
            # All decryption attempts failed - invalid or corrupted
            LOGGER.warning(
                'Failed to decrypt token - invalid or corrupted ciphertext'
            )
            return None
    except (binascii.Error, ValueError, UnicodeDecodeError):
        # Handle decryption errors (base64 decode, unicode errors)
        LOGGER.warning(
            'Failed to decrypt token - invalid or corrupted ciphertext'
        )
        return None

encrypt

encrypt(plaintext: str | None) -> str | None

Encrypt a token string.

Parameters:

Name Type Description Default
plaintext str | None

Token string to encrypt, or None

required

Returns:

Type Description
str | None

Base64-encoded encrypted token, or None if input is None

Raises:

Type Description
Exception

If encryption fails

Source code in src/imbi_common/auth/encryption.py
def encrypt(self, plaintext: str | None) -> str | None:
    """Encrypt a token string.

    Args:
        plaintext: Token string to encrypt, or None

    Returns:
        Base64-encoded encrypted token, or None if input is None

    Raises:
        Exception: If encryption fails
    """
    if plaintext is None:
        return None

    try:
        encrypted_bytes: bytes = self._fernet.encrypt(
            plaintext.encode('utf-8')
        )
        # Fernet already returns base64-encoded bytes, just decode to str
        return encrypted_bytes.decode('ascii')
    except Exception as err:
        LOGGER.exception('Encryption failed: %s', err)
        raise

get_instance classmethod

get_instance() -> TokenEncryption

Get singleton instance of TokenEncryption.

Returns:

Name Type Description
TokenEncryption TokenEncryption

The singleton instance

Raises:

Type Description
RuntimeError

If encryption key not configured in settings

Source code in src/imbi_common/auth/encryption.py
@classmethod
def get_instance(cls) -> 'TokenEncryption':
    """Get singleton instance of TokenEncryption.

    Returns:
        TokenEncryption: The singleton instance

    Raises:
        RuntimeError: If encryption key not configured in settings
    """
    if cls._instance is None:
        auth_settings = settings.get_auth_settings()
        if not auth_settings.encryption_key:
            raise RuntimeError('Encryption key not configured')
        cls._instance = cls(auth_settings.encryption_key)
    return cls._instance

reset_instance classmethod

reset_instance() -> None

Reset singleton instance (for testing).

This method is primarily used in test suites to reset the singleton state between tests.

Source code in src/imbi_common/auth/encryption.py
@classmethod
def reset_instance(cls) -> None:
    """Reset singleton instance (for testing).

    This method is primarily used in test suites to reset the singleton
    state between tests.
    """
    cls._instance = None

get_fernet

get_fernet(
    auth_settings: Auth | None = None,
) -> fernet.Fernet

Get Fernet instance for encryption/decryption.

Parameters:

Name Type Description Default
auth_settings Auth | None

Optional auth settings (uses singleton if not provided)

None

Returns:

Type Description
Fernet

Fernet instance configured with encryption key

Source code in src/imbi_common/auth/encryption.py
def get_fernet(auth_settings: settings.Auth | None = None) -> fernet.Fernet:
    """Get Fernet instance for encryption/decryption.

    Args:
        auth_settings: Optional auth settings (uses singleton if not provided)

    Returns:
        Fernet instance configured with encryption key

    """
    if auth_settings is None:
        auth_settings = settings.get_auth_settings()

    if not auth_settings.encryption_key:
        # This should not happen since Auth now auto-generates encryption_key
        auth_settings.encryption_key = fernet.Fernet.generate_key().decode(
            'ascii'
        )

    return fernet.Fernet(auth_settings.encryption_key.encode('ascii'))

encrypt_token

encrypt_token(
    plaintext: str, auth_settings: Auth | None = None
) -> str

Encrypt a token string.

Parameters:

Name Type Description Default
plaintext str

Token string to encrypt

required
auth_settings Auth | None

Optional auth settings (creates default if not provided)

None

Returns:

Type Description
str

Base64-encoded encrypted token

Source code in src/imbi_common/auth/encryption.py
def encrypt_token(
    plaintext: str, auth_settings: settings.Auth | None = None
) -> str:
    """Encrypt a token string.

    Args:
        plaintext: Token string to encrypt
        auth_settings: Optional auth settings (creates default if not provided)

    Returns:
        Base64-encoded encrypted token

    """
    f = get_fernet(auth_settings)
    encrypted_bytes: bytes = f.encrypt(plaintext.encode('utf-8'))
    return encrypted_bytes.decode('ascii')

decrypt_token

decrypt_token(
    ciphertext: str, auth_settings: Auth | None = None
) -> str

Decrypt a token string.

Parameters:

Name Type Description Default
ciphertext str

Base64-encoded encrypted token

required
auth_settings Auth | None

Optional auth settings (creates default if not provided)

None

Returns:

Type Description
str

Decrypted plaintext token

Raises:

Type Description
Exception

If decryption fails

Source code in src/imbi_common/auth/encryption.py
def decrypt_token(
    ciphertext: str, auth_settings: settings.Auth | None = None
) -> str:
    """Decrypt a token string.

    Args:
        ciphertext: Base64-encoded encrypted token
        auth_settings: Optional auth settings (creates default if not provided)

    Returns:
        Decrypted plaintext token

    Raises:
        Exception: If decryption fails

    """
    f = get_fernet(auth_settings)
    plaintext_bytes: bytes = f.decrypt(ciphertext.encode('ascii'))
    return plaintext_bytes.decode('utf-8')