15. Коннектор CDC

15.1. Настройка коннектора CDC

Коннектор CDC позволяет отслеживать изменения на сервере РЕД Базы Данных в таблицах на уровне строк и генерировать соответствующие события вставки, обновления и удаления. События для каждой таблицы записываются в отдельную тему Kafka, где они могут быть использованы приложениями и сервисами. Коннектор должен быть включен локально на каждом сервере РЕД Базы Данных, где требуется отслеживание изменений в таблицах. После включения коннектор начинает ослеживать изменения на сервере и публиковать изменения в отдельных темах Kafka. Коннектор CDC для РЕД Базы Данных отличается от других коннекторов Debezium - он не реализуется поверх механизма Kafka Connect. Коннектор представляет собой встроенный в процесс сервера JVM-процесс, публикующий события в Kafka с помощью продюсера Kafka. Коннектор реализован на основе механизма репликации сервера РЕД Базы Данных, что позволяет отслеживать изменения без создания дополнительных нагрузок на сервер. Для работы коннектора необходимо настроить механизм репликации на сервере РЕД Базы Данных.

Коннектор реализуется в виде плагина репликации Debezium_Connector, использующем JAVA библиотеки платформы Debezium с открытым исходным кодом для сбора измененных данных. Плагин реализован в виде нативной dll/so библиотеки, используемой для инициализации плагина, и java-библиотеки, реализующей работу коннектора. Для включения коннектора плагин должен быть доступен в конфигурации плагинов plugins.conf, а также в конфигурации репликации replication.conf в секции database в настройке plugin должен быть указан Debezium_Connector. Параметры коннектора задаются в конфигурационном файле reddatabase.connector.yaml.

Для работы коннектора в конфигурационном файле плагинов plugins.conf необходимо добавить секции плагина коннектора:

Plugin = Debezium_Connector {
        Module = $(dir_plugins)/debezium_connector
        Config = Debezium_Connector_config
}

Config = Debezium_Connector_config {
        JarDirs = $(this)/debezium
}
  • Plugin - Плагин, необходимый для работы коннектора (Debezium_Connector).

  • Module - Определяет путь к нативной библиотеке коннектора. По умолчанию она находится в каталоге plugins сервера и имеет имя libdebezium_connector.so/debezium_connector.ddl. Если нативная библиотека имеет имя по умолчанию, то значением параметра можно оставить $(dir_plugins)/debezium_connector.

  • Config - Определяет секцию параметров плагина в формате <имя секции>_config. Имя секции может быть любым. По умолчанию Debezium_Connector_config.

  • JarDirs - Путь к каталогу с Java-плагином относительно каталога сервера. По умолчанию имеет значение $(this)/debezium.

Если сервер РЕД Базы Данных установлен инсталятором, то для включения коннектора достаточно раскомментировать секции Plugin = Debezium_Connector и Config = Debezium_Connector_config.

Настройка репликации осуществляется в файле replication.conf, находящегося в каталоге сервера РЕД Базы Данных. Необходимые настройки репликации для базы или баз данных, в которых требуется отслеживать изменения:

database
{
        plugin = Debezium_Connector
}

В конфгирукции репликации по умолчанию включена секция database, но в ней не настроены никакие параметры репликации. Секция database применяется ко всем базам данных. Для включения коннеткора необходимо в секции database в параметре plugin указать Debezium_Connector.

Если требуется отслеживание определенной базы данных (или нескольких баз), то для неё должна быть указана секция database с указанием пути к базе:

database = /your/db.fdb
{
        plugin = Debezium_Connector
}

database = /var/db/rdb.fdb
{
        plugin = Debezium_Connector
}

Предупреждение

При настройке параметров на уровне базы данных в разделе database должен быть указан полный путь к базе данных. Псевдонимы и подстановочные знаки не допускаются. После настройки сервера необходимо его перезапустить.

После настройки репликации в файле replication.conf необходимо в каждой базе данных, для которой требуется отслеживание изменений, включить репликацию и добавить таблицы для отслеживания в публикации. Включение репликации в базе данных с помощью DDL команды:

ALTER DATABASE ENABLE PUBLICATION;

Включение таблицы в список реплицируемых осуществляется следующей DDL командой:

ALTER DATABASE INCLUDE TABLE <имя таблицы> TO PUBLICATION

Можно добавить все доступные таблицы (в том числе и те, которые будут созданы в процессе) в список реплицируемых:

ALTER DATABASE INCLUDE ALL TO PUBLICATION

Подробнее о фильтрации таблиц в репликации см. раздел Фильтрация таблиц и столбцов.

Последним этапом настройки нужно указать конфигурацию коннектора РЕД Базы Данных в файле jvm.args, если не используется конфигурационный файл по умолчанию. Для этого в jvm.args необходимо добавить строку с парамтером -Dfirebird.connector.config=:

-Dfirebird.connector.config=/var/config/custom.rdb.connector.yaml

Конфигурация коннектора может располагаться в любом каталоге, к которому есть права доступа у сервера РЕД Базы Данных. Доступные параметры коннектора описаны в разделе ниже.

15.2. Параметры коннектора

В конфигурационном файле reddatabase.connector.yaml настраиваются следующие параметры коннектора и продюсера Kafka:

  • connector.name - Имя коннектора. Обязательный параметр.

    Тип: string

    Значение по умолчанию: ""

  • connector.journal.directory - Каталог для записи журналов репликации. Обязательный параметр.

    Тип: string

    Значение по умолчанию: ""

  • connector.journal.check.interval.ms - Период просмотра наличия журналов транзакций в миллисекундах. Обязательный параметр.

    Тип: long

    Значение по умолчанию: 0

    Допустимые значения: [0,...9223372036854775807]

  • kafka.producer.bootstrap.servers - Список пар хост/порт, используемых для установления начального соединения с кластером Kafka. Клиенты используют этот список для загрузки и обнаружения полного набора брокеров Kafka. Хотя порядок серверов в списке не имеет значения, рекомендуется указывать несколько серверов, чтобы обеспечить устойчивость в случае падения какого-либо сервера. Этот список не обязательно должен содержать весь набор брокеров, так как клиенты Kafka автоматически управляют и обновляют соединения с кластером. Cписок имет вид: host1:port1,host2:port2,...... Поскольку эти сервера используются только для начального подключения для обнаружения полного состава кластера, который может динамически меняться, список не обязательно должен содержать полный набор серверов.

    Тип: list

    Значение по умолчанию: ""

  • kafka.producer.retries - При значении больше нуля клиент будет повторно отправлять любую запись, отправка которой не удалась из-за случайной ошибки. Обратите внимание, что такая повторная отправка ничем не отличается от того, если бы клиент повторно отправил запись после получения ошибки. Разрешение повторных попыток без установки max.in.flight.requests.per.connection равным 1 потенциально изменит порядок записей, поскольку если две партии отправляются в один раздел, и первая не удалась и была повторно отправлена, а вторая удалась, то записи из второй партии могут появиться первыми. Кроме того, запросы продюсера будут отклонены до того, как будет исчерпано количество повторных попыток, если таймаут, настроенный в параметре delivery.timeout.ms, истечет до успешного подтверждения. Обычно предпочитают вместо него использовать delivery.timeout.ms для управления поведением повторных попыток.

    Тип: int

    Значение по умолчанию: 2147483647

    Допустимые значения: [0,...,2147483647]

  • kafka.producer.retry.backoff.ms - Время ожидания перед попыткой повторного запроса к данному тематическому разделу. Это позволяет избежать многократной отправки запросов в замкнутом цикле при некоторых сценариях отказа.

    Тип: long

    Значение по умолчанию: 100

    Допустимые значения: [0,...,9223372036854775807]

  • kafka.producer.transactional.id - Используется для доставки транзакций. Обеспечивает семантику надежности, охватывающую несколько сессий Producer, поскольку позволяет клиенту гарантировать, что транзакции, использующие тот же transactional.id, были завершены до начала любых новых транзакций. Если transactional.id не указан, то продюсер ограничивается идемпотентной доставкой. Если transactional.id настроен, то подразумевается enable.idempotence. По умолчанию transactional.id не настроен, что означает, что транзакции не могут быть использованы.

    Тип: string

    Значение по умолчанию: null

  • kafka.producer.max.block.ms - Время, в течение которого будут блокироваться методы send(), partitionsFor(), initTransactions(), sendOffsetsToTransaction(), commitTransaction() и abortTransaction() KafkaProducer. Для send() этот таймаут ограничивает общее время ожидания выборки метаданных и выделения буфера (блокировка в пользовательских сериализаторах или разделителе не учитывается в этом таймауте). Для partitionsFor() этот таймаут ограничивает время ожидания метаданных, если они недоступны. Методы, связанные с транзакциями, всегда блокируются, но могут прерваться, если координатор транзакций не может быть обнаружен или не отвечает в течение таймаута

    Тип: long

    Значение по умолчанию: 60000 (1 минута)

    Допустимые значения: [0,...,9223372036854775807]

  • kafka.producer.delivery.timeout.ms - Верхняя граница времени сообщения об успехе или неудаче после возврата вызова send(). Это ограничивает общее время задержки записи перед отправкой, время ожидания подтверждения от брокера (если ожидается), а также время, допустимое для повторных попыток отправки. Продюсер может сообщить о неудачной отправке записи раньше указанного значения, если возникла неисправимая ошибка, исчерпаны все повторные попытки, или запись добавлена в пакет, у которого истекает срок доставки раньше. Значение этого параметра должно быть больше суммы request.timeout.ms и linger.ms или равно ей.

    Тип: int

    Значение по умолчанию: 120000 (2 минуты)

    Допустимые значения: [0,...,2147483647]

  • kafka.producer.request.timeout.ms - Параметр контролирует максимальное время, в течение которого клиент будет ожидать ответа на запрос. Если ответ не получен до истечения таймаута, клиент повторно отправит запрос, если это необходимо, или отклонит запрос, если количество повторных попыток исчерпано. Это значение должно быть больше, чем replica.lag.time.max.ms (конфигурация брокера), чтобы уменьшить вероятность дублирования сообщений из-за ненужных повторных попыток продюсера.

    Тип: int

    Значение по умолчанию: 30000 (30 секунд)

    Допустимые значения: [0,...,2147483647]

  • kafka.producer.close.timeout.ms - Время в миллисекундах, в течение которого продюсер завершает отправку запросов в темы kafka, по истечении указанного времени продюсер будет закрыт.

    Тип: Long

    Значение по умолчанию: 60000(60 секунд)

    Допустимые значения: [0,...,9223372036854775807]

  • kafka.topic.prefix - Логическое имя, которое идентифицирует и обеспечивает пространство имен для конкретного сервера РЕД Базы Данных, на котором фиксируются изменения. Логическое имя должно быть уникальным для всех других коннекторов, поскольку оно используется в качестве префикса для всех имен тем Kafka, которые получают события, испускаемые этим коннектором. В префиксе сервера должны использоваться только буквы, цифры, дефисы, точки и символы подчеркивания.

    Тип: string

    Значение по умолчанию: ""

  • key.converter - Конвертер ключей.

    Тип: class

    Значение по умолчанию: null

    Допустимые значения: подкласс org.apache.kafka.connect.storage.Converter, класс с публичным конструктором без аргумента

  • value.converter - Конвертер значений.

    Тип: class

    Значение по умолчанию: null

    Допустимые значения: подкласс org.apache.kafka.connect.storage.Converter, класс с публичным конструктором без аргумента

Пример конфигурации:

connector.name: reddatabase
connector.journal.directory: /tmp/journal
connector.journal.check.interval.ms: 0

kafka.producer.bootstrap.servers: localhost:9092
kafka.producer.retries: 3
kafka.producer.retry.backoff.ms: 1000
kafka.producer.max.block.ms: 1000
kafka.producer.timeout.ms: 1000
kafka.producer.transactional.id: reddatabase-transaction-id
kafka.topic.prefix: reddatabase_topic

key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter

Логирование действий коннектора осуществляется с помощью библиотеки Logback. Пример конфигурации логирования в файл:

<configuration>
        <appender name="FILE"class="ch.qos.logback.core.FileAppender">
                <file>/tmp/debezium-connector-reddatabase.log</file>

                <encoder>
                        <pattern>%date %level [%thread] %logger50 [%file:%line] %msg%n</pattern>
                </encoder>
        </appender>
        <root level="debug">
                <appender-ref ref="FILE"/>
        </root>
</configuration>

Для логирования работы коннектора в jvm.args необходимо добавить строку с паратером -Dlogback.configurationFile, в котором указыватся путь к файлу с конфигурацией логирования:

-Dlogback.configurationFile=<путь>/logback.xml

15.3. Наименование тем

По умолчанию коннектор записывает события изменений для всех операций INSERT, UPDATE и DELETE, происходящих в таблице, в одну тему Apache Kafka, определённую для этой таблицы. Коннектор использует следующий формат для наименования тем событий изменений, который задаётся в файле reddatabase.connector.yaml:

<kafka.topic.prefix>.<путь к бд>.<имя таблицы>

Символы "\", "/" и ":", указанные в пути к базе данных, будут заменены на символ "_".

15.4. Фильтрация таблиц и столбцов

Коннектор отслеживает изменения для всех таблиц, для которых включена репликация. Список реплицируемых таблиц является вариантом реализации списка отслеживаемых таблиц (так называемый whitelist/includelist). Включение таблицы в список реплицируемых осуществляется командой ALTER DATABASE INCLUDE TABLE <ТАБЛИЦА> TO PUBLICATION:

ALTER DATABASE INCLUDE TABLE TEST_TABLE TO PUBLICATION

Возможно включение нескольких таблиц в список реплицируемых:

ALTER DATABASE INCLUDE TABLE T1, T2, T3 TO PUBLICATION

При желании можно добавить все доступные таблицы (в том числе и те, которые будут созданы в процессе) в список реплицируемых:

ALTER DATABASE INCLUDE ALL TO PUBLICATION

Исключить конкретные таблицы из списка реплицируемых:

ALTER DATABASE EXCLUDE TABLE T1, T2, T3 FROM PUBLICATION;

Исключить все таблицы из списка реплицируемых:

ALTER DATABASE EXCLUDE ALL FROM PUBLICATION;

Кроме инструментов РЕД Базы Данных коннектор имеет встроенные механизмы фильтрации таблиц и столбцов. Настройка фильтрации таблиц и столбцов осуществляется в файле reddatabase.connector.yaml следующими параметрами:

  • table.include.list - Необязательный список регулярных выражений, разделенных запятыми, которые соответствуют идентификаторам таблиц, изменения которых необходимо отслеживать. Коннектор не фиксирует изменения в таблицах, невключенных в table.include.list. Каждый идентификатор имеет вид: <Имя_БД>.<Имя_Таблицы>. По умолчанию коннектор фиксирует изменения в каждой несистемной таблице, добавленной в список реплицируемых. Несовместим с параметром table.exclude.list;

  • table.exclude.list - Необязательный список регулярных выражений, разделенных запятыми, которые соответствуют идентификаторам таблиц, изменения которых не отслеживаются. Коннектор фиксирует изменения в любой таблице, добавленной в список репликации, но не указанной в table.exclude.list. Каждый идентификатор имеет вид <Имя_БД>.<Имя_Таблицы>. Несовместим с параметром table.include.list;

  • column.include.list - Необязательный список регулярных выражений, разделенных запятыми, которые соответствуют именам столбцов для включения в событие изменения. Полные имена столбцов имеют вид: <Имя_БД>.<Имя_Таблицы>.<Имя_Столбца>. Несовместим с параметром column.exclude.list;

  • column.exclude.list - Необязательный список регулярных выражений, разделенных запятыми, которые соответствуют именам столбцов для исключения из события изменения. Полные имена столбцов имеют вид: <Имя_БД>.<Имя_Таблицы>.<Имя_Столбца>. Несовместим с параметром column.include.list.

Если в настройках указаны оба параметра table.include.list и table.exclude.list, то приоритет будет за table.exclude.list. Такое же поведение касается параметров column.include.list и column.exclude.list - приоритет у column.exclude.list. Включение таблиц в список реплицируемых (ALTER DATABASE INCLUDE TABLE <ТАБЛИЦА> TO PUBLICATION) является обязательным условием отслеживания изменений таблиц. Без включения репликации для таблиц использование парамтеров table.include.list, table.exclude.list, column.include.list, column.exclude.list не имеет смысла.

Примеры:

# Включение таблицы TABLE1 из БД /opt/databases/employee.fdb в список для
отслеживания:
table.include.list: /opt/databases/employee.fdb.table1

# Включение таблицы TABLE1 из ЛЮБОЙ БД в список для отслеживания:
table.include.list: .*table1

# Включение таблицы TABLE1 из ЛЮБОЙ БД и таблицы TEST_TABLE из любой БД с именем
data.fdb в список для отслеживания:
table.include.list: .*table1, .*data.fdb.test_table

# Включение таблицы TABLE1 из ЛЮБОЙ employee.fdb в список для отслеживания:
table.include.list: .*employee.fdb.table1

# Исключение таблицы TABLE1 базы данных /opt/databases/employee.fdb из списка
отслеживаемых:
table.include.list: /opt/databases/employee.fdb.table1

# Включение столбца ID таблицы TABLE1 из БД /opt/databases/employee.fdb в сообщение
изменения:
column.include.list: /opt/databases/employee.fdb.table1.id

# Включение столбца ID таблицы TABLE1 из ЛЮБОЙ БД в сообщение изменения:
column.include.list: .*table1.id

# Включение столбцов ID и NAME ЛЮБОЙ таблицы ЛЮБОЙ БД в сообщение изменения:
column.include.list: .*id, *.name

# Исключение столбца F_VCHAR ЛЮБОЙ таблицы ЛЮБОЙ БД из сообщения изменения:
column.exclude.list: .*f_vchar

15.5. События изменения данных

Каждое событие изменения данных, которое формирует коннектор РЕД Базы Данных, имеет ключ и значение. Структура ключа и значения зависит от таблицы, из которой исходят события изменения. Коннектор, как и Kafka Connect, разработан с учетом непрерывных потоков сообщений о событиях. Каждый ключ и значение сообщения состоит из двух частей: схемы и полезной нагрузки. Схема описывает структуру полезной нагрузки, а полезная нагрузка содержит фактические данные. Для каждой измененной таблицы ключ события изменения формируется из полей в первичном ключе (или уникальном ключевом ограничении) таблицы на момент создания события.

Как и ключ сообщения, значение сообщения о событии изменения имеет раздел схемы и раздел полезной нагрузки. Раздел полезной нагрузки каждого значения события изменения, создаваемого коннектором, имеет структуру конверта со следующими полями:

  • op - обязательное поле, содержащее строковое значение, описывающее тип операции: CREATE - создание (или вставка), UPDATE - обновление, DELETE - удаление;

  • before - необязательное поле, которое, если присутствует, содержит состояние строки до наступления события;

  • after - необязательное поле, которое, если присутствует, содержит состояние строки после произошедшего события. Структура описывается той же схемой, которая использовалась ранее;

  • source - обязательное поле, содержащее структуру, описывающую исходные метаданные для события, которая в случае РЕД Базы Данных содержит такие поля: версия коннектора, имя коннектора, имя сервера, номер транзакции, время (по системным часам в JVM, выполняющей задачу Kafka Connect), в которое коннектор обработал событие, имя базы данных, имя таблицы.

Пример формирования события изменения данных для операции INSERT:

INSERT INTO TABLE11 VALUES(42,'NEW VALUE');

Когда в кластер Kafka попадает сообщение об удалении, то соответствующий ключ удаляется из хранилища состояний, освобождая таким образом место.

Предупреждение

Если используется Jdbc Sink Connector, то включение tombstone records необходимо для генерации DELETE запросов для результирующей БД.

Отключить генерацию tombstone можно настройкой tombstones.on.delete:

tombstones.on.delete: false

15.6. События изменения схем

Коннектор формирует события изменения схемы, которые формируются из выполняемых DDL операций на сервере. Коннектор отправляет сообщение каждый раз, когда происходит изменение структуры базы данных. Сообщения, которые коннектор отправляет в тему изменения схемы, содержат полезную нагрузку и также содержат схему о событии изменения. Полезная нагрузка сообщения о событии изменения схемы включает следующие элементы:

  • databaseName\schemaName - Идентифицирует базу данных и схему, содержащую изменение.

  • ddl - Это поле содержит DDL, который отвечает за изменение схемы.

  • tableChanges - Массив из одного или нескольких элементов, которые содержат изменения схемы, созданные командой DDL.

  • type - Описывает вид изменения. Значением может быть одно из следующих: CREATE - таблица создана; ALTER - таблица изменена; DROP - таблица удалена.

  • id - Полный идентификатор таблицы, которая была создана, изменена или удалена.

  • table - Представляет метаданные таблицы после примененного изменения.

  • primaryKeyColumnNames - Список столбцов, составляющих первичный ключ таблицы.

  • columns - Метаданные для каждого столбца в измененной таблице.

Пример формирования события изменения схемы при выполнение операции CREATE TABLE:

create table table24(f_id integer primary key, f_vchar varchar(100));

15.7. Журналы транзакций

Коннектор для каждой репликационной транзакции создает журнал транзакции. Журнал транзакции используется для сохранения данных транзакции (DDL и DML событий) и отправки в кластер Kafka. Также журналы используются для повторной отправки, если кластер был недоступен или возникли другие ошибки при первичной отправке сообщений транзакции. Для включения записи журналов транзакций необходимо указать путь к каталогу журналов в опции connector.journal.directory, например:

connector.journal.directory: /tmp/debezium-connector-reddatabase

В случае возникновени ошибок при отправке сообщений журналы транзакции повторно вычитываются, и при первой доступности кластера Kafka сообщения отправляются снова. При успешном отправлении всех сообщений репликационной транзакции журнал транзакции удаляется. Период просмотра наличия журналов транзакций и их повторной отправки по умолчанию равен 0. Период можно изменить через опцию connector.journal.check.interval.ms, которая принимает положительное число в миллисекундах. Например, задать время просмотра наличия журналов транзакций и их отправки равное 10 минутам:

connector.journal.check.interval.ms: 600000

Имена журналов транзакций имеют следующий формат: <Случайный UUID>.log[.live]. Журнал незафиксированной транзакции имеет расширение .live, журналы зафиксированных транзакций представлены с расширением .log. Коннектор отправляет сообщения только из журналов зафиксированных транзакций (с расширением .log). Если серверная транзакция была отменена (rollback), журнал репликационной транзакции удаляется в любом случае.