Read time: ~

Kafka Streams: Stateless and Stateful Processing

The Streams DSL, KStream and KTable, the stream-table duality, state stores and changelogs, windowing, joins, exactly-once, and interactive queries with Spring.

So far you consume records one at a time and manage any state yourself. Kafka Streams is a library for processing and transforming streams declaratively, with state, windows, and joins handled for you and stored durably in Kafka. This is a detailed, advanced module: it builds a running revenue aggregation over the Payment stream and explains the machinery underneath.

Kafka Streams is just a library, not a separate cluster. It runs inside your Spring Boot app, scales by running more instances, and uses Kafka itself for state and fault tolerance.


What you’ll be able to do after this module

  • Build a topology with the Streams DSL using KStream and KTable.
  • Apply stateless operations like map, filter, and flatMap.
  • Aggregate with groupByKey and count or aggregate, backed by a state store.
  • Explain the stream-table duality and changelog topics.
  • Apply windowing and joins, and enable exactly-once.
  • Query state directly with interactive queries.

1. KStream and KTable

The DSL has two core abstractions.

  • KStream: an unbounded stream of independent records, each an event. Reading payments as a KStream gives every PaymentSucceeded event.
  • KTable: a changelog view where each key maps to its latest value, like a continuously updated table. Reading a compacted topic as a KTable gives the current value per key.

A KStream answers “what happened”, a KTable answers “what is the current state”. Most topologies mix both.

flowchart LR
    src["payments topic"] --> ks["KStream<br/>each payment event"]
    ks --> agg["aggregate by key"]
    agg --> kt["KTable<br/>revenue per customer"]
    kt --> out["revenue-by-customer topic"]

2. Stateless operations

Stateless operations transform each record independently, with no memory of past records. They map directly onto the DSL.

KStream<String, PaymentSucceeded> payments =
    builder.stream("payments");

payments
    .filter((key, p) -> p.amount().compareTo(BigDecimal.ZERO) > 0)  // drop zero/refunds
    .mapValues(p -> p.amount())                                     // keep the amount
    .to("payment-amounts");

filter, map, mapValues, and flatMap are the workhorses. They need no state store because the output for a record depends only on that record.


3. Stateful aggregation

Aggregation needs to remember a running result per key, which is state. Group records by key, then aggregate. Kafka Streams keeps the running value in a local state store and backs it up to a changelog topic.

KStream<String, PaymentSucceeded> payments = builder.stream("payments");

KTable<String, Double> revenueByCustomer = payments
    .groupBy((key, p) -> p.customerId())
    .aggregate(
        () -> 0.0,
        (customerId, payment, total) -> total + payment.amount().doubleValue(),
        Materialized.as("revenue-by-customer-store"));

revenueByCustomer.toStream().to("revenue-by-customer");

The result is a KTable: the latest running revenue per customer. Every new payment updates the store and emits the new total downstream.


4. The stream-table duality and changelogs

The two abstractions are two views of the same data. A stream of updates can be folded into a table (the latest value per key), and a table’s changes can be read out as a stream. This is the stream-table duality.

State stores rely on it for fault tolerance. Every update to a store is also written to a compacted changelog topic in Kafka. If an instance dies, another instance rebuilds the store by replaying the changelog, so state survives failures without an external database.

flowchart TD
    upd["record update"] --> store["local state store<br/>(RocksDB)"]
    store --> clog["changelog topic<br/>(compacted)"]
    clog -.on failure, replay to rebuild.-> store2["restored store<br/>on another instance"]

This is exactly the log-compaction use case from Storage Internals: the changelog keeps the latest value per key so it can rebuild the table.


5. Windowing

Aggregating over all time is often not what you want. Windows bucket records by time so you can compute, for example, revenue per hour.

WindowBehavior
TumblingFixed-size, non-overlapping (every 1 hour)
HoppingFixed-size, overlapping (1 hour, advancing every 15 min)
SessionDynamic, grouped by gaps of inactivity
payments
    .groupBy((key, p) -> p.customerId())
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
    .aggregate(
        () -> 0.0,
        (customerId, payment, total) -> total + payment.amount().doubleValue(),
        Materialized.as("hourly-revenue-store"));

6. Joins

Streams can be joined to combine related data.

  • Stream-stream join: correlate two event streams within a time window, for example matching OrderCreated with PaymentSucceeded for the same order id.
  • Stream-table join: enrich each event with the latest state from a table, for example adding customer details from a customersKTable to each payment.

Stream-stream joins are windowed because two events rarely arrive at the same instant. Stream-table joins are not windowed, because the table always has a current value to enrich with.


7. Exactly-once and interactive queries

Kafka Streams builds on the transactions from Transactions and Exactly-Once Semantics. Turn on exactly-once with one setting, and Streams makes reads, state updates, and writes atomic across the topology.

spring:
  kafka:
    streams:
      properties:
        processing.guarantee: exactly_once_v2

Interactive queries let you read a state store directly from the application, turning your stream processor into a queryable service without an external database.

ReadOnlyKeyValueStore<String, Double> store = streamsBuilderFactoryBean
    .getKafkaStreams()
    .store(StoreQueryParameters.fromNameAndType(
        "revenue-by-customer-store", QueryableStoreTypes.keyValueStore()));

Double revenue = store.get(customerId);

8. Spring integration

Spring for Apache Kafka wires Streams through spring-kafka plus the kafka-streams dependency. Annotate a config class with @EnableKafkaStreams and Spring provides a managed StreamsBuilderFactoryBean. You define the topology by injecting a StreamsBuilder.

@Configuration
@EnableKafkaStreams
public class RevenueTopology {

    @Bean
    KStream<String, PaymentSucceeded> revenueStream(StreamsBuilder builder) {
        KStream<String, PaymentSucceeded> payments = builder.stream("payments");
        payments
            .groupBy((key, p) -> p.customerId())
            .aggregate(
                () -> 0.0,
                (id, p, total) -> total + p.amount().doubleValue(),
                Materialized.as("revenue-by-customer-store"))
            .toStream()
            .to("revenue-by-customer");
        return payments;
    }
}

Set the application id and default serdes in application.yml:

spring:
  kafka:
    streams:
      application-id: revenue-processor
      bootstrap-servers: localhost:9092

The application-id is also the consumer group and the prefix for internal changelog and repartition topics, so it must be unique and stable per processor.


9. Guided practical

Run this against the local lab.

  1. Add the kafka-streams dependency and an @EnableKafkaStreams config.
  2. Build the revenue-by-customer aggregation topology above.
  3. Produce several PaymentSucceeded records for two customers and confirm running totals appear on revenue-by-customer.
  4. Add an interactive query endpoint and read a customer’s current revenue over HTTP.
  5. Enable exactly_once_v2 and confirm the topology still runs.
  6. Restart the app and confirm the state store rebuilds from its changelog.

Next:Kafka Connect and the Wider Ecosystem, where you move data in and out of Kafka without writing producer or consumer code.