Batch Operations Examples¶
Batches allow you to execute multiple statements atomically or efficiently.
Basic Batch¶
from rsylla import Batch
batch = Batch("logged")
batch.append_statement("INSERT INTO users (id, name) VALUES (?, ?)")
batch.append_statement("INSERT INTO users (id, name) VALUES (?, ?)")
await session.batch(batch, [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"}
])
Batch Types¶
Logged Batch (Atomic)¶
# All statements succeed or all fail
batch = Batch("logged")
batch.append_statement("UPDATE accounts SET balance = balance - ? WHERE id = ?")
batch.append_statement("UPDATE accounts SET balance = balance + ? WHERE id = ?")
await session.batch(batch, [
{"balance": 100, "id": "from_account"},
{"balance": 100, "id": "to_account"}
])
Unlogged Batch (Performance)¶
# No atomicity guarantee, but faster
batch = Batch("unlogged")
for _ in range(100):
batch.append_statement("INSERT INTO logs (id, msg, ts) VALUES (?, ?, ?)")
await session.batch(batch, log_entries)
Counter Batch¶
batch = Batch("counter")
batch.append_statement("UPDATE stats SET views = views + ? WHERE page = ?")
batch.append_statement("UPDATE stats SET clicks = clicks + ? WHERE page = ?")
await session.batch(batch, [
{"views": 1, "page": "home"},
{"clicks": 1, "page": "home"}
])
With Prepared Statements¶
insert_stmt = await session.prepare(
"INSERT INTO events (user_id, ts, type) VALUES (?, ?, ?)"
)
batch = Batch("unlogged")
batch.append_prepared(insert_stmt)
batch.append_prepared(insert_stmt)
batch.append_prepared(insert_stmt)
await session.batch(batch, [
{"user_id": 1, "ts": now, "type": "login"},
{"user_id": 1, "ts": now, "type": "view"},
{"user_id": 1, "ts": now, "type": "click"}
])
Batch Configuration¶
batch = (
Batch("logged")
.with_consistency("QUORUM")
.with_timestamp(custom_timestamp)
.with_tracing(True)
)
batch.set_idempotent(True)
Multi-Table Atomic Update¶
async def create_order(session, order_id, user_id, items):
"""Create order and update inventory atomically"""
batch = Batch("logged")
# Insert order
batch.append_statement(
"INSERT INTO orders (id, user_id, status, created) VALUES (?, ?, ?, ?)"
)
# Insert order items
for _ in items:
batch.append_statement(
"INSERT INTO order_items (order_id, item_id, qty) VALUES (?, ?, ?)"
)
# Update inventory
for _ in items:
batch.append_statement(
"UPDATE inventory SET quantity = quantity - ? WHERE item_id = ?"
)
values = [
{"id": order_id, "user_id": user_id, "status": "pending", "created": now}
]
for item in items:
values.append({"order_id": order_id, "item_id": item["id"], "qty": item["qty"]})
for item in items:
values.append({"quantity": item["qty"], "item_id": item["id"]})
await session.batch(batch, values)
Bulk Insert Pattern¶
async def bulk_insert(session, table, rows, batch_size=100):
"""Insert rows in batches"""
stmt = await session.prepare(f"INSERT INTO {table} JSON ?")
for i in range(0, len(rows), batch_size):
chunk = rows[i:i + batch_size]
batch = Batch("unlogged")
for _ in chunk:
batch.append_prepared(stmt)
await session.batch(batch, [{"json": json.dumps(r)} for r in chunk])
print(f"Inserted {len(rows)} rows")
Best Practices¶
- Batch same partition - Most efficient when all statements target same partition
- Keep batches small - Large batches (>100 statements) can timeout
- Use unlogged for performance - When atomicity isn't required
- Don't batch across many partitions - Defeats the purpose
- Use prepared statements - For better performance in batches