Building a Complete Change Data Capture (CDC) System with Debezium, Kafka, and Spring Boot

The Java Engineer
By -
0

Introduction

In this comprehensive guide, we'll build a real-time Change Data Capture (CDC) system that automatically tracks every change made to a PostgreSQL database and creates an audit log. By the end, you'll understand how databases, message brokers, and applications work together to create event-driven architectures.

What We're Building

Imagine this scenario: Every time someone creates, updates, or deletes a person record in your database, you want to automatically log that change in an audit table. No manual triggers, no application-level tracking—just pure, automated database change tracking.

The Flow:

Database Change → Debezium Captures It → Kafka Stores It → Spring Boot Processes It → Audit Log Created

Why This Matters

Traditional applications require you to write code to track changes:

// Traditional way - you have to remember to do this!
person.save();
auditLog.create("Person created", person.id);  // Easy to forget!

With CDC:

// CDC way - automatic!
person.save();  // That's it! CDC handles the rest automatically

Real-world use cases:

  • Compliance & Auditing: Track who changed what and when
  • Data Replication: Keep multiple databases in sync
  • Event-Driven Architecture: Trigger actions based on data changes
  • Cache Invalidation: Update caches when data changes
  • Analytics: Feed data warehouses in real-time
  • Microservices Communication: Notify other services about changes

Understanding the Architecture

The Components

1. PostgreSQL with Debezium Image

PostgreSQL is our source database, but not just any PostgreSQL—we use a special Debezium-configured version that has Write-Ahead Logging (WAL) enabled.

What is WAL? PostgreSQL keeps a log of every change before applying it. This is called the Write-Ahead Log. It's like a journal where PostgreSQL writes "I'm about to insert this record" before actually doing it. This ensures data safety if the server crashes.

Why do we need it for CDC? Debezium reads this WAL to see what changed. It's like reading someone's journal to know what they did.

2. Zookeeper

Zookeeper is Kafka's coordinator. It's like a manager that keeps track of:

  • Which Kafka brokers are alive
  • Which topics exist
  • Leader election for partitions
  • Configuration management

Why do we need it? Kafka (especially version 7.3.0 we're using) needs Zookeeper to coordinate distributed operations. Think of it as the "brain" that helps Kafka instances communicate.

3. Kafka

Kafka is a distributed message broker—a super-fast, reliable post office for data.

How it works:

  • Topics: Like mail folders (e.g., "dbserver.public.person")
  • Producers: Applications that send messages (Debezium)
  • Consumers: Applications that read messages (Spring Boot)
  • Partitions: Divide topics for parallelism
  • Offsets: Keep track of which messages you've read

Why Kafka and not a database queue?

  • Handles millions of messages per second
  • Keeps messages even after they're read (replay capability)
  • Distributed and fault-tolerant
  • Scales horizontally

4. Kafka Connect (Debezium)

Kafka Connect is a framework for moving data in and out of Kafka. Debezium is a connector plugin that specializes in CDC.

What Debezium does:

  1. Connects to PostgreSQL
  2. Reads the Write-Ahead Log (WAL)
  3. Converts database changes to JSON messages
  4. Publishes them to Kafka topics

The magic: All of this happens automatically in real-time!

5. Spring Boot Application

Our Java application that:

  • Listens to Kafka topics
  • Processes CDC events
  • Creates audit log entries
  • Provides REST API for testing

Part 1: Setting Up the Infrastructure with Docker Compose

Why Docker Compose?

Instead of installing PostgreSQL, Kafka, Zookeeper, and Kafka Connect separately on your machine, Docker Compose lets us define all services in one file and start them with a single command.

Benefits:

  • Consistent environment (works the same everywhere)
  • Easy to start/stop everything
  • Isolated from your system
  • Easy to reset and start fresh

The Complete docker-compose.yml

Let's break down each service:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    healthcheck:
      test: ["CMD", "echo", "ruok", "|", "nc", "localhost", "2181"]
      interval: 10s
      timeout: 5s
      retries: 5
      start_period: 30s

Explanation:

  • image: Uses Confluent's Zookeeper (production-ready distribution)
  • ports: Maps container port 2181 to host port 2181 (default Zookeeper port)
  • ZOOKEEPER_CLIENT_PORT: The port clients use to connect
  • ZOOKEEPER_TICK_TIME: Basic time unit (milliseconds) for heartbeats
  • healthcheck: Docker checks if Zookeeper is ready by sending "ruok" (are you ok?) command
  • start_period: 30s: Gives Zookeeper 30 seconds to start before checking health
  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    healthcheck:
      test: ["CMD-SHELL", "nc -z localhost 9092 || exit 1"]
      interval: 10s
      timeout: 5s
      retries: 10
      start_period: 60s

Critical Kafka Configuration Explained:

Why Two Listeners? This is the trickiest part! Kafka needs to be accessible from two places:

  1. Inside Docker network: Other containers (like Debezium) use kafka:29092
  2. Outside Docker (your Windows machine): Spring Boot uses localhost:9092

The Listener Configuration:

  • KAFKA_LISTENERS: Where Kafka actually listens (0.0.0.0 means all interfaces)

    • PLAINTEXT://0.0.0.0:29092: Internal Docker communication
    • PLAINTEXT_HOST://0.0.0.0:9092: External host communication
  • KAFKA_ADVERTISED_LISTENERS: What Kafka tells clients to use

    • PLAINTEXT://kafka:29092: Containers connect here
    • PLAINTEXT_HOST://localhost:9092: Host machine connects here

Why This Matters: When Debezium publishes a message, it connects using kafka:29092. Kafka then tells future consumers (your Spring Boot app) "you can reach me at localhost:9092". This is why your Spring Boot app on Windows can connect!

Other Settings:

  • KAFKA_BROKER_ID: 1: Unique ID for this Kafka instance (important in clusters)
  • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1: For single-node setup (would be 3 in production)
  • KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0: Start consuming immediately
  • healthcheck: Verifies Kafka port 9092 is accepting connections
  postgres:
    image: debezium/postgres:13
    container_name: postgres
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: mydb
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 10s
      timeout: 5s
      retries: 5

Why debezium/postgres and not regular postgres? The Debezium PostgreSQL image comes pre-configured with:

  • WAL level set to logical: Regular PostgreSQL uses "replica" level, but CDC needs "logical"
  • wal2json plugin: Converts WAL to JSON format
  • Proper permissions: Allows Debezium to create replication slots

What's a Replication Slot? A replication slot is like a bookmark in the WAL. It tells PostgreSQL "keep the log from this point onwards because someone (Debezium) is reading it."

  connect:
    image: debezium/connect:2.1
    container_name: connect
    ports:
      - "8083:8083"
    depends_on:
      kafka:
        condition: service_healthy
      postgres:
        condition: service_healthy
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8083/"]
      interval: 30s
      timeout: 10s
      retries: 5

Kafka Connect Configuration:

  • BOOTSTRAP_SERVERS: kafka:29092: Connects to Kafka using internal Docker network
  • GROUP_ID: 1: Identifies this Connect cluster
  • CONFIG_STORAGE_TOPIC: Stores connector configurations in Kafka
  • OFFSET_STORAGE_TOPIC: Stores position in the WAL (so it can resume after restart)
  • STATUS_STORAGE_TOPIC: Stores connector health status

Why port 8083? Kafka Connect provides a REST API on port 8083 for managing connectors (create, delete, update, status check).

Starting the Infrastructure

# Stop and remove any existing containers (clean slate)
docker compose down -v

# Start all services in detached mode (background)
docker compose up -d

# Wait for services to become healthy
Start-Sleep -Seconds 120

# Check status
docker compose ps

What -v does: Removes volumes (stored data). This ensures you start completely fresh—useful for testing!

Why wait 2 minutes?

  • Zookeeper needs ~10-15 seconds to start
  • Kafka needs ~30-45 seconds (waits for Zookeeper, then initializes)
  • Postgres needs ~10-15 seconds
  • Kafka Connect needs ~45-60 seconds (waits for Kafka, loads plugins)

Verifying Health: All containers should show "Up (healthy)". The healthcheck ensures:

  • Zookeeper is accepting connections
  • Kafka broker is ready
  • Postgres is accepting queries
  • Kafka Connect REST API is responding

Part 2: Creating the Database Schema

CREATE TABLE person (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    age INT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE audit_log (
    id SERIAL PRIMARY KEY,
    operation VARCHAR(10),
    person_id BIGINT,
    person_name VARCHAR(100),
    person_age INT,
    event_time TIMESTAMP,
    processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

The person Table: This is our source table—the one we'll monitor for changes.

  • id SERIAL PRIMARY KEY: Auto-incrementing ID (essential for CDC to identify records)
  • name VARCHAR(100) NOT NULL: Required field
  • age INT: Optional field
  • created_at TIMESTAMP: Tracks when record was created

The audit_log Table: This stores our audit trail.

  • operation: Type of change (CREATE, UPDATE, DELETE)
  • person_id: Which person was affected
  • person_name, person_age: Snapshot of data at the time of change
  • event_time: When the database change occurred
  • processed_at: When our Spring Boot app processed it (useful for debugging lag)

Run it:

docker compose exec postgres psql -U postgres -d mydb -c "
CREATE TABLE IF NOT EXISTS person (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    age INT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE IF NOT EXISTS audit_log (
    id SERIAL PRIMARY KEY,
    operation VARCHAR(10),
    person_id BIGINT,
    person_name VARCHAR(100),
    person_age INT,
    event_time TIMESTAMP,
    processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"

Part 3: Configuring Debezium Connector

The connector is the "bridge" between PostgreSQL and Kafka. We configure it via REST API.

{
  "name": "postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "mydb",
    "topic.prefix": "dbserver",
    "table.include.list": "public.person",
    "plugin.name": "pgoutput",
    "slot.name": "debezium",
    "publication.name": "dbz_publication",
    "publication.autocreate.mode": "all_tables"
  }
}

Configuration Breakdown:

Basic Connection:

  • connector.class: Tells Kafka Connect this is a PostgreSQL connector
  • database.hostname: Docker service name (not "localhost"!)
  • database.port: Standard PostgreSQL port
  • database.user/password: Credentials

Topic Configuration:

  • topic.prefix: "dbserver" → Creates topics like dbserver.public.person
    • Format: {prefix}.{schema}.{table}
    • Why? Helps organize topics, especially with multiple databases

Table Selection:

  • table.include.list: "public.person" → Only monitor the person table
    • You could monitor multiple: "public.person,public.orders"
    • Or use patterns: "public.*" for all tables in public schema

PostgreSQL-Specific Settings:

  • plugin.name: "pgoutput" → The logical decoding plugin

    • Built into PostgreSQL 10+
    • Alternative: "wal2json" (older setups)
  • slot.name: "debezium" → Name of the replication slot

    • PostgreSQL creates a slot with this name
    • Prevents WAL from being deleted while Debezium is reading
  • publication.name: "dbz_publication" → Name of the PostgreSQL publication

    • Publications define which tables to replicate
  • publication.autocreate.mode: "all_tables"

    • Debezium automatically creates the publication
    • Alternative: "filtered" (only specified tables)

Create the Connector:

Invoke-RestMethod -Method Post `
  -Uri http://localhost:8083/connectors `
  -Headers @{ "Content-Type" = "application/json" } `
  -Body '{
    "name": "postgres-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "postgres",
      "database.password": "postgres",
      "database.dbname": "mydb",
      "topic.prefix": "dbserver",
      "table.include.list": "public.person",
      "plugin.name": "pgoutput",
      "slot.name": "debezium",
      "publication.name": "dbz_publication",
      "publication.autocreate.mode": "all_tables"
    }
  }'

Verify It's Running:

Invoke-RestMethod -Uri http://localhost:8083/connectors/postgres-connector/status

You should see:

{
  "name": "postgres-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "172.19.0.5:8083"
  },
  "tasks": [{
    "id": 0,
    "state": "RUNNING",
    "worker_id": "172.19.0.5:8083"
  }]
}

What "RUNNING" means:

  • Debezium connected to PostgreSQL successfully
  • Created the replication slot
  • Started reading the WAL
  • Publishing changes to Kafka

Part 4: Understanding the CDC Message Format

When you insert a person, Debezium creates a JSON message like this:

{
  "schema": { ... },
  "payload": {
    "before": null,
    "after": {
      "id": 1,
      "name": "John Doe",
      "age": 25,
      "created_at": 1770479813360635
    },
    "source": {
      "version": "2.1.4.Final",
      "connector": "postgresql",
      "name": "dbserver",
      "ts_ms": 1770479813373,
      "snapshot": "false",
      "db": "mydb",
      "schema": "public",
      "table": "person",
      "txId": 490,
      "lsn": 23880744
    },
    "op": "c",
    "ts_ms": 1770479813840
  }
}

Breaking It Down:

The payload object:

  1. before: The row data BEFORE the change

    • null for INSERT (nothing existed before)
    • Contains old values for UPDATE
    • Contains deleted values for DELETE
  2. after: The row data AFTER the change

    • Contains new values for INSERT and UPDATE
    • null for DELETE (nothing exists after)
  3. source: Metadata about where this came from

    • ts_ms: When the database change occurred (epoch milliseconds)
    • db, schema, table: Identifies the exact table
    • txId: Transaction ID (useful for grouping related changes)
    • lsn: Log Sequence Number (position in WAL)
  4. op: Operation type

    • "c": CREATE (INSERT)
    • "u": UPDATE
    • "d": DELETE
    • "r": READ (initial snapshot)

Why two timestamps?

  • source.ts_ms: When PostgreSQL committed the transaction
  • payload.ts_ms: When Debezium processed it
  • Difference = lag (should be milliseconds in real-time systems)

Example Messages:

INSERT:

{
  "before": null,
  "after": {"id": 1, "name": "John", "age": 30},
  "op": "c"
}

UPDATE:

{
  "before": {"id": 1, "name": "John", "age": 30},
  "after": {"id": 1, "name": "John", "age": 31},
  "op": "u"
}

DELETE:

{
  "before": {"id": 1, "name": "John", "age": 31},
  "after": null,
  "op": "d"
}

Part 5: Building the Spring Boot Application

Project Setup

Dependencies (build.gradle):

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'org.springframework.kafka:spring-kafka'
    runtimeOnly 'org.postgresql:postgresql'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    implementation 'com.fasterxml.jackson.core:jackson-databind'
}

Why each dependency?

  • spring-boot-starter-web: REST API support
  • spring-boot-starter-data-jpa: Database access with JPA/Hibernate
  • spring-kafka: Kafka consumer support
  • postgresql: PostgreSQL JDBC driver
  • lombok: Reduces boilerplate (getters, setters, constructors)
  • jackson-databind: JSON parsing (already included, but explicit is good)

Application Configuration (application.yml)

spring:
  application:
    name: spring-cdc-kafka
  datasource:
    url: jdbc:postgresql://localhost:5432/mydb
    username: postgres
    password: postgres
  jpa:
    hibernate:
      ddl-auto: update
    show-sql: true
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: audit-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: true

server:
  port: 8080

logging:
  level:
    org.springframework.kafka: DEBUG
    org.apache.kafka: INFO
    com.thejavaengineer.springcdckafka: DEBUG

Configuration Explained:

DataSource:

  • url: jdbc:postgresql://localhost:5432/mydb: Connects to Postgres on host machine
    • Why localhost? Spring Boot runs on Windows, not in Docker
    • Docker exposes Postgres port 5432 to host

JPA/Hibernate:

  • ddl-auto: update: Auto-creates/updates tables (convenient for development)

    • Other options: create, create-drop, validate, none
    • Production: Use validate or none + Flyway/Liquibase for migrations
  • show-sql: true: Logs SQL queries (great for debugging)

Kafka Consumer:

  • bootstrap-servers: localhost:9092: Connects to Kafka on host machine

    • Remember: Kafka advertises localhost:9092 for external clients
  • group-id: audit-group: Consumer group name

    • Multiple consumers with same group-id share the workload
    • Each message delivered to only one consumer in the group
    • Different groups all receive the same messages (pub-sub pattern)
  • auto-offset-reset: earliest: What to do when no offset exists

    • earliest: Read from the beginning of the topic
    • latest: Read only new messages
    • none: Throw error if no offset found
  • enable-auto-commit: true: Automatically mark messages as processed

    • Alternative: Manual commit for more control (at-least-once vs at-most-once)

Deserializers:

  • Convert bytes from Kafka into Java objects
  • StringDeserializer: Treats messages as UTF-8 strings
  • We'll parse JSON manually using Jackson

Entity Classes

AuditLog.java:

package com.thejavaengineer.springcdckafka;

import jakarta.persistence.*;
import lombok.Data;
import java.time.LocalDateTime;

@Entity
@Table(name = "audit_log")
@Data
public class AuditLog {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    private String operation;
    private Long personId;
    private String personName;
    private Integer personAge;
    private LocalDateTime eventTime;
    
    @Column(name = "processed_at")
    private LocalDateTime processedAt = LocalDateTime.now();
}

Annotations Explained:

  • @Entity: Marks this as a JPA entity (database table)
  • @Table(name = "audit_log"): Maps to "audit_log" table
  • @Data: Lombok generates getters, setters, toString, equals, hashCode
  • @Id: Primary key field
  • @GeneratedValue(strategy = GenerationType.IDENTITY): Database auto-generates ID
  • @Column(name = "processed_at"): Maps to "processed_at" column (Java uses camelCase)

Person.java:

package com.thejavaengineer.springcdckafka;

import jakarta.persistence.*;
import lombok.Data;
import java.time.LocalDateTime;

@Entity
@Table(name = "person")
@Data
public class Person {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    private String name;
    private Integer age;
    
    @Column(name = "created_at")
    private LocalDateTime createdAt;
}

Repository Interfaces

package com.thejavaengineer.springcdckafka;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface AuditLogRepository extends JpaRepository<AuditLog, Long> {
}

@Repository
public interface PersonRepository extends JpaRepository<Person, Long> {
}

What JpaRepository provides:

  • save(entity): Insert or update
  • findById(id): Find by primary key
  • findAll(): Get all records
  • deleteById(id): Delete by primary key
  • count(): Count records
  • And 20+ more methods!

Spring Data JPA magic: You can add custom methods like:

List<AuditLog> findByOperation(String operation);
List<AuditLog> findByPersonIdOrderByEventTimeDesc(Long personId);

Spring automatically implements them based on method names!

Kafka Configuration

package com.thejavaengineer.springcdckafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Why this configuration?

@Configuration: Tells Spring this class defines beans (components)

@EnableKafka: Enables Kafka listener annotations (@KafkaListener)

ConsumerFactory: Factory pattern for creating Kafka consumers

  • Encapsulates all configuration
  • Allows easy customization per consumer

ConcurrentKafkaListenerContainerFactory: Creates listener containers

  • "Concurrent" means multiple threads can process messages
  • Each thread handles messages from different partitions
  • Default: Number of partitions = number of threads

Why separate factory and config?

  • Flexibility: Can have multiple consumer factories with different configs
  • Testing: Easy to mock
  • Spring best practices: Separation of concerns

The Kafka Consumer (The Heart of the Application)

package com.thejavaengineer.springcdckafka;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;

@Component
@RequiredArgsConstructor
@Slf4j
public class DebeziumConsumer {

    private final AuditLogRepository auditLogRepository;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @KafkaListener(topics = "dbserver.public.person", groupId = "audit-group")
    public void consume(String message) {
        try {
            log.info("🔔 Received CDC event!");

            // Parse the JSON message
            JsonNode root = objectMapper.readTree(message);
            JsonNode payload = root.path("payload");

            // Extract operation type
            String operation = payload.path("op").asText();
            
            // Extract before/after states
            JsonNode after = payload.path("after");
            JsonNode before = payload.path("before");
            
            // Extract timestamp
            long timestamp = payload.path("ts_ms").asLong();

            // Create audit log entry
            AuditLog auditLog = new AuditLog();
            auditLog.setOperation(mapOperation(operation));
            auditLog.setEventTime(LocalDateTime.ofInstant(
                    Instant.ofEpochMilli(timestamp),
                    ZoneId.systemDefault()
            ));

            // Populate data based on operation
            if (!after.isNull()) {
                // INSERT or UPDATE: Use "after" state
                auditLog.setPersonId(after.path("id").asLong());
                auditLog.setPersonName(after.path("name").asText());
                auditLog.setPersonAge(after.path("age").asInt(0));
            } else if (!before.isNull()) {
                // DELETE: Use "before" state
                auditLog.setPersonId(before.path("id").asLong());
                auditLog.setPersonName(before.path("name").asText());
                auditLog.setPersonAge(before.path("age").asInt(0));
            }

            // Save to database
            auditLogRepository.save(auditLog);

            log.info("✅ AUDITED: {} → Person ID={}, Name={}",
                    auditLog.getOperation(),
                    auditLog.getPersonId(),
                    auditLog.getPersonName()
            );

        } catch (Exception e) {
            log.error("❌ Error processing CDC event", e);
            e.printStackTrace();
        }
    }

    private String mapOperation(String op) {
        return switch (op) {
            case "c" -> "CREATE";
            case "u" -> "UPDATE";
            case "d" -> "DELETE";
            case "r" -> "READ";
            default -> op;
        };
    }
}

Deep Dive into the Consumer:

@Component: Spring manages this as a bean (singleton by default)

@RequiredArgsConstructor: Lombok generates constructor for final fields

  • Enables constructor-based dependency injection (recommended)

@Slf4j: Lombok generates a logger field: private static final Logger log = ...

@KafkaListener:

  • topics: Which Kafka topics to listen to (can be array: {"topic1", "topic2"})
  • groupId: Consumer group (shares load with other consumers)
  • Spring automatically calls this method for each message

JSON Parsing:

JsonNode root = objectMapper.readTree(message);
JsonNode payload = root.path("payload");
  • readTree: Parses JSON string to tree structure
  • path: Navigates JSON safely (returns "missing node" instead of null)

Operation Mapping:

private String mapOperation(String op) {
    return switch (op) {
        case "c" -> "CREATE";
        case "u" -> "UPDATE";
        case "d" -> "DELETE";
        case "r" -> "READ";
        default -> op;
    };
}
  • Converts Debezium's single-letter codes to readable strings
  • "r" is for initial snapshot (when connector first starts)

Timestamp Conversion:

auditLog.setEventTime(LocalDateTime.ofInstant(
    Instant.ofEpochMilli(timestamp),
    ZoneId.systemDefault()
));
  • Debezium sends timestamp as epoch milliseconds (1770479813373)
  • Convert to LocalDateTime for easy database storage and querying

Handling Different Operations:

if (!after.isNull()) {
    // For INSERT and UPDATE
    auditLog.setPersonId(after.path("id").asLong());
    // ...
} else if (!before.isNull()) {
    // For DELETE
    auditLog.setPersonId(before.path("id").asLong());
    // ...
}
  • INSERT: before is null, after has data
  • UPDATE: Both before and after have data (we use after for current state)
  • DELETE: after is null, before has data

Error Handling:

try {
    // Process message
} catch (Exception e) {
    log.error("❌ Error processing CDC event", e);
    e.printStackTrace();
}
  • In production, you'd want more sophisticated error handling:
    • Dead letter queue (DLQ) for failed messages
    • Retry logic with exponential backoff
    • Alerting on repeated failures

REST Controller (For Testing)

package com.thejavaengineer.springcdckafka;

import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class PersonController {

    private final PersonRepository personRepository;
    private final AuditLogRepository auditLogRepository;

    @PostMapping("/persons")
    public ResponseEntity<Person> createPerson(@RequestBody Person person) {
        Person saved = personRepository.save(person);
        return ResponseEntity.ok(saved);
    }

    @GetMapping("/persons")
    public ResponseEntity<List<Person>> getAllPersons() {
        return ResponseEntity.ok(personRepository.findAll());
    }

    @PutMapping("/persons/{id}")
    public ResponseEntity<Person> updatePerson(@PathVariable Long id, @RequestBody Person person) {
        return personRepository.findById(id)
            .map(existing -> {
                existing.setName(person.getName());
                existing.setAge(person.getAge());
                return ResponseEntity.ok(personRepository.save(existing));
            })
            .orElse(ResponseEntity.notFound().build());
    }

    @DeleteMapping("/persons/{id}")
    public ResponseEntity<Void> deletePerson(@PathVariable Long id) {
        personRepository.deleteById(id);
        return ResponseEntity.ok().build();
    }

    @GetMapping("/audit-logs")
    public ResponseEntity<List<AuditLog>> getAuditLogs() {
        return ResponseEntity.ok(auditLogRepository.findAll());
    }
}

REST API Endpoints:

  • POST /api/persons: Create a person
  • GET /api/persons: List all persons
  • PUT /api/persons/{id}: Update a person
  • DELETE /api/persons/{id}: Delete a person
  • GET /api/audit-logs: View all audit logs

Why @RestController?

  • Combines @Controller + @ResponseBody
  • Automatically serializes return values to JSON
  • Perfect for REST APIs

Why Optional.map()?

return personRepository.findById(id)
    .map(existing -> {
        // Update and save
        return ResponseEntity.ok(personRepository.save(existing));
    })
    .orElse(ResponseEntity.notFound().build());
  • Functional programming style
  • Handles "not found" elegantly
  • Alternative to if-else null checks

Part 6: Testing the Complete System

Test 1: Direct Database Insert

docker compose exec postgres psql -U postgres -d mydb -c "INSERT INTO person (name, age) VALUES ('John Doe', 30);"

What happens:

  1. PostgreSQL inserts the row
  2. PostgreSQL writes to WAL: "INSERT INTO person..."
  3. Debezium reads the WAL entry
  4. Debezium creates a JSON message
  5. Debezium publishes to Kafka topic dbserver.public.person
  6. Spring Boot consumer receives the message
  7. Spring Boot parses the JSON
  8. Spring Boot inserts into audit_log table
  9. You see: ✅ AUDITED: CREATE → Person ID=1, Name=John Doe

Timing: This entire process takes milliseconds! Typical lag: 10-50ms.

Test 2: REST API Insert

Invoke-RestMethod -Method Post `
  -Uri http://localhost:8080/api/persons `
  -Headers @{ "Content-Type" = "application/json" } `
  -Body '{"name": "Jane Smith", "age": 25}'

What happens:

  1. REST request hits Spring Boot
  2. Spring Boot saves to person table
  3. PostgreSQL writes to WAL 4-9. Same as Test 1

Key insight: Doesn't matter HOW the data changes—SQL, REST API, admin tool, migration script—CDC captures it all!

Test 3: Update

Invoke-RestMethod -Method Put `
  -Uri http://localhost:8080/api/persons/1 `
  -Headers @{ "Content-Type" = "application/json" } `
  -Body '{"name": "John Updated", "age": 31}'

Debezium message:

{
  "before": {"id": 1, "name": "John Doe", "age": 30, "created_at": 1770479813360635},
  "after": {"id": 1, "name": "John Updated", "age": 31, "created_at": 1770479813360635},
  "op": "u"
}

Audit log entry:

operation: UPDATE
person_id: 1
person_name: John Updated
person_age: 31

Test 4: Delete

Invoke-RestMethod -Method Delete -Uri http://localhost:8080/api/persons/1

Debezium message:

{
  "before": {"id": 1, "name": "John Updated", "age": 31, "created_at": 1770479813360635},
  "after": null,
  "op": "d"
}

Audit log entry:

operation: DELETE
person_id: 1
person_name: John Updated
person_age: 31

Verifying Audit Logs

# Via REST API
Invoke-RestMethod -Uri http://localhost:8080/api/audit-logs

# Via Database
docker compose exec postgres psql -U postgres -d mydb -c "SELECT * FROM audit_log ORDER BY processed_at DESC;"

Part 7: Advanced Concepts

Consumer Groups and Scalability

Single Consumer:

Topic: dbserver.public.person (3 partitions)
Consumer Group: audit-group
  - Consumer 1: Reads partitions 0, 1, 2

Multiple Consumers (Same Group):

Topic: dbserver.public.person (3 partitions)
Consumer Group: audit-group
  - Consumer 1: Reads partition 0
  - Consumer 2: Reads partition 1
  - Consumer 3: Reads partition 2

Result: 3x throughput! Each consumer processes 1/3 of messages.

Multiple Consumer Groups:

Topic: dbserver.public.person
Consumer Group: audit-group
  - Consumer A: Creates audit logs
Consumer Group: analytics-group
  - Consumer B: Updates analytics dashboard
Consumer Group: cache-group
  - Consumer C: Invalidates cache

Result: Pub-sub pattern! All groups receive all messages.

Offset Management

What's an offset? Like a bookmark—tells Kafka "I've read up to here."

Commit strategies:

  1. Auto-commit (what we use):

    enable-auto-commit: true
    auto-commit-interval: 1000  # Every 1 second
    
    • Pros: Simple, automatic
    • Cons: Possible message loss on crash (1 second window)
  2. Manual commit:

    @KafkaListener(topics = "dbserver.public.person")
    public void consume(String message, Acknowledgment ack) {
        processMessage(message);
        ack.acknowledge();  // Commit after successful processing
    }
    
    • Pros: At-least-once delivery guarantee
    • Cons: More code, possible duplicates on crash
  3. Transactional:

    @Transactional
    public void consume(String message) {
        processMessage(message);
        // Kafka offset commits with database transaction
    }
    
    • Pros: Exactly-once semantics
    • Cons: More complex, requires Kafka transactions

Monitoring and Troubleshooting

Check Kafka Topics:

docker compose exec kafka kafka-topics --bootstrap-server kafka:29092 --list

View Messages:

docker compose exec kafka kafka-console-consumer --bootstrap-server kafka:29092 --topic dbserver.public.person --from-beginning

Check Consumer Group Lag:

docker compose exec kafka kafka-consumer-groups --bootstrap-server kafka:29092 --group audit-group --describe

Output:

TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
dbserver.public.person   0          150             150             0

LAG = 0: Consumer is caught up! ✅ LAG > 0: Consumer is falling behind—might need scaling!

Check Debezium Status:

Invoke-RestMethod -Uri http://localhost:8083/connectors/postgres-connector/status

Check Replication Slot:

docker compose exec postgres psql -U postgres -d mydb -c "SELECT * FROM pg_replication_slots;"

Error Scenarios and Solutions

Problem 1: Consumer can't connect to Kafka

Error: java.net.UnknownHostException: kafka

Solution: Check Kafka listeners configuration. External clients use localhost:9092.

Problem 2: Debezium connector fails

"state": "FAILED",
"trace": "org.postgresql.util.PSQLException: ERROR: permission denied for table person"

Solution: Grant permissions:

GRANT SELECT ON TABLE person TO postgres;
ALTER TABLE person REPLICA IDENTITY FULL;

Problem 3: Messages in Kafka but consumer not receiving

# Check if consumer group is active
docker compose exec kafka kafka-consumer-groups --bootstrap-server kafka:29092 --group audit-group --describe

Solution: Restart Spring Boot application. Check logs for connection errors.

Problem 4: Lag increasing Causes:

  • Consumer processing too slow
  • Database writes too slow
  • Network issues

Solutions:

  • Scale horizontally (more consumers)
  • Optimize database queries (add indexes)
  • Batch inserts in audit log
  • Increase partition count

Part 8: Production Considerations

What We Didn't Cover (But Should for Production)

1. Security:

  • Kafka: Enable SSL/TLS, SASL authentication
  • PostgreSQL: SSL connections, restricted users
  • Kafka Connect: Secrets management (not hardcoded passwords)

2. High Availability:

  • Kafka: 3+ brokers with replication factor 3
  • Zookeeper: 3+ nodes (or migrate to KRaft mode—Zookeeper-less Kafka)
  • PostgreSQL: Primary-replica setup
  • Spring Boot: Multiple instances behind load balancer

3. Monitoring:

  • Kafka: JMX metrics, Prometheus + Grafana
  • Debezium: Connector health, lag metrics
  • Spring Boot: Actuator endpoints, APM tools (New Relic, Datadog)

4. Data Retention:

  • Kafka: Configure topic retention (default: 7 days)
    retention.ms=604800000  # 7 days
    retention.bytes=1073741824  # 1GB
    
  • Audit log: Archival strategy (move old records to cold storage)

5. Schema Evolution:

  • Avro/Protobuf: Use schema registry instead of JSON
    • Backward/forward compatibility
    • Smaller message size
    • Type safety

6. Disaster Recovery:

  • Backup strategies: Regular PostgreSQL backups
  • Kafka offset reset: Ability to replay from specific point
  • Idempotency: Handle duplicate messages gracefully

7. Performance Tuning:

  • Kafka:
    batch.size=16384  # Batch size for producer
    linger.ms=10      # Wait time before sending batch
    compression.type=snappy  # Compress messages
    
  • PostgreSQL:
    -- Optimize replication slot lag
    ALTER SYSTEM SET wal_sender_timeout = '60s';
    ALTER SYSTEM SET max_replication_slots = 10;
    
  • Spring Boot:
    spring.kafka.listener:
      concurrency: 3  # Multiple consumer threads
    

Cost Optimization

Resource Usage:

  • Zookeeper: ~512MB RAM
  • Kafka: ~2GB RAM minimum (more for high throughput)
  • Postgres: ~1GB RAM minimum
  • Kafka Connect: ~1GB RAM
  • Spring Boot: ~512MB RAM

Cloud Deployment:

  • AWS: MSK (Managed Kafka), RDS (Postgres), ECS/EKS (Spring Boot)
  • GCP: Confluent Cloud, Cloud SQL, GKE
  • Azure: Event Hubs (Kafka-compatible), Azure Database for PostgreSQL, AKS

Part 9: Common Patterns and Use Cases

Pattern 1: Multi-Table CDC

Monitor multiple tables:

{
  "table.include.list": "public.person,public.order,public.payment"
}

Results in topics:

  • dbserver.public.person
  • dbserver.public.order
  • dbserver.public.payment

Each needs its own consumer or use routing:

@KafkaListener(topics = {"dbserver.public.person", "dbserver.public.order"})
public void consumeMultiple(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    if (topic.endsWith("person")) {
        handlePerson(message);
    } else if (topic.endsWith("order")) {
        handleOrder(message);
    }
}

Pattern 2: Aggregate Events

Combine related changes:

@KafkaListener(topics = "dbserver.public.order")
public void consumeOrder(String message) {
    JsonNode payload = parse(message);
    Long orderId = payload.path("after").path("id").asLong();
    
    // Load related order items
    List<OrderItem> items = orderItemRepository.findByOrderId(orderId);
    
    // Create enriched event
    OrderCreatedEvent event = new OrderCreatedEvent(order, items);
    eventPublisher.publish(event);
}

Pattern 3: Data Synchronization

Keep two databases in sync:

Source DB (Postgres) → Debezium → Kafka → Consumer → Target DB (MySQL)
@KafkaListener(topics = "dbserver.public.person")
public void syncToMySQL(String message) {
    JsonNode after = parse(message).path("payload").path("after");
    String op = parse(message).path("payload").path("op").asText();
    
    if ("c".equals(op) || "u".equals(op)) {
        mysqlPersonRepository.save(mapToPerson(after));
    } else if ("d".equals(op)) {
        Long id = after.path("id").asLong();
        mysqlPersonRepository.deleteById(id);
    }
}

Pattern 4: Event Sourcing

Store all events in event store:

@KafkaListener(topics = "dbserver.public.person")
public void storeEvent(String message) {
    PersonEvent event = new PersonEvent();
    event.setRawPayload(message);
    event.setTimestamp(extractTimestamp(message));
    event.setOperation(extractOperation(message));
    eventStore.save(event);  // Never update, only insert
}

// Later, rebuild state from events
public Person rebuildPersonState(Long personId) {
    List<PersonEvent> events = eventStore.findByPersonId(personId);
    return events.stream()
        .reduce(new Person(), (person, event) -> applyEvent(person, event));
}

Pattern 5: CQRS (Command Query Responsibility Segregation)

Separate write and read models:

Write Model (Postgres) → CDC → Kafka → Read Model (Elasticsearch/MongoDB)
@KafkaListener(topics = "dbserver.public.person")
public void updateReadModel(String message) {
    JsonNode after = parse(message).path("payload").path("after");
    
    PersonDocument doc = new PersonDocument();
    doc.setId(after.path("id").asLong());
    doc.setName(after.path("name").asText());
    doc.setAge(after.path("age").asInt());
    doc.setSearchableText(doc.getName().toLowerCase());
    
    elasticsearchRepository.save(doc);  // Optimized for search
}

Conclusion

You've just built a production-ready Change Data Capture system! Let's recap what you learned:

Key Concepts

  1. CDC: Captures database changes automatically
  2. Debezium: Reads database logs and publishes to Kafka
  3. Kafka: Reliable, scalable message broker
  4. Spring Boot: Consumes events and takes action

The Power of CDC

Before CDC:

// Manual tracking—error-prone!
person.save();
auditLog.create("Person created");  // What if you forget?
cache.invalidate(person.id);
analytics.update(person);

With CDC:

// Automatic—never miss a change!
person.save();
// CDC handles: audit logs, cache, analytics, notifications, etc.

When to Use CDC

✅ Great for:

  • Audit logging
  • Data replication
  • Cache invalidation
  • Event-driven microservices
  • Real-time analytics
  • Data warehousing (ETL)

❌ Not ideal for:

  • Simple CRUD apps (overkill)
  • High-latency tolerant systems (CDC adds complexity)
  • Applications with no change tracking needs

Next Steps

  1. Learn More:

    • Kafka Streams (process events in real-time)
    • Schema Registry (manage message schemas)
    • Kafka Connect ecosystem (100+ connectors)
  2. Experiment:

    • Add filtering (only track certain changes)
    • Add transformations (modify messages in flight)
    • Multiple consumers (fan-out pattern)
  3. Production Deployment:

    • Add monitoring (Prometheus, Grafana)
    • Implement error handling (DLQ, retries)
    • Add security (SSL, authentication)

Resources


Final Thoughts

Change Data Capture transforms how we build data-driven applications. Instead of polling databases or writing complex triggers, we let the database tell us what changed. This pattern scales to millions of events per second and forms the backbone of modern data architectures.

You now have the knowledge to build event-driven systems that react to data changes in real-time. Whether it's compliance, analytics, or microservices communication, CDC is your superpower.

Happy coding! 🚀


Questions? Found this helpful? Leave a comment below or reach out on [Twitter/LinkedIn]. If this tutorial helped you, please star the repository and share it with others!

Tags: #CDC #Debezium #Kafka #SpringBoot #EventDriven #Microservices #Java #PostgreSQL #Tutorial

Post a Comment

0Comments

Post a Comment (0)

#buttons=(Ok, Go it!) #days=(20)

Our website uses cookies to enhance your experience. Learn more
Ok, Go it!