|Description||Kafka Queue system running in AWS. Used to transport data between services.|
|Quality||No Sentry, no SONAR|
Kafka Queue system running in AWS. Used to transport data between services.
Kafka does something between message bus, distributed replication, queue and broadcast. Specifically:
- It has ‘topics’, which can be written to and read from.
- It has ‘producers’ which can write to a topic in sequence.
- It has ‘consumers’ which can read from a topic in sequence.
- It has ‘consumer groups’, which allow groups of consumers to co-ordinate to load-balance.
- It has nodes (brokers), which operate as peers.
- It splits messages on each topic into partitions, assigning each partition to a particular node.
- Nodes co-ordinate to replicate each partitioned topic for durability.
Kafka depends on ZooKeeper for co-ordination. ZooKeeper an entirely separate, but only used in conjunction with Kafka, and some Kafka admin tasks involve talking to ZooKeeper directly.
A Topic is a named pipe (analogous to a database table). Kafka can have many topics. A topic contains an ordered stream of messages, each with a timestamp and other metadata. Messages are sharded into partitions within a topic.
Each topic has a specific configuration, and can inherit from the default configuration. Pertinent config options include:
- Number of partitions
- Replication factor per partition
The scripts for creating topics contain these parameters. It’s a bad idea to let Kafka auto-create topics, because it will use less-than-helpful defaults.
Partitions allow the consumers to work in parallel. They are also the unit of replication.
Producers connect to a broker (any broker) and put messages onto a Topic. Examples of Producers include Agents, which send Input Evidence Records (for later consumption by the Percolator).
A Producer has a configurable ‘ack’ value, so it can be sure that the message was replicated. Out of “none”, “one” and “all”, we choose “one”. If we choose all, then the outage of one broker means that things stall and timeout. See ‘acks’ in the documentation.
Brokers are Kafka servers. These run as peers in a cluster of 3 (minimum size). Brokers receive messages, assign them to partitions, and route to the appropriate server. For each partition, one broker is nominated as the ‘leader’ and a configurable number of replica nodes are chosen. We typically choose full replication for belt-and-braces, so that each node has a complete dataset.
A Broker is essentially a log store. It doesn’t care about consumers. This sets it apart from ActivemQ, for example, which makes sure each consumer only gets a message once and co-ordinates transactions.
A Consumer connects to a broker to process messages. The internal Kafka API is very flexible, leading to a variety of different usage patterns. See the Event Data Kafka page on how we use it.
The Java client library (for Producers and Consumers) is the canonical implementation. It contains more functionality than your typical client library. A ‘Consumer’ is an instance of a Consumer object in the library, and each process can have more than one Consumer in a multi-threaded setup for concurrency, and more than one process can have Consumers. Within a given Consumer Group, Consumers co-ordinate to work out how many of them there are, assign portions of the partitions to each one, and how far through reading each partition of the log each consumer is.
If a consumer leaves the group, its absence will be detected by a timeout. At this point, other consumers will stop what they are doing, rebalance their topic assignments, and continue. Depending on whether a consumer had committed a checkpoint, this may resut in the same messages being consumed again.
This means that the outage of one consumer in a group might cause a rebalance and re-ingestion. If everything appears to stop for a while, give it some time (a few minutes) to see if the consumers can rebalance.
The Kafka documentation is excellent, reading it is recommended.
See the Event Data Kafka for illustrations of how it’s used.
TTL and expiry
Every topic has a time-to-live. Because Brokers are simply log storage, and don’t know what’s been consumed by which Consumers, we need to configure how long to keep the data. The following concerns come into play:
- How long we need to keep the data. Once it’s been consumed and archived, there is no use keeping it in Kafka.
- Variation in message volumes. If we get a sudden increase in data throughput, that will mean more storage for a given period of time.
- How much disk storage we have. This is a specific concern on e.g. Hetzner Cloud where the amount of storage has a hard constraint. When we have full replication (i.e. every node has a full replica) then each node is storing 3x more than the minimum, but this brings the benefit of data resilience.
- Is the data consumed in a timely manner? E.g. streaming consumers tend to process a message immediately, daily archivers do this every 24 hours.
- Variation on consumption patterns. If we come under very heavy load, consumers may slow down and get behind. If the archiver crashes, or the machine is down in a given slot, it may be up to a day or two behind.
- Variations on human maintenance. If we do experience a software problem, can we fix things and get them running in a given period of time before the expiry happens.
- Replay delay. Consumers such as the heartbeat, which monitors the Evidence Log, will start from the earliest offset when they start up. It can take a while for them to consume all messages and catch up to the present moment, in which time they may register various checks as being bad. Likewise, the Percolator might replay from the earliest offset, which might cause some delay before it gets back to unprocessed records. If most of this replay behaviour is useless, then we might as well not store the data.
- Functional constraints. There may be reasons why we need to keep a particular amount of data around. For example, the heartbeat checks might stipulate that a given event must occur at least once a day. This would mean that messages younger than 1 day should not be deleted or they may trigger a false alarm.
Expiry can be expressed as either a time period or given volume of data.
Best practice and hints
Think about the ‘ack’ configuration in Producers (see above). Having single acknowledgement will offer best flexibility in case one node goes down, especially if only three Kafka nodes are provisinoed.
When deploying, think about the available disk space, desired data TTL, and the relative importance of data storage. Balance this against the responsiveness of the incident response plan:
- If there is a daily snapshot process, how quickly can we respond to get it up and running. Make sure data won’t expire before we can respond and potentially bugfix.
- If there is a bug or exceptional lag in processing data, how much lee-way do we have to scale up / bugfix. Make sure data won’t expire before it is processed.
When Kafka is run in Docker Swarm, it will be automatically restarted if needed.
The throughput of messages being processed is monitored via the Evidence Logs which feed the heartbeats.
- Documentation including all the config options.