Design a distributed message broker system that allows multiple producers to send messages concurrently while enabling consumers to retrieve and process them asynchronously.

Functional Requirements
-
Message Publishing by Producers: The system should allow multiple producers to send messages to the broker independently and concurrently. Producers should not need to know which consumers will process the messages, helping services remain loosely coupled and scalable.
-
Temporary and Durable Message Storage: Consumers may not process messages immediately, so the broker must temporarily store messages until they are consumed. This storage layer acts as a buffer between producers and consumers, allowing the system to handle traffic spikes, slow consumers, and temporary failures without losing data.
-
Intelligent Message Routing and Fan-Out: The broker should route messages to the correct queues or consumers based on routing rules. Some messages may be processed by only one consumer, while others may need to be delivered to multiple services simultaneously.
-
Efficient Consumer Waiting Mechanism: Consumers should not continuously poll the broker for new messages because constant polling wastes CPU and network resources. Instead, the broker should support efficient mechanisms so consumers can wait efficiently and process messages immediately when they arrive.
Core Domain Models
Let's discuss the key design choices made to satisfy the functional requirements of a message broker system and understand why each component exists in the architecture.
Message Broker
The Message Broker acts as the central coordinator, allowing multiple producers to concurrently publish messages and multiple consumers to subscribe and asynchronously process those messages.
Since consumers may not process messages immediately, the broker must buffer messages temporarily until consumers are ready to consume them. This buffering enables asynchronous communication and decouples producers from consumers.
The simplest way to implement the broker is to maintain a mapping of routing keys to message queues. Each routing key represents a topic or category of messages and can be associated with one or more queues, allowing multiple consumers or services to independently process the same event without interfering with each other.

When a producer publishes a message using a routing key, the broker looks up all queues bound to that routing key and pushes a copy of the message into each queue. Each consumer continuously processes messages from its subscribed queue and removes messages after successful processing.
public class MessageBroker {
// routingKey -> queues: Supports one-to-many message routing (fan-out)
private final Map<String, List<MessageQueue>> bindings = new ConcurrentHashMap<>();
// queueName -> queue: fast lookup
private final Map<String, MessageQueue> queues = new ConcurrentHashMap<>();
// Bind queue to routing key
public void bind(String routingKey, MessageQueue queue) {
queues.put(queue.getName(),queue);
bindings.computeIfAbsent(routingKey, k -> new CopyOnWriteArrayList<>()).add(queue);
}
// Publish message
@Override
public void publish(String routingKey, Message message) {
System.out.println("[BROKER] Publishing message : " + message.getPayload());
List<MessageQueue> queues = bindings.get(routingKey);
if(queues == null || queues.isEmpty()) {
System.out.println("[BROKER] No queues bound for routing key : " + routingKey);
return;
}
// Fan-out message to all bound queues
for(MessageQueue queue : queues) {
queue.enqueue(message);
}
}
// Subscribe consumer to queue
@Override
public void subscribe(String queueName, Consumer consumer) {
MessageQueue queue = queues.get(queueName);
queue.subscribe(consumer);
System.out.println("[BROKER] Consumer subscribed to : " + queueName);
}
}
To keep the broker's code clean, move the mapping logic into an Exchange component.
The broker's only job should be to receive a message and hand it to the exchange. The
exchange decides which queues get it. This allows you to swap out strategies (e.g.,
DirectExchange vs. TopicExchange with wildcard support like order.*) without changing
your broker code.
public class MessageBroker {
// queueName -> queue: fast lookup
private final Map<String, MessageQueue> queues = new ConcurrentHashMap<>();
private final Exchange exchange;
public MessageBroker(Exchange exchange) {
this.exchange = exchange;
}
// Publish message
@Override
public void publish(String routingKey, Message message) {
exchange.route(routingKey, message);
}
// Subscribe consumer to queue
@Override
public void subscribe(String queueName, Consumer consumer) {
MessageQueue queue = queues.get(queueName);
queue.subscribe(consumer);
System.out.println("[BROKER] Consumer subscribed to : " + queueName);
}
}
Exchange
The Exchange is responsible for binding queues to routing keys and routing messages to the correct queues based on the routing key.
interface Exchange {
void bind(String routingKey, MessageQueue queue);
void route(String routingKey, Message message);
}
Since the broker delegates routing responsibility to the Exchange abstraction, different routing behaviors can be introduced without modifying the broker itself.
For example, a DirectExchange performs exact routing key matching and routes messages only
to queues explicitly bound to that routing key.
public class DirectExchange implements Exchange {
// routingKey -> queues
private final Map<String, List<MessageQueue>> bindings = new ConcurrentHashMap<>();
// Bind queue to routing key
@Override
public void bind(String routingKey, MessageQueue queue) {
bindings
.computeIfAbsent(
routingKey,
k -> new CopyOnWriteArrayList<>()
)
.add(queue);
System.out.println(
"[DIRECT-EXCHANGE] Bound queue '"
+ queue.getName()
+ "' to routing key '"
+ routingKey
+ "'"
);
}
// Route message to queues
@Override
public void route(String routingKey, Message message) {
List<MessageQueue> queues = bindings.get(routingKey);
if(queues == null || queues.isEmpty()) {
System.out.println(
"[DIRECT-EXCHANGE] No queues found for : "
+ routingKey
);
return;
}
// Route only to exact matching queues
for(MessageQueue queue : queues) {
queue.enqueue(message);
}
}
}
This strategy is useful when messages should be delivered to very specific consumers.
A TopicExchange supports wildcard-based routing and allows queues to subscribe to
patterns instead of exact routing keys.
public class TopicExchange implements Exchange {
/*
--------------------------------------------------------
pattern -> queues
Example:
order.* -> [Queue1, Queue2]
--------------------------------------------------------
*/
private final Map<String, List<MessageQueue>> bindings = new ConcurrentHashMap<>();
// Bind queue to pattern
@Override
public void bind(String pattern, MessageQueue queue) {
bindings
.computeIfAbsent(
pattern,
k -> new CopyOnWriteArrayList<>()
)
.add(queue);
System.out.println(
"[TOPIC-EXCHANGE] Bound queue '"
+ queue.getName()
+ "' to pattern '"
+ pattern
+ "'"
);
}
// Route message to queues
@Override
public void route(String routingKey, Message message) {
for(Map.Entry<String, List<MessageQueue>> entry : bindings.entrySet()) {
String pattern = entry.getKey();
if(matches(pattern, routingKey)) {
List<MessageQueue> queues = entry.getValue();
for(MessageQueue queue : queues) {
queue.enqueue(message);
}
}
}
}
// Pattern matching
private boolean matches(String pattern, String routingKey) {
/*
----------------------------------------------------
Convert topic pattern to regex
* -> one word
# -> zero or more words
----------------------------------------------------
*/
String regex = pattern
.replace(".", "\\.")
.replace("*", "[^.]+")
.replace("#", ".*");
return routingKey.matches(regex);
}
// Sample Usage
public static void main(String[] args) {
Exchange exchange = new TopicExchange();
exchange.bind("order.*", paymentQueue);
exchange.bind("*.created", analyticsQueue);
exchange.bind("order.#", auditQueue);
exchange.route("order.created", new Message("Order Created"));
exchange.route("order.updated", new Message("Order Updated"));
exchange.route("payment.created", new Message("Payment Created"));
}
}
This strategy allows you to build complex subscription models where consumers can subscribe
to categories of events (e.g., "give me all order-related events" using order.#) without
needing to know the specific routing keys ahead of time.
Message Queue
one consumer per queue.
class Message {
String id;
String payload;
long createdAt;
int retryCount;
}
class MessageQueue {
Deque<Message> queue; // main storage
Map<String, Message> inFlight; // messages being processed
public synchronized void enqueue(Message message) {
queue.offerLast(message);
}
public synchronized Message poll() {
Message msg = queue.pollFirst();
if (msg != null) {
inFlight.put(msg.id, msg);
}
return msg;
}
public synchronized void ack(String messageId) {
inFlight.remove(messageId);
}
public synchronized void retryUnacked() {
for (Message msg : inFlight.values()) {
if (shouldRetry(msg)) {
msg.retryCount++;
queue.offerLast(msg);
}
}
inFlight.clear();
}
}
Producer
public class Producer {
private final MessageBroker broker;
public Producer(MessageBroker broker) {
this.broker = broker;
}
public void send(
String routingKey,
String payload
) {
Message message =
new Message(payload);
broker.publish(routingKey, message);
}
}
This abstraction:
decouples producer logic from broker internals supports multiple producer implementations hides networking complexity
Consumer
We want horizontal scalability. Multiple Consumers Should Process Messages Concurrently. We use multiple consumer threads
public class ConsumerWorker implements Runnable {
private final String name;
private final BrokerQueue queue;
public ConsumerWorker(
String name,
BrokerQueue queue
) {
this.name = name;
this.queue = queue;
}
@Override
public void run() {
while(true) {
try {
Message message =
queue.dequeue();
System.out.println(
"[" + name + "] Processing: "
+ message.getPayload()
);
process(message);
System.out.println(
"[" + name + "] ACK: "
+ message.getId()
);
} catch(Exception ex) {
ex.printStackTrace();
}
}
}
private void process(
Message message
) throws Exception {
Thread.sleep(1000);
// Simulate failure
if(Math.random() < 0.2) {
message.incrementRetry();
System.out.println(
"[" + name + "] RETRY: "
+ message.getPayload()
);
queue.enqueue(message);
throw new RuntimeException(
"Processing failed"
);
}
}
}
Non-Functional Requirements
-
Concurrent Message Processing and Horizontal Scalability: The system should support multiple consumers processing messages in parallel to increase throughput and improve scalability. As traffic grows, additional consumers should be added easily to distribute the workload across multiple machines or instances.
-
Fault Tolerance Against Consumer Failures: Consumer failures are common in distributed systems due to crashes, timeouts, or processing errors. The broker should detect failed consumers and safely re-deliver unprocessed messages so that failures do not result in message loss.
Bottlenecks and Scaling Issues
The basic design requires one queue per consumer. While this is simple and easy to understand, it does not scale well in real-world distributed systems.
If a particular consumer becomes slow, messages will accumulate continuously in its dedicated queue, causing the queue size to grow indefinitely. Since each consumer owns its own queue, the system cannot automatically redistribute load across other consumers.
Producer
↓
Exchange
↓
-------------------------
| Queue-A → Consumer-A |
| Queue-B → Consumer-B |
| Queue-C → Consumer-C |
-------------------------
We can have multiple queue - consumer pairs. However, adding more queue - consumer pairs requires provisioning additional queues, which becomes an infrastructure management problem. As the number of consumers grows dynamically, managing queues becomes increasingly difficult.
Better Design: Competing Consumers
To resolve the scalability and infrastructure overhead issues, we can transition from Dedicated Queues to the Competing Consumers Pattern.
Instead of creating a new queue for every new consumer instance, you create one shared queue
(e.g., orders_queue) and allow multiple consumer instances to subscribe to it. To support
this, you need to update your MessageQueue class so it doesn't just hold one list of
subscribers, but manages the dispatching logic so that each message is delivered to only
one available consumer.
-
Round-Robin Dispatching: The queue maintains a pointer to the next consumer in its subscriber list. When a message arrives, it sends it to that consumer and moves the pointer.
-
Acknowledgment (Ack) Mechanism: This is the most critical missing piece for production stability. A message should not be "removed" from the queue the moment it is dispatched. It should stay in a "pending/unacknowledged" state. Only when the consumer sends an ACK back to the queue should the message be deleted. If a consumer crashes, the message can be requeued.
// Inside MessageQueue.java
private final AtomicInteger nextConsumer = new AtomicInteger(0);
private void dispatch() {
if (subscribers.isEmpty()) return;
Message message = queue.poll();
if (message == null) return;
// Pick one consumer (Round Robin)
int index = nextConsumer.getAndIncrement() % subscribers.size();
Consumer selected = subscribers.get(index);
// Dispatch to the specific consumer
CompletableFuture.runAsync(() -> selected.onMessage(message));
}
By moving to this model, you lose ordering guarantees (if that matters for your use case). Because multiple consumers are processing messages in parallel, Message A might finish after Message B even if Message A was processed first. If strict ordering is required, you would typically use "Consistent Hashing" exchanges to ensure all messages for a specific order ID always go to the same consumer.
With all this, the broker becomes heavily stateful, managing acknowledgments, delivery state, retries, and subscriptions. As message volume grows to millions of events per second, these operations become bottlenecks.