This is the final blog in Kafka Message Ordering series. I would highly recommend reading the previous blogs to ensure that you can follow the discussion here. Here is what we have already covered:
-
- Factors that can disrupt message ordering at producer level – Kafka Message Ordering Part – 1
- Factors responsible for altering message ordering at the infrastructure layer and consumer side – Kafka Message Ordering Part – 2
- Architecting a solution for message ordering issues using Apache HBase – Kafka Message Ordering Part- 3
In the last blog we saw how Apache HBase can help alleviate message ordering concerns but due to compatibility issues within the technology stack or organization decisions, Apache HBase might not be a viable option. In this blog, I would cover another alternative for the same message ordering problem using Apache Kudu. In the parlance of the CAP theorem, Apache Kudu is a CP type of storage engine and considered a good citizen on a Hadoop cluster. Apache Kudu stores data on the Linux filesystem and its tablet servers share the same partitions as existing HDFS datanodes.
In Apache HBase, we leveraged the native capability to order the events or messages based on their timestamps hence always retrieved the latest message successfully for a given key whereas Apache Kudu provides OLTP type of operational flexibility in the big data world. We can quickly store and clean huge amounts of data with CRUD operations supported at record/row level.
Let us now understand the applicability of Apache Kudu here with the help of an example wherein we would cover both ends of the spectrum (producers and consumers) to address the message order issues.
Producer side: Assuming a use case wherein we have to work with a legacy system that is technically incapable of achieving desired throughput (real-time), hence to compensate for this the events are batched together making every message a little bulkier. It helps to achieve higher throughput on average. Here are relevant details for this use case and how the producer should create the messages –
- We are targeting a near real-time solution wherein events are grouped together in one message (say 400 events per message) before transmitting it over to the consumer.
- An event represents a record in the source which has been updated, added, or deleted.
- A record at source can change multiple times in a single message window i.e. multiple events being generated and wrapped in a single message.
- A single message can have only 400 events/source records (i.e. events are batched together in one message).
- Every event has a unique event id i.e. if a record pk1 changed three times then three events would be generated with three unique event ids (could be timestamps for simplicity)
- The latest generated event would have the greatest eventid (be it a sequence or timestamp)
- Every event contains two additional columns for metadata. The first one is Eventid and the second one is one of the CRUD Operation Type (inserted, updated, or deleted).
- If there were 900 events generated in one message window then a total of 3 messages would be sent (2 with 400 events/records and one with 100 events).
- Every message has a unique message-id even if it is split. So in the case above where due to the restriction on events per message, three messages have to be created then all of them would still have the same message-id.
Here is a sample set of events in different messages(which might be produced to different partitions of the topic):
If you see, the record with primary key A is updated twice, the record with primary key B is deleted and the record with primary key C is updated once.
Consumer Side: Before consuming the message we will land messages/events in Apache Kudu wherein we have the tables designed to store multiple events for the same record. It is achieved through the metadata columns (message id and the event id) as composite primary keys. Eventids are different for each event with messageid helping us avoid eventid collision across different data sources. Essentially, the idea here is that:
- All the events are stored in corresponding landing tables in Apache Kudu tables
- For processing, the events are selected based on the highest event id and highest messageid for every unique key (hence achieving message order). Something like:
SELECT Key, PINCODE FROM (SELECT ROW_NUMBER() OVER (PARTITION BY Key ORDER BY MESSAGEID DESC, EVENTID DESC) AS MESSAGE_ROW_NUMBER, db.table.* FROM db.table ) TAB WHERE MESSAGE_ROW_NUMBER = 1
- Finally, as we are inserting everything we need to keep the data footprint light and for us only the latest record for each primary key is relevant. We can clean up the data using:
DELETE FROM db.table WHERE CONCAT(KEY, MESSAGEID, EVENTID) IN ( SELECT CONCAT(KEY, MESSAGEID, EVENTID) FROM ( SELECT KEY, OPERATION, ROW_NUMBER() OVER (PARTITION BY KEY ORDER BY MESSAGEID DESC, EVENTID DESC) AS MESSAGE_ROW_NUMBER FROM db.table)TAB WHERE MESSAGE_ROW_NUMBER > 1
If you see, the message ordering is a tricky situation but can be handled elegantly with the appropriate system design. I hope you would have enjoyed reading this blog series on message ordering and would leverage these concepts to solve related problems in your use cases. Do let me know in case you have any doubts, need more information, or have any suggestions on this topic.