Tutorial: Building Your First Application¶
This tutorial walks you through building a complete application with rsylla, covering all essential concepts.
What We'll Build¶
A simple user management system with:
- User CRUD operations
- Session-based authentication tracking
- Activity logging
Prerequisites¶
# Start ScyllaDB
docker run --name scylla -d -p 9042:9042 scylladb/scylla
# Wait for startup (about 30 seconds)
docker logs -f scylla
Step 1: Project Setup¶
Create a new Python file user_app.py:
import asyncio
import time
from rsylla import Session, SessionBuilder, Query, Batch, ScyllaError
# We'll add our code here
Step 2: Database Connection¶
Simple Connection¶
async def connect_simple():
"""Connect using the simple method"""
session = await Session.connect(["127.0.0.1:9042"])
return session
Production Connection¶
async def connect_production():
"""Connect with production settings"""
session = await (
SessionBuilder()
.known_nodes([
"node1.example.com:9042",
"node2.example.com:9042",
"node3.example.com:9042"
])
.user("app_user", "secure_password")
.pool_size(20)
.connection_timeout(10000)
.compression("lz4")
.tcp_nodelay(True)
.build()
)
return session
Step 3: Schema Setup¶
Create keyspace and tables:
async def setup_schema(session):
"""Create keyspace and tables"""
# Create keyspace
await session.execute("""
CREATE KEYSPACE IF NOT EXISTS user_app
WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': 1
}
""")
await session.use_keyspace("user_app", case_sensitive=False)
# Users table
await session.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id int PRIMARY KEY,
username text,
email text,
password_hash text,
is_active boolean,
created_at timestamp,
last_login timestamp
)
""")
# User sessions table
await session.execute("""
CREATE TABLE IF NOT EXISTS user_sessions (
session_id text PRIMARY KEY,
user_id int,
created_at timestamp,
expires_at timestamp,
ip_address text
)
""")
# Activity log table (time-series style)
await session.execute("""
CREATE TABLE IF NOT EXISTS activity_log (
user_id int,
activity_date date,
activity_time timestamp,
activity_type text,
details text,
PRIMARY KEY ((user_id, activity_date), activity_time)
) WITH CLUSTERING ORDER BY (activity_time DESC)
""")
# Wait for schema to propagate
await session.await_schema_agreement()
print("Schema created successfully!")
Step 4: User CRUD Operations¶
Create User¶
async def create_user(session, user_id, username, email, password_hash):
"""Create a new user"""
current_time = int(time.time() * 1000)
await session.execute(
"""INSERT INTO users
(user_id, username, email, password_hash, is_active, created_at)
VALUES (?, ?, ?, ?, ?, ?)""",
{
"user_id": user_id,
"username": username,
"email": email,
"password_hash": password_hash,
"is_active": True,
"created_at": current_time
}
)
# Log the activity
await log_activity(session, user_id, "USER_CREATED", f"User {username} created")
return user_id
Read User¶
async def get_user(session, user_id):
"""Get user by ID"""
result = await session.execute(
"SELECT * FROM users WHERE user_id = ?",
{"user_id": user_id}
)
row = result.first_row()
if row:
columns = row.columns()
return {
"user_id": columns[0],
"username": columns[1],
"email": columns[2],
"is_active": columns[4],
"created_at": columns[5],
"last_login": columns[6]
}
return None
async def get_all_users(session, limit=100):
"""Get all users with limit"""
query = (
Query("SELECT user_id, username, email, is_active FROM users")
.with_page_size(limit)
)
result = await session.query(query)
users = []
for row in result:
cols = row.columns()
users.append({
"user_id": cols[0],
"username": cols[1],
"email": cols[2],
"is_active": cols[3]
})
return users
Update User¶
async def update_user_email(session, user_id, new_email):
"""Update user email"""
await session.execute(
"UPDATE users SET email = ? WHERE user_id = ?",
{"email": new_email, "user_id": user_id}
)
await log_activity(session, user_id, "EMAIL_UPDATED", f"Email changed to {new_email}")
async def update_last_login(session, user_id):
"""Update last login timestamp"""
current_time = int(time.time() * 1000)
await session.execute(
"UPDATE users SET last_login = ? WHERE user_id = ?",
{"last_login": current_time, "user_id": user_id}
)
Delete User¶
async def delete_user(session, user_id):
"""Delete a user"""
await session.execute(
"DELETE FROM users WHERE user_id = ?",
{"user_id": user_id}
)
print(f"User {user_id} deleted")
Step 5: Using Prepared Statements¶
For high-performance operations:
class UserRepository:
"""Repository with prepared statements"""
def __init__(self, session):
self.session = session
self._statements_prepared = False
async def prepare_statements(self):
"""Prepare statements once"""
if self._statements_prepared:
return
self.insert_user_stmt = await self.session.prepare(
"""INSERT INTO users
(user_id, username, email, password_hash, is_active, created_at)
VALUES (?, ?, ?, ?, ?, ?)"""
)
self.get_user_stmt = await self.session.prepare(
"SELECT * FROM users WHERE user_id = ?"
)
# Mark as idempotent for safe retries
self.get_user_stmt = self.get_user_stmt.set_idempotent(True)
self.update_email_stmt = await self.session.prepare(
"UPDATE users SET email = ? WHERE user_id = ?"
)
self._statements_prepared = True
print("Statements prepared!")
async def create_user(self, user_id, username, email, password_hash):
"""Create user using prepared statement"""
await self.prepare_statements()
await self.session.execute_prepared(
self.insert_user_stmt,
{
"user_id": user_id,
"username": username,
"email": email,
"password_hash": password_hash,
"is_active": True,
"created_at": int(time.time() * 1000)
}
)
return user_id
async def get_user(self, user_id):
"""Get user using prepared statement"""
await self.prepare_statements()
result = await self.session.execute_prepared(
self.get_user_stmt,
{"user_id": user_id}
)
return result.first_row()
async def bulk_create_users(self, users_data):
"""Create multiple users efficiently"""
await self.prepare_statements()
for user in users_data:
await self.session.execute_prepared(self.insert_user_stmt, user)
print(f"Created {len(users_data)} users")
Step 6: Activity Logging¶
Log user activities:
async def log_activity(session, user_id, activity_type, details):
"""Log user activity"""
from datetime import date
current_time = int(time.time() * 1000)
today = (date.today() - date(1970, 1, 1)).days
await session.execute(
"""INSERT INTO activity_log
(user_id, activity_date, activity_time, activity_type, details)
VALUES (?, ?, ?, ?, ?)""",
{
"user_id": user_id,
"activity_date": today,
"activity_time": current_time,
"activity_type": activity_type,
"details": details
}
)
async def get_user_activity(session, user_id, activity_date):
"""Get user activity for a specific day"""
result = await session.execute(
"""SELECT activity_time, activity_type, details
FROM activity_log
WHERE user_id = ? AND activity_date = ?""",
{"user_id": user_id, "activity_date": activity_date}
)
activities = []
for row in result:
cols = row.columns()
activities.append({
"time": cols[0],
"type": cols[1],
"details": cols[2]
})
return activities
Step 7: Batch Operations¶
Use batches for atomic operations:
async def create_user_with_session(session, user_data, session_data):
"""Create user and session atomically"""
batch = Batch("logged") # Atomic batch
# Add user insert
batch.append_statement(
"""INSERT INTO users
(user_id, username, email, password_hash, is_active, created_at)
VALUES (?, ?, ?, ?, ?, ?)"""
)
# Add session insert
batch.append_statement(
"""INSERT INTO user_sessions
(session_id, user_id, created_at, expires_at, ip_address)
VALUES (?, ?, ?, ?, ?)"""
)
values = [user_data, session_data]
await session.batch(batch, values)
print("User and session created atomically!")
Step 8: Error Handling¶
Properly handle errors:
async def safe_get_user(session, user_id):
"""Get user with error handling"""
try:
result = await session.execute(
"SELECT * FROM users WHERE user_id = ?",
{"user_id": user_id}
)
return result.first_row()
except ScyllaError as e:
print(f"Database error: {e}")
return None
async def execute_with_retry(session, query, values, max_retries=3):
"""Execute query with retry logic"""
for attempt in range(max_retries):
try:
return await session.execute(query, values)
except ScyllaError as e:
if attempt == max_retries - 1:
raise
print(f"Retry {attempt + 1}/{max_retries}: {e}")
await asyncio.sleep(2 ** attempt) # Exponential backoff
Step 9: Complete Application¶
Put it all together:
import asyncio
import time
import uuid
from datetime import date
from rsylla import Session, SessionBuilder, Query, Batch, ScyllaError
async def main():
print("=" * 50)
print("rsylla User Management Tutorial")
print("=" * 50)
# Connect
print("\n1. Connecting to ScyllaDB...")
session = await Session.connect(["127.0.0.1:9042"])
print(" Connected!")
# Setup schema
print("\n2. Setting up schema...")
await setup_schema(session)
# Create repository
print("\n3. Preparing statements...")
repo = UserRepository(session)
await repo.prepare_statements()
# Create users
print("\n4. Creating users...")
await repo.create_user(1, "alice", "alice@example.com", "hash123")
await repo.create_user(2, "bob", "bob@example.com", "hash456")
await repo.create_user(3, "charlie", "charlie@example.com", "hash789")
print(" Created 3 users!")
# Query users
print("\n5. Querying users...")
users = await get_all_users(session)
for user in users:
print(f" - {user['username']} ({user['email']})")
# Update user
print("\n6. Updating user email...")
await update_user_email(session, 1, "alice.new@example.com")
user = await get_user(session, 1)
print(f" Updated: {user['username']} -> {user['email']}")
# Log activity
print("\n7. Logging activity...")
await log_activity(session, 1, "LOGIN", "User logged in from 192.168.1.1")
await log_activity(session, 1, "VIEW_PAGE", "Viewed dashboard")
# Get activity
today = (date.today() - date(1970, 1, 1)).days
activities = await get_user_activity(session, 1, today)
print(f" Found {len(activities)} activities")
# Batch operation
print("\n8. Creating user with session (batch)...")
user_data = {
"user_id": 100,
"username": "newuser",
"email": "new@example.com",
"password_hash": "newhash",
"is_active": True,
"created_at": int(time.time() * 1000)
}
session_data = {
"session_id": str(uuid.uuid4()),
"user_id": 100,
"created_at": int(time.time() * 1000),
"expires_at": int(time.time() * 1000) + 3600000,
"ip_address": "192.168.1.100"
}
await create_user_with_session(session, user_data, session_data)
# Delete user
print("\n9. Deleting user...")
await delete_user(session, 3)
# Final count
print("\n10. Final user count...")
users = await get_all_users(session)
print(f" Total users: {len(users)}")
print("\n" + "=" * 50)
print("Tutorial completed!")
print("=" * 50)
if __name__ == "__main__":
asyncio.run(main())
Running the Application¶
Expected output:
==================================================
rsylla User Management Tutorial
==================================================
1. Connecting to ScyllaDB...
Connected!
2. Setting up schema...
Schema created successfully!
3. Preparing statements...
Statements prepared!
4. Creating users...
Created 3 users!
5. Querying users...
- alice (alice@example.com)
- bob (bob@example.com)
- charlie (charlie@example.com)
...
==================================================
Tutorial completed!
==================================================
Key Takeaways¶
- Use
await- All rsylla operations are async - Prepare statements - For repeated queries, prepare once, execute many
- Use batches - For atomic multi-statement operations
- Handle errors - Wrap operations in try/except with ScyllaError
- Design for queries - Create tables that support your query patterns
Next Steps¶
- Data Types Guide - Working with CQL types
- Best Practices - Production patterns
- API Reference - Complete API documentation
- Advanced Patterns - Real-world use cases