Introduction
In this comprehensive guide, we'll build a real-time Change Data Capture (CDC) system that automatically tracks every change made to a PostgreSQL database and creates an audit log. By the end, you'll understand how databases, message brokers, and applications work together to create event-driven architectures.
What We're Building
Imagine this scenario: Every time someone creates, updates, or deletes a person record in your database, you want to automatically log that change in an audit table. No manual triggers, no application-level tracking—just pure, automated database change tracking.
The Flow:
Database Change → Debezium Captures It → Kafka Stores It → Spring Boot Processes It → Audit Log Created
Why This Matters
Traditional applications require you to write code to track changes:
// Traditional way - you have to remember to do this!
person.save();
auditLog.create("Person created", person.id); // Easy to forget!
With CDC:
// CDC way - automatic!
person.save(); // That's it! CDC handles the rest automatically
Real-world use cases:
- Compliance & Auditing: Track who changed what and when
- Data Replication: Keep multiple databases in sync
- Event-Driven Architecture: Trigger actions based on data changes
- Cache Invalidation: Update caches when data changes
- Analytics: Feed data warehouses in real-time
- Microservices Communication: Notify other services about changes
Understanding the Architecture
The Components
1. PostgreSQL with Debezium Image
PostgreSQL is our source database, but not just any PostgreSQL—we use a special Debezium-configured version that has Write-Ahead Logging (WAL) enabled.
What is WAL? PostgreSQL keeps a log of every change before applying it. This is called the Write-Ahead Log. It's like a journal where PostgreSQL writes "I'm about to insert this record" before actually doing it. This ensures data safety if the server crashes.
Why do we need it for CDC? Debezium reads this WAL to see what changed. It's like reading someone's journal to know what they did.
2. Zookeeper
Zookeeper is Kafka's coordinator. It's like a manager that keeps track of:
- Which Kafka brokers are alive
- Which topics exist
- Leader election for partitions
- Configuration management
Why do we need it? Kafka (especially version 7.3.0 we're using) needs Zookeeper to coordinate distributed operations. Think of it as the "brain" that helps Kafka instances communicate.
3. Kafka
Kafka is a distributed message broker—a super-fast, reliable post office for data.
How it works:
- Topics: Like mail folders (e.g., "dbserver.public.person")
- Producers: Applications that send messages (Debezium)
- Consumers: Applications that read messages (Spring Boot)
- Partitions: Divide topics for parallelism
- Offsets: Keep track of which messages you've read
Why Kafka and not a database queue?
- Handles millions of messages per second
- Keeps messages even after they're read (replay capability)
- Distributed and fault-tolerant
- Scales horizontally
4. Kafka Connect (Debezium)
Kafka Connect is a framework for moving data in and out of Kafka. Debezium is a connector plugin that specializes in CDC.
What Debezium does:
- Connects to PostgreSQL
- Reads the Write-Ahead Log (WAL)
- Converts database changes to JSON messages
- Publishes them to Kafka topics
The magic: All of this happens automatically in real-time!
5. Spring Boot Application
Our Java application that:
- Listens to Kafka topics
- Processes CDC events
- Creates audit log entries
- Provides REST API for testing
Part 1: Setting Up the Infrastructure with Docker Compose
Why Docker Compose?
Instead of installing PostgreSQL, Kafka, Zookeeper, and Kafka Connect separately on your machine, Docker Compose lets us define all services in one file and start them with a single command.
Benefits:
- Consistent environment (works the same everywhere)
- Easy to start/stop everything
- Isolated from your system
- Easy to reset and start fresh
The Complete docker-compose.yml
Let's break down each service:
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
healthcheck:
test: ["CMD", "echo", "ruok", "|", "nc", "localhost", "2181"]
interval: 10s
timeout: 5s
retries: 5
start_period: 30s
Explanation:
image: Uses Confluent's Zookeeper (production-ready distribution)ports: Maps container port 2181 to host port 2181 (default Zookeeper port)ZOOKEEPER_CLIENT_PORT: The port clients use to connectZOOKEEPER_TICK_TIME: Basic time unit (milliseconds) for heartbeatshealthcheck: Docker checks if Zookeeper is ready by sending "ruok" (are you ok?) commandstart_period: 30s: Gives Zookeeper 30 seconds to start before checking health
kafka:
image: confluentinc/cp-kafka:7.3.0
container_name: kafka
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
healthcheck:
test: ["CMD-SHELL", "nc -z localhost 9092 || exit 1"]
interval: 10s
timeout: 5s
retries: 10
start_period: 60s
Critical Kafka Configuration Explained:
Why Two Listeners? This is the trickiest part! Kafka needs to be accessible from two places:
- Inside Docker network: Other containers (like Debezium) use
kafka:29092 - Outside Docker (your Windows machine): Spring Boot uses
localhost:9092
The Listener Configuration:
KAFKA_LISTENERS: Where Kafka actually listens (0.0.0.0 means all interfaces)PLAINTEXT://0.0.0.0:29092: Internal Docker communicationPLAINTEXT_HOST://0.0.0.0:9092: External host communication
KAFKA_ADVERTISED_LISTENERS: What Kafka tells clients to usePLAINTEXT://kafka:29092: Containers connect herePLAINTEXT_HOST://localhost:9092: Host machine connects here
Why This Matters:
When Debezium publishes a message, it connects using kafka:29092. Kafka then tells future consumers (your Spring Boot app) "you can reach me at localhost:9092". This is why your Spring Boot app on Windows can connect!
Other Settings:
KAFKA_BROKER_ID: 1: Unique ID for this Kafka instance (important in clusters)KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1: For single-node setup (would be 3 in production)KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0: Start consuming immediatelyhealthcheck: Verifies Kafka port 9092 is accepting connections
postgres:
image: debezium/postgres:13
container_name: postgres
ports:
- "5432:5432"
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: mydb
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s
timeout: 5s
retries: 5
Why debezium/postgres and not regular postgres? The Debezium PostgreSQL image comes pre-configured with:
- WAL level set to logical: Regular PostgreSQL uses "replica" level, but CDC needs "logical"
- wal2json plugin: Converts WAL to JSON format
- Proper permissions: Allows Debezium to create replication slots
What's a Replication Slot? A replication slot is like a bookmark in the WAL. It tells PostgreSQL "keep the log from this point onwards because someone (Debezium) is reading it."
connect:
image: debezium/connect:2.1
container_name: connect
ports:
- "8083:8083"
depends_on:
kafka:
condition: service_healthy
postgres:
condition: service_healthy
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8083/"]
interval: 30s
timeout: 10s
retries: 5
Kafka Connect Configuration:
BOOTSTRAP_SERVERS: kafka:29092: Connects to Kafka using internal Docker networkGROUP_ID: 1: Identifies this Connect clusterCONFIG_STORAGE_TOPIC: Stores connector configurations in KafkaOFFSET_STORAGE_TOPIC: Stores position in the WAL (so it can resume after restart)STATUS_STORAGE_TOPIC: Stores connector health status
Why port 8083? Kafka Connect provides a REST API on port 8083 for managing connectors (create, delete, update, status check).
Starting the Infrastructure
# Stop and remove any existing containers (clean slate)
docker compose down -v
# Start all services in detached mode (background)
docker compose up -d
# Wait for services to become healthy
Start-Sleep -Seconds 120
# Check status
docker compose ps
What -v does:
Removes volumes (stored data). This ensures you start completely fresh—useful for testing!
Why wait 2 minutes?
- Zookeeper needs ~10-15 seconds to start
- Kafka needs ~30-45 seconds (waits for Zookeeper, then initializes)
- Postgres needs ~10-15 seconds
- Kafka Connect needs ~45-60 seconds (waits for Kafka, loads plugins)
Verifying Health: All containers should show "Up (healthy)". The healthcheck ensures:
- Zookeeper is accepting connections
- Kafka broker is ready
- Postgres is accepting queries
- Kafka Connect REST API is responding
Part 2: Creating the Database Schema
CREATE TABLE person (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
age INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE audit_log (
id SERIAL PRIMARY KEY,
operation VARCHAR(10),
person_id BIGINT,
person_name VARCHAR(100),
person_age INT,
event_time TIMESTAMP,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
The person Table: This is our source table—the one we'll monitor for changes.
id SERIAL PRIMARY KEY: Auto-incrementing ID (essential for CDC to identify records)name VARCHAR(100) NOT NULL: Required fieldage INT: Optional fieldcreated_at TIMESTAMP: Tracks when record was created
The audit_log Table: This stores our audit trail.
operation: Type of change (CREATE, UPDATE, DELETE)person_id: Which person was affectedperson_name,person_age: Snapshot of data at the time of changeevent_time: When the database change occurredprocessed_at: When our Spring Boot app processed it (useful for debugging lag)
Run it:
docker compose exec postgres psql -U postgres -d mydb -c "
CREATE TABLE IF NOT EXISTS person (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
age INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS audit_log (
id SERIAL PRIMARY KEY,
operation VARCHAR(10),
person_id BIGINT,
person_name VARCHAR(100),
person_age INT,
event_time TIMESTAMP,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"
Part 3: Configuring Debezium Connector
The connector is the "bridge" between PostgreSQL and Kafka. We configure it via REST API.
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "mydb",
"topic.prefix": "dbserver",
"table.include.list": "public.person",
"plugin.name": "pgoutput",
"slot.name": "debezium",
"publication.name": "dbz_publication",
"publication.autocreate.mode": "all_tables"
}
}
Configuration Breakdown:
Basic Connection:
connector.class: Tells Kafka Connect this is a PostgreSQL connectordatabase.hostname: Docker service name (not "localhost"!)database.port: Standard PostgreSQL portdatabase.user/password: Credentials
Topic Configuration:
topic.prefix: "dbserver" → Creates topics likedbserver.public.person- Format:
{prefix}.{schema}.{table} - Why? Helps organize topics, especially with multiple databases
- Format:
Table Selection:
table.include.list: "public.person" → Only monitor the person table- You could monitor multiple: "public.person,public.orders"
- Or use patterns: "public.*" for all tables in public schema
PostgreSQL-Specific Settings:
plugin.name: "pgoutput" → The logical decoding plugin- Built into PostgreSQL 10+
- Alternative: "wal2json" (older setups)
slot.name: "debezium" → Name of the replication slot- PostgreSQL creates a slot with this name
- Prevents WAL from being deleted while Debezium is reading
publication.name: "dbz_publication" → Name of the PostgreSQL publication- Publications define which tables to replicate
publication.autocreate.mode: "all_tables"- Debezium automatically creates the publication
- Alternative: "filtered" (only specified tables)
Create the Connector:
Invoke-RestMethod -Method Post `
-Uri http://localhost:8083/connectors `
-Headers @{ "Content-Type" = "application/json" } `
-Body '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "mydb",
"topic.prefix": "dbserver",
"table.include.list": "public.person",
"plugin.name": "pgoutput",
"slot.name": "debezium",
"publication.name": "dbz_publication",
"publication.autocreate.mode": "all_tables"
}
}'
Verify It's Running:
Invoke-RestMethod -Uri http://localhost:8083/connectors/postgres-connector/status
You should see:
{
"name": "postgres-connector",
"connector": {
"state": "RUNNING",
"worker_id": "172.19.0.5:8083"
},
"tasks": [{
"id": 0,
"state": "RUNNING",
"worker_id": "172.19.0.5:8083"
}]
}
What "RUNNING" means:
- Debezium connected to PostgreSQL successfully
- Created the replication slot
- Started reading the WAL
- Publishing changes to Kafka
Part 4: Understanding the CDC Message Format
When you insert a person, Debezium creates a JSON message like this:
{
"schema": { ... },
"payload": {
"before": null,
"after": {
"id": 1,
"name": "John Doe",
"age": 25,
"created_at": 1770479813360635
},
"source": {
"version": "2.1.4.Final",
"connector": "postgresql",
"name": "dbserver",
"ts_ms": 1770479813373,
"snapshot": "false",
"db": "mydb",
"schema": "public",
"table": "person",
"txId": 490,
"lsn": 23880744
},
"op": "c",
"ts_ms": 1770479813840
}
}
Breaking It Down:
The payload object:
before: The row data BEFORE the changenullfor INSERT (nothing existed before)- Contains old values for UPDATE
- Contains deleted values for DELETE
after: The row data AFTER the change- Contains new values for INSERT and UPDATE
nullfor DELETE (nothing exists after)
source: Metadata about where this came fromts_ms: When the database change occurred (epoch milliseconds)db,schema,table: Identifies the exact tabletxId: Transaction ID (useful for grouping related changes)lsn: Log Sequence Number (position in WAL)
op: Operation type"c": CREATE (INSERT)"u": UPDATE"d": DELETE"r": READ (initial snapshot)
Why two timestamps?
source.ts_ms: When PostgreSQL committed the transactionpayload.ts_ms: When Debezium processed it- Difference = lag (should be milliseconds in real-time systems)
Example Messages:
INSERT:
{
"before": null,
"after": {"id": 1, "name": "John", "age": 30},
"op": "c"
}
UPDATE:
{
"before": {"id": 1, "name": "John", "age": 30},
"after": {"id": 1, "name": "John", "age": 31},
"op": "u"
}
DELETE:
{
"before": {"id": 1, "name": "John", "age": 31},
"after": null,
"op": "d"
}
Part 5: Building the Spring Boot Application
Project Setup
Dependencies (build.gradle):
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.kafka:spring-kafka'
runtimeOnly 'org.postgresql:postgresql'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
implementation 'com.fasterxml.jackson.core:jackson-databind'
}
Why each dependency?
spring-boot-starter-web: REST API supportspring-boot-starter-data-jpa: Database access with JPA/Hibernatespring-kafka: Kafka consumer supportpostgresql: PostgreSQL JDBC driverlombok: Reduces boilerplate (getters, setters, constructors)jackson-databind: JSON parsing (already included, but explicit is good)
Application Configuration (application.yml)
spring:
application:
name: spring-cdc-kafka
datasource:
url: jdbc:postgresql://localhost:5432/mydb
username: postgres
password: postgres
jpa:
hibernate:
ddl-auto: update
show-sql: true
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: audit-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: true
server:
port: 8080
logging:
level:
org.springframework.kafka: DEBUG
org.apache.kafka: INFO
com.thejavaengineer.springcdckafka: DEBUG
Configuration Explained:
DataSource:
url: jdbc:postgresql://localhost:5432/mydb: Connects to Postgres on host machine- Why localhost? Spring Boot runs on Windows, not in Docker
- Docker exposes Postgres port 5432 to host
JPA/Hibernate:
ddl-auto: update: Auto-creates/updates tables (convenient for development)- Other options:
create,create-drop,validate,none - Production: Use
validateornone+ Flyway/Liquibase for migrations
- Other options:
show-sql: true: Logs SQL queries (great for debugging)
Kafka Consumer:
bootstrap-servers: localhost:9092: Connects to Kafka on host machine- Remember: Kafka advertises
localhost:9092for external clients
- Remember: Kafka advertises
group-id: audit-group: Consumer group name- Multiple consumers with same group-id share the workload
- Each message delivered to only one consumer in the group
- Different groups all receive the same messages (pub-sub pattern)
auto-offset-reset: earliest: What to do when no offset existsearliest: Read from the beginning of the topiclatest: Read only new messagesnone: Throw error if no offset found
enable-auto-commit: true: Automatically mark messages as processed- Alternative: Manual commit for more control (at-least-once vs at-most-once)
Deserializers:
- Convert bytes from Kafka into Java objects
StringDeserializer: Treats messages as UTF-8 strings- We'll parse JSON manually using Jackson
Entity Classes
AuditLog.java:
package com.thejavaengineer.springcdckafka;
import jakarta.persistence.*;
import lombok.Data;
import java.time.LocalDateTime;
@Entity
@Table(name = "audit_log")
@Data
public class AuditLog {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String operation;
private Long personId;
private String personName;
private Integer personAge;
private LocalDateTime eventTime;
@Column(name = "processed_at")
private LocalDateTime processedAt = LocalDateTime.now();
}
Annotations Explained:
@Entity: Marks this as a JPA entity (database table)@Table(name = "audit_log"): Maps to "audit_log" table@Data: Lombok generates getters, setters, toString, equals, hashCode@Id: Primary key field@GeneratedValue(strategy = GenerationType.IDENTITY): Database auto-generates ID@Column(name = "processed_at"): Maps to "processed_at" column (Java uses camelCase)
Person.java:
package com.thejavaengineer.springcdckafka;
import jakarta.persistence.*;
import lombok.Data;
import java.time.LocalDateTime;
@Entity
@Table(name = "person")
@Data
public class Person {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String name;
private Integer age;
@Column(name = "created_at")
private LocalDateTime createdAt;
}
Repository Interfaces
package com.thejavaengineer.springcdckafka;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface AuditLogRepository extends JpaRepository<AuditLog, Long> {
}
@Repository
public interface PersonRepository extends JpaRepository<Person, Long> {
}
What JpaRepository provides:
save(entity): Insert or updatefindById(id): Find by primary keyfindAll(): Get all recordsdeleteById(id): Delete by primary keycount(): Count records- And 20+ more methods!
Spring Data JPA magic: You can add custom methods like:
List<AuditLog> findByOperation(String operation);
List<AuditLog> findByPersonIdOrderByEventTimeDesc(Long personId);
Spring automatically implements them based on method names!
Kafka Configuration
package com.thejavaengineer.springcdckafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Why this configuration?
@Configuration: Tells Spring this class defines beans (components)
@EnableKafka: Enables Kafka listener annotations (@KafkaListener)
ConsumerFactory: Factory pattern for creating Kafka consumers
- Encapsulates all configuration
- Allows easy customization per consumer
ConcurrentKafkaListenerContainerFactory: Creates listener containers
- "Concurrent" means multiple threads can process messages
- Each thread handles messages from different partitions
- Default: Number of partitions = number of threads
Why separate factory and config?
- Flexibility: Can have multiple consumer factories with different configs
- Testing: Easy to mock
- Spring best practices: Separation of concerns
The Kafka Consumer (The Heart of the Application)
package com.thejavaengineer.springcdckafka;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
@Component
@RequiredArgsConstructor
@Slf4j
public class DebeziumConsumer {
private final AuditLogRepository auditLogRepository;
private final ObjectMapper objectMapper = new ObjectMapper();
@KafkaListener(topics = "dbserver.public.person", groupId = "audit-group")
public void consume(String message) {
try {
log.info("🔔 Received CDC event!");
// Parse the JSON message
JsonNode root = objectMapper.readTree(message);
JsonNode payload = root.path("payload");
// Extract operation type
String operation = payload.path("op").asText();
// Extract before/after states
JsonNode after = payload.path("after");
JsonNode before = payload.path("before");
// Extract timestamp
long timestamp = payload.path("ts_ms").asLong();
// Create audit log entry
AuditLog auditLog = new AuditLog();
auditLog.setOperation(mapOperation(operation));
auditLog.setEventTime(LocalDateTime.ofInstant(
Instant.ofEpochMilli(timestamp),
ZoneId.systemDefault()
));
// Populate data based on operation
if (!after.isNull()) {
// INSERT or UPDATE: Use "after" state
auditLog.setPersonId(after.path("id").asLong());
auditLog.setPersonName(after.path("name").asText());
auditLog.setPersonAge(after.path("age").asInt(0));
} else if (!before.isNull()) {
// DELETE: Use "before" state
auditLog.setPersonId(before.path("id").asLong());
auditLog.setPersonName(before.path("name").asText());
auditLog.setPersonAge(before.path("age").asInt(0));
}
// Save to database
auditLogRepository.save(auditLog);
log.info("✅ AUDITED: {} → Person ID={}, Name={}",
auditLog.getOperation(),
auditLog.getPersonId(),
auditLog.getPersonName()
);
} catch (Exception e) {
log.error("❌ Error processing CDC event", e);
e.printStackTrace();
}
}
private String mapOperation(String op) {
return switch (op) {
case "c" -> "CREATE";
case "u" -> "UPDATE";
case "d" -> "DELETE";
case "r" -> "READ";
default -> op;
};
}
}
Deep Dive into the Consumer:
@Component: Spring manages this as a bean (singleton by default)
@RequiredArgsConstructor: Lombok generates constructor for final fields
- Enables constructor-based dependency injection (recommended)
@Slf4j: Lombok generates a logger field: private static final Logger log = ...
@KafkaListener:
topics: Which Kafka topics to listen to (can be array:{"topic1", "topic2"})groupId: Consumer group (shares load with other consumers)- Spring automatically calls this method for each message
JSON Parsing:
JsonNode root = objectMapper.readTree(message);
JsonNode payload = root.path("payload");
readTree: Parses JSON string to tree structurepath: Navigates JSON safely (returns "missing node" instead of null)
Operation Mapping:
private String mapOperation(String op) {
return switch (op) {
case "c" -> "CREATE";
case "u" -> "UPDATE";
case "d" -> "DELETE";
case "r" -> "READ";
default -> op;
};
}
- Converts Debezium's single-letter codes to readable strings
- "r" is for initial snapshot (when connector first starts)
Timestamp Conversion:
auditLog.setEventTime(LocalDateTime.ofInstant(
Instant.ofEpochMilli(timestamp),
ZoneId.systemDefault()
));
- Debezium sends timestamp as epoch milliseconds (1770479813373)
- Convert to
LocalDateTimefor easy database storage and querying
Handling Different Operations:
if (!after.isNull()) {
// For INSERT and UPDATE
auditLog.setPersonId(after.path("id").asLong());
// ...
} else if (!before.isNull()) {
// For DELETE
auditLog.setPersonId(before.path("id").asLong());
// ...
}
- INSERT:
beforeis null,afterhas data - UPDATE: Both
beforeandafterhave data (we useafterfor current state) - DELETE:
afteris null,beforehas data
Error Handling:
try {
// Process message
} catch (Exception e) {
log.error("❌ Error processing CDC event", e);
e.printStackTrace();
}
- In production, you'd want more sophisticated error handling:
- Dead letter queue (DLQ) for failed messages
- Retry logic with exponential backoff
- Alerting on repeated failures
REST Controller (For Testing)
package com.thejavaengineer.springcdckafka;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class PersonController {
private final PersonRepository personRepository;
private final AuditLogRepository auditLogRepository;
@PostMapping("/persons")
public ResponseEntity<Person> createPerson(@RequestBody Person person) {
Person saved = personRepository.save(person);
return ResponseEntity.ok(saved);
}
@GetMapping("/persons")
public ResponseEntity<List<Person>> getAllPersons() {
return ResponseEntity.ok(personRepository.findAll());
}
@PutMapping("/persons/{id}")
public ResponseEntity<Person> updatePerson(@PathVariable Long id, @RequestBody Person person) {
return personRepository.findById(id)
.map(existing -> {
existing.setName(person.getName());
existing.setAge(person.getAge());
return ResponseEntity.ok(personRepository.save(existing));
})
.orElse(ResponseEntity.notFound().build());
}
@DeleteMapping("/persons/{id}")
public ResponseEntity<Void> deletePerson(@PathVariable Long id) {
personRepository.deleteById(id);
return ResponseEntity.ok().build();
}
@GetMapping("/audit-logs")
public ResponseEntity<List<AuditLog>> getAuditLogs() {
return ResponseEntity.ok(auditLogRepository.findAll());
}
}
REST API Endpoints:
POST /api/persons: Create a personGET /api/persons: List all personsPUT /api/persons/{id}: Update a personDELETE /api/persons/{id}: Delete a personGET /api/audit-logs: View all audit logs
Why @RestController?
- Combines
@Controller+@ResponseBody - Automatically serializes return values to JSON
- Perfect for REST APIs
Why Optional.map()?
return personRepository.findById(id)
.map(existing -> {
// Update and save
return ResponseEntity.ok(personRepository.save(existing));
})
.orElse(ResponseEntity.notFound().build());
- Functional programming style
- Handles "not found" elegantly
- Alternative to if-else null checks
Part 6: Testing the Complete System
Test 1: Direct Database Insert
docker compose exec postgres psql -U postgres -d mydb -c "INSERT INTO person (name, age) VALUES ('John Doe', 30);"
What happens:
- PostgreSQL inserts the row
- PostgreSQL writes to WAL: "INSERT INTO person..."
- Debezium reads the WAL entry
- Debezium creates a JSON message
- Debezium publishes to Kafka topic
dbserver.public.person - Spring Boot consumer receives the message
- Spring Boot parses the JSON
- Spring Boot inserts into
audit_logtable - You see:
✅ AUDITED: CREATE → Person ID=1, Name=John Doe
Timing: This entire process takes milliseconds! Typical lag: 10-50ms.
Test 2: REST API Insert
Invoke-RestMethod -Method Post `
-Uri http://localhost:8080/api/persons `
-Headers @{ "Content-Type" = "application/json" } `
-Body '{"name": "Jane Smith", "age": 25}'
What happens:
- REST request hits Spring Boot
- Spring Boot saves to
persontable - PostgreSQL writes to WAL 4-9. Same as Test 1
Key insight: Doesn't matter HOW the data changes—SQL, REST API, admin tool, migration script—CDC captures it all!
Test 3: Update
Invoke-RestMethod -Method Put `
-Uri http://localhost:8080/api/persons/1 `
-Headers @{ "Content-Type" = "application/json" } `
-Body '{"name": "John Updated", "age": 31}'
Debezium message:
{
"before": {"id": 1, "name": "John Doe", "age": 30, "created_at": 1770479813360635},
"after": {"id": 1, "name": "John Updated", "age": 31, "created_at": 1770479813360635},
"op": "u"
}
Audit log entry:
operation: UPDATE
person_id: 1
person_name: John Updated
person_age: 31
Test 4: Delete
Invoke-RestMethod -Method Delete -Uri http://localhost:8080/api/persons/1
Debezium message:
{
"before": {"id": 1, "name": "John Updated", "age": 31, "created_at": 1770479813360635},
"after": null,
"op": "d"
}
Audit log entry:
operation: DELETE
person_id: 1
person_name: John Updated
person_age: 31
Verifying Audit Logs
# Via REST API
Invoke-RestMethod -Uri http://localhost:8080/api/audit-logs
# Via Database
docker compose exec postgres psql -U postgres -d mydb -c "SELECT * FROM audit_log ORDER BY processed_at DESC;"
Part 7: Advanced Concepts
Consumer Groups and Scalability
Single Consumer:
Topic: dbserver.public.person (3 partitions)
Consumer Group: audit-group
- Consumer 1: Reads partitions 0, 1, 2
Multiple Consumers (Same Group):
Topic: dbserver.public.person (3 partitions)
Consumer Group: audit-group
- Consumer 1: Reads partition 0
- Consumer 2: Reads partition 1
- Consumer 3: Reads partition 2
Result: 3x throughput! Each consumer processes 1/3 of messages.
Multiple Consumer Groups:
Topic: dbserver.public.person
Consumer Group: audit-group
- Consumer A: Creates audit logs
Consumer Group: analytics-group
- Consumer B: Updates analytics dashboard
Consumer Group: cache-group
- Consumer C: Invalidates cache
Result: Pub-sub pattern! All groups receive all messages.
Offset Management
What's an offset? Like a bookmark—tells Kafka "I've read up to here."
Commit strategies:
Auto-commit (what we use):
enable-auto-commit: true auto-commit-interval: 1000 # Every 1 second- Pros: Simple, automatic
- Cons: Possible message loss on crash (1 second window)
Manual commit:
@KafkaListener(topics = "dbserver.public.person") public void consume(String message, Acknowledgment ack) { processMessage(message); ack.acknowledge(); // Commit after successful processing }- Pros: At-least-once delivery guarantee
- Cons: More code, possible duplicates on crash
Transactional:
@Transactional public void consume(String message) { processMessage(message); // Kafka offset commits with database transaction }- Pros: Exactly-once semantics
- Cons: More complex, requires Kafka transactions
Monitoring and Troubleshooting
Check Kafka Topics:
docker compose exec kafka kafka-topics --bootstrap-server kafka:29092 --list
View Messages:
docker compose exec kafka kafka-console-consumer --bootstrap-server kafka:29092 --topic dbserver.public.person --from-beginning
Check Consumer Group Lag:
docker compose exec kafka kafka-consumer-groups --bootstrap-server kafka:29092 --group audit-group --describe
Output:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
dbserver.public.person 0 150 150 0
LAG = 0: Consumer is caught up! ✅ LAG > 0: Consumer is falling behind—might need scaling!
Check Debezium Status:
Invoke-RestMethod -Uri http://localhost:8083/connectors/postgres-connector/status
Check Replication Slot:
docker compose exec postgres psql -U postgres -d mydb -c "SELECT * FROM pg_replication_slots;"
Error Scenarios and Solutions
Problem 1: Consumer can't connect to Kafka
Error: java.net.UnknownHostException: kafka
Solution: Check Kafka listeners configuration. External clients use localhost:9092.
Problem 2: Debezium connector fails
"state": "FAILED",
"trace": "org.postgresql.util.PSQLException: ERROR: permission denied for table person"
Solution: Grant permissions:
GRANT SELECT ON TABLE person TO postgres;
ALTER TABLE person REPLICA IDENTITY FULL;
Problem 3: Messages in Kafka but consumer not receiving
# Check if consumer group is active
docker compose exec kafka kafka-consumer-groups --bootstrap-server kafka:29092 --group audit-group --describe
Solution: Restart Spring Boot application. Check logs for connection errors.
Problem 4: Lag increasing Causes:
- Consumer processing too slow
- Database writes too slow
- Network issues
Solutions:
- Scale horizontally (more consumers)
- Optimize database queries (add indexes)
- Batch inserts in audit log
- Increase partition count
Part 8: Production Considerations
What We Didn't Cover (But Should for Production)
1. Security:
- Kafka: Enable SSL/TLS, SASL authentication
- PostgreSQL: SSL connections, restricted users
- Kafka Connect: Secrets management (not hardcoded passwords)
2. High Availability:
- Kafka: 3+ brokers with replication factor 3
- Zookeeper: 3+ nodes (or migrate to KRaft mode—Zookeeper-less Kafka)
- PostgreSQL: Primary-replica setup
- Spring Boot: Multiple instances behind load balancer
3. Monitoring:
- Kafka: JMX metrics, Prometheus + Grafana
- Debezium: Connector health, lag metrics
- Spring Boot: Actuator endpoints, APM tools (New Relic, Datadog)
4. Data Retention:
- Kafka: Configure topic retention (default: 7 days)
retention.ms=604800000 # 7 days retention.bytes=1073741824 # 1GB - Audit log: Archival strategy (move old records to cold storage)
5. Schema Evolution:
- Avro/Protobuf: Use schema registry instead of JSON
- Backward/forward compatibility
- Smaller message size
- Type safety
6. Disaster Recovery:
- Backup strategies: Regular PostgreSQL backups
- Kafka offset reset: Ability to replay from specific point
- Idempotency: Handle duplicate messages gracefully
7. Performance Tuning:
- Kafka:
batch.size=16384 # Batch size for producer linger.ms=10 # Wait time before sending batch compression.type=snappy # Compress messages - PostgreSQL:
-- Optimize replication slot lag ALTER SYSTEM SET wal_sender_timeout = '60s'; ALTER SYSTEM SET max_replication_slots = 10; - Spring Boot:
spring.kafka.listener: concurrency: 3 # Multiple consumer threads
Cost Optimization
Resource Usage:
- Zookeeper: ~512MB RAM
- Kafka: ~2GB RAM minimum (more for high throughput)
- Postgres: ~1GB RAM minimum
- Kafka Connect: ~1GB RAM
- Spring Boot: ~512MB RAM
Cloud Deployment:
- AWS: MSK (Managed Kafka), RDS (Postgres), ECS/EKS (Spring Boot)
- GCP: Confluent Cloud, Cloud SQL, GKE
- Azure: Event Hubs (Kafka-compatible), Azure Database for PostgreSQL, AKS
Part 9: Common Patterns and Use Cases
Pattern 1: Multi-Table CDC
Monitor multiple tables:
{
"table.include.list": "public.person,public.order,public.payment"
}
Results in topics:
dbserver.public.persondbserver.public.orderdbserver.public.payment
Each needs its own consumer or use routing:
@KafkaListener(topics = {"dbserver.public.person", "dbserver.public.order"})
public void consumeMultiple(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
if (topic.endsWith("person")) {
handlePerson(message);
} else if (topic.endsWith("order")) {
handleOrder(message);
}
}
Pattern 2: Aggregate Events
Combine related changes:
@KafkaListener(topics = "dbserver.public.order")
public void consumeOrder(String message) {
JsonNode payload = parse(message);
Long orderId = payload.path("after").path("id").asLong();
// Load related order items
List<OrderItem> items = orderItemRepository.findByOrderId(orderId);
// Create enriched event
OrderCreatedEvent event = new OrderCreatedEvent(order, items);
eventPublisher.publish(event);
}
Pattern 3: Data Synchronization
Keep two databases in sync:
Source DB (Postgres) → Debezium → Kafka → Consumer → Target DB (MySQL)
@KafkaListener(topics = "dbserver.public.person")
public void syncToMySQL(String message) {
JsonNode after = parse(message).path("payload").path("after");
String op = parse(message).path("payload").path("op").asText();
if ("c".equals(op) || "u".equals(op)) {
mysqlPersonRepository.save(mapToPerson(after));
} else if ("d".equals(op)) {
Long id = after.path("id").asLong();
mysqlPersonRepository.deleteById(id);
}
}
Pattern 4: Event Sourcing
Store all events in event store:
@KafkaListener(topics = "dbserver.public.person")
public void storeEvent(String message) {
PersonEvent event = new PersonEvent();
event.setRawPayload(message);
event.setTimestamp(extractTimestamp(message));
event.setOperation(extractOperation(message));
eventStore.save(event); // Never update, only insert
}
// Later, rebuild state from events
public Person rebuildPersonState(Long personId) {
List<PersonEvent> events = eventStore.findByPersonId(personId);
return events.stream()
.reduce(new Person(), (person, event) -> applyEvent(person, event));
}
Pattern 5: CQRS (Command Query Responsibility Segregation)
Separate write and read models:
Write Model (Postgres) → CDC → Kafka → Read Model (Elasticsearch/MongoDB)
@KafkaListener(topics = "dbserver.public.person")
public void updateReadModel(String message) {
JsonNode after = parse(message).path("payload").path("after");
PersonDocument doc = new PersonDocument();
doc.setId(after.path("id").asLong());
doc.setName(after.path("name").asText());
doc.setAge(after.path("age").asInt());
doc.setSearchableText(doc.getName().toLowerCase());
elasticsearchRepository.save(doc); // Optimized for search
}
Conclusion
You've just built a production-ready Change Data Capture system! Let's recap what you learned:
Key Concepts
- CDC: Captures database changes automatically
- Debezium: Reads database logs and publishes to Kafka
- Kafka: Reliable, scalable message broker
- Spring Boot: Consumes events and takes action
The Power of CDC
Before CDC:
// Manual tracking—error-prone!
person.save();
auditLog.create("Person created"); // What if you forget?
cache.invalidate(person.id);
analytics.update(person);
With CDC:
// Automatic—never miss a change!
person.save();
// CDC handles: audit logs, cache, analytics, notifications, etc.
When to Use CDC
✅ Great for:
- Audit logging
- Data replication
- Cache invalidation
- Event-driven microservices
- Real-time analytics
- Data warehousing (ETL)
❌ Not ideal for:
- Simple CRUD apps (overkill)
- High-latency tolerant systems (CDC adds complexity)
- Applications with no change tracking needs
Next Steps
Learn More:
- Kafka Streams (process events in real-time)
- Schema Registry (manage message schemas)
- Kafka Connect ecosystem (100+ connectors)
Experiment:
- Add filtering (only track certain changes)
- Add transformations (modify messages in flight)
- Multiple consumers (fan-out pattern)
Production Deployment:
- Add monitoring (Prometheus, Grafana)
- Implement error handling (DLQ, retries)
- Add security (SSL, authentication)
Resources
- Debezium Documentation: https://debezium.io/documentation/
- Kafka Documentation: https://kafka.apache.org/documentation/
- Spring Kafka: https://spring.io/projects/spring-kafka
- My GitHub: thejavaengineer/spring-cdc-kafka
Final Thoughts
Change Data Capture transforms how we build data-driven applications. Instead of polling databases or writing complex triggers, we let the database tell us what changed. This pattern scales to millions of events per second and forms the backbone of modern data architectures.
You now have the knowledge to build event-driven systems that react to data changes in real-time. Whether it's compliance, analytics, or microservices communication, CDC is your superpower.
Happy coding! 🚀
Questions? Found this helpful? Leave a comment below or reach out on [Twitter/LinkedIn]. If this tutorial helped you, please star the repository and share it with others!
Tags: #CDC #Debezium #Kafka #SpringBoot #EventDriven #Microservices #Java #PostgreSQL #Tutorial
