The async-cassandra wrapper leverages the connection pooling provided by the Cassandra Python driver. Understanding how connection pooling works is crucial for optimizing performance and setting appropriate expectations.
When using Cassandra protocol version 3 or higher (default for Cassandra 2.1+), the Python driver maintains exactly one TCP connection per host. This is a fundamental limitation that differs from other Cassandra drivers (Java, C++, etc.).
-
Python's Global Interpreter Lock (GIL): Due to Python's GIL, the driver cannot effectively utilize multiple connections per host as it would in multi-threaded environments like Java.
-
Protocol Version 3+ Efficiency: Modern protocol versions support up to 32,768 concurrent requests per connection (vs. 128 in protocol v2), making multiple connections unnecessary for most workloads.
According to the official Python driver API documentation:
"If protocol_version is set to 3 or higher, this is not supported (there is always one connection per host, unless the host is remote and connect_to_remote_hosts is False)"
The source code confirms that attempting to configure multiple connections per host with protocol v3+ results in an UnsupportedOperation exception.
- Supports configurable connection pooling
- Default: 2-8 connections for LOCAL hosts, 1-2 for REMOTE hosts
- Maximum 128 concurrent requests per connection
- Can configure using
set_core_connections_per_host()andset_max_connections_per_host()
- Fixed at one connection per host
- Supports up to 32,768 concurrent requests per connection
- Connection pooling configuration methods raise
UnsupportedOperation - Better performance due to reduced pooling overhead and lock contention
Despite the single connection limitation, the async-cassandra wrapper provides significant performance benefits:
- High Concurrency: Each connection can handle thousands of concurrent requests
- Async I/O: Non-blocking operations allow efficient use of the single connection
- Reduced Overhead: No connection pool management overhead
From our FastAPI example tests:
- 10 requests: 10.24x faster than sync (878.5 requests/second)
- 50 requests: 25.60x faster than sync (2,207.9 requests/second)
- 100 requests: 20.99x faster than sync (1,826.2 requests/second)
- Concurrent operations: 825 users/second creation rate
Pre-establish connections at startup to avoid latency on first requests:
async def warmup_connections(cluster):
"""Force connections to all nodes before serving traffic"""
session = await cluster.connect()
# Execute a lightweight query on each node
for host in cluster.metadata.all_hosts():
try:
await session.execute("SELECT now() FROM system.local")
except Exception:
pass # Node might be down, continue with othersSince you only have one connection per host, monitoring is crucial:
async def check_connection_health(session):
"""Check health of all connections"""
healthy_hosts = []
unhealthy_hosts = []
for host in session.cluster.metadata.all_hosts():
try:
# Use host-specific routing to test each connection
statement = SimpleStatement(
"SELECT now() FROM system.local",
routing_key=host.address
)
await session.execute(statement)
healthy_hosts.append(host)
except Exception as e:
unhealthy_hosts.append((host, str(e)))
return {
"healthy": len(healthy_hosts),
"unhealthy": len(unhealthy_hosts),
"details": unhealthy_hosts
}Prevent overwhelming the single connection per host:
from asyncio import Semaphore
class RateLimitedSession:
"""Wrapper to limit concurrent requests per session"""
def __init__(self, session, max_concurrent=1000):
self.session = session
self.semaphore = Semaphore(max_concurrent)
async def execute(self, query, parameters=None, **kwargs):
async with self.semaphore:
return await self.session.execute(query, parameters, **kwargs)Optimize connection settings for your workload:
cluster = AsyncCluster(
contact_points=['localhost'],
# Increase I/O threads for better concurrency
executor_threads=4, # Default is 2
# Keep connections alive
idle_heartbeat_interval=30, # Default is 30
# Connection timeout
connect_timeout=10, # Default is 5
# Control query timeout
request_timeout=10 # Default is 10
)Here's a complete monitoring utility for async-cassandra:
import asyncio
from datetime import datetime
from typing import Dict, List, Any
from cassandra.cluster import Host
class ConnectionMonitor:
"""Monitor async-cassandra connection health and metrics"""
def __init__(self, session):
self.session = session
self.metrics = {
"requests_sent": 0,
"requests_completed": 0,
"requests_failed": 0,
"last_health_check": None
}
async def get_connection_stats(self) -> Dict[str, Any]:
"""Get detailed connection statistics"""
stats = {
"timestamp": datetime.utcnow().isoformat(),
"cluster_name": self.session.cluster.metadata.cluster_name,
"protocol_version": self.session.cluster.protocol_version,
"hosts": []
}
for host in self.session.cluster.metadata.all_hosts():
host_info = {
"address": str(host.address),
"datacenter": host.datacenter,
"rack": host.rack,
"is_up": host.is_up,
"release_version": host.release_version,
"connection_count": 1 if host.is_up else 0 # Always 1 for protocol v3+
}
# Test connection latency
if host.is_up:
try:
start = asyncio.get_event_loop().time()
await self.session.execute(
"SELECT now() FROM system.local",
host=host.address
)
host_info["latency_ms"] = (asyncio.get_event_loop().time() - start) * 1000
except Exception as e:
host_info["error"] = str(e)
host_info["latency_ms"] = None
stats["hosts"].append(host_info)
stats["total_connections"] = sum(1 for h in stats["hosts"] if h.get("is_up"))
stats["app_metrics"] = self.metrics.copy()
return stats
async def continuous_monitoring(self, interval: int = 60):
"""Run continuous monitoring"""
while True:
try:
stats = await self.get_connection_stats()
self.metrics["last_health_check"] = stats["timestamp"]
# Log or send to monitoring system
print(f"Connection Stats: {stats}")
# Alert on issues
down_hosts = [h for h in stats["hosts"] if not h.get("is_up")]
if down_hosts:
print(f"WARNING: {len(down_hosts)} hosts are down")
await asyncio.sleep(interval)
except Exception as e:
print(f"Monitoring error: {e}")
await asyncio.sleep(interval)-
Don't try to configure connection pool size with protocol v3+:
# This will raise UnsupportedOperation cluster.set_core_connections_per_host(HostDistance.LOCAL, 5)
-
Don't assume multiple connections will improve performance:
- The single connection can handle thousands of concurrent requests
- Adding application-level parallelism is more effective
-
Don't ignore connection health:
- With only one connection per host, a connection failure impacts all requests to that host
- Implement proper monitoring and alerting
The "one connection per host" limitation in the Python driver is a design decision that works well with Python's architecture. The async-cassandra wrapper maximizes the efficiency of these connections through async I/O, providing excellent performance for most workloads.
For extremely high-throughput scenarios (>10,000 requests/second per host), consider:
- Scaling horizontally with more application instances
- Using a Cassandra cluster with more nodes to distribute load
- Implementing application-level sharding if needed