Debezium source connector, add a field to the `after` object

3 min read 04-10-2024
Debezium source connector, add a field to the `after` object


Adding a Custom Field to Your Debezium after Object: A Comprehensive Guide

Problem: You're using Debezium to capture changes in your database and want to include additional, custom data in the after object of your change events. This data might be calculated from existing fields or represent external information relevant to the change.

Rephrased: Imagine you're tracking changes to your customer database with Debezium. You want to include each customer's loyalty points balance in the change event, even though it's not stored directly in the database table.

Scenario and Original Code:

Let's say you're using Debezium to track changes in a customers table:

CREATE TABLE customers (
  id INT PRIMARY KEY,
  name VARCHAR(255),
  email VARCHAR(255)
);

Your Debezium connector configuration currently looks like this:

name: my-connector
connector.class: io.debezium.connector.mysql.MySqlConnector
tasks.max: 1
database.hostname: localhost
database.port: 3306
database.user: user
database.password: password
database.server.name: mydb
database.whitelist: "type=mysql, databases=mydb"
table.whitelist: "type=mysql, databases=mydb, tables=customers"

This captures changes to the customers table but only includes the id, name, and email fields in the after object of the change events.

Adding a Custom Field:

You can extend the after object using a custom Debezium processor. Here's how:

  1. Create a Custom Processor:

    • You'll need a Java class that implements the io.debezium.pipeline.source.spi.ChangeEventSourceProcessor interface.
    • This processor will be responsible for modifying the change event before it's sent to your sink.
  2. Define the Processor Logic:

    • Implement the process method, which receives the change event as an argument.
    • Inside the method, access the after object, calculate or retrieve your custom field value, and add it to the object.

Example Code:

public class CustomerLoyaltyProcessor implements ChangeEventSourceProcessor {
  @Override
  public void process(ChangeEventSourceContext context, SourceRecord record) {
    if (record.value() instanceof Struct) {
      Struct value = (Struct) record.value();
      // Assuming loyalty points are stored in a separate system
      int loyaltyPoints = getLoyaltyPoints(value.getString("id"));

      // Add the loyaltyPoints field to the "after" object
      value.put("loyaltyPoints", loyaltyPoints);

      // Replace the original record value with the modified "after" object
      record = new SourceRecord(record.sourceOffset(), record.sourcePartition(), record.topic(), record.keySchema(), record.key(), value.schema(), value);
      context.output(record);
    }
  }

  // Placeholder for retrieving loyalty points (replace with your actual logic)
  private int getLoyaltyPoints(String customerId) {
    // ... your code to fetch loyalty points from a different system
  }
}
  1. Configure the Processor:
    • In your Debezium connector configuration, add the processor.class property and set it to your custom processor's class name.
    • You might need to add the processor's JAR file to the classpath of your connector.

Enhanced Configuration:

name: my-connector
connector.class: io.debezium.connector.mysql.MySqlConnector
tasks.max: 1
database.hostname: localhost
database.port: 3306
database.user: user
database.password: password
database.server.name: mydb
database.whitelist: "type=mysql, databases=mydb"
table.whitelist: "type=mysql, databases=mydb, tables=customers"
processor.class: com.example.CustomerLoyaltyProcessor

Now, your change events will include the loyaltyPoints field in the after object, allowing you to access and use this information in your downstream applications.

Further Enhancements:

  • Error Handling: Implement proper error handling in the getLoyaltyPoints method to gracefully handle cases where retrieving loyalty points fails.
  • Performance Optimization: If performance is a concern, optimize your getLoyaltyPoints method to avoid unnecessary database calls or API requests.
  • Custom Schemas: For complex custom fields, you can define a custom schema for your after object using the Debezium SchemaRegistry and RecordValidator features.

Conclusion:

This article demonstrated how to extend the after object of Debezium change events by creating a custom processor. This allows you to include additional, relevant data not directly available in the database table, enriching your event data and enabling more advanced use cases. By following these steps, you can easily customize your change events to meet your specific requirements.