Core Messaging Patterns
Work queues, publish/subscribe, routing, topics, and request/reply (RPC) implemented with Spring AMQP.
Prerequisite:Consumer Acknowledgements and PrefetchYou’ll need: The Spring AMQP setup from this section and a broker running locally.
You now know how to route, store, publish, consume, and acknowledge. This module assembles those pieces into the five patterns that cover the vast majority of real systems.
What you’ll be able to do after this module
- Distribute work across competing consumers with a work queue.
- Broadcast events with publish/subscribe.
- Route selectively with direct and topic exchanges.
- Implement request/reply (RPC) over RabbitMQ with Spring AMQP.
1. Work queues (competing consumers)
Multiple instances of the same consumer read from one queue. The broker hands each message to exactly one of them, spreading load. This is how you scale a service horizontally.
flowchart LR
P["Order Service"]
Q["inventory.queue"]
C1["Inventory instance 1"]
C2["Inventory instance 2"]
C3["Inventory instance 3"]
P --> Q
Q --> C1
Q --> C2
Q --> C3
Every instance declares the same listener on the same queue. Run more instances to add capacity. Prefetch (from the previous module) controls how fairly work is spread.
@RabbitListener(queues = "inventory.queue")
public void reserve(OrderCreated event) {
inventoryService.reserve(event);
}
Note: Each message goes to only one consumer here. To have several different services each receive a copy, use publish/subscribe below.
2. Publish/subscribe (fanout)
One event, many independent consumers. A fanout exchange delivers a copy to every bound queue, and each service has its own queue.
flowchart LR
P["Order Service"]
FXorders.fanout
QI["inventory.queue"]
QN["notification.queue"]
QA["analytics.queue"]
P --> FX
FX --> QI
FX --> QN
FX --> QA
@Bean
FanoutExchange ordersFanout() {
return new FanoutExchange("orders.fanout");
}
@Bean
Binding notificationBinding(Queue notificationQueue, FanoutExchange ordersFanout) {
return BindingBuilder.bind(notificationQueue).to(ordersFanout);
}
Each service gives its queue a distinct name so all of them receive every event. Routing details are in Exchanges, Bindings and Routing Topologies.
3. Routing (direct exchange)
When consumers want only some events, use a direct exchange and bind each queue to the routing keys it cares about.
flowchart LR
P["Order Service"]
DXorders.direct
QC["fulfilment.queue"]
QX["cancellation.queue"]
P -->|"order.created"| DX
P -->|"order.cancelled"| DX
DX -->|"order.created"| QC
DX -->|"order.cancelled"| QX
rabbitTemplate.convertAndSend("orders.direct", "order.cancelled", event);
4. Topics (pattern routing)
A topic exchange routes on dotted patterns, so consumers subscribe to slices of the event space without the producer knowing who listens.
flowchart LR
P["Order Service"]
TXorders.topic
P -->|"order.eu.created"| TX
TX -->|"order.eu.*"| QEU["eu.orders.queue"]
TX -->|"order.#"| QAUD["audit.queue"]
@Bean
Binding euBinding(Queue euOrdersQueue, TopicExchange ordersTopic) {
return BindingBuilder.bind(euOrdersQueue).to(ordersTopic).with("order.eu.*");
}
Wildcard rules (* and #) are detailed in the routing module.
5. Request/reply (RPC)
Sometimes you want a response over messaging. In request/reply, the client sends a request, includes a reply-to queue and a correlation-id, and blocks for the matching reply. Spring AMQP handles the plumbing through convertSendAndReceive.
sequenceDiagram
participant Client
participant Broker as RabbitMQ
participant Server as Pricing Service
Client->>Broker: publish request (reply-to, correlation-id)
Broker->>Server: deliver request
Server->>Server: compute price
Server->>Broker: publish reply (same correlation-id)
Broker->>Client: deliver reply
Note over Client: unblocks with the response
Client side:
PriceQuote quote = (PriceQuote) rabbitTemplate.convertSendAndReceive(
"pricing.exchange", "price.request", new PriceRequest("WIDGET-1", 2));
Server side: return a value from the listener, and Spring publishes it to the request’s reply-to with the correct correlation-id.
@RabbitListener(queues = "price.request.queue")
public PriceQuote quote(PriceRequest request) {
return pricingService.quote(request);
}
Spring uses a fast direct reply-to mechanism by default, so you do not declare a reply queue yourself.
Caution: Request/reply reintroduces synchronous coupling and latency, the very things messaging avoids. Use it only when the caller truly needs the answer inline. For everything else, prefer fire-and-forget events.
Choosing a pattern
| Need | Pattern |
|---|---|
| Scale one consumer horizontally | Work queue |
| One event, many independent reactions | Publish/subscribe |
| Selective delivery by exact key | Routing (direct) |
| Selective delivery by pattern | Topics |
| A response is required inline | Request/reply |
Checkpoint
You should now be able to:
- Scale a consumer with a work queue and competing consumers.
- Broadcast with publish/subscribe.
- Route selectively with direct and topic exchanges.
- Implement request/reply with
convertSendAndReceive, and explain its trade-offs.