Datenbankänderungen erkennen und streamen mit Debezium und Apache Kafka (Teil 2) – Ein Beispiel

Im ersten Teil des Blogposts wurde vorgestellt, welche technischen Möglichkeiten man zum Erkennen von Datenbankänderungen hat und wie das Tool Debezium zusammen mit der Plattform Apache Kafka genutzt werden kann, um solche Change-Events zu streamen und anderen Anwendungen zur Verfügung zu stellen.

Jetzt soll dazu Stück für Stück ein kleiner Prototyp erarbeitet werden, welcher die Funktionsweise von Debezium demonstriert. Die Architektur dazu ist folgendermaßen aufgebaut: Auf einer lokalen Instanz eines SQL Servers befindet sich eine Datenbank mit einer einzigen Tabelle, die den Namen CdcDemo trägt. Diese enthält ein paar wenige Datensätze. Dazu wird jeweils eine Instanz von Apache Kafka und Kafka Connect aufgesetzt. In der Kafka-Instanz wird später ein Topic für Change-Events erstellt, während Kafka Connect den SQL-Server-Konnektor von Debezium beinhaltet. Letztendlich werden die Änderungsdaten aus dem Topic von zwei Anwendungen ausgelesen und vereinfacht auf der Konsole dargestellt. Die beiden Consumer sollen zeigen, dass sich die Nachrichten von Debezium auch parallelisiert verarbeiten lassen.

Architektur des Prototypen
Abbildung 1: Architektur des Prototypen

Vorbereitungen im SQL Server

Zuallererst muss die Grundlage dafür gelegt werden, das Change Data Capture demonstrieren zu können, d. h. es muss eine kleine Datenbasis erstellt werden. Dazu wird in einer Datenbank einer lokalen SQL-Server-Instanz eine Tabelle mit dem folgenden Befehl angelegt:

CREATE TABLE CdcDemo (
	Id INT PRIMARY KEY,
	Surname VARCHAR(50) NULL,
	Forename VARCHAR(50) NULL
)

Nun können beliebige Datensätze in die Tabelle eingefügt werden. In diesem Beispiel handelt es sich um die Namen von berühmten Autorinnen und Autoren.

IdSurnameForename
101LindgrenAstrid
102KingStephen
103KästnerErich

Im Anschluss sind die für den Debezium-Konnektor spezifischen Vorkehrungen zu treffen. Das bedeutet im Falle des SQL Servers, dass sowohl die Datenbank als auch die Tabelle für das Change Data Capture aktiviert werden müssen. Das geschieht über die Ausführung der beiden folgenden Systemprozeduren:

EXEC sys.sp_cdc_enable_db

EXEC sys.sp_cdc_enable_table
	@source_schema = N’dbo’,
	@source_name = N’CdcDemo’,
	@role_name = N’CdcRole’

In diesem Beispiel ist dbo das Schema der Tabelle, die CdcDemo heißt. Der Zugriff auf die Änderungsdaten erfolgt über die Rolle CdcRole.

Kafka und Debezium einrichten

Wenn die Vorbereitungen im SQL Server erfolgreich getroffen wurden, kann nun die für Debezium benötigte Infrastruktur eingerichtet werden. Zuvor sollte aber noch sichergestellt werden, dass eine aktuelle Version der Java-Runtime installiert ist.

Jetzt kann die Apache-Kafka-Software von der offiziellen Downloadseite heruntergeladen und in einen beliebigen Ordner entpackt werden. Eine Installation ist nicht erforderlich. Danach muss der SQL-Server-Konnektor von Debezium heruntergeladen und ebenfalls in einen Ordner entpackt werden.

Nach dem erfolgreichen Download von Apache Kafka und Debezium kann nun eine neue Konfiguration für den Konnektor mittels einer Java-Properties-Datei erstellt werden:

name=srcsys1-connector
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
database.hostname=123.123.123.123
database.port=1433
database.user=cdc-demo-user
database.password=cdc-demo-password
database.dbname=cdc-demo-db
database.server.name=srcsys1
table.whitelist=dbo.CdcDemo
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=dbhistory.srcsys1

Besonders wichtig ist die Einstellung connector.class. Diese gibt Kafka Connect an, welcher der zuvor heruntergeladenen, ausführbaren Konnektoren genutzt werden soll. Die Klassennamen der Debezium-Konnektoren finden sich in der jeweiligen Dokumentation. Weiterhin bestimmt database.server.name den logischen Namen, den Debezium für die Datenbank verwendet. Dieser ist wichtig für die spätere Benennung des Topics in Kafka. Mithilfe der Konfiguration table.whitelist können alle Tabellen angegeben werden, die der Debezium-Konnektor überwachen soll. Eine Erklärung zu allen weiteren Parametern ist in der Dokumentation von Debezium zu finden.

Als nächstes muss die Konfigurationsdatei von Kafka Connect angepasst werden, welche sich im Ordner config der Kafka-Installation befindet. Da für dieses Beispiel nur eine Instanz gebraucht wird, ist die Datei connect-standalone.properties zu verwenden. An sich können hier die Voreinstellungen beibehalten werden. Nur für die Eigenschaft plugin.path muss der Pfad zum heruntergeladenen Debezium-Konnektor angegeben werden. Wichtig: Damit ist nicht der Pfad zu den JAR-Dateien gemeint, sondern zu dem Ordner in der Hierarchie darüber, da Kafka Connect auch mehrere Konnektoren gleichzeitig ausführen kann, die sich in diesem Ordner befinden.

Für Apache Kafka an sich ist noch eine kleine Änderung in der Konfigurationsdatei server.properties sinnvoll. Da letztendlich zwei Konsumenten die Debezium-Nachricht verarbeiten sollen, macht es auch Sinn, die Anzahl der Partitionen für ein Topic auf zwei zu erhöhen. Damit werden die Change-Events entweder in die erste oder in die zweite Partition geschrieben. Die Partitionen werden dann jeweils einem Consumer zugeordnet, sodass die Nachrichten parallel, aber nicht doppelt verarbeitet werden. Zur Umsetzung davon ist beim Parameter num.partitions die Zahl 2 einzutragen.

Nachdem nun die beteiligten Komponenten konfiguriert wurden, können die Instanzen gestartet werden. Dabei ist die Beachtung der Reihenfolge wichtig.

$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
$ ./bin/kafka-server-start.sh config/server.properties
$ ./bin/connect-standalone.sh config/connect-standalone.
	properties <path_to_debezium_config>

Zuerst wird der ZooKeeper gestartet, der für die Verwaltung der Kafka-Instanzen verantwortlich ist. Danach wird ein Kafka-Server ausgeführt, der sich beim ZooKeeper anmeldet. Zum Schluss wird Kafka Connect zusammen mit dem Debezium-Konnektor gestartet.

Einen Consumer implementieren

Mittlerweile steht die Infrastruktur für Debezium vollständig und es fehlt nur noch ein Consumer, der die Nachrichten aus Kafka verarbeiten kann. Dafür wird beispielhaft unter Verwendung der Bibliothek Confluent.Kafka eine .NET-Core-Konsolenanwendung programmiert, die sich am Einführungsbeispiel der Bibliothek auf GitHub orientiert. Zusätzlich dazu gibt es noch eine weitere Methode, die die gelesenen JSON-Nachrichten aus Kafka kurz und knapp auf der Konsole darstellt.

using Confluent.Kafka;
using Newtonsoft.Json.Linq;
using System;
using System.Threading;

namespace StreamKafka
{
    class Program
    {
        static void Main(string[] args)
        {
            var config = new ConsumerConfig
            {
                GroupId = "streamer-group",
                BootstrapServers = "localhost:9092",
                AutoOffsetReset = AutoOffsetReset.Earliest,
            };

            using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                consumer.Subscribe("srcsys1.dbo.CdcDemo");

                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) =>
                {
                    e.Cancel = true;
                    cts.Cancel();
                };

                try
                {
                    while (true)
                    {
                        try
                        {
                            var consumeResult = consumer.Consume(cts.Token);

                            if (consumeResult.Message.Value != null)
                                Console.WriteLine($"[{consumeResult.TopicPartitionOffset}]  " + ProcessMessage(consumeResult.Message.Value));
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Error occured: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    consumer.Close();
                }
            }


        }

        static string ProcessMessage(string jsonString)
        {
            var jsonObject = JObject.Parse(jsonString);
            var payload = jsonObject["payload"];

            string returnString = "";

            char operation = payload["op"].ToString()[0];

            switch (operation)
            {
                case 'c':
                    returnString += "INSERT: ";
                    returnString += $"{payload["after"]["Id"]} | {payload["after"]["Nachname"]} | {payload["after"]["Vorname"]}";
                    break;

                case 'd':
                    returnString += "DELETE: ";
                    returnString += $"{payload["before"]["Id"]} | {payload["before"]["Nachname"]} | {payload["before"]["Vorname"]}";
                    break;

                case 'u':
                    returnString += "UPDATE: ";
                    returnString += $"{payload["before"]["Id"]} | {payload["before"]["Nachname"]} | {payload["before"]["Vorname"]} --> " +
                        $"{payload["after"]["Id"]} | {payload["after"]["Nachname"]} | {payload["after"]["Vorname"]}";
                    break;

                default:
                    returnString += $"{payload["after"]["Id"]} | {payload["after"]["Nachname"]} | {payload["after"]["Vorname"]}";
                    break;
            }

            return returnString;
        }
    }
}

Im Quelltext gibt es einige interessante Stellen: Zunächst wird für den Consumer eine Konfiguration erstellt. Diese beinhaltet eine GroupId, die durch eine Zeichenkette repräsentiert wird. Die Gruppe dient dazu, die Arbeit unter den Konsumenten aufzuteilen, da Applikationen derselben Gruppe keine Nachrichten doppelt verarbeiten. Im Nachfolgenden abonniert der Consumer das Topic srcsys1.dbo.CdcDemo, das zuvor von Debezium automatisch in Kafka eingerichtet wurde. Der Name des Topics ergibt sich dabei aus den in der Debezium-Konfiguration angegebenen Parametern für den Server und die Tabelle. Im Anschluss geht der Consumer in eine Endlosschleife über, in der Nachrichten gelesen, verarbeitet und ausgegeben werden.

Den Prototypen testen

Jetzt sind für diesen Prototyp alle erforderlichen Komponenten installiert, konfiguriert und implementiert. Es wird Zeit, den Prototypen auch einmal auszuprobieren. Dafür ist es ratsam, zunächst zwei Instanzen des implementierten Consumers zu starten und erst im Anschluss Kafka bzw. Debezium wie oben beschrieben auszuführen.

Sind alle Komponenten hochgefahren, führt der Debezium-Konnektor einen Snapshot der Datenbanktabelle durch und schreibt diese Nachrichten an Kafka. Dort warten schon die beiden Consumer. Sie sollten eine Ausgabe produzieren, die der folgenden Abbildung ähnlichsieht.

Abbildung 2: Die Consumer geben den Snapshot der Datenbanktabelle aus

Kurz zur Bedeutung der Ausgabe: Die Informationen in den eckigen Klammern vor dem eigentlichen Datensatz geben Auskunft über das Topic, die Partitionsnummer und die Lognummer der jeweiligen Nachricht. Es wird ersichtlich, dass sich ein Consumer nur um die Nachrichten einer Partition kümmert. Debezium entscheidet durch Hashing und Modulo-Rechnung des Primärschlüssels, welcher Partition ein Datensatz zugeordnet wird.

Nun kann man testen, wie Debezium auf Änderungen an der Tabelle reagiert. Mittels des SQL Server Management Studios lassen sich INSERT-, UPDATE-, und DELETE-Befehle auf der Datenbank ausführen. Nur kurze Zeit, nachdem ein Statement abgesetzt wurde, sollten die Consumer darauf reagieren und eine entsprechende Ausgabe produzieren. Nachdem ein paar DML-Kommandos ausgeführt wurden, könnte die Ausgabe wie folgt aussehen:

Abbildung 3: Konsolenausgabe der Consumer nach einigen Änderungen auf der Tabelle

Eine Frage sollte zum Schluss noch geklärt werden: Ist es durch die Partitionierung der Nachrichten möglich, dass Race Conditions auftreten? Könnten sich also Änderungen am selben Datensatz über die beiden Partitionen gegenseitig „überholen“ und somit in der falschen Reihenfolge verarbeitet werden? Die Antwort ist nein. Daran hat Debezium zum Glück schon gedacht. Da die Change-Events anhand ihres Primärschlüssels wie oben beschrieben den jeweiligen Partitionen zugeordnet werden, landen Änderungsdaten bezüglich desselben Datensatzes auch immer hintereinander in derselben Partition und werden dort nur von einem Consumer in der richtigen Reihenfolge verarbeitet.

Fazit

Das Beispiel hat gezeigt, dass die Nutzung von Debezium zusammen mit Apache Kafka zum Streamen von Datenbankänderungen recht einfach umzusetzen ist. Einfüge-, Änderungs- und Lösch-Befehle können damit in annähernder Echtzeit bearbeitet werden. Zusätzlich zu den in diesem Prototyp gezeigten Beispielen gibt es auch die Möglichkeit, Änderungen am Datenschema zu streamen. Dafür erstellt Debezium ein separates Topic in Kafka.

Wichtig ist es zu beachten, dass der vorgestellte Prototyp nur ein Minimalbeispiel darstellt. Für den produktiven Einsatz von Debezium ist es erforderlich, die entsprechenden Komponenten so zu skalieren, dass ein gewisses Maß an Fehlertoleranz und Ausfallsicherheit gewährleistet ist.

Datenbankänderungen erkennen und streamen mit Debezium und Apache Kafka (Teil 1) – Die Theorie

In nahezu jedem Unternehmensumfeld spielen Datenbanken, insbesondere relationale Datenbanken, eine große Rolle. In ihnen werden sowohl die Stammdaten von Mitarbeitern, Kunden etc. als auch die sich ständig ändernden Bewegungsdaten des Unternehmens verwaltet. Aus den verschiedensten Gründen können sich nun für betreffende Firmen neue Anforderungen ergeben, so dass die Änderungen der Bewegungsdaten in Echtzeit in anderen Prozessen und Applikationen weiterverarbeitet werden müssen – wie bspw. bei Buchungen in einem System. Dafür reicht es nicht mehr aus, dass eine Anwendung durch Polling regelmäßig eine Quelltabelle abfragt. Vielmehr wird es notwendig, dass die Anwendung über Change-Events in einer Tabelle benachrichtigt wird und die Informationen sofort aufbereitet zur Verfügung gestellt bekommt.

Und genau das ist mit dem Einsatz von Debezium und der Streaming-Plattform Apache Kafka möglich.

Datenänderungen erkennen

Zu Beginn sollte einmal kurz geklärt werden, welche grundlegenden Möglichkeiten es zum Erfassen von Datenänderungen in relationalen Datenbanken gibt. In der Fachliteratur finden sich dazu unter dem Begriff Change Data Capture vier Methoden, die im folgenden Abschnitt vorgestellt werden.

Die einfachste Möglichkeit zur Erkennung von Änderungen ist der zeilen- und spaltenweise Vergleich einer Datenbanktabelle mit einer älteren Version von ihr. Es ist offensichtlich, dass dieser Algorithmus gerade für größere Tabellen nicht besonders effizient, dafür aber leicht zu implementieren ist. Eine weitere Idee ist es, nur die geänderten Datensätze anhand einer geschickten SQL-Abfrage auszuwählen. Dafür kommen in der Praxis meist Zeitstempel zur Anwendung, die für jeden Datensatz angeben, wann dieser zuletzt geändert wurde. Sucht man nun nach Datenänderungen in der Tabelle, selektiert man alle Datensätze, deren Zeitstempel jünger sind als der Zeitpunkt des letzten Vergleichs. Ein minimales SQL-Statement könnte so aussehen:

SELECT * FROM [source_table] 
WHERE last_updated < [datetime_of_last_comparison]

Problematisch an diesem Ansatz ist, dass eine solche Zeitstempel-Spalte in der Datenbanktabelle schon vorhanden sein muss, was wohl nicht immer der Fall sein wird. Es werden also Anforderungen an das Datenschema gestellt. Weiterhin lassen sich mit dieser Methode gelöschte Datensätze nur sehr schwer erkennen, da ihre Zeitstempel ebenfalls gelöscht werden.

Eine dritte Möglichkeit, die dieses Problem lösen kann, ist die Implementierung eines Datenbanktriggers, der nach jedem INSERT, UPDATE und DELETE auf der Tabelle ausgelöst wird. Weiterhin wäre es auch denkbar, Änderungen am Datenbankschema mithilfe eines Triggers zu überwachen. Ein solcher Trigger könnte dann die erfassten Datenänderungen in eine extra dafür vorgesehene Tabelle schreiben.

Zu guter Letzt gibt es noch eine vierte Methode, das sogenannte Log-Scanning, welches in diesem Blogpost eine besonders wichtige Rolle spielt. Die meisten Datenbankmanagementsysteme (DBMS) führen ein Transaktionslog, das alle Änderungen, die an der Datenbank vollzogen wurden, aufzeichnet. Dies dient vor allem dazu, die Datenbank nach einem Ausfall, sei es bedingt durch einen Stromausfall oder einen Schaden am physischen Datenträger, wieder zurück in einen konsistenten Zustand zu bringen. Für das Change Data Capture wird nun das Transaktionslog etwas zweckentfremdet: Durch das Auslesen der Logdatei ist es möglich, Änderungen an den Datensätzen einer bestimmten Quelltabelle zu erkennen und diese dann weiter zu verarbeiten.

Der große Vorteil des Log-Scannings ist es, dass durch das Auslesen des Transaktionslogs kein Overhead auf der Datenbank erzeugt wird, wie es beim Stellen von Abfragen oder beim Ausführen eines Triggers nach jeder Änderung der Fall ist. Damit wird die Performance der Datenbank nicht weiter beeinträchtigt. Weiterhin können alle Arten von Datenänderungen erfasst werden: das Einfügen, Aktualisieren und Löschen von Datensätzen und auch das Ändern des Datenschemas. Problematisch ist aber, dass der Aufbau solcher Transaktionslogs nicht genormt ist. Die Logdateien der einzelnen Datenbankhersteller sehen vollkommen unterschiedlich aus und unterscheiden sich teilweise auch zwischen den einzelnen Versionen ein und desselben Datenbankmanagementsystems. Möchte man also Datenänderungen aus mehreren Datenbanken unterschiedlicher Hersteller erfassen, müssen mehrere Algorithmen implementiert werden.

Und genau an dieser Stelle kommt Debezium ins Spiel.

Was ist Debezium?

Debezium ist an sich keine eigenständige Software, sondern eine Plattform für Konnektoren, die das Change Data Capture für verschiedene Datenbanken implementieren. Dabei nutzen die Konnektoren das Log-Scanning, um die Datenänderungen zu erfassen und sie an die Streaming-Plattform Apache Kafka weiterzureichen.

Jeder Konnektor repräsentiert eine Software für ein bestimmtes DBMS, welche in der Lage ist, eine Verbindung zur Datenbank aufzubauen und dort die Änderungsdaten auszulesen. Die genaue Funktionsweise eines Konnektors unterscheidet sich zwischen den DBMS, im Allgemeinen wird aber zunächst ein Snapshot vom bestehenden Datenbestand erzeugt und danach auf Änderungen im Transaktionslog gewartet. Bisher hat Debezium Konnektoren für die Datenbanken MongoDB, MySQL, PostgreSQL und SQL Server entwickelt. Weitere für Oracle, Db2 und Cassandra befinden sich aktuell noch in der Entwicklungsphase, sind aber schon verfügbar.

Die gängigste Variante, Debezium einzusetzen, ist in Kombination mit Apache Kafka und dem dazugehörigen Framework Kafka Connect. Apache Kafka ist eine quelloffene, verteilte Streaming-Plattform, die auf dem Publisher-Subscriber-Modell basiert. Es bietet Applikationen die Möglichkeit, Datenströme in Echtzeit zu verarbeiten und Nachrichten zwischen mehreren Prozessen auszutauschen. Mit Kafka Connect lassen sich passend dazu Konnektoren implementieren, die sowohl Daten nach Kafka schreiben (Quell-Konnektoren) als auch aus Kafka lesen können (Sink-Konnektoren). Debezium stellt, wie man schon vermutet, solche Quell-Konnektoren zur Verfügung, die die ermittelten Change-Events an Kafka streamen. Applikationen, die nun an den Änderungsdaten interessiert sind, können diese aus den Logs von Apache Kafka lesen und verarbeiten.

Architektur-Skizze für den Einsatz von Debezium
Abbildung: Architektur-Skizze für den Einsatz von Debezium 1

Seit kurzem gibt es auch die Möglichkeit, Debezium nicht mehr in Verbindung mit Kafka Connect, sondern auch als Standalone-Applikation zu verwenden. Damit sind Anwender nicht mehr an Kafka gebunden und können Change-Events auch an weitere Streaming-Plattformen, wie z. B. Kinesis, Google Cloud Pub/Sub oder Azure Event Hubs weiterreichen. Die Änderungsdaten, die Debezium aus unterschiedlichen Datenbanken ermittelt, müssen für Applikationen, die diese weiterverarbeiten wollen (Consumer), in einem einheitlichen Format zur Verfügung gestellt werden. Das realisiert Debezium über das JSON-Format. Ein Change-Event hat dabei immer zwei Teile: einen Payload-Teil und ein vorangehendes Schema, das den Aufbau des Payloads beschreibt. Im Payload-Teil befinden sich u. a. die Operation, die auf dem Datensatz ausgeführt wurde bzw. auch die Inhalte der Zeile vor und nach der Änderung. Besonders gekennzeichnet wird auch der Primärschlüssel des Datensatzes, was wichtig für die Zuordnung des Events zu einer Partition in Kafka ist. Doch dazu mehr im zweiten Teil.

Weiterhin legt Debezium beim Change Data Capture einen großen Wert auf Fehlertoleranz: Sollte mal ein Konnektor abstürzen, wird nach seinem Neustart an der letzten verarbeiteten Position im Transaktionslog weitergelesen. Auch bei einer fehlenden Verbindung zu Apache Kafka werden die Änderungsdaten so lange zwischengespeichert, bis wieder eine Verbindung hergestellt werden konnte. Damit wären zunächst einmal die Grundlagen für die Nutzung von Debezium geklärt. Im nächsten Teil des Blogposts wird ein konkretes Beispiel vorgestellt, bei dem Debezium Change-Events aus einer Beispieltabelle im SQL Server streamt.


1 Die Skizze ist in ihrer Art an die Architektur-Skizze in der Dokumentation von Debezium angelehnt.