Внешняя компонента 1с (native) для взаимодействия с Apache Kafka
Под капотом: https://github.com/confluentinc/librdkafka
Статические библиотеки для сборки - включены в проект
librdkafka v2.3.0
Сборка под linux:
Окружение:
GLIBC 2.31
gcc 9.4.0
g++ 9.4.0
cmake 3.16.3
Зависимости:
uuid-dev
Статические библиотеки (включены в проект):
librdkafka++.a
librdkafka-static.a
Для Linux - libssl, libcrypto, libcurl, libzstd, zlib - влинкованы в librdkafka-static.a
Сборка:
X64:
mkdir -p build
cd build
rm -r *
cmake ../.
cmake --build .
X32:
mkdir -p build
cd build
rm -r *
cmake -DCMAKE_CXX_FLAGS=-m32 -DCMAKE_SHARED_LINKER_FLAGS=-m32 ../.
cmake --build .
Сборка под windows:
Окружение:
cl - 19.24.28315
link - 14.24.28315.0
msbuild - 16.4.0+e901037fe
Platform Toolset - v142
Статические библиотеки (включены в проект):
libcrypto.lib
libssl.lib
libcurl.lib
librdkafka.lib
librdkafkacpp.lib
libzstd_static.lib
zlibstat.lib
Сборка:
X64:
mkdir build
cd build
rd . /S /Q
cmake ../.
cmake --build . --config Release
X32:
mkdir build
cd build
rd . /S /Q
cmake ../. -A Win32
cmake --build . --config Release
стрСписокБрокеров - Адреса в формате "10.0.5.187:9092,10.0.5.85:9092,10.0.5.86:9092"
стрИмяГруппы - Произвольная строка
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Применяет параметры, заданные функциями "SetGlobalConf", "SetTopicConf"
Создает соединения и управляющие потоки внутри компоненты
стрКлюч - Имя параметра
стрЗначение - Значение параметра
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Устанавлевает глобальные параметры для получаетеля
стрКлюч - Имя параметра
стрЗначение - Значение параметра
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Устанавлевает параметры для топиков
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Сбрасывает параметры, установленные функциями
"SetGlobalConf", "SetTopicConf"
стрИмяТопика - Имя топика
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Добавляет топик в список на подписание
(для использования функцией "Subscribe")
(заполняет внутренний список внутри компоненты)
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Подписывается на топики из списка, заданного
вызовами функции "AddTopicToSubscribeList".
Разделы будут назначены автоматически брокером,
в зависимости от количества подписанных на топик получателей
с текуей группой(groupID).
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Отписывается от всех топиков
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Очищает список топиков на подписку
стрТопик - Имя топика
чРаздел - Номер раздела топика
чСмещение - Смещение, с которого будет начато чтение,
Неопределено - для последнего смещения закоммиченного в брокер
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Добавляет топик-раздел-(смещение) в список на
назначение (для использования функцией "Assign")
(заполняет внутренний список внутри компоненты)
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Выполняет назначение топиков-рвзделов-(смещений) из списка,
сформированного вызовами функции "AddRecordToTopicPartitionList".
При использовании этого метода не происходит ребалансировки
партиций между получателями одной группы, получатель читает
только из назначенных функцией партиций.
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Сбрасывает назначения разделов, сделанные функцией "Assign"
Получение останавливается (метод"ConsumePool" -
всегда завершаются таймаутами) после вызова функции
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Очищает список топик-раздел-(смещение)
чТаймаут - Миллисекунд на чтение батча
чКоличествоСообщений - Количество сообщений, для чтения
чКоличествоОшибокДляПрерывания - Количеств ошибок(например таймаутов),
после которого выполнение будет прервано
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Пытается получить сообщения и в случае успеха,
сохраняет их во внутренний пул компоненты
Возвращаемые значение:
Количество в случае успеха
-1 в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Получает количество сообщений во внутреннем пуле компоненты
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Очищает список сообщений во внутреннем пуле компоненты
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Записывает в брокер последние прочитанные смещения для
топиков-партиций, из котороых были прочитаны сообщения
булBase64 - Если Истина - данные возвращаются в формате Base64
Возвращаемые значение:
json ответ в случае успеха
Неопределено в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Формирует JSON из внутреннего пула сообщений компоненты,
возвращает его в случае успеха. Если сообщения содержат бинарные
данные - JSON сформировать не удастся. В этом случае нужно
формировать JSON c флагом "булBase64"
стрТопик - Имя топика
чРаздел - Номер раздела
чТаймаут - Таймаут
Возвращаемые значение:
JSON в случае успеха
Неопределено в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Получает минимальное и максимальное смещения раздела
Формирует JSON структуру вида {"low": "x", "hight": "y"}
Значение low - минимально возможное смещение для раздела
Значение hight - максимально возможное смещение для раздела
стрТопик - Имя топика
чРаздел - Номер раздела
чТаймаут - Таймаут
Возвращаемые значение:
Смещение в случае успеха
Неопределено в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Получает последнее прочитанное и записанное в брокер
смещение получателем для текущей группы (текущее смещение
группы для раздела)
стрСписокБрокеров - Адреса в формате "10.0.5.187:9092,10.0.5.85:9092 10.0.5.86:9092"
стрТопик - ИмяТопика
чРаздел - Номер раздела, -1 для автоматического партиционирования по ключу
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Применяет параметры, заданные функциями
"SetGlobalConf", "SetTopicConf". Создает соединения и
управляющие потоки внутри компоненты
стрКлюч - Имя параметра
стрЗначение - Значение параметра
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Устанавлевает глобальные параметры для получаетеля
стрКлюч - Имя параметра
стрЗначение - Значение параметра
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Устанавлевает параметры для топиков
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Сбрасывает параметры, установленные функциями
"SetGlobalConf", "SetTopicConf"
стрJSON - Список сообщений в формате JSON
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Устанавливает пул сообщений для отправки
Формат:
[{"Key": "Ключ", "Value": "Сообщение",
"Headers": [{"Заголовок1": "ЗначениеЗаголовка1"}]}]
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Очищает внутренний список компоненты на отправку
Возвращаемые значение:
Количество в случае успеха
-1 в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Получает количество сообщений во внутреннем пуле компоненты
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Выполняет отправку сообщений из внутреннего пула компоненты
Сохраняет резуультат отправки для каждого сообщения во внутренний список
Результаты отправки можно получить, используя функцию
GetJSONDeliveryReport
Возвращаемые значение:
JSON в случае успеха
Неопределено в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Получает отчет о доставке сообщений в формате JSON
Формат:
[{
"Key": "0b2f3ee3-466e-409e-84a8-b6b083582ed9",
"Topic": "test_topic",
"Partition": 2,
"Timestamp": 1695034248357,
"Offset": 54859,
"Error": "Success",
"Status": "Persisted"
}]
Возвращаемые значение:
Ложь если в отчете на доставку есть сообщение,
имеющие статус НЕ "Persisted".
Неопределено в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Получает признак успешности отправки пула сообщений
стрСписокБрокеров - Адреса в формате "10.0.5.187:9092,10.0.5.85:9092,10.0.5.86:9092"
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Применяет параметры, заданные функциями "SetGlobalConf"
Создает соединения и управляющие потоки внутри компоненты
стрКлюч - Имя параметра
стрЗначение - Значение параметра
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Устанавлевает глобальные параметры для клиента администрирования
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Сбрасывает параметры, установленные функциями "SetGlobalConf"
стрТопик - Имя топика
чРаздел - Номер раздела топика
чСмещение - Смещение, с которого будет начато чтение,
Неопределено - для автоопределения смещения
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Добавляет топик-раздел-(смещение) в список на назначение
(заполняет внутренний список внутри компоненты)
Этот спсиок будет использован функциями:
DeleteRecordsBefore, AlterGroupOffsets, DeleteGroupOffsets,
QueryWatermarkOffsets
Возвращаемые значение:
Истина в случае успеха
Ложь в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Очищает список топик-раздел-(смещение)
(внутренний список компоненты)
чслТаймаут - Таймаут
Возвращаемые значение:
json результат в случае успеха
Неопределено в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Удаляет сообщения в партициях, заданных функцией
AddRecordToTopicPartitionList. Сообщения удаляются
ДО заданного функцией AddRecordToTopicPartitionList смещения
НЕ включительно
Формат:
[{
"Topic": "test_topic",
"Partition": 0,
"Offset": 55828,
"Error": "Success"
}]
стрГруппа - Группа получателй, для изменения ее смещений
чслТаймаут - Таймаут
Возвращаемые значение:
json результат в случае успеха
Неопределено в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Изменяет текущие смещения группы получателей,
на заданные функцией AddRecordToTopicPartitionList
Формат:
[{
"Topic": "test_topic",
"Partition": 0,
"Offset": 57836,
"Error": "Success"
}]
стрГруппа - Группа получателй, для удаления ее смещений
чслТаймаут - Таймаут
Возвращаемые значение:
json результат в случае успеха
Неопределено в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Удаляет текущие смещения группы получателей, заданные функцией
AddRecordToTopicPartitionList
Формат:
[{
"Topic": "test_topic",
"Partition": 0,
"Offset": -1001,
"Error": "Success"
}]
чслТаймаут - Таймаут
Возвращаемые значение:
json результат в случае успеха
Неопределено в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Получает минимальное и максимальное смещение разделов, заданных
функцией AddRecordToTopicPartitionList
Формат:
[{
"Topic": "test_topic",
"Partition": 0,
"Low": 55825,
"Hight": 58049
}]
стрГруппа - Группа получателй, для получения ее смещений
чслТаймаут - Таймаут
Возвращаемые значение:
json результат в случае успеха
Неопределено в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Получает текущие смещения для указанной группы получателей.
Получаются смещения по всем топикам-партициям, которые
есть по этой группе получателей
Формат:
[{
"Topic": "test_topic",
"Partition": 0,
"Offset": 2,
"Error": "Success"
}]
чслТаймаут - Таймаут
Возвращаемые значение:
json результат в случае успеха
Неопределено в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Получает список групп получателей, существующих в брокере
Формат:
[{
"GroupId": "ccr_dev",
"State": "Stable",
"IsSimple": 0
}]
чслТаймаут - Таймаут
стрТопик - Топик, для получения по нему данных
Возвращаемые значение:
json результат в случае успеха
Неопределено в случае ошибки
Описнаие ошибки можно получить через свойство "ErrorDescription"
Описание:
Получает метаданные брокера. Если не указан топик - то все метаданные.
Если топик указан - то метаданные по топику
Формат:
{
"Brokers": [{
"Id": 0,
"Host": "adm-fko-dev-kfk1.dns-shop.ru",
"Port": 9092,
"Controller": true
}],
"Topics": [{
"Topic": "scrapper-jobs-queue-ozon",
"Partitions": [{
"Id": 0,
"Leader": 0,
"Replicas": [0],
"Isrs": [0]
}]}]}
Полный список настроек библиотеки есть у вендора:
https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
Глобальные настройки:
enable.auto.commit
Если установлено в true - библиотека будет автоматически
фиксировать смещения прочитанных сообщений раз в
auto.commit.interval.ms без явного вызова commit()
queued.min.messages
Kafka протокол подразумевает чтение/отправку батчами.
Даже при чтении одного сообщения - физически из брокера
едет queued.min.messages сообщений
max.poll.interval.ms
Максимальное время между вызовами метода "ConsumePool"
Подразумевается polling модель - т.е. получатель должен
паостоянно опрашивать брокер. Если получатель ни разу
не выполнил вызов "ConsumePool" за max.poll.interval.ms
- он считается отвалившимся, и более не может читать
из брокера до переподключения
Глобальные настройки:
queue.buffering.max.messages
При вызове метода Produce - сообщения, помещенные в
внутренний буфер компоненты - помещаются итерационно в буфер
на отправку. queue.buffering.max.messages - размер буфера
на отправку. Из буфера на отправку - сообщения раз в
queue.buffering.max.ms отправляются в kafka, группируясь в батчи
Настройки топика:
request.required.acks
Прежде чем отправка будет считаться успешной - лидер партиции,
может ожидать синхронизации с остальными репликами.
request.required.acks - количество реплик, с которыми будет
ожидаться синхронизация.
0 - подтверждение не приходит даже от лидера
1 - подтвержение от одной ноды
-1 - подтверждение от всех реплик
Глобальные настройки:
enable.ssl.certificate.verification
Включить ли выключить проверку сертификата брокера
ssl.ca.pem
Ca сертификат - для проверки открытого ключа брокера
Можно задавать строкой в формате PEM
security.protocol
Протокол взаимодействия с брокером
plaintext, ssl, sasl_plaintext, sasl_ssl
sasl.mechanisms
Механизм аутентификации
GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER
sasl.username
Имя пользователя для аутентификации
sasl.password
Пароль для аутентификации
ssl.endpoint.identification.algorithm
Проверять что CN в сертификате сервера совпадает
с именем хоста, к которому подключаемся. Если нет -
будет ошибка на этапе ssl.handshake