Session API¶
The Session class is the main entry point for interacting with ScyllaDB. It manages connections, executes queries, and handles cluster operations.
SessionBuilder¶
Use SessionBuilder to create sessions with custom configuration.
Constructor¶
Methods¶
known_node(hostname: str) -> SessionBuilder¶
Add a single known node.
Parameters:
hostname- Node address in format "host:port"
Returns: Self for method chaining
known_nodes(hostnames: List[str]) -> SessionBuilder¶
Add multiple known nodes.
builder = SessionBuilder().known_nodes([
"node1.example.com:9042",
"node2.example.com:9042",
"node3.example.com:9042"
])
Parameters:
hostnames- List of node addresses
Returns: Self for method chaining
use_keyspace(keyspace_name: str, case_sensitive: bool) -> SessionBuilder¶
Set the default keyspace for the session.
Parameters:
keyspace_name- Name of the keyspacecase_sensitive- Whether the keyspace name is case-sensitive
Returns: Self for method chaining
connection_timeout(duration_ms: int) -> SessionBuilder¶
Set the connection timeout.
Parameters:
duration_ms- Timeout in milliseconds
Returns: Self for method chaining
pool_size(size: int) -> SessionBuilder¶
Set the connection pool size per host.
Parameters:
size- Number of connections per host (must be > 0)
Raises: ValueError if size is 0
Returns: Self for method chaining
user(username: str, password: str) -> SessionBuilder¶
Set authentication credentials.
Parameters:
username- Authentication usernamepassword- Authentication password
Returns: Self for method chaining
compression(compression: Optional[str]) -> SessionBuilder¶
Set compression type for network traffic.
Parameters:
compression- One of:"lz4","snappy", orNone
Raises: ValueError for invalid compression type
Returns: Self for method chaining
tcp_nodelay(nodelay: bool) -> SessionBuilder¶
Enable or disable TCP_NODELAY (Nagle's algorithm).
Parameters:
nodelay-Trueto disable Nagle's algorithm (lower latency)
Returns: Self for method chaining
tcp_keepalive(keepalive_ms: Optional[int]) -> SessionBuilder¶
Configure TCP keepalive.
Note
TCP keepalive configuration is handled at the OS level in recent versions.
Parameters:
keepalive_ms- Keepalive interval in milliseconds, orNoneto disable
Returns: Self for method chaining
async build() -> Session¶
Build and connect the session.
Returns: Connected Session instance
Raises: ScyllaError on connection failure
Complete Example¶
from rsylla import SessionBuilder
session = await (
SessionBuilder()
.known_nodes(["node1:9042", "node2:9042", "node3:9042"])
.user("app_user", "secure_password")
.use_keyspace("production", case_sensitive=False)
.pool_size(20)
.connection_timeout(10000)
.compression("lz4")
.tcp_nodelay(True)
.build()
)
Session¶
The Session class represents an active connection to the ScyllaDB cluster.
Static Methods¶
async connect(nodes: List[str]) -> Session¶
Create a session with default configuration.
Parameters:
nodes- List of node addresses
Returns: Connected Session instance
Raises: ScyllaError on connection failure
Instance Methods¶
async execute(query: str, values: Optional[Dict[str, Any]] = None) -> QueryResult¶
Execute a CQL query.
# Simple query
result = await session.execute("SELECT * FROM users")
# Query with parameters
result = await session.execute(
"SELECT * FROM users WHERE id = ?",
{"id": 123}
)
Parameters:
query- CQL query stringvalues- Optional dictionary of parameter values
Returns: QueryResult containing the results
Raises: ScyllaError on query failure
async query(query: Query, values: Optional[Dict[str, Any]] = None) -> QueryResult¶
Execute a Query object with configuration.
from rsylla import Query
query = (
Query("SELECT * FROM users WHERE id = ?")
.with_consistency("QUORUM")
.with_page_size(1000)
)
result = await session.query(query, {"id": 123})
Parameters:
query- ConfiguredQueryobjectvalues- Optional dictionary of parameter values
Returns: QueryResult containing the results
Raises: ScyllaError on query failure
async prepare(query: str) -> PreparedStatement¶
Prepare a statement for repeated execution.
Parameters:
query- CQL query string to prepare
Returns: PreparedStatement ready for execution
Raises: ScyllaError on preparation failure
async execute_prepared(prepared: PreparedStatement, values: Optional[Dict[str, Any]] = None) -> QueryResult¶
Execute a prepared statement.
prepared = await session.prepare("SELECT * FROM users WHERE id = ?")
result = await session.execute_prepared(prepared, {"id": 123})
Parameters:
prepared-PreparedStatementto executevalues- Optional dictionary of parameter values
Returns: QueryResult containing the results
Raises: ScyllaError on execution failure
async batch(batch: Batch, values: List[Dict[str, Any]]) -> QueryResult¶
Execute a batch of statements.
from rsylla import Batch
batch = Batch("logged")
batch.append_statement("INSERT INTO users (id, name) VALUES (?, ?)")
batch.append_statement("INSERT INTO users (id, name) VALUES (?, ?)")
result = await session.batch(batch, [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"}
])
Parameters:
batch-Batchobject containing statementsvalues- List of dictionaries, one per statement
Returns: QueryResult (usually empty for write operations)
Raises: ScyllaError on batch failure
async use_keyspace(keyspace_name: str, case_sensitive: bool) -> None¶
Switch to a different keyspace.
Parameters:
keyspace_name- Name of the keyspacecase_sensitive- Whether the name is case-sensitive
Raises: ScyllaError if keyspace doesn't exist
async await_schema_agreement() -> bool¶
Wait for schema to synchronize across the cluster.
# After creating a table
await session.execute("CREATE TABLE ...")
if await session.await_schema_agreement():
print("Schema synchronized!")
Returns: True when schema agreement is reached
Raises: ScyllaError on timeout or failure
get_cluster_data() -> str¶
Get cluster metadata information.
Returns: String representation of cluster data
get_keyspace() -> Optional[str]¶
Get the current keyspace.
Returns: Current keyspace name or None
Usage Examples¶
Basic CRUD Operations¶
from rsylla import Session
async def crud_example():
session = await Session.connect(["127.0.0.1:9042"])
await session.use_keyspace("test", False)
# Create
await session.execute(
"INSERT INTO users (id, name, email) VALUES (?, ?, ?)",
{"id": 1, "name": "Alice", "email": "alice@example.com"}
)
# Read
result = await session.execute(
"SELECT * FROM users WHERE id = ?",
{"id": 1}
)
row = result.first_row()
if row:
print(row.columns())
# Update
await session.execute(
"UPDATE users SET email = ? WHERE id = ?",
{"email": "alice.new@example.com", "id": 1}
)
# Delete
await session.execute(
"DELETE FROM users WHERE id = ?",
{"id": 1}
)
Connection Pool Pattern¶
class Database:
"""Singleton database connection"""
_session = None
@classmethod
async def get_session(cls):
if cls._session is None:
cls._session = await (
SessionBuilder()
.known_nodes(["node1:9042", "node2:9042"])
.pool_size(20)
.compression("lz4")
.build()
)
return cls._session
# Usage
session = await Database.get_session()
result = await session.execute("SELECT * FROM users")
Retry Pattern¶
import asyncio
from rsylla import Session, ScyllaError
async def execute_with_retry(session, query, values=None, max_retries=3):
"""Execute query with exponential backoff retry"""
for attempt in range(max_retries):
try:
return await session.execute(query, values)
except ScyllaError as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt
print(f"Retry {attempt + 1}/{max_retries} after {wait_time}s: {e}")
await asyncio.sleep(wait_time)