For some cases such as the ones present in AdServing, the counters come really handy to accumulate totals for events coming into a system compared to batch aggregates. While distributed counters consistency is a well-known problem Cassandra counters in version 2.1 are claimed to be more accurate compared to the prior ones. This post describes the approach and the results of Cassandra counters consistency testing in different failure scenarios such as rolling restarts, abnormal termination of nodes, and network splits.
Here’s the initial blog post by DataStax describing the internals of new Cassandra counters implementation. There exist really good counters consistency tests during network splits performed by Jepsen and published in Aphyr’s blog. While there are a wide variety of stress test tools, with counters it would be nice to verify their behavior for business-specific cases, emulating production workload. For this purpose, a simple Akka-based application has been implemented allowing to produce a high write rate while using the problem domain data model.
The Load Testing Tool
The main idea behind the tool is in providing highly concurrent writes with respect to the data model (which reflects a particular business case) and a simple feedback loop to verify the results. The feedback loop is implemented with a configurable number of retries to wait for eventually consistent writes being propagated to all nodes.
Actor System Overview
Actors responsibilities and relationships
Coordinator
controls interactions between the rest of the actors, waits for initialization, executes broadcast load start command, etc. when all writes are sent to the cluster, Coordinator executes feedback loop to compare actual counter values with expected resultGenerator
s are domain-aware random insert queries generators that take into account the number of unique rows and counters per row to generate the full path to counters. The total number of counters thus equals the number of unique rows times the number of counters per rowProcessor
s are Actors controlling execution for a specific table and sub sum of total counter value. Total number of processors equalnumber_of_entities * parallelism_factor
. Each processor is responsible for providing writes fortotalSum/parallelism
subtotal of target total value for each counter. Number of processors affects the number of concurrent writes to the same counters.Worker
s are needed for generating increments and coordinating writes to generated keys. Each worker holds a subset of the Processor’s keys for a specific table thus providing faster iterations over smaller insert batches. The number of writers influences the parallelism of writes to different keys providing intermixed writes with other workers. But generally, it is a performance optimization for processing a smaller number of keys per writeCassandraWriter
is a short-living actor which dies after all work is done. These actors live on a dedicated dispatcher with a separate thread pool due todatastax-java-driver
specifics: it becomes enormously slow on a shared thread pool for some reason.
Data model
A replication factor of 3 will be used in tests against 5-node C* cluster:
CREATE KEYSPACE IF NOT EXISTS $keyspaceName WITH replication= {'class':'SimpleStrategy', 'replication_factor':3};
During the tests, the data will be written independently representing 3 different entities (e.g. views, clicks, and campaigns). Each of these entities looks as follows:
CREATE TABLE IF NOT EXISTS $keyspaceName.$name (
id uuid,
u ascii,
e ascii,
d int,
c counter,
PRIMARY KEY((id, u, e), d));
But for one of the entities, two column families will be created to reflect the case with different read paths (same fields, different row keys).
Test Setup
Cassandra cluster configuration
Tests are performed against 5 node Cassandra 2.1.9 cluster. Each node has 32Gb of RAM and 4-core 8-threads 3.70GHz Xeon CPU.
Cassandra heap size: 8Gb
Test configuration
Here’s an example of tool invocation:
java -jar counters-test.jar -h 10.120.0.252 -k test_keyspace -p 10 -w 100 -s 50 -r 1000 -c 100
Details:
- counter total value 50
- parallelism 10
- 100 workers, keys per worker 1000 (sequentially fed to async writes)
- 1000 rows
- 100 counters per row
The tool is configured to write +1 increments which result in 5M of total writes to every table (views, clicks, 2x campaign). Write throughput after warmup fluctuates around 30K writes/sec. Generally
Network partitions and rolling restarts are implemented with Siafu: fabric-based python tool
Cassandra client configuration
Client configured with:
implicit val cluster = Cluster.builder()
.addContactPoint(config.clusterNode)
.withQueryOptions(new QueryOptions()
.setConsistencyLevel(ConsistencyLevel.QUORUM)
).build()
All writes are executed in sync mode, i.e.:
Try(msg.session.execute(statement)) match {
case Failure(ex) => log.error(ex, "ERROR DURING WRITE")
case _ =>
}
The Test
Scenarios Overview and Results Interpretation
The counters consistency is going to be evaluated in the following scenarios while writes are executed against the cluster:
- node normally stops
- node normally restarts
- cluster rolling restart
- node dies (
kill -9
) - node dies, starts again and
nodetool repair
is executed - node is lost due to network split and then comes back
Interpretation of results
Results for each entity contain the following set of fields:
overcounts
- the total number of counters holding value greater than expectedmaxOverCount
- maximum overcount value for a single countertotalOverCount
- the total number of queries with overcountundercounts
- the total number of counters holding value less than expectedmaxUnderCount
- maximum undercount value for a single countertotalUnderCount
- the total number of queries with undercount
Load stats
Here’s an example of throughput stats for the test being executed without incidents: The spike in the beginning is due to several minutes of warmup done before the actual writes.
And here’s an example of throughput deviation while node fails and gets back:
One really important thing during interpretation of the test results is that the test is pretty synthetic and the amount of failed under- or overcounts will vary depending on the throughput and amount of failures per test. But for the initial evaluation of C* counters behavior in the most common failure scenarios, this should be enough to grasp the whole picture.
The Results
Scenario: a normal node stop
DEVIATION STATS:
overcounts: 4
maxOverCount: 1
totalOverCount: 4
undercounts: 0
maxUnderCount: 0
totalUnderCount: 0
DEVIATION STATS:
overcounts: 1
maxOverCount: 1
totalOverCount: 1
undercounts: 0
maxUnderCount: 0
totalUnderCount: 0
DEVIATION STATS:
overcounts: 2
maxOverCount: 1
totalOverCount: 2
undercounts: 0
maxUnderCount: 0
totalUnderCount: 0
Scenario: a normal node restart
DEVIATION STATS:
overcounts: 4
maxOverCount: 1
totalOverCount: 4
undercounts: 11
maxUnderCount: 1
totalUnderCount: 11
DEVIATION STATS:
overcounts: 1
maxOverCount: 1
totalOverCount: 1
undercounts: 2
maxUnderCount: 1
totalUnderCount: 2
DEVIATION STATS:
overcounts: 1
maxOverCount: 1
totalOverCount: 1
undercounts: 6
maxUnderCount: 1
totalUnderCount: 6
Scenario: cluster rolling restart
DEVIATION STATS:
overcounts: 8
maxOverCount: 1
totalOverCount: 8
undercounts: 22
maxUnderCount: 1
totalUnderCount: 22
DEVIATION STATS:
overcounts: 15
maxOverCount: 1
totalOverCount: 15
undercounts: 20
maxUnderCount: 1
totalUnderCount: 20
DEVIATION STATS:
overcounts: 21
maxOverCount: 2
totalOverCount: 22
undercounts: 30
maxUnderCount: 1
totalUnderCount: 30
Normal termination operations conclusion
During normal operations like service stops, restarts, and cluster rolling restarts the amount of failed counters is not exceeded 0.000007% which is a pretty acceptable deviation for most of the cases. The results may vary depending on load/throughput but the fact is that even during normal cluster operations some undercounts (and what’s even worse overcounts) are registered.
Scenario: node dies (kill -9
)
DEVIATION STATS:
overcounts: 0
maxOverCount: 0
totalOverCount: 0
undercounts: 16
maxUnderCount: 1
totalUnderCount: 16
DEVIATION STATS:
overcounts: 1
maxOverCount: 1
totalOverCount: 1
undercounts: 50
maxUnderCount: 2
totalUnderCount: 52
DEVIATION STATS:
overcounts: 0
maxOverCount: 0
totalOverCount: 0
undercounts: 28
maxUnderCount: 1
totalUnderCount: 28
Scenario: network partition
DEVIATION STATS:
overcounts: 0
maxOverCount: 0
totalOverCount: 0
undercounts: 13
maxUnderCount: 1
totalUnderCount: 13
DEVIATION STATS:
overcounts: 0
maxOverCount: 0
totalOverCount: 0
undercounts: 55
maxUnderCount: 1
totalUnderCount: 55
DEVIATION STATS:
overcounts: 0
maxOverCount: 0
totalOverCount: 0
undercounts: 57
maxUnderCount: 1
totalUnderCount: 57
Scenario: 2 new nodes are added to 5-node cluster while writing
No over- or under-counts registered
Scenario: node dies, starts again and nodetool repair
is executed
DEVIATION STATS:
overcounts: 1
maxOverCount: 1
totalOverCount: 1
undercounts: 7019
maxUnderCount: 4
totalUnderCount: 8607
DEVIATION STATS:
overcounts: 0
maxOverCount: 0
totalOverCount: 0
undercounts: 4146
maxUnderCount: 4
totalUnderCount: 5264
DEVIATION STATS:
overcounts: 1
maxOverCount: 1
totalOverCount: 1
undercounts: 4588
maxUnderCount: 4
totalUnderCount: 6027
Abnormal termination observations
All cases except abnormal termination with consequent restart (with or without repair) went pretty same to normal termination while node coming back to the cluster after failure has shown major undercounts.
The interesting part here is that with consistency level configured to QUORUM
there’s a repeatable exception appearing in the logs: Cassandra timeout during write query at consistency ONE (1 replica were required but only 0 acknowledged the write)
2015-08-31 15:02:07,336 ERROR [akka://counter-consistency/user/$b/$m/$S/$b] - ERROR DURING WRITE
com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency ONE (1 replica were required but only 0 acknowledged the write)
at com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:54) ~[cassandra-tools.jar:1.0]
at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:269) ~[cassandra-tools.jar:1.0]
at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:183) ~[cassandra-tools.jar:1.0]
at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) ~[cassandra-tools.jar:1.0]
...
It looks like it relates to [CASSANDRA-10041]: “timeout during write query at consistency ONE” when updating counter at consistency QUORUM and 2 of 3 nodes alive Jira issue which was unresolved at the moment of performing tests.
In the node death-then-restart scenario, the observed behavior is that one can’t just restart the failed node. It’s a bit strange because this situation could happen and it looks cumbersome to re-bootstrap the node wiping out all the data. But it looks like a reasonable tradeoff for the given counter functionality. The next test scenario checks the re-bootstrapping approach.
Scenario: node killed, then rebootstrapped
Actions performed:
- one of the nodes is killed (
kill -9
) - dead node removed
nodetool removenode <node uid>
- data dirs wiped out on failed node machine
JVM_OPTS="$JVM_OPTS -Dcassandra.replace_address=<node self address>
added tocassandra-env.sh
on the dead node- node started
The results:
DEVIATION STATS:
overcounts: 2
maxOverCount: 1
totalOverCount: 2
undercounts: 69
maxUnderCount: 2
totalUnderCount: 70
DEVIATION STATS:
overcounts: 1
maxOverCount: 1
totalOverCount: 1
undercounts: 50
maxUnderCount: 1
totalUnderCount: 50
DEVIATION STATS:
overcounts: 0
maxOverCount: 0
totalOverCount: 0
undercounts: 0
maxUnderCount: 0
totalUnderCount: 0
After the total wipeout of the commitlog and data directories deviations are of the same magnitude as in the other scenarios.
Conclusion
The results are pretty clear: Cassandra 2.1 counters are not totally consistent. But. The deviation and amount of undercounts are really small in the total amount of writes per test(5M) while the amount of overcounts is even smaller (not exceeding 10 per test). One thing to mention here again is that results randomly vary depending on which node to restart due to row key distribution across the nodes and write throughput. One still can’t use the counters in banking but the amount of failed counters is quite acceptable in other cases where strict accuracy is not a mandatory requirement(e.g. adserving). Combined with some sort of aggregates as a part of Lambda Architecture Cassandra counters can be used without big risks. But there are still two open questions after the tests:
- why in the case of normal operations like stop/restart some counters are inconsistent? This basically means that in case of any Cassandra operation which needs service restart it will lose some increments
- why one can’t simply restart failed node without major undercounts? Counters operations are completely different from the regular ones in Cassandra and
nodetool repair
simply doesn’t work as expected.