Read time: ~

First Producer and Consumer with Spring for Apache Kafka

Add spring-kafka, configure the connection, create topics with KafkaAdmin, publish with KafkaTemplate, consume with @KafkaListener, and serialize typed DTOs as JSON.

This is where the concepts become code. You built a mental model, saw the internals, and ran a cluster by hand. Now the Order service publishes an OrderCreated event with a KafkaTemplate, and the Inventory service consumes it with a @KafkaListener, over the same orders topic you produced to from the console.

You start with plain strings to prove the wiring, then switch to typed DTOs serialized as JSON, which is how real services exchange events.


What you’ll be able to do after this module

  • Add and configure Spring for Apache Kafka in a Spring Boot service.
  • Create topics from code with NewTopic and KafkaAdmin instead of the CLI.
  • Publish records with KafkaTemplate and consume them with @KafkaListener.
  • Serialize and deserialize typed DTOs as JSON, with trusted packages configured.
  • Guard against poison records with ErrorHandlingDeserializer.

1. Add the dependency and configure the connection

Add the Spring for Apache Kafka starter. It is a plain library dependency, not a Spring Boot starter artifact, but Spring Boot auto-configures it.

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Point the application at the broker from the local lab in application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:9092

From this one property, Spring Boot auto-configures a ProducerFactory, a ConsumerFactory, a KafkaTemplate, and the listener container infrastructure. As covered in Core Concepts, the client uses bootstrap-servers only to discover the cluster, then talks to whichever broker leads each partition.


2. The end-to-end round trip

Here is what happens when the Order service publishes and the Inventory service consumes.

sequenceDiagram
    participant Producer as Order Service
    participant Template as KafkaTemplate
    participant Broker as Kafka (orders topic)
    participant Listener as @KafkaListener
    participant Consumer as Inventory Service

    Producer->>Template: send("orders", orderId, OrderCreated)
    Template->>Template: serialize key and value
    Template->>Broker: append to a partition of orders
    Broker->>Listener: deliver record from assigned partition
    Listener->>Listener: deserialize value to OrderCreated
    Listener->>Consumer: invoke handler(OrderCreated)
    Consumer->>Broker: offset committed after handler returns

Unlike a queue, the broker does not delete the record when Inventory reads it. The record stays in the log, and Inventory tracks its position with a committed offset, which you meet properly in Consuming Deeper.


3. Create the topic from code

In the lab you created orders with the CLI. In an application you declare it as a bean, and KafkaAdmin (auto-configured from bootstrap-servers) creates it on startup if it does not already exist.

@Configuration
public class KafkaTopicConfig {

    public static final String ORDERS_TOPIC = "orders";

    @Bean
    NewTopic ordersTopic() {
        return TopicBuilder.name(ORDERS_TOPIC)
                .partitions(3)
                .replicas(1)
                .build();
    }
}

4. Publish with KafkaTemplate

Inject KafkaTemplate and send. Start with a string value to prove the wiring end to end before introducing types.

@RestController
@RequiredArgsConstructor
public class OrderController {

    private final KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping("/orders")
    public String createOrder(@RequestBody String orderJson) {
        String orderId = "order-1";
        kafkaTemplate.send(KafkaTopicConfig.ORDERS_TOPIC, orderId, orderJson);
        return "published";
    }
}

The second argument is the record key. As you saw in the lab, the key decides the partition, and all records with the same key keep their relative order. Key selection is covered in depth in Producing Deeper.


5. Consume with @KafkaListener

Annotate a method with @KafkaListener. Spring starts a listener container that polls the topic and invokes your method for each record.

@Component
public class InventoryListener {

    @KafkaListener(topics = KafkaTopicConfig.ORDERS_TOPIC, groupId = "inventory-service")
    public void onOrderCreated(String orderJson) {
        System.out.println("Reserving stock for: " + orderJson);
    }
}

The groupId names the consumer group. Every instance of the Inventory service that shares this group splits the partitions of orders between them, so the work scales out. Send a request and watch it flow:

curl -X POST localhost:8080/orders \
  -H "Content-Type: application/json" \
  -d '{"orderId":1,"item":"widget","amount":49.90}'

The listener prints the payload almost immediately. Open the Kafka UI at http://localhost:8080 and you can see the record on the orders topic and the inventory-service group’s position advance.


6. Serialize typed DTOs as JSON

Passing raw strings is fragile. Configure JSON serialization so producer and consumer exchange a typed DTO. Define the event as a record whose fields match the JSON you produced by hand in the lab.

public record OrderCreated(long orderId, String item, BigDecimal amount) {}

Configure the serializer and deserializer in application.yml. The producer writes JSON, and the consumer rebuilds the typed object.

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "com.example.orders"
        spring.json.value.default.type: "com.example.orders.OrderCreated"

Now publish and consume the DTO directly, with no manual JSON handling:

// Producer: KafkaTemplate<String, OrderCreated>
kafkaTemplate.send("orders", String.valueOf(event.orderId()), event);

// Consumer
@KafkaListener(topics = "orders", groupId = "inventory-service")
public void onOrderCreated(OrderCreated event) {
    // event.item() and event.amount() are strongly typed
}

7. Survive a poison record

If a bad record lands on the topic, for example malformed JSON, a raw JsonDeserializer throws while deserializing, before your handler runs. The container cannot advance past it, so the same record is retried forever and the partition stalls.

Wrap the deserializer in ErrorHandlingDeserializer so a deserialization failure becomes a handled error instead of an infinite loop.

spring:
  kafka:
    consumer:
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.trusted.packages: "com.example.orders"
        spring.json.value.default.type: "com.example.orders.OrderCreated"

This is the minimum safe default. The full story of retries, backoff, and routing bad records to a dead letter topic lives in Retries, Error Handling, and Dead Letter Topics.


8. Guided practical

Run this against the single-broker lab from Local Lab.

  1. Start the lab with docker compose up -d and confirm the broker is up.
  2. Create a Spring Boot 3.x app with the spring-kafka dependency and bootstrap-servers: localhost:9092.
  3. Add the NewTopic bean, start the app, and confirm the orders topic appears in the Kafka UI.
  4. Add the KafkaTemplate producer and the @KafkaListener, then curl the endpoint and watch the listener log the payload.
  5. Switch to the OrderCreated DTO with JSON serialization, publish again, and confirm the consumer receives a typed object.
  6. Produce one malformed record from the console (kafka-console-producer.sh), confirm it stalls the raw consumer, then add ErrorHandlingDeserializer and confirm the app no longer loops.

Next:Producing Deeper, where you take control of keys, partitioning, acks, and batching to make the producer fast and correct.