How to use consumer groups in Redis Streams

Take advantage of consumer groups in Redis Streams to distribute the processing of a data stream among multiple consumers

How to use consumer groups in Redis Streams
Vaikoovery (CC BY 3.0)

Redis Streams is a new data structure, introduced in Redis 5.0, that allows you to create and manage data streams. In a previous article, I showed how to add data to a stream, and how to read the data in multiple ways. In this article, I’ll explain how to use consumer groups in Redis Streams. A consumer group is a way to split a stream of messages among multiple clients to speed up processing or lighten the load for slower consumers.

In a perfect world, both data producers and consumers work at the same pace, and there’s no data loss or data backlog. Unfortunately, that’s not the case in the real world. In nearly all real-time data stream processing use cases, producers and consumers work at different speeds. In addition, there is more than one type of consumer, each with its own requirements and processing pace. Redis Streams addresses this need with a feature set that gravitates heavily towards supporting the consumers. One of its most important features is the consumer group.

When to use a Redis Streams consumer group

The purpose of consumer groups is to scale out your data consumption process. Let’s consider one example — an image processing application. The solution requires three main components:

  1. A producer (one or more cameras, perhaps) that captures and stores images;
  2. Redis Stream that saves images (in a stream data store) in the order they arrive; and
  3. An image processor that processes each image. 
redis streams 2 figure 1 Redis Labs

Figure 1. Sample image processing solution with a single producer and consumer.

Suppose your producer saves 500 images per second, and the image processor processes only 100 images per second at its full capacity. This rate difference will create a backlog, and your image processor will never be able to catch up. An easy way to address this problem is to run five image processors (as shown in Figure 2), each processing a mutually exclusive set of images. You can achieve this through a consumer group, which enables you to partition your workloads and route them to different consumers.

redis streams 2 figure 2 Redis Labs

Figure 2. Scaling the solution with more consumers to match the production rate.

A consumer group does more than data partitioning — it ensures data safety and enables disaster recovery.

How a Redis Streams consumer group works

A consumer group is a data structure within a Redis Stream. As shown in Figure 3, you can think about a consumer group as a collection of lists. Another thing to imagine is a list of items that are not consumed by any consumers — for our discussion, let’s call this an “unconsumed list.” As data arrives in the stream, it is immediately pushed to the unconsumed list.

redis streams 2 figure 3 Redis Labs

Figure 3. How a Redis Streams consumer group is structured.

The consumer group maintains a separate list for each consumer, typically with an application attached. In figure 3, our solution has N identical applications (App 1, App 2, …. App n) that read data via Consumer 1, Consumer 2, … Consumer n respectively.

When an app reads data using the XREADGROUP command, specific data entries are removed from the unconsumed list and pushed into the pending entries list that belongs to the respective consumer. Thus, no two consumers will consume the same data.

Finally, when the app notifies the stream with the XACK command, it will remove the item from the consumer’s pending entries list.

Now that I’ve explained the basics of consumer groups, let’s dig deeper into how this data lifecycle works.

Creating a Redis Streams consumer group

You can create a new consumer group using the command XGROUP CREATE, as shown below.

XGROUP CREATE mystream mygroup $ MKSTREAM

As with XREAD, a $ sign at the end of the command tells the stream to deliver only new data from that point in time forward. The alternate option is 0 or another ID from the stream entry. When using 0, the stream will deliver all data from the beginning of the stream.

MKSTREAM creates a new stream, mystream in this case, if it does not already exist.

Reading and managing Redis Stream data

Assume you have a Redis Stream (mystream), and you have already created a consumer group (mygroup) as shown above. You can now add items with names a, b, c, d, e as in the following example.

XADD mystream * name a

Running this command for names a through e will populate Redis Stream, mystream, and the unconsumed list of the consumer group mystream. This is illustrated in Figure 4.

redis streams 2 figure 4 Redis Labs

Figure 4. A new Redis Streams consumer group called mygroup.

Here you can see that consumers Alice and Bob haven’t started their jobs yet. App A consumes data through the consumer Alice, while App B consumes data through Bob.

Consuming Redis Streams data

The command to read data from a group is XREADGROUP. In our example, when App A starts processing data, it calls the consumer (Alice) to fetch data, as in:

XREADGROUP GROUP mygroup COUNT 2 Alice STREAMS mystream >

Similarly, App B reads the data via Bob, as follows:

XREADGROUP GROUP mygroup COUNT 2 Bob STREAMS mystream >

The special character > at the end tells Redis Streams to fetch only data entries that are not delivered to any other consumers. Also note that no two consumers will consume the same data, which will result in moving data from the unconsumed list to Alice and Bob as shown in Figure 5.

redis streams 2 figure 5 Redis Labs

Figure 5. Reading data to the consumer in a Redis Streams consumer group.

Removing processed messages from pending entries lists

The data in the pending entries lists of your consumers will remain there until App A and App B acknowledge to Redis Streams that they have successfully consumed the data. This is done using the command XACK. For example, App A would acknowledge as follows after consuming d and e, which have the IDs 1526569411111-0 and 1526569411112-0.

XACK mystream mygroup 1526569411111-0 1526569411112-0

The combination of XREADGROUP and XACK is analogous to starting a transaction and committing it, which ensures data safety. 

After running XACK, let’s assume App A executed XREADGROUP as shown below. Now the data structure looks like Figure 6.

XREADGROUP GROUP mygroup COUNT 2 Alice STREAMS mystream >
redis streams 2 figure 6 Redis Labs

Figure 6. The Redis Streams consumer group after XACK and XREADGROUP by App A.

Recovering from failures

If App B terminated due to failure while processing b and c, then the data structure would look like Figure 7.

redis streams 2 figure 7 Redis Labs

Figure 7. App B hasn’t acknowledged with the XACK command.

Now you are left with two options:

1. Restart App B and reload the data from the consumer (Bob).

In this case, App B must read data from your consumer (Bob) using the XREADGROUP command, but with one difference. Instead of > at the end, App B would pass 0 (or the ID lower than the previous data entry that was processed). Remember that > sends new data from the unconsumed list to the consumer.

XREADGROUP GROUP mygroup COUNT 2 Bob STREAMS mystream 0

The above command will retrieve data entries that are already stored in the list for consumer Bob. It will not fetch new data from the unconsumed list. App B could iterate through all the data in the consumer Bob before fetching new data.

2. Force Alice to claim all the data from Bob and process it via App A.

This is particularly helpful if you cannot recover App B due to node, disk, or network failure. In such cases, any other consumer (such as Alice) can claim Bob’s data and continue processing that data, thus preventing service downtime. To claim Bob’s data, you must run two sets of commands:

XPENDING mystream mygroup - + 10 Bob

This will fetch all pending data entries for Bob. The options - and + fetch the entire range. If b and c had the IDs 1526569411113-0 and 1526569411114-0 respectively, the command that will move Bob’s data to Alice is as follows:

XCLAIM mystream mygroup Alice 0 1526569411113-0 1526569411114-0

Consumer groups maintain a running clock for data in the consumed list. For example, when App B reads b, the clock kicks in until Bob receives the ACK. With the time option in the XCLAIM command, you can tell the consumer group to move only data that’s idle longer than a specified time. You can also ignore that by passing 0 as shown in the example above. The result of these commands is illustrated in Figure 8. XCLAIM also comes in handy when one of your consumer processors is slow, resulting in a backlog of unprocessed data.

redis streams 2 figure 8 Redis Labs

Figure 8. Alice claimed all of the data from Bob.

In the previous article, we covered the basics of how to use Redis Streams. We went a bit deeper in this article and explained when to use consumer groups and how they work. Consumer groups in Redis Streams reduce your burden when it comes to managing data partitions, their lifecycles, and data safety. Plus, the scale-out capabilities of consumer groups can benefit many real-time applications.

In a forthcoming third article on Redis Streams, I will demonstrate how to develop a real-time classification application using Redis Streams and Lettuce, a Java-based open source library for Redis. Meanwhile, you can learn more by working through the Redis Streams tutorial on the Redis project website. 

Roshan Kumar is a senior product manager at Redis Labs. He has extensive experience in software development and technology marketing. Roshan has worked at Hewlett-Packard and many successful Silicon Valley startups including ZillionTV, Salorix, Alopa, and ActiveVideo. As an enthusiastic programmer, he designed and developed mindzeal.com, an online platform hosting computer programming courses for young students. Roshan holds a bachelor’s degree in computer science and an MBA from Santa Clara University.

New Tech Forum provides a venue to explore and discuss emerging enterprise technology in unprecedented depth and breadth. The selection is subjective, based on our pick of the technologies we believe to be important and of greatest interest to InfoWorld readers. InfoWorld does not accept marketing collateral for publication and reserves the right to edit all contributed content. Send all inquiries to newtechforum@infoworld.com.

Copyright © 2018 IDG Communications, Inc.