Skip to content

Advanced Patterns

Advanced usage patterns for complex applications.

Multi-Tenant Architecture

from rsylla import Session, SessionBuilder

class MultiTenantDB:
    """Manage per-tenant keyspaces"""

    def __init__(self, session: Session):
        self.session = session
        self._tenant_stmts: dict = {}

    async def create_tenant(self, tenant_id: str) -> None:
        """Create tenant keyspace and tables"""
        await self.session.execute(f"""
            CREATE KEYSPACE IF NOT EXISTS tenant_{tenant_id}
            WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 3}}
        """)

        await self.session.execute(f"""
            CREATE TABLE IF NOT EXISTS tenant_{tenant_id}.users (
                id int PRIMARY KEY,
                name text,
                email text
            )
        """)

    async def get_user(self, tenant_id: str, user_id: int):
        """Get user for specific tenant"""
        key = f"get_user_{tenant_id}"

        if key not in self._tenant_stmts:
            self._tenant_stmts[key] = await self.session.prepare(
                f"SELECT * FROM tenant_{tenant_id}.users WHERE id = ?"
            )

        result = await self.session.execute_prepared(
            self._tenant_stmts[key], {"id": user_id}
        )
        return result.first_row()


# Usage
async def main():
    session = await SessionBuilder().known_node("localhost:9042").build()

    db = MultiTenantDB(session)
    await db.create_tenant("acme")
    user = await db.get_user("acme", 123)

Time Series with Bucketing

import time
from datetime import date, datetime
from rsylla import Session

class TimeSeriesStore:
    """Efficient time series storage with bucketing"""

    def __init__(self, session: Session):
        self.session = session
        self._write_stmt = None
        self._read_stmt = None

    async def _ensure_prepared(self) -> None:
        if self._write_stmt is None:
            self._write_stmt = await self.session.prepare(
                """INSERT INTO metrics (name, bucket, ts, value)
                   VALUES (?, ?, ?, ?) USING TTL 604800"""
            )
            self._read_stmt = await self.session.prepare(
                """SELECT ts, value FROM metrics
                   WHERE name = ? AND bucket = ?
                   AND ts >= ? AND ts <= ?"""
            )

    async def write(self, metric: str, value: float, timestamp: int | None = None) -> None:
        """Write metric with automatic bucketing"""
        await self._ensure_prepared()

        ts = timestamp or int(time.time() * 1000)
        bucket = (date.today() - date(1970, 1, 1)).days

        await self.session.execute_prepared(
            self._write_stmt,
            {"name": metric, "bucket": bucket, "ts": ts, "value": value}
        )

    async def query_range(self, metric: str, start: datetime, end: datetime) -> list:
        """Query metrics across date buckets"""
        await self._ensure_prepared()

        start_bucket = (start.date() - date(1970, 1, 1)).days
        end_bucket = (end.date() - date(1970, 1, 1)).days
        ts_start = int(start.timestamp() * 1000)
        ts_end = int(end.timestamp() * 1000)

        results = []
        for bucket in range(start_bucket, end_bucket + 1):
            result = await self.session.execute_prepared(
                self._read_stmt,
                {"name": metric, "bucket": bucket, "ts_start": ts_start, "ts_end": ts_end}
            )
            for row in result:
                results.append({"ts": row[0], "value": row[1]})

        return results

Event Sourcing

import json
import time
from rsylla import Session

class EventStore:
    """Event sourcing with optimistic locking"""

    def __init__(self, session: Session):
        self.session = session
        self._version_stmt = None
        self._append_stmt = None
        self._read_stmt = None

    async def _ensure_prepared(self) -> None:
        if self._version_stmt is None:
            self._version_stmt = await self.session.prepare(
                "SELECT version FROM events WHERE stream_id = ? ORDER BY version DESC LIMIT 1"
            )
            self._append_stmt = await self.session.prepare(
                """INSERT INTO events (stream_id, version, type, data, ts)
                   VALUES (?, ?, ?, ?, ?) IF NOT EXISTS"""
            )
            self._read_stmt = await self.session.prepare(
                """SELECT version, type, data FROM events
                   WHERE stream_id = ? AND version > ?"""
            )

    async def append(
        self,
        stream_id: str,
        event_type: str,
        data: dict,
        expected_version: int | None = None
    ) -> int:
        """Append event with version check"""
        await self._ensure_prepared()

        # Check current version
        result = await self.session.execute_prepared(
            self._version_stmt, {"stream_id": stream_id}
        )
        row = result.first_row()
        current = row[0] if row else 0

        if expected_version is not None and current != expected_version:
            raise Exception(f"Version conflict: expected {expected_version}, got {current}")

        new_version = current + 1
        ts = int(time.time() * 1000)

        # Use LWT for safe insert
        result = await self.session.execute_prepared(
            self._append_stmt,
            {
                "stream_id": stream_id,
                "version": new_version,
                "type": event_type,
                "data": json.dumps(data),
                "ts": ts
            }
        )

        row = result.first_row()
        if row and not row[0]:  # [applied] = false
            raise Exception("Concurrent modification")

        return new_version

    async def read_stream(self, stream_id: str, from_version: int = 0) -> list:
        """Read all events from stream"""
        await self._ensure_prepared()

        result = await self.session.execute_prepared(
            self._read_stmt,
            {"stream_id": stream_id, "version": from_version}
        )

        return [
            {
                "version": row[0],
                "type": row[1],
                "data": json.loads(row[2])
            }
            for row in result
        ]

Materialized Views Pattern

import time
from rsylla import Session, Batch

class MaterializedViewManager:
    """Maintain denormalized views"""

    def __init__(self, session: Session):
        self.session = session

    async def create_post(
        self,
        post_id: int,
        author_id: int,
        title: str,
        tags: list[str]
    ) -> None:
        """Create post and update all views atomically"""
        ts = int(time.time() * 1000)

        batch = Batch("logged")

        # Main table
        batch.append_statement(
            "INSERT INTO posts (id, author, title, tags, created) VALUES (?, ?, ?, ?, ?)"
        )

        # By author view
        batch.append_statement(
            "INSERT INTO posts_by_author (author, created, id, title) VALUES (?, ?, ?, ?)"
        )

        # By tag views - one statement per tag
        for _ in tags:
            batch.append_statement(
                "INSERT INTO posts_by_tag (tag, created, id, title) VALUES (?, ?, ?, ?)"
            )

        # Build values list - one dict per statement in batch
        values = [
            {"id": post_id, "author": author_id, "title": title, "tags": tags, "created": ts},
            {"author": author_id, "created": ts, "id": post_id, "title": title}
        ]

        for tag in tags:
            values.append({"tag": tag, "created": ts, "id": post_id, "title": title})

        await self.session.batch(batch, values)

Distributed Locking

import uuid
import time
from rsylla import Session

class DistributedLock:
    """Distributed lock using LWT"""

    def __init__(self, session: Session, ttl_seconds: int = 30):
        self.session = session
        self.ttl = ttl_seconds
        self._acquire_stmt = None
        self._release_stmt = None

    async def _ensure_prepared(self) -> None:
        if self._acquire_stmt is None:
            # Note: TTL must be embedded in query as it's not a bind parameter
            self._acquire_stmt = await self.session.prepare(
                f"""INSERT INTO locks (name, holder, acquired_at)
                   VALUES (?, ?, ?)
                   IF NOT EXISTS
                   USING TTL {self.ttl}"""
            )
            self._release_stmt = await self.session.prepare(
                "DELETE FROM locks WHERE name = ? IF holder = ?"
            )

    async def acquire(self, lock_name: str, holder: str | None = None) -> str | None:
        """Try to acquire lock. Returns holder ID if acquired, None otherwise."""
        await self._ensure_prepared()

        holder = holder or str(uuid.uuid4())
        now = int(time.time() * 1000)

        result = await self.session.execute_prepared(
            self._acquire_stmt,
            {"name": lock_name, "holder": holder, "acquired_at": now}
        )

        row = result.first_row()
        if row and row[0]:  # [applied] = true
            return holder
        return None

    async def release(self, lock_name: str, holder: str) -> bool:
        """Release lock if we own it. Returns True if released."""
        await self._ensure_prepared()

        result = await self.session.execute_prepared(
            self._release_stmt,
            {"name": lock_name, "holder": holder}
        )
        row = result.first_row()
        return bool(row and row[0])  # [applied]

    async def with_lock(self, lock_name: str, callback):
        """Execute callback with lock held"""
        holder = await self.acquire(lock_name)
        if not holder:
            raise Exception(f"Could not acquire lock: {lock_name}")

        try:
            return await callback()
        finally:
            await self.release(lock_name, holder)


# Usage
async def example():
    session = await Session.connect(["localhost:9042"])
    lock = DistributedLock(session)

    async def critical_section():
        print("Doing exclusive work...")

    await lock.with_lock("my-resource", critical_section)

Read-Through Cache

import pickle
from rsylla import Session

class CachedRepository:
    """Repository with caching layer"""

    def __init__(self, session: Session, cache_ttl: int = 300):
        self.session = session
        self.cache_ttl = cache_ttl
        self._cache_get_stmt = None
        self._cache_set_stmt = None
        self._user_get_stmt = None
        self._cache_del_stmt = None

    async def _ensure_prepared(self) -> None:
        if self._cache_get_stmt is None:
            self._cache_get_stmt = await self.session.prepare(
                "SELECT value FROM cache WHERE key = ?"
            )
            # Note: TTL embedded in query
            self._cache_set_stmt = await self.session.prepare(
                f"INSERT INTO cache (key, value) VALUES (?, ?) USING TTL {self.cache_ttl}"
            )
            self._user_get_stmt = await self.session.prepare(
                "SELECT id, name, email FROM users WHERE id = ?"
            )
            self._cache_del_stmt = await self.session.prepare(
                "DELETE FROM cache WHERE key = ?"
            )

    async def get_user(self, user_id: int) -> dict | None:
        """Get user with caching"""
        await self._ensure_prepared()

        cache_key = f"user:{user_id}"

        # Try cache first
        result = await self.session.execute_prepared(
            self._cache_get_stmt, {"key": cache_key}
        )
        row = result.first_row()
        if row:
            return pickle.loads(row[0])

        # Cache miss - fetch from source
        result = await self.session.execute_prepared(
            self._user_get_stmt, {"id": user_id}
        )
        row = result.first_row()
        if not row:
            return None

        user = {"id": row[0], "name": row[1], "email": row[2]}

        # Store in cache
        await self.session.execute_prepared(
            self._cache_set_stmt,
            {"key": cache_key, "value": pickle.dumps(user)}
        )

        return user

    async def invalidate(self, user_id: int) -> None:
        """Invalidate cache entry"""
        await self._ensure_prepared()

        await self.session.execute_prepared(
            self._cache_del_stmt,
            {"key": f"user:{user_id}"}
        )

Saga Pattern

from rsylla import Session

class OrderSaga:
    """Distributed transaction using saga pattern"""

    def __init__(self, session: Session):
        self.session = session
        self._reserve_stmt = None
        self._release_stmt = None
        self._create_order_stmt = None
        self._cancel_order_stmt = None
        self._confirm_order_stmt = None

    async def _ensure_prepared(self) -> None:
        if self._reserve_stmt is None:
            self._reserve_stmt = await self.session.prepare(
                "UPDATE inventory SET reserved = reserved + ? WHERE id = ? IF quantity >= ?"
            )
            self._release_stmt = await self.session.prepare(
                "UPDATE inventory SET reserved = reserved - ? WHERE id = ?"
            )
            self._create_order_stmt = await self.session.prepare(
                "INSERT INTO orders (id, user_id, items, status) VALUES (?, ?, ?, 'pending')"
            )
            self._cancel_order_stmt = await self.session.prepare(
                "UPDATE orders SET status = 'cancelled' WHERE id = ?"
            )
            self._confirm_order_stmt = await self.session.prepare(
                "UPDATE orders SET status = 'confirmed' WHERE id = ?"
            )

    async def create_order(
        self,
        order_id: str,
        user_id: int,
        items: list[dict]
    ) -> None:
        """Create order with compensating transactions"""
        await self._ensure_prepared()
        reserved_items = []

        try:
            # Step 1: Reserve inventory for each item
            for item in items:
                await self._reserve_inventory(item["id"], item["qty"])
                reserved_items.append(item)

            # Step 2: Create order
            await self._create_order(order_id, user_id, items)

            # Step 3: Confirm order
            await self._confirm_order(order_id)

        except Exception:
            # Compensate: release reserved inventory
            for item in reserved_items:
                try:
                    await self._release_inventory(item["id"], item["qty"])
                except Exception:
                    pass  # Log and continue with other compensations

            # Compensate: cancel order if it was created
            try:
                await self._cancel_order(order_id)
            except Exception:
                pass

            raise

    async def _reserve_inventory(self, item_id: str, qty: int) -> None:
        result = await self.session.execute_prepared(
            self._reserve_stmt,
            {"qty": qty, "id": item_id, "min_qty": qty}
        )
        row = result.first_row()
        if not row or not row[0]:  # [applied] = false
            raise Exception(f"Insufficient inventory for {item_id}")

    async def _release_inventory(self, item_id: str, qty: int) -> None:
        await self.session.execute_prepared(
            self._release_stmt,
            {"qty": qty, "id": item_id}
        )

    async def _create_order(self, order_id: str, user_id: int, items: list) -> None:
        import json
        await self.session.execute_prepared(
            self._create_order_stmt,
            {"id": order_id, "user_id": user_id, "items": json.dumps(items)}
        )

    async def _cancel_order(self, order_id: str) -> None:
        await self.session.execute_prepared(
            self._cancel_order_stmt,
            {"id": order_id}
        )

    async def _confirm_order(self, order_id: str) -> None:
        await self.session.execute_prepared(
            self._confirm_order_stmt,
            {"id": order_id}
        )

Connection Pool Management

import asyncio
from contextlib import asynccontextmanager
from rsylla import Session, SessionBuilder

class SessionPool:
    """Manage session lifecycle for applications"""

    _session: Session | None = None
    _lock: asyncio.Lock = asyncio.Lock()

    @classmethod
    async def get_session(cls) -> Session:
        """Get or create the shared session"""
        if cls._session is None:
            async with cls._lock:
                if cls._session is None:
                    cls._session = await (
                        SessionBuilder()
                        .known_node("localhost:9042")
                        .use_keyspace("myapp", False)
                        .pool_size(4)
                        .compression("lz4")
                        .build()
                    )
        return cls._session

    @classmethod
    async def close(cls) -> None:
        """Close the session (call on application shutdown)"""
        cls._session = None  # Session cleanup is handled by Rust


@asynccontextmanager
async def get_db():
    """Context manager for database access"""
    session = await SessionPool.get_session()
    yield session


# Usage in an async application
async def handle_request():
    async with get_db() as session:
        result = await session.execute("SELECT * FROM users LIMIT 10")
        return list(result)