Kafka Streams: Stateless and Stateful Processing
The Streams DSL, KStream and KTable, the stream-table duality, state stores and changelogs, windowing, joins, exactly-once, and interactive queries with Spring.
So far you consume records one at a time and manage any state yourself. Kafka Streams is a library for processing and transforming streams declaratively, with state, windows, and joins handled for you and stored durably in Kafka. This is a detailed, advanced module: it builds a running revenue aggregation over the Payment stream and explains the machinery underneath.
Kafka Streams is just a library, not a separate cluster. It runs inside your Spring Boot app, scales by running more instances, and uses Kafka itself for state and fault tolerance.
What you’ll be able to do after this module
- Build a topology with the Streams DSL using
KStreamandKTable. - Apply stateless operations like
map,filter, andflatMap. - Aggregate with
groupByKeyandcountoraggregate, backed by a state store. - Explain the stream-table duality and changelog topics.
- Apply windowing and joins, and enable exactly-once.
- Query state directly with interactive queries.
1. KStream and KTable
The DSL has two core abstractions.
KStream: an unbounded stream of independent records, each an event. Readingpaymentsas aKStreamgives everyPaymentSucceededevent.KTable: a changelog view where each key maps to its latest value, like a continuously updated table. Reading a compacted topic as aKTablegives the current value per key.
A KStream answers “what happened”, a KTable answers “what is the current state”. Most topologies mix both.
flowchart LR
src["payments topic"] --> ks["KStream<br/>each payment event"]
ks --> agg["aggregate by key"]
agg --> kt["KTable<br/>revenue per customer"]
kt --> out["revenue-by-customer topic"]
2. Stateless operations
Stateless operations transform each record independently, with no memory of past records. They map directly onto the DSL.
KStream<String, PaymentSucceeded> payments =
builder.stream("payments");
payments
.filter((key, p) -> p.amount().compareTo(BigDecimal.ZERO) > 0) // drop zero/refunds
.mapValues(p -> p.amount()) // keep the amount
.to("payment-amounts");
filter, map, mapValues, and flatMap are the workhorses. They need no state store because the output for a record depends only on that record.
3. Stateful aggregation
Aggregation needs to remember a running result per key, which is state. Group records by key, then aggregate. Kafka Streams keeps the running value in a local state store and backs it up to a changelog topic.
KStream<String, PaymentSucceeded> payments = builder.stream("payments");
KTable<String, Double> revenueByCustomer = payments
.groupBy((key, p) -> p.customerId())
.aggregate(
() -> 0.0,
(customerId, payment, total) -> total + payment.amount().doubleValue(),
Materialized.as("revenue-by-customer-store"));
revenueByCustomer.toStream().to("revenue-by-customer");
The result is a KTable: the latest running revenue per customer. Every new payment updates the store and emits the new total downstream.
4. The stream-table duality and changelogs
The two abstractions are two views of the same data. A stream of updates can be folded into a table (the latest value per key), and a table’s changes can be read out as a stream. This is the stream-table duality.
State stores rely on it for fault tolerance. Every update to a store is also written to a compacted changelog topic in Kafka. If an instance dies, another instance rebuilds the store by replaying the changelog, so state survives failures without an external database.
flowchart TD
upd["record update"] --> store["local state store<br/>(RocksDB)"]
store --> clog["changelog topic<br/>(compacted)"]
clog -.on failure, replay to rebuild.-> store2["restored store<br/>on another instance"]
This is exactly the log-compaction use case from Storage Internals: the changelog keeps the latest value per key so it can rebuild the table.
5. Windowing
Aggregating over all time is often not what you want. Windows bucket records by time so you can compute, for example, revenue per hour.
| Window | Behavior |
|---|---|
| Tumbling | Fixed-size, non-overlapping (every 1 hour) |
| Hopping | Fixed-size, overlapping (1 hour, advancing every 15 min) |
| Session | Dynamic, grouped by gaps of inactivity |
payments
.groupBy((key, p) -> p.customerId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.aggregate(
() -> 0.0,
(customerId, payment, total) -> total + payment.amount().doubleValue(),
Materialized.as("hourly-revenue-store"));
6. Joins
Streams can be joined to combine related data.
- Stream-stream join: correlate two event streams within a time window, for example matching
OrderCreatedwithPaymentSucceededfor the same order id. - Stream-table join: enrich each event with the latest state from a table, for example adding customer details from a
customersKTableto each payment.
Stream-stream joins are windowed because two events rarely arrive at the same instant. Stream-table joins are not windowed, because the table always has a current value to enrich with.
7. Exactly-once and interactive queries
Kafka Streams builds on the transactions from Transactions and Exactly-Once Semantics. Turn on exactly-once with one setting, and Streams makes reads, state updates, and writes atomic across the topology.
spring:
kafka:
streams:
properties:
processing.guarantee: exactly_once_v2
Interactive queries let you read a state store directly from the application, turning your stream processor into a queryable service without an external database.
ReadOnlyKeyValueStore<String, Double> store = streamsBuilderFactoryBean
.getKafkaStreams()
.store(StoreQueryParameters.fromNameAndType(
"revenue-by-customer-store", QueryableStoreTypes.keyValueStore()));
Double revenue = store.get(customerId);
8. Spring integration
Spring for Apache Kafka wires Streams through spring-kafka plus the kafka-streams dependency. Annotate a config class with @EnableKafkaStreams and Spring provides a managed StreamsBuilderFactoryBean. You define the topology by injecting a StreamsBuilder.
@Configuration
@EnableKafkaStreams
public class RevenueTopology {
@Bean
KStream<String, PaymentSucceeded> revenueStream(StreamsBuilder builder) {
KStream<String, PaymentSucceeded> payments = builder.stream("payments");
payments
.groupBy((key, p) -> p.customerId())
.aggregate(
() -> 0.0,
(id, p, total) -> total + p.amount().doubleValue(),
Materialized.as("revenue-by-customer-store"))
.toStream()
.to("revenue-by-customer");
return payments;
}
}
Set the application id and default serdes in application.yml:
spring:
kafka:
streams:
application-id: revenue-processor
bootstrap-servers: localhost:9092
The application-id is also the consumer group and the prefix for internal changelog and repartition topics, so it must be unique and stable per processor.
9. Guided practical
Run this against the local lab.
- Add the
kafka-streamsdependency and an@EnableKafkaStreamsconfig. - Build the revenue-by-customer aggregation topology above.
- Produce several
PaymentSucceededrecords for two customers and confirm running totals appear onrevenue-by-customer. - Add an interactive query endpoint and read a customer’s current revenue over HTTP.
- Enable
exactly_once_v2and confirm the topology still runs. - Restart the app and confirm the state store rebuilds from its changelog.
Next:Kafka Connect and the Wider Ecosystem, where you move data in and out of Kafka without writing producer or consumer code.