Blog

Cassandra to Kafka Data Pipeline Part 1

Back 08/10/17
B.

Introduction

I’ve wanted to create a system which in its core uses event sourcing for quite a while - actually since I’ve read Martin Kleppmann’s Making Sense of Stream Processing. The book is really amazing, Martin tends to explain all concepts from basic building blocks and in a really simple and understandable way. I recommend it to everyone.

The idea is to have a running Cassandra cluster and to evolve a system with no downtime in such a way that Kafka is the Source of Truth with immutable facts. Every other system (in this case Cassandra cluster) should use these facts and aggregate / transform them for its purpose. Also, since all facts are in Kafka, it should be easy to drop the whole database, index, cache or any other data system and recreate it from scratch again.

The following diagrams should illustrate the system evolution.

Starting system architecture

Starting system architecture

 

Target system architecture

Target system architecture

 

When observing the diagrams, it seems like a pretty straightforward and trivial thing to do, but there’s more to it, especially when you want to do it with no downtime.

Evolution breakdown

I tried to break down the evolution process to a few conceptual steps and this is what I came up with:

  1. Have a mechanism to push each Cassandra change to Kafka with timestamp
  2. Start collecting each Cassandra change to temporary Kafka topic

    I need to start collecting before a snapshot is taken, otherwise there will be a time window in which incoming changes would be lost, and it also needs to go to temporary topic since there is data in the database which should be first in an ordered sequence of events

  3. Take the existing database snapshot

    This one is pretty straightforward

  4. Start reading data from the snapshot into the right Kafka topic

    Since the data from the snapshot was created first, it should be placed first into Kafka

  5. After the snapshot is read, redirect the data from the temporary Kafka topic to the right Kafka topic, but mind the timestamp when the snapshot is taken

    This step is essential to be done correctly, and could be considered as the hardest part. Since change event collecting started before the snapshot, there is a possibility that some events also exist in the snapshot as well and, to avoid inconsistencies, each event should be idempotent and I should try to be as precise as possible when comparing the event timestamp with the snapshot timestamp

  6. Create a new Cassandra cluster/keyspace/table and Kafka stream to read from Kafka and insert into this new Cassandra cluster/keyspace/table

    As a result, the new cassandra cluster should be practically a copy/clone of the existing one

  7. Wait for the temporary Kafka topic to deplete

    If I change the application to read from the new cassandra right away, and Kafka temporary topic still doesn’t catch up with system, there will be significant read delays (performance penalties) in the system. To make sure everything is in order, I think monitoring of time to propagate the change to the new Cassandra cluster will help and if the number is decent (a few milliseconds), I can proceed to the next step

  8. Change the application to read from the new cassandra instead of old and still write to old

    Since everything is done within the no downtime context, the application is actually several instances of application on different nodes, and they won’t be changed simultaneously, that would cause the downtime. I’d need to change one at a time, while others are still having the old software version. For this reason, the application still needs to write to the old cassandra, since other application nodes are still reading from the old cassandra.

  9. When each application instance is updated, change the application to write directly to Kafka right topic

    Now each node, one by one, can be updated with new application version which will write directly to Kafka. In parallel, old nodes will write to the old Cassandra which will propagate to Kafka topic, and new nodes will write directly to the Kafka topic. When the change is complete, all nodes are writing directly to the Kafka topic and we are good to go.

  10. Clean up

    At this point, the system writes to the right Kafka topic, the stream is reading from it and making inserts into the new Cassandra. The old Cassandra and Kafka temporary topic are no longer necessary so it should be safe for me to remove them.

Well, that’s the plan, so we’ll see whether it is doable or not.

There are a few motivating factors why I’ve chosen to evolve an existing system instead of building one the way I want from scratch.

  1. It is more challenging, hence more fun
  2. The need for evolving existing systems is the everyday job of software developers; you don’t get a chance to build a system for a starting set of requirements with guarantee that nothing in it will ever change (except for a college project, perhaps)
  3. When a system needs to change, you can choose two ways, to build a new one from scratch and when ready replace the old or to evolve the existing. I’ve done the former a few times in my life, and it might seem as fun at the beginning, but it takes awfully long, with a lot of bug fixing, often ends up as a catastrophe and is always expensive
  4. Evolving a system takes small changes with more control, instead of placing a totally new system instead of the old.
  5. I’m a fan of Martin Fowler’s blog, Evolutionary Database Design fits particularly nicely in this topic

Since writing about this in a single post would render quite a huge post, I’ve decided to split it into a few, I’m still not sure how many, but I’ll start and see where it takes me. Bear with me.

Data model

I’ll start with data model. Actually, it is just one simple table, but it should be enough to demonstrate the idea. The following CQL code describes the table.

CREATE TABLE IF NOT EXISTS movies_by_genre (
title text,
genre text,
year int,
rating float,
duration int,
director text,
country text,
PRIMARY KEY ((genre, year), rating, duration)
) WITH CLUSTERING ORDER BY (rating DESC, duration ASC)

The use case for this table might not be that common, since the table is actually designed to have a complex primary key with at least two columns as a partition key and at least two clustering columns. The reason for that is it will leverage examples, since handling of a complex primary key might be needed for someone reading this.

In order to satisfy the first item from the Evolution breakdown, I need a way to push each Cassandra change to Kafka with a timestamp. There are a few ways to do it: Cassandra Triggers, Cassandra CDC, Cassandra Custom Secondary Index and possibly some other ways, but I’ll investigate only the three mentioned.

Cassandra Triggers

For this approach I’ll use two Cassandra 3.11.0 nodes, two Kafka 0.10.1.1 nodes and one Zookeeper 3.4.6. Every node will run in a separate Docker container. I decided to use Docker since it keeps my machine clean and it is easy to recreate infrastructure.

To create a trigger in Cassandra, ITrigger interface needs to be implemented. The interface itself is pretty simple:

public interface ITrigger {

public Collection<Mutation> augment(Partition update);
}

And that’s all there is to it. The interface has been changed since Cassandra 3.0. Earlier versions of Cassandra used the following interface:

public interface ITrigger {

public Collection<Mutation> augment(ByteBuffer partitionKey, ColumnFamily update);
}

Before I dive into implementation, let’s discuss the interface a bit more. There are several important points regarding the implementation that need to be honored and those points are explained on the interface’s javadoc:

  1. Implementation of this interface should only have a constructor without parameters
  2. ITrigger implementation can be instantiated multiple times during the server life time. (Depends on the number of times the trigger folder is updated.)
  3. ITrigger implementation should be stateless (avoid dependency on instance variables).

Besides that, augment method is called exactly once per update and Partition object contains all relevant information about the update. You might notice that return type is not void but rather a collection of mutations. This way trigger can be implemented to perform some additional changes when certain criteria are met. But since I just want to propagate data to Kafka, I’ll just read the update information, send it to Kafka and return empty mutation collection. In order not to pollute this article with a huge amount of code, I’ve created maven project which creates a JAR file, and the project can be found here.

I’ll try to explain the code in the project. Firstly, there is a FILE_PATH constant, which points to /etc/cassandra/triggers/KafkaTrigger.yml and this is where YAML configuration for trigger class needs to be. It should contain configuration options for Kafka brokers and for topic name. The file is pretty simple, since the whole file contains just the following two lines:

bootstrap.servers: cluster_kafka_1:9092,cluster_kafka_2:9092
topic.name: trigger-topic

I’ll come to that later when we build our docker images. Next, there is a constructor which initializes the Kafka producer and ThreadPoolExecutor. I could have done it without ThreadPoolExecutor, but the reason for it is that the trigger augment call is on Cassandra’s write path and in that way it impacts Cassandra’s write performances. To minimize that, I’ve moved trigger execution to background thread. This is doable in this case, since I am not making any mutations, I can just start the execution in another thread and return an empty list of mutations immediately. In case when the trigger needs to make a mutation based on partition changes, that would need to happen in the same thread.

Reading data from partition update in augment method is really a mess. Cassandra API is not that intuitive and I went through a real struggle to read all the necessary information. There are a few different ways to update a partition in Cassandra, and these are ones I’ve covered:

  1. Insert
  2. Update
  3. Delete of director column
  4. Delete of title column
  5. Delete of both director and title columns
  6. Delete of row
  7. Delete range of rows for last clustering column (duration between some values)
  8. Delete all rows for specific rating clustering column
  9. Delete range of rows for first clustering column (rating between some values)
  10. Delete whole partition

A simplified algorithm would be:

if (isPartitionDeleted(partition)) {
handle partition delete;
} else {
if (isRowUpdated(partition)) {
if (isRowDeleted(partition)) {
handle row delete;
} else {
if (isCellDeleted(partition)) {
handle cell delete;
} else {
handle upsert;
}
}
} else if (isRangeDelete(partition)) {
handle range delete;
}
}

In each case, JSON is generated and sent to Kafka. Each message contains enough information to recreate Cassandra CQL query from it.

Besides that, there are a few helper methods for reading the YAML configuration and that is all.

In order to test everything, I’ve chosen Docker, as stated earlier. I’m using Cassandra docker image with 3.11.0 tag. But since the JAR file and KafkaTrigger.yml need to be copied into the docker container, there are two options:

  1. Use Cassandra 3.11.0 image and docker cp command to copy the files into the container
  2. Create a new Docker image with files already in it and use that image

The first option is not an option actually, it is not in the spirit of Docker to do such thing so I will go with the second option.

Create a cluster directory somewhere and a cassandra directory within it

mkdir -p cluster/cassandra

cluster directory will be needed for later, now just create KafkaTrigger.yml in cassandra dir with the content I provided earlier. Also, the built JAR file (cassandra-trigger-0.0.1-SNAPSHOT.jar) needs to be copied here. To build all that into Docker, I created a Dockerfile with the following content:

FROM cassandra:3.11.0
COPY KafkaTrigger.yml /etc/cassandra/triggers/KafkaTrigger.yml
COPY cassandra-trigger-0.0.1-SNAPSHOT.jar /etc/cassandra/triggers/trigger.jar
CMD ["cassandra", "-f"]

In console, just position yourself in the cassandra directory and run:

docker build -t trigger-cassandra .

That will create a docker image with name trigger-cassandra.

All that is left is to create a Docker compose file, join all together and test it. The Docker compose file should be placed in the  cluster directory. The reason for that is because Docker compose has a naming convention for containers it creates, it is <present_directory_name>_<service_name>_<order_num>. And I already specified the Kafka domain names in KafkaTrigger.yml as cluster_kafka_1 and cluster_kafka_2, in case the Docker compose is run from another location, container naming would change and KafkaTrigger.yml would need to be updated.

My Docker compose file is located in the cluster directory, it’s named cluster.yml and it looks like this:

version: '3.3'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:0.10.1.1
ports:
- 9092
environment:
HOSTNAME_COMMAND: "ifconfig | awk '/Bcast:.+/{print $$2}' | awk -F\":\" '{print $$2}'"
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
cassandra-seed:
image: trigger-cassandra
ports:
- 7199
- 9042
environment:
CASSANDRA_CLUSTER_NAME: test-cluster
cassandra:
image: trigger-cassandra
ports:
- 7199
- 9042
environment:
CASSANDRA_CLUSTER_NAME: test-cluster
CASSANDRA_SEEDS: cassandra-seed

The cluster contains the definition for Zookeeper, Kafka and Cassandra with the exception that there are two Cassandra services. The reason for that is that one can be standalone, but all others need a seed list. cassandra-seed will serve as seed, and cassandra as scalable service. That way, I can start multiple instances of cassandra. However, to start multiple instances, it takes time, and it is not recommended to have multiple Cassandra nodes in joining state. So, scale should be done one node at a time. That does not apply to Kafka nodes. With the following command, I’ve got a running cluster ready for use:

docker-compose -f cluster.yml up -d --scale kafka=2

After that, I connected to the Cassandra cluster with cqlsh and created the keyspace and table.

To add a trigger to the table, you need to execute the following command:

CREATE TRIGGER kafka_trigger ON movies_by_genre USING 'io.smartcat.cassandra.trigger.KafkaTrigger';

In case you get the following error:

ConfigurationException: Trigger class 'io.smartcat.cassandra.trigger.KafkaTrigger' doesn't exist

There are several things that can be wrong. The JAR file might not be loaded within the Cassandra node; that should happen automatically, but if it doesn’t you can try to load it with:

nodetool reloadTriggers

If the problem persists, it might be that the configuration file is not at a proper location, but that can only happen if you are using a different infrastructure setup and you forgot to copy KafkaTrigger.yml to the proper location. Cassandra will show the same error even if class is found but there is some problem instantiating it or casting it to theITrigger interface. Also, make sure that you implemented the ITrigger interface from the right Cassandra version (versions of cassandra in the JAR file and of the cassandra node should match).

If there are no errors, the trigger is created properly. This can be checked by executing the following CQL commands:

USE system_schema;
SELECT * FROM triggers;

Results

I used kafka-console-consumer to see if messages end up in Kafka, but any other option is good enough. Here are a few things I tried and the results it gave me.

-- insert
INSERT INTO movies_by_genre (genre, year, rating, duration, title, director) VALUES ('drama', 2015, 7.4, 110, 'The Good Lie', 'Philippe Falardeau');

{"rows":[{"cells":[{"name":"director","value":"Philippe Falardeau"},{"name":"title","value":"The Good Lie"}],"clusteringKey":"7.4, 110"}],"key":"drama:2015"}


-- update
UPDATE movies_by_genre SET title = 'a' WHERE genre = 'drama' AND year = 2015 AND rating = 7.4 AND duration = 110;

{"rows":[{"cells":[{"name":"title","value":"a"}],"clusteringKey":"7.4, 110"}],"key":"drama:2015"}


-- delete of director column
DELETE director FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating = 7.4 AND duration = 110;

{"rows":[{"cells":[{"deleted":true,"name":"director"}],"clusteringKey":"7.4, 110"}],"key":"drama:2015"}


-- delete of title column
DELETE title FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating = 7.4 AND duration = 110;

{"rows":[{"cells":[{"deleted":true,"name":"title"}],"clusteringKey":"7.4, 110"}],"key":"drama:2015"}


-- delete of both director and title columns
DELETE title, director FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating = 7.4 AND duration = 110;

{"rows":[{"cells":[{"deleted":true,"name":"director"},{"deleted":true,"name":"title"}],"clusteringKey":"7.4, 110"}],"key":"drama:2015"}


-- delete of row
DELETE FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating = 7.4 AND duration = 110;

{"rowDeleted":true,"rows":[{"clusteringKey":"7.4, 110"}],"key":"drama:2015"}


-- delete range of rows for last clustering column (duration between some values)
DELETE FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating = 7.4 AND duration > 90;

{"rowRangeDeleted":true,"start":[{"clusteringKey":"7.4"},{"inclusive":false,"clusteringKey":"90"}],"end":[{"inclusive":true,"clusteringKey":"7.4"}],"rows":[],"key":"drama:2015"}


-- delete range of rows for last clustering column (duration between some values)
DELETE FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating = 7.4 AND duration < 90;

{"rowRangeDeleted":true,"start":[{"inclusive":true,"clusteringKey":"7.4"}],"end":[{"clusteringKey":"7.4"},{"inclusive":false,"clusteringKey":"90"}],"rows":[],"key":"drama:2015"}


-- delete range of rows for last clustering column (duration between some values)
DELETE FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating = 7.4 AND duration > 90 AND duration <= 120;

{"rowRangeDeleted":true,"start":[{"clusteringKey":"7.4"},{"inclusive":false,"clusteringKey":"90"}],"end":[{"clusteringKey":"7.4"},{"inclusive":true,"clusteringKey":"120"}],"rows":[],"key":"drama:2015"}


-- delete all rows for specific rating clustering column
DELETE FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating = 7.4;

{"rowRangeDeleted":true,"start":[{"inclusive":true,"clusteringKey":"7.4"}],"end":[{"inclusive":true,"clusteringKey":"7.4"}],"rows":[],"key":"drama:2015"}


-- delete range of rows for first clustering column (rating between some values)
DELETE FROM movies_by_genre WHERE genre = 'drama' AND year = 2015 AND rating >= 7.5 AND rating < 9.0;

{"rowRangeDeleted":true,"start":[{"inclusive":false,"clusteringKey":"9.0"}],"end":[{"inclusive":true,"clusteringKey":"7.5"}],"rows":[],"key":"drama:2015"}


-- delete whole partition
DELETE FROM movies_by_genre WHERE genre = 'drama' AND year = 2015;

{"partitionDeleted":true,"key":"drama:2015"}

For most cases, not all of these mutations are used, usually it’s just insert, update and one kind of delete. Here I intentionally tried several ways since it might come in handy to someone. In case you have a simpler table use case, you might be able to simplify the trigger code as well.

What is also worth noting is that triggers execute only on a coordinator node; they have nothing to do with data ownership nor replication and the JAR file needs to be on every node that can become a coordinator.

Going a step further

This is OK for testing purposes, but for this experiment to have any value, I will simulate the mutations to the cassandra cluster at some rate. This can be accomplished in several ways, writing a custom small application, using cassandra stress or using some other tool. Here at SmartCat, we have developed a tool for such purpose. That is the easiest way for me to create load on a Cassandra cluster. The tool is called Berserker, you can give it a try.

To start with Berserker, I’ve downloaded the latest version (0.0.7 is the latest at the moment of writing) from here. And I’ve created a configuration file named configuration.yml.

load-generator-configuration:
data-source-configuration-name: Ranger
rate-generator-configuration-name: ConstantRateGenerator
worker-configuration-name: Cassandra
metrics-reporter-configuration-name: JMX
thread-count: 10
queue-capacity: 100000

data-source-configuration:
values:
genre: random(['horror', 'comedy', 'action', 'sci-fi', 'drama', 'thriller'])
year: random(1980..2017)
rating: random(float(5.5)..float(9.5))
duration: random(85..150)
title: random(['Jurassic World', 'Toy Story', 'Deadpool', 'Gravity', 'The Matrix'])
director: random(['Philippe Falardeau', 'Martin Scorsese', 'Steven Spielberg', 'Ridley Scott'])
insert: string("INSERT INTO movies_by_genre (genre, year, rating, duration, title, director) VALUES ('{}', {}, {}, {}, '{}', '{}');", $genre, $year, $rating, $duration, $title, $director)
deleteRow: string("DELETE FROM movies_by_genre WHERE genre = '{}' AND year = {} AND rating = {} and duration = {}", $genre, $year, $rating, $duration)
deletePartition: string("DELETE FROM movies_by_genre WHERE genre = '{}' AND year = {}", $genre, $year)
statement:
consistencyLevel: ONE
query: random([$insert, $deleteRow, $deletePartition])
output: $statement

rate-generator-configuration:
rate: 1000

worker-configuration:
connection-points: 0.0.0.0:32779,0.0.0.0:32781
keyspace: custom
async: false
bootstrap-commands:
- "CREATE KEYSPACE IF NOT EXISTS custom WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};"
- USE custom;
- CREATE TABLE IF NOT EXISTS movies_by_genre (title text, genre text, year int, rating float, duration int, director text, country text, PRIMARY KEY ((genre, year), rating, duration)) WITH CLUSTERING ORDER BY (rating DESC, duration ASC);

metrics-reporter-configuration:
domain: berserker
filter:

load-generator-configuration section is used to specify all other configurations. There, for every type of the configuration, name is specified in order for the Berserker to know which configuration parser to use in concrete sections. After that, a section for each configuration with parser specific options and format is found. There are following sections available:

  1. data-source-configuration where data source which will generate data for worker is specified
  2. rate-generator-configuration where should be specified how rate generator will be created and it will generate rate. This rate is rate at which worker will execute
  3. worker-configuration, configuration for worker
  4. metrics-reporter-configuration, configuration for metrics reporting, currently only JMX and console reporting is supported

In this case, the data-source-configuration section is actually a Ranger configuration format and can be found here.

An important part for this article is the connection-points property within worker-configration. This will probably be different every time Docker compose creates a cluster. To see your connection points run:

docker ps

It should give you a similar output:

CONTAINER ID        IMAGE                          COMMAND                  CREATED             STATUS              PORTS                                                                       NAMES
f274b40c8dce wurstmeister/kafka:0.10.1.1 "start-kafka.sh" 3 hours ago Up 3 hours 0.0.0.0:32784->9092/tcp cluster_kafka_1
f56b8664e54b wurstmeister/kafka:0.10.1.1 "start-kafka.sh" 3 hours ago Up 3 hours 0.0.0.0:32783->9092/tcp cluster_kafka_2
81a32c580a12 wurstmeister/zookeeper:3.4.6 "/bin/sh -c '/usr/..." 3 hours ago Up 3 hours 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp cluster_zookeeper_1
efdd29b63faa trigger-cassandra "/docker-entrypoin..." 3 hours ago Up 3 hours 7000-7001/tcp, 9160/tcp, 0.0.0.0:32782->7199/tcp, 0.0.0.0:32781->9042/tcp cluster_cassandra-seed_1
0fb1311ba3ee trigger-cassandra "/docker-entrypoin..." 3 hours ago Up 3 hours 7000-7001/tcp, 9160/tcp, 0.0.0.0:32780->7199/tcp, 0.0.0.0:32779->9042/tcp cluster_cassandra_1

There you can find port mapping for cluster_cassandra-seed_1 and cluster_cassandra_1 containers and use it, in this case it is: 0.0.0.0:32779 and 0.0.0.0:32781.

Now that everything is settled, just run:

java -jar berserker-runner-0.0.7.jar -c configuration.yml

Berserker starts spamming the Cassandra cluster and in my terminal where kafka-console-consumer is running, I can see messages appearing, it seems everything is as expected, at least for now.

End

That’s all, next time I’ll talk about Cassandra CDC and maybe custom secondary index. Hopefully, in a few blog posts, I’ll have the whole idea tested and running.

Vladimir Vajda

Technical Consultant

Software developer, engineer. Open to new ideas, critical in reviewing existing solutions. Hungry for knowledge and loves a great argument. Enjoys working in java ecosystem.