Skip to content

mikhail-1c/onec_librdkafka

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

onec_librdkafka

Внешняя компонента 1с (native) для взаимодействия с Apache Kafka

Под капотом: https://github.com/confluentinc/librdkafka
Статические библиотеки для сборки - включены в проект

librdkafka v2.3.0

Сборка

Сборка под linux:

Окружение:
	GLIBC 2.31
	gcc 9.4.0
	g++ 9.4.0
	cmake 3.16.3

Зависимости:
	uuid-dev
		
Статические библиотеки (включены в проект):
	librdkafka++.a
	librdkafka-static.a
	
	Для Linux - libssl, libcrypto, libcurl, libzstd, zlib - влинкованы в librdkafka-static.a

Сборка:
	X64: 
		mkdir -p build
		cd build
		rm -r *
		cmake ../.
		cmake --build .
		
	X32:
		mkdir -p build
		cd build
		rm -r *
		cmake -DCMAKE_CXX_FLAGS=-m32 -DCMAKE_SHARED_LINKER_FLAGS=-m32 ../.
		cmake --build .

Сборка под windows:

Окружение:
	cl - 19.24.28315
	link - 14.24.28315.0
	msbuild - 16.4.0+e901037fe
	Platform Toolset - v142
			
Статические библиотеки (включены в проект):
	libcrypto.lib
	libssl.lib
	libcurl.lib
	librdkafka.lib
	librdkafkacpp.lib
	libzstd_static.lib
	zlibstat.lib
	
Сборка:
	X64:
		mkdir build
		cd build
		rd . /S /Q
		cmake ../.
		cmake --build . --config Release
		
	X32:
		mkdir build
		cd build
		rd . /S /Q
		cmake ../. -A Win32
		cmake --build . --config Release

API получателя

Initialize(стрСписокБрокеров, стрИмяГруппы);

стрСписокБрокеров	- Адреса в формате "10.0.5.187:9092,10.0.5.85:9092,10.0.5.86:9092"
стрИмяГруппы		- Произвольная строка
                          
Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
                                          
Описание:
	Применяет параметры, заданные функциями "SetGlobalConf", "SetTopicConf"
	Создает соединения и управляющие потоки внутри компоненты

SetGlobalConf(стрКлюч, стрЗначение);

стрКлюч		- Имя параметра
стрЗначение	- Значение параметра

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Устанавлевает глобальные параметры для получаетеля

SetTopicConf(стрКлюч, стрЗначение);

стрКлюч		- Имя параметра
стрЗначение	- Значение параметра

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Устанавлевает параметры для топиков

ConfReset();

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Сбрасывает параметры, установленные функциями 
	"SetGlobalConf", "SetTopicConf"

AddTopicToSubscribeList(стрИмяТопика);

стрИмяТопика      - Имя топика

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Добавляет топик в список на подписание 
	(для использования функцией "Subscribe")
	(заполняет внутренний список внутри компоненты)

Subscribe();

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Подписывается на топики из списка, заданного
	вызовами функции "AddTopicToSubscribeList".

	Разделы будут назначены автоматически брокером,
	в зависимости от количества подписанных на топик получателей
	с текуей группой(groupID).

Unsubscribe();

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Отписывается от всех топиков

ClearSubscribeList();

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Очищает список топиков на подписку

AddRecordToTopicPartitionList(стрТопик, чРаздел, чСмещение);

стрТопик	- Имя топика
чРаздел		- Номер раздела топика
чСмещение	- Смещение, с которого будет начато чтение,
	Неопределено - для последнего смещения закоммиченного в брокер

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Добавляет топик-раздел-(смещение) в список на 
	назначение (для использования функцией "Assign")
	(заполняет внутренний список внутри компоненты)

Assign();

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Выполняет назначение топиков-рвзделов-(смещений) из списка,
	сформированного вызовами функции "AddRecordToTopicPartitionList".
	При использовании этого метода не происходит ребалансировки
	партиций между получателями одной группы, получатель читает
	только из назначенных функцией партиций.

Unassign();

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Сбрасывает назначения разделов, сделанные функцией "Assign"
	Получение останавливается (метод"ConsumePool" -
	всегда завершаются таймаутами) после вызова функции

ClearTopicPartitionList();

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Очищает список топик-раздел-(смещение)

ConsumePool(чТаймаут, чКолСообщений, чКолОшибокДляПрерывания);

чТаймаут			- Миллисекунд на чтение батча
чКоличествоСообщений		- Количество сообщений, для чтения
чКоличествоОшибокДляПрерывания	- Количеств ошибок(например таймаутов),
	после которого выполнение будет прервано

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Пытается получить сообщения и в случае успеха,
	сохраняет их во внутренний пул компоненты

GetMessagePoolLength();

Возвращаемые значение:
	Количество в случае успеха
	-1 в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Получает количество сообщений во внутреннем пуле компоненты

ClearMessagePool();

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Очищает список сообщений во внутреннем пуле компоненты

Commit();

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Записывает в брокер последние прочитанные смещения для 
	топиков-партиций, из котороых были прочитаны сообщения

ReseiveJSONMessages(булBase64);

булBase64 - Если Истина - данные возвращаются в формате Base64          
	
Возвращаемые значение:
	json ответ в случае успеха
	Неопределено  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Формирует JSON из внутреннего пула сообщений компоненты,
	возвращает его в случае успеха. Если сообщения содержат бинарные
	данные - JSON сформировать не удастся. В этом случае нужно 
	формировать JSON c флагом "булBase64"

QueryWatermarkOffsets(стрТопик, чРаздел, чТаймаут);

стрТопик	- Имя топика
чРаздел		- Номер раздела
чТаймаут	- Таймаут

Возвращаемые значение:
	JSON в случае успеха
	Неопределено  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Получает минимальное и максимальное смещения раздела
	Формирует JSON структуру вида {"low": "x", "hight": "y"}
	Значение low - минимально возможное смещение для раздела
	Значение hight - максимально возможное смещение для раздела

CommittedOffset(стрТопик, чРаздел, чТаймаут);

стрТопик    - Имя топика
чРаздел     - Номер раздела
чТаймаут    - Таймаут

Возвращаемые значение:
	Смещение в случае успеха
	Неопределено  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Получает последнее прочитанное и записанное в брокер 
	смещение получателем для текущей группы (текущее смещение
	группы для раздела)

API отправителя

Initialize(стрСписокБрокеров, стрТопик, чРаздел);

стрСписокБрокеров		- Адреса в формате "10.0.5.187:9092,10.0.5.85:9092 10.0.5.86:9092"
стрТопик			- ИмяТопика
чРаздел				- Номер раздела, -1 для автоматического партиционирования по ключу

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Применяет параметры, заданные функциями 
	"SetGlobalConf", "SetTopicConf". Создает соединения и 
	управляющие потоки внутри компоненты

SetGlobalConf(стрКлюч, стрЗначение);

стрКлюч		- Имя параметра
стрЗначение	- Значение параметра

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Устанавлевает глобальные параметры для получаетеля

SetTopicConf(стрКлюч, стрЗначение);

стрКлюч		- Имя параметра
стрЗначение	- Значение параметра

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Устанавлевает параметры для топиков

ConfReset();

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Сбрасывает параметры, установленные функциями 
	"SetGlobalConf", "SetTopicConf"

SetJSONMessageList(стрJSON);

стрJSON - Список сообщений в формате JSON

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Устанавливает пул сообщений для отправки
	Формат: 
		[{"Key": "Ключ", "Value": "Сообщение",
		"Headers": [{"Заголовок1": "ЗначениеЗаголовка1"}]}]

ClearMessagePool();

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Очищает внутренний список компоненты на отправку

GetMessagePoolLength();

Возвращаемые значение:
	Количество в случае успеха
	-1 в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Получает количество сообщений во внутреннем пуле компоненты

Produce();

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Выполняет отправку сообщений из внутреннего пула компоненты
	Сохраняет резуультат отправки для каждого сообщения во внутренний список
	Результаты отправки можно получить, используя функцию 
	GetJSONDeliveryReport

GetJSONDeliveryReport();

Возвращаемые значение:
	JSON в случае успеха
	Неопределено  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Получает отчет о доставке сообщений в формате JSON

Формат:
	[{
		"Key": "0b2f3ee3-466e-409e-84a8-b6b083582ed9",
		"Topic": "test_topic",
		"Partition": 2,
		"Timestamp": 1695034248357,
		"Offset": 54859,
		"Error": "Success",
		"Status": "Persisted"
	}]

IsDelivered();

Возвращаемые значение:
	Ложь если в отчете на доставку есть сообщение,
	имеющие статус НЕ "Persisted".
	Неопределено  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Получает признак успешности отправки пула сообщений

API клиента администрирования

Initialize(стрСписокБрокеров);

стрСписокБрокеров	- Адреса в формате "10.0.5.187:9092,10.0.5.85:9092,10.0.5.86:9092"
							
Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
											
Описание:
	Применяет параметры, заданные функциями "SetGlobalConf"
	Создает соединения и управляющие потоки внутри компоненты

SetGlobalConf(стрКлюч, стрЗначение);

стрКлюч		- Имя параметра
стрЗначение	- Значение параметра

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Устанавлевает глобальные параметры для клиента администрирования

ConfReset();

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Сбрасывает параметры, установленные функциями "SetGlobalConf"

AddRecordToTopicPartitionList(стрТопик, чРаздел, чСмещение);

стрТопик	- Имя топика
чРаздел		- Номер раздела топика
чСмещение	- Смещение, с которого будет начато чтение,
					Неопределено - для автоопределения смещения

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Добавляет топик-раздел-(смещение) в список на назначение
	(заполняет внутренний список внутри компоненты)
	Этот спсиок будет использован функциями:
	DeleteRecordsBefore, AlterGroupOffsets, DeleteGroupOffsets,
	QueryWatermarkOffsets

ClearTopicPartitionList();

Возвращаемые значение:
	Истина в случае успеха
	Ложь  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Очищает список топик-раздел-(смещение)
	(внутренний список компоненты)

DeleteRecordsBefore(чслТаймаут);

чслТаймаут	- Таймаут

Возвращаемые значение:
	json результат в случае успеха
	Неопределено  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Удаляет сообщения в партициях, заданных функцией 
	AddRecordToTopicPartitionList. Сообщения удаляются 
	ДО заданного функцией AddRecordToTopicPartitionList смещения
	НЕ включительно

Формат:
	[{
		"Topic": "test_topic",
		"Partition": 0,
		"Offset": 55828,
		"Error": "Success"
	}]

AlterGroupOffsets(стрГруппа, чслТаймаут);

стрГруппа	- Группа получателй, для изменения ее смещений
чслТаймаут	- Таймаут

Возвращаемые значение:
	json результат в случае успеха
	Неопределено  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Изменяет текущие смещения группы получателей, 
	на заданные функцией AddRecordToTopicPartitionList

Формат:
	[{
		"Topic": "test_topic",
		"Partition": 0,
		"Offset": 57836,
		"Error": "Success"
	}]

DeleteGroupOffsets(стрГруппа, чслТаймаут);

стрГруппа	- Группа получателй, для удаления ее смещений
чслТаймаут	- Таймаут

Возвращаемые значение:
	json результат в случае успеха
	Неопределено  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Удаляет текущие смещения группы получателей, заданные функцией 
	AddRecordToTopicPartitionList

Формат:
	[{
		"Topic": "test_topic",
		"Partition": 0,
		"Offset": -1001,
		"Error": "Success"
	}]

QueryWatermarkOffsets(чслТаймаут);

чслТаймаут	- Таймаут

Возвращаемые значение:
	json результат в случае успеха
	Неопределено  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Получает минимальное и максимальное смещение разделов, заданных
	функцией AddRecordToTopicPartitionList

Формат:
	[{
		"Topic": "test_topic",
		"Partition": 0,
		"Low": 55825,
		"Hight": 58049
	}]

GetGroupOffsets(стрГруппа, чслТаймаут);

стрГруппа	- Группа получателй, для получения ее смещений
чслТаймаут	- Таймаут

Возвращаемые значение:
	json результат в случае успеха
	Неопределено  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Получает текущие смещения для указанной группы получателей.
	Получаются смещения по всем топикам-партициям, которые
	есть по этой группе получателей

Формат:
	[{
		"Topic": "test_topic",
		"Partition": 0,
		"Offset": 2,
		"Error": "Success"
	}]

GetGroupList(чслТаймаут);

чслТаймаут	- Таймаут

Возвращаемые значение:
	json результат в случае успеха
	Неопределено  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Получает список групп получателей, существующих в брокере

Формат:
	[{
		"GroupId": "ccr_dev",
		"State": "Stable",
		"IsSimple": 0
	}]

GetMetadata(чслТаймаут, стрТопик = Неопределено);

чслТаймаут	- Таймаут
стрТопик	- Топик, для получения по нему данных

Возвращаемые значение:
	json результат в случае успеха
	Неопределено  в случае ошибки
	Описнаие ошибки можно получить через свойство "ErrorDescription"
	
Описание:
	Получает метаданные брокера. Если не указан топик - то все метаданные.
	Если топик указан - то метаданные по топику
Формат:
	{
		"Brokers": [{
			"Id": 0,
			"Host": "adm-fko-dev-kfk1.dns-shop.ru",
			"Port": 9092,
			"Controller": true
			}],
		"Topics": [{
			"Topic": "scrapper-jobs-queue-ozon",
			"Partitions": [{
					"Id": 0,
					"Leader": 0,
					"Replicas": [0],
					"Isrs": [0]
				}]}]}

Настройки

Полный список настроек библиотеки есть у вендора:
https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

Важные настройки получателя:

Глобальные настройки:
	enable.auto.commit
		Если установлено в true - библиотека будет автоматически 
		фиксировать смещения прочитанных сообщений раз в 
		auto.commit.interval.ms без явного вызова commit()

	queued.min.messages
		Kafka протокол подразумевает чтение/отправку батчами.
		Даже при чтении одного сообщения - физически из брокера
		едет queued.min.messages сообщений

	max.poll.interval.ms
		Максимальное время между вызовами метода "ConsumePool"
		Подразумевается polling модель - т.е. получатель должен
		паостоянно опрашивать брокер. Если получатель ни разу
		не выполнил вызов "ConsumePool" за max.poll.interval.ms
		- он считается отвалившимся, и более не может читать 
		из брокера до переподключения

Важные настройки отправителя:

Глобальные настройки:
	queue.buffering.max.messages
		При вызове метода Produce - сообщения, помещенные в 
		внутренний буфер компоненты - помещаются итерационно в буфер 
		на отправку. queue.buffering.max.messages - размер буфера 
		на отправку. Из буфера на отправку - сообщения раз в 
		queue.buffering.max.ms отправляются в kafka, группируясь в батчи
	
Настройки топика:
	request.required.acks
		Прежде чем отправка будет считаться успешной - лидер партиции,
		может ожидать синхронизации с остальными репликами.
		request.required.acks - количество реплик, с которыми будет
		ожидаться синхронизация. 
		0	- подтверждение не приходит даже от лидера
		1	- подтвержение от одной ноды
		-1	- подтверждение от всех реплик

Общие настройки

Глобальные настройки:

	enable.ssl.certificate.verification
		Включить ли выключить проверку сертификата брокера

	ssl.ca.pem
		Ca сертификат - для проверки открытого ключа брокера
		Можно задавать строкой в формате PEM

	security.protocol
		Протокол взаимодействия с брокером
		plaintext, ssl, sasl_plaintext, sasl_ssl

	sasl.mechanisms
		Механизм аутентификации
		GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER

	sasl.username
		Имя пользователя для аутентификации

	sasl.password
		Пароль для аутентификации

	ssl.endpoint.identification.algorithm
		Проверять что CN в сертификате сервера совпадает
		с именем хоста, к которому подключаемся. Если нет - 
		будет ошибка на этапе ssl.handshake

About

Native API компонента для работы с Kafka из 1С

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages

  • C++ 77.0%
  • C 22.7%
  • CMake 0.3%