15. Коннектор CDC
15.1. Настройка коннектора CDC
Коннектор CDC позволяет отслеживать изменения на сервере РЕД Базы Данных в таблицах на уровне строк и генерировать соответствующие события вставки, обновления и удаления.
События для каждой таблицы записываются в отдельную тему Kafka, где они могут быть использованы приложениями и сервисами.
Коннектор должен быть включен локально на каждом сервере РЕД Базы Данных, где требуется отслеживание изменений в таблицах.
После включения коннектор начинает ослеживать изменения на сервере и публиковать изменения в отдельных темах Kafka.
Коннектор CDC для РЕД Базы Данных отличается от других коннекторов Debezium - он не реализуется поверх механизма Kafka Connect.
Коннектор представляет собой встроенный в процесс сервера JVM-процесс, публикующий события в Kafka с помощью продюсера Kafka.
Коннектор реализован на основе механизма репликации сервера РЕД Базы Данных, что позволяет отслеживать изменения без создания дополнительных нагрузок на сервер.
Для работы коннектора необходимо настроить механизм репликации на сервере РЕД Базы Данных.
Коннектор реализуется в виде плагина репликации Debezium_Connector,
использующем JAVA библиотеки платформы Debezium с открытым исходным кодом для сбора измененных данных.
Плагин реализован в виде нативной dll/so библиотеки, используемой для инициализации плагина, и java-библиотеки, реализующей работу коннектора.
Для включения коннектора плагин должен быть доступен в конфигурации плагинов plugins.conf,
а также в конфигурации репликации replication.conf в секции database в настройке plugin должен быть указан Debezium_Connector.
Параметры коннектора задаются в конфигурационном файле reddatabase.connector.yaml.
Для работы коннектора в конфигурационном файле плагинов plugins.conf необходимо добавить секции плагина коннектора:
Plugin = Debezium_Connector {
Module = $(dir_plugins)/debezium_connector
Config = Debezium_Connector_config
}
Config = Debezium_Connector_config {
JarDirs = $(this)/debezium
}
Plugin- Плагин, необходимый для работы коннектора (Debezium_Connector).Module- Определяет путь к нативной библиотеке коннектора. По умолчанию она находится в каталогеpluginsсервера и имеет имяlibdebezium_connector.so/debezium_connector.ddl. Если нативная библиотека имеет имя по умолчанию, то значением параметра можно оставить$(dir_plugins)/debezium_connector.Config- Определяет секцию параметров плагина в формате<имя секции>_config. Имя секции может быть любым. По умолчаниюDebezium_Connector_config.JarDirs- Путь к каталогу сJava-плагиномотносительно каталога сервера. По умолчанию имеет значение$(this)/debezium.
Если сервер РЕД Базы Данных установлен инсталятором, то для включения коннектора достаточно
раскомментировать секции Plugin = Debezium_Connector и Config = Debezium_Connector_config.
Настройка репликации осуществляется в файле replication.conf, находящегося в каталоге сервера РЕД Базы Данных.
Необходимые настройки репликации для базы или баз данных, в которых требуется отслеживать изменения:
database
{
plugin = Debezium_Connector
}
В конфгирукции репликации по умолчанию включена секция database, но в ней не настроены никакие параметры репликации.
Секция database применяется ко всем базам данных.
Для включения коннеткора необходимо в секции database в параметре plugin указать Debezium_Connector.
Если требуется отслеживание определенной базы данных (или нескольких баз), то для неё должна быть указана секция database с указанием пути к базе:
database = /your/db.fdb
{
plugin = Debezium_Connector
}
database = /var/db/rdb.fdb
{
plugin = Debezium_Connector
}
Предупреждение
При настройке параметров на уровне базы данных в разделе database должен быть указан полный путь к базе данных.
Псевдонимы и подстановочные знаки не допускаются.
После настройки сервера необходимо его перезапустить.
После настройки репликации в файле replication.conf необходимо в каждой базе данных,
для которой требуется отслеживание изменений, включить репликацию и добавить таблицы для отслеживания в публикации.
Включение репликации в базе данных с помощью DDL команды:
ALTER DATABASE ENABLE PUBLICATION;
Включение таблицы в список реплицируемых осуществляется следующей DDL командой:
ALTER DATABASE INCLUDE TABLE <имя таблицы> TO PUBLICATION
Можно добавить все доступные таблицы (в том числе и те, которые будут созданы в процессе) в список реплицируемых:
ALTER DATABASE INCLUDE ALL TO PUBLICATION
Подробнее о фильтрации таблиц в репликации см. раздел Фильтрация таблиц и столбцов.
Последним этапом настройки нужно указать конфигурацию коннектора РЕД Базы Данных в файле jvm.args,
если не используется конфигурационный файл по умолчанию.
Для этого в jvm.args необходимо добавить строку с парамтером -Dfirebird.connector.config=:
-Dfirebird.connector.config=/var/config/custom.rdb.connector.yaml
Конфигурация коннектора может располагаться в любом каталоге, к которому есть права доступа у сервера РЕД Базы Данных. Доступные параметры коннектора описаны в разделе ниже.
15.2. Параметры коннектора
В конфигурационном файле reddatabase.connector.yaml настраиваются следующие параметры коннектора и продюсера Kafka:
connector.name- Имя коннектора. Обязательный параметр.Тип: stringЗначение по умолчанию: ""connector.journal.directory- Каталог для записи журналов репликации. Обязательный параметр.Тип: stringЗначение по умолчанию: ""connector.journal.check.interval.ms- Период просмотра наличия журналов транзакций в миллисекундах. Обязательный параметр.Тип: longЗначение по умолчанию: 0Допустимые значения: [0,...9223372036854775807]kafka.producer.bootstrap.servers- Список пар хост/порт, используемых для установления начального соединения с кластеромKafka. Клиенты используют этот список для загрузки и обнаружения полного набора брокеровKafka. Хотя порядок серверов в списке не имеет значения, рекомендуется указывать несколько серверов, чтобы обеспечить устойчивость в случае падения какого-либо сервера. Этот список не обязательно должен содержать весь набор брокеров, так как клиентыKafkaавтоматически управляют и обновляют соединения с кластером. Cписок имет вид:host1:port1,host2:port2,...... Поскольку эти сервера используются только для начального подключения для обнаружения полного состава кластера, который может динамически меняться, список не обязательно должен содержать полный набор серверов.Тип: listЗначение по умолчанию: ""kafka.producer.retries- При значении больше нуля клиент будет повторно отправлять любую запись, отправка которой не удалась из-за случайной ошибки. Обратите внимание, что такая повторная отправка ничем не отличается от того, если бы клиент повторно отправил запись после получения ошибки. Разрешение повторных попыток без установкиmax.in.flight.requests.per.connectionравным 1 потенциально изменит порядок записей, поскольку если две партии отправляются в один раздел, и первая не удалась и была повторно отправлена, а вторая удалась, то записи из второй партии могут появиться первыми. Кроме того, запросы продюсера будут отклонены до того, как будет исчерпано количество повторных попыток, если таймаут, настроенный в параметреdelivery.timeout.ms, истечет до успешного подтверждения. Обычно предпочитают вместо него использоватьdelivery.timeout.msдля управления поведением повторных попыток.Тип: intЗначение по умолчанию: 2147483647Допустимые значения: [0,...,2147483647]kafka.producer.retry.backoff.ms- Время ожидания перед попыткой повторного запроса к данному тематическому разделу. Это позволяет избежать многократной отправки запросов в замкнутом цикле при некоторых сценариях отказа.Тип: longЗначение по умолчанию: 100Допустимые значения: [0,...,9223372036854775807]kafka.producer.transactional.id- Используется для доставки транзакций. Обеспечивает семантику надежности, охватывающую несколько сессийProducer, поскольку позволяет клиенту гарантировать, что транзакции, использующие тот жеtransactional.id, были завершены до начала любых новых транзакций. Еслиtransactional.idне указан, то продюсер ограничивается идемпотентной доставкой. Еслиtransactional.idнастроен, то подразумеваетсяenable.idempotence. По умолчаниюtransactional.idне настроен, что означает, что транзакции не могут быть использованы.Тип: stringЗначение по умолчанию: nullkafka.producer.max.block.ms- Время, в течение которого будут блокироваться методыsend(),partitionsFor(),initTransactions(),sendOffsetsToTransaction(),commitTransaction()иabortTransaction()KafkaProducer. Дляsend()этот таймаут ограничивает общее время ожидания выборки метаданных и выделения буфера (блокировка в пользовательских сериализаторах или разделителе не учитывается в этом таймауте). ДляpartitionsFor()этот таймаут ограничивает время ожидания метаданных, если они недоступны. Методы, связанные с транзакциями, всегда блокируются, но могут прерваться, если координатор транзакций не может быть обнаружен или не отвечает в течение таймаутаТип: longЗначение по умолчанию: 60000 (1 минута)Допустимые значения: [0,...,9223372036854775807]kafka.producer.delivery.timeout.ms- Верхняя граница времени сообщения об успехе или неудаче после возврата вызоваsend(). Это ограничивает общее время задержки записи перед отправкой, время ожидания подтверждения от брокера (если ожидается), а также время, допустимое для повторных попыток отправки. Продюсер может сообщить о неудачной отправке записи раньше указанного значения, если возникла неисправимая ошибка, исчерпаны все повторные попытки, или запись добавлена в пакет, у которого истекает срок доставки раньше. Значение этого параметра должно быть больше суммыrequest.timeout.msиlinger.msили равно ей.Тип: intЗначение по умолчанию: 120000 (2 минуты)Допустимые значения: [0,...,2147483647]kafka.producer.request.timeout.ms- Параметр контролирует максимальное время, в течение которого клиент будет ожидать ответа на запрос. Если ответ не получен до истечения таймаута, клиент повторно отправит запрос, если это необходимо, или отклонит запрос, если количество повторных попыток исчерпано. Это значение должно быть больше, чемreplica.lag.time.max.ms(конфигурация брокера), чтобы уменьшить вероятность дублирования сообщений из-за ненужных повторных попыток продюсера.Тип: intЗначение по умолчанию: 30000 (30 секунд)Допустимые значения: [0,...,2147483647]kafka.producer.close.timeout.ms- Время в миллисекундах, в течение которого продюсер завершает отправку запросов в темы kafka, по истечении указанного времени продюсер будет закрыт.Тип: LongЗначение по умолчанию: 60000(60 секунд)Допустимые значения: [0,...,9223372036854775807]kafka.topic.prefix- Логическое имя, которое идентифицирует и обеспечивает пространство имен для конкретного сервера РЕД Базы Данных, на котором фиксируются изменения. Логическое имя должно быть уникальным для всех других коннекторов, поскольку оно используется в качестве префикса для всех имен темKafka, которые получают события, испускаемые этим коннектором. В префиксе сервера должны использоваться только буквы, цифры, дефисы, точки и символы подчеркивания.Тип: stringЗначение по умолчанию: ""key.converter- Конвертер ключей.Тип: classЗначение по умолчанию: nullДопустимые значения: подкласс org.apache.kafka.connect.storage.Converter, класс с публичным конструктором без аргументаvalue.converter- Конвертер значений.Тип: classЗначение по умолчанию: nullДопустимые значения: подкласс org.apache.kafka.connect.storage.Converter, класс с публичным конструктором без аргумента
Пример конфигурации:
connector.name: reddatabase
connector.journal.directory: /tmp/journal
connector.journal.check.interval.ms: 0
kafka.producer.bootstrap.servers: localhost:9092
kafka.producer.retries: 3
kafka.producer.retry.backoff.ms: 1000
kafka.producer.max.block.ms: 1000
kafka.producer.timeout.ms: 1000
kafka.producer.transactional.id: reddatabase-transaction-id
kafka.topic.prefix: reddatabase_topic
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
Логирование действий коннектора осуществляется с помощью библиотеки Logback. Пример конфигурации логирования в файл:
<configuration>
<appender name="FILE"class="ch.qos.logback.core.FileAppender">
<file>/tmp/debezium-connector-reddatabase.log</file>
<encoder>
<pattern>%date %level [%thread] %logger50 [%file:%line] %msg%n</pattern>
</encoder>
</appender>
<root level="debug">
<appender-ref ref="FILE"/>
</root>
</configuration>
Для логирования работы коннектора в jvm.args необходимо добавить строку с паратером -Dlogback.configurationFile,
в котором указыватся путь к файлу с конфигурацией логирования:
-Dlogback.configurationFile=<путь>/logback.xml
15.3. Наименование тем
По умолчанию коннектор записывает события изменений для всех операций INSERT, UPDATE и DELETE, происходящих в таблице, в одну тему Apache Kafka,
определённую для этой таблицы. Коннектор использует следующий формат для наименования тем событий изменений, который задаётся в файле reddatabase.connector.yaml:
<kafka.topic.prefix>.<путь к бд>.<имя таблицы>
Символы "\", "/" и ":", указанные в пути к базе данных, будут заменены на символ "_".
15.4. Фильтрация таблиц и столбцов
Коннектор отслеживает изменения для всех таблиц, для которых включена репликация.
Список реплицируемых таблиц является вариантом реализации списка отслеживаемых таблиц (так называемый whitelist/includelist).
Включение таблицы в список реплицируемых осуществляется командой ALTER DATABASE INCLUDE TABLE <ТАБЛИЦА> TO PUBLICATION:
ALTER DATABASE INCLUDE TABLE TEST_TABLE TO PUBLICATION
Возможно включение нескольких таблиц в список реплицируемых:
ALTER DATABASE INCLUDE TABLE T1, T2, T3 TO PUBLICATION
При желании можно добавить все доступные таблицы (в том числе и те, которые будут созданы в процессе) в список реплицируемых:
ALTER DATABASE INCLUDE ALL TO PUBLICATION
Исключить конкретные таблицы из списка реплицируемых:
ALTER DATABASE EXCLUDE TABLE T1, T2, T3 FROM PUBLICATION;
Исключить все таблицы из списка реплицируемых:
ALTER DATABASE EXCLUDE ALL FROM PUBLICATION;
Кроме инструментов РЕД Базы Данных коннектор имеет встроенные механизмы фильтрации таблиц и столбцов.
Настройка фильтрации таблиц и столбцов осуществляется в файле reddatabase.connector.yaml следующими параметрами:
table.include.list- Необязательный список регулярных выражений, разделенных запятыми, которые соответствуют идентификаторам таблиц, изменения которых необходимо отслеживать. Коннектор не фиксирует изменения в таблицах, невключенных вtable.include.list. Каждый идентификатор имеет вид:<Имя_БД>.<Имя_Таблицы>. По умолчанию коннектор фиксирует изменения в каждой несистемной таблице, добавленной в список реплицируемых. Несовместим с параметромtable.exclude.list;table.exclude.list- Необязательный список регулярных выражений, разделенных запятыми, которые соответствуют идентификаторам таблиц, изменения которых не отслеживаются. Коннектор фиксирует изменения в любой таблице, добавленной в список репликации, но не указанной вtable.exclude.list. Каждый идентификатор имеет вид<Имя_БД>.<Имя_Таблицы>. Несовместим с параметромtable.include.list;column.include.list- Необязательный список регулярных выражений, разделенных запятыми, которые соответствуют именам столбцов для включения в событие изменения. Полные имена столбцов имеют вид:<Имя_БД>.<Имя_Таблицы>.<Имя_Столбца>. Несовместим с параметромcolumn.exclude.list;column.exclude.list- Необязательный список регулярных выражений, разделенных запятыми, которые соответствуют именам столбцов для исключения из события изменения. Полные имена столбцов имеют вид:<Имя_БД>.<Имя_Таблицы>.<Имя_Столбца>. Несовместим с параметромcolumn.include.list.
Если в настройках указаны оба параметра table.include.list и table.exclude.list, то приоритет будет за table.exclude.list.
Такое же поведение касается параметров column.include.list и column.exclude.list - приоритет у column.exclude.list.
Включение таблиц в список реплицируемых (ALTER DATABASE INCLUDE TABLE <ТАБЛИЦА> TO PUBLICATION) является обязательным условием отслеживания изменений таблиц.
Без включения репликации для таблиц использование парамтеров table.include.list, table.exclude.list, column.include.list, column.exclude.list не имеет смысла.
Примеры:
# Включение таблицы TABLE1 из БД /opt/databases/employee.fdb в список для
отслеживания:
table.include.list: /opt/databases/employee.fdb.table1
# Включение таблицы TABLE1 из ЛЮБОЙ БД в список для отслеживания:
table.include.list: .*table1
# Включение таблицы TABLE1 из ЛЮБОЙ БД и таблицы TEST_TABLE из любой БД с именем
data.fdb в список для отслеживания:
table.include.list: .*table1, .*data.fdb.test_table
# Включение таблицы TABLE1 из ЛЮБОЙ employee.fdb в список для отслеживания:
table.include.list: .*employee.fdb.table1
# Исключение таблицы TABLE1 базы данных /opt/databases/employee.fdb из списка
отслеживаемых:
table.include.list: /opt/databases/employee.fdb.table1
# Включение столбца ID таблицы TABLE1 из БД /opt/databases/employee.fdb в сообщение
изменения:
column.include.list: /opt/databases/employee.fdb.table1.id
# Включение столбца ID таблицы TABLE1 из ЛЮБОЙ БД в сообщение изменения:
column.include.list: .*table1.id
# Включение столбцов ID и NAME ЛЮБОЙ таблицы ЛЮБОЙ БД в сообщение изменения:
column.include.list: .*id, *.name
# Исключение столбца F_VCHAR ЛЮБОЙ таблицы ЛЮБОЙ БД из сообщения изменения:
column.exclude.list: .*f_vchar
15.5. События изменения данных
Каждое событие изменения данных, которое формирует коннектор РЕД Базы Данных, имеет ключ и значение.
Структура ключа и значения зависит от таблицы, из которой исходят события изменения.
Коннектор, как и Kafka Connect, разработан с учетом непрерывных потоков сообщений о событиях.
Каждый ключ и значение сообщения состоит из двух частей: схемы и полезной нагрузки.
Схема описывает структуру полезной нагрузки, а полезная нагрузка содержит фактические данные.
Для каждой измененной таблицы ключ события изменения формируется из полей в первичном ключе (или уникальном ключевом ограничении) таблицы на момент создания события.
Как и ключ сообщения, значение сообщения о событии изменения имеет раздел схемы и раздел полезной нагрузки. Раздел полезной нагрузки каждого значения события изменения, создаваемого коннектором, имеет структуру конверта со следующими полями:
op- обязательное поле, содержащее строковое значение, описывающее тип операции:CREATE- создание (или вставка),UPDATE- обновление,DELETE- удаление;before- необязательное поле, которое, если присутствует, содержит состояние строки до наступления события;after- необязательное поле, которое, если присутствует, содержит состояние строки после произошедшего события. Структура описывается той же схемой, которая использовалась ранее;source- обязательное поле, содержащее структуру, описывающую исходные метаданные для события, которая в случае РЕД Базы Данных содержит такие поля: версия коннектора, имя коннектора, имя сервера, номер транзакции, время (по системным часам вJVM, выполняющей задачуKafka Connect), в которое коннектор обработал событие, имя базы данных, имя таблицы.
Пример формирования события изменения данных для операции INSERT:
INSERT INTO TABLE11 VALUES(42,'NEW VALUE');
Когда в кластер Kafka попадает сообщение об удалении, то соответствующий ключ удаляется из хранилища состояний, освобождая таким образом место.
Предупреждение
Если используется Jdbc Sink Connector, то включение tombstone records необходимо для генерации DELETE запросов для результирующей БД.
Отключить генерацию tombstone можно настройкой tombstones.on.delete:
tombstones.on.delete: false
15.6. События изменения схем
Коннектор формирует события изменения схемы, которые формируются из выполняемых DDL операций на сервере.
Коннектор отправляет сообщение каждый раз, когда происходит изменение структуры базы данных. Сообщения, которые коннектор отправляет в тему изменения схемы,
содержат полезную нагрузку и также содержат схему о событии изменения. Полезная нагрузка сообщения о событии изменения схемы включает следующие элементы:
databaseName\schemaName- Идентифицирует базу данных и схему, содержащую изменение.ddl- Это поле содержитDDL, который отвечает за изменение схемы.tableChanges- Массив из одного или нескольких элементов, которые содержат изменения схемы, созданные командойDDL.type- Описывает вид изменения. Значением может быть одно из следующих:CREATE- таблица создана;ALTER- таблица изменена;DROP- таблица удалена.id- Полный идентификатор таблицы, которая была создана, изменена или удалена.table- Представляет метаданные таблицы после примененного изменения.primaryKeyColumnNames- Список столбцов, составляющих первичный ключ таблицы.columns- Метаданные для каждого столбца в измененной таблице.
Пример формирования события изменения схемы при выполнение операции CREATE TABLE:
create table table24(f_id integer primary key, f_vchar varchar(100));
15.7. Журналы транзакций
Коннектор для каждой репликационной транзакции создает журнал транзакции.
Журнал транзакции используется для сохранения данных транзакции (DDL и DML событий) и отправки в кластер Kafka.
Также журналы используются для повторной отправки, если кластер был недоступен или возникли другие ошибки
при первичной отправке сообщений транзакции.
Для включения записи журналов транзакций необходимо указать путь к каталогу журналов в опции
connector.journal.directory, например:
connector.journal.directory: /tmp/debezium-connector-reddatabase
В случае возникновени ошибок при отправке сообщений журналы транзакции повторно вычитываются,
и при первой доступности кластера Kafka сообщения отправляются снова.
При успешном отправлении всех сообщений репликационной транзакции журнал транзакции удаляется.
Период просмотра наличия журналов транзакций и их повторной отправки по умолчанию равен 0.
Период можно изменить через опцию connector.journal.check.interval.ms, которая принимает положительное число в миллисекундах.
Например, задать время просмотра наличия журналов транзакций и их отправки равное 10 минутам:
connector.journal.check.interval.ms: 600000
Имена журналов транзакций имеют следующий формат: <Случайный UUID>.log[.live].
Журнал незафиксированной транзакции имеет расширение .live, журналы зафиксированных транзакций представлены с расширением .log.
Коннектор отправляет сообщения только из журналов зафиксированных транзакций (с расширением .log).
Если серверная транзакция была отменена (rollback), журнал репликационной транзакции удаляется в любом случае.