From 030d57032a8bcb1c36f4576753274deb99ae4c87 Mon Sep 17 00:00:00 2001 From: mensfeld <392754+mensfeld@users.noreply.github.com> Date: Sun, 28 Jun 2026 09:59:52 +0000 Subject: [PATCH] chore: refresh automatic content --- Changelog/Karafka-Core.md | 3 ++ Changelog/Karafka.md | 2 ++ .../Karafka-Integration-Tests-Catalog.md | 1 + Librdkafka/Configuration.md | 30 +++++++++---------- 4 files changed, 21 insertions(+), 15 deletions(-) diff --git a/Changelog/Karafka-Core.md b/Changelog/Karafka-Core.md index d0c98e72..725d4ca2 100644 --- a/Changelog/Karafka-Core.md +++ b/Changelog/Karafka-Core.md @@ -3,6 +3,9 @@ # Karafka Core Changelog +## 2.6.2 (Unreleased) +- [Fix] Make assigning a setting on a frozen `Configurable::Node` atomic. The ivar-backed writer evaluated `@configs_refs[name] = value` before `instance_variable_set`, so a frozen node mutated the canonical store and only then raised `FrozenError`, leaving the store and the ivar-backed reader permanently out of sync. It now raises before touching any state. + ## 2.6.1 (2026-06-15) - [Enhancement] Speed up `Contract#call` by ~1.25x for minimal and ~1.4x for fully populated data: resolve rule paths with a single `Hash#fetch` per level instead of `key?` + `[]`, inline the per-rule type dispatch into the rules loop, and compare the dig sentinel via `#equal?` so `#==` is never dispatched to the validated (user-provided) values. This is the per-message validation path in WaterDrop producers. - [Fix] `Contract#call` with rule paths of 3+ keys no longer raises `NoMethodError` when an intermediate value is not a `Hash` and reports the path as missing instead, consistent with the 2-key path behavior. diff --git a/Changelog/Karafka.md b/Changelog/Karafka.md index df6c0910..815a651d 100644 --- a/Changelog/Karafka.md +++ b/Changelog/Karafka.md @@ -27,6 +27,7 @@ - [Enhancement] Contain non-`StandardError` errors (`ScriptError`, `SystemStackError`, etc.) raised in the consumption flow (both the consume and the after-consume phases, where user-extensible DLQ code runs) the same way as standard processing errors: the failure is recorded and the regular retry/pause/DLQ flow engages, instead of the batch being skipped with offsets later committed past it. Process-critical errors (`SystemExit`, `SignalException`, `NoMemoryError`) are not retried in-process and are never dispatched to the DLQ: the failure is recorded, the partition stays paused and a graceful shutdown is initiated by the auto-subscribed `Instrumentation::CriticalErrorsListener` watching the `error.occurred` bus - so critical errors escalate regardless of where in the framework they surface (also from quiet mode) and the failed batch is redelivered after a restart. The critical errors list is configurable via `config.internal.processing.critical_errors` for user-specific fatal error classes. - [Enhancement] Bump swarm supervisor `SHUTDOWN_GRACE_PERIOD` from 1s to 15s to give forked nodes enough time to finish post-`shutdown_timeout` cleanup (at_exit handlers, librdkafka finalization, connection pool close) before the supervisor forcefully terminates them, especially on CI where `sleep` granularity and `waitpid` cost stretch each supervision loop iteration. - [Enhancement] Add `stability_ttl` to `Kubernetes::LivenessListener` and `Pro::Swarm::LivenessListener` to detect consumers frozen in a single non-`"steady"` librdkafka `cgrp.join_state` (e.g. a broker stuck in `CompletingRebalance` due to bugs like KAFKA-19862). Brief non-steady periods are normal during rebalances: the timer resets on every join_state transition, so only a group frozen in the same non-steady state continuously for longer than the threshold is flagged. The pre-join `init` and mid-join `wait-metadata` states are excluded and clear prior tracking; genuine freezes there are covered by `polling_ttl`. On breach, `stability_ttl_exceeded: true` is reported in the Kubernetes HTTP body (HTTP 500) and status code `4` by the swarm listener; the flag clears automatically once the group returns to `"steady"`. **Requires `statistics.interval.ms` to be set** - without it this check has no effect. Defaults to twice the maximum `max.poll.interval.ms` across all active subscription groups (root `kafka` config as fallback), derived lazily on first health evaluation. When a connection listener finishes its fetch loop (downscale or shutdown), its subscription group's tracking is cleared to prevent false positives. +- [Fix] `Pro::Iterator#each` reset its EOF/stop tracking (`@stopped_partitions`, `@stopped`) only after the consumer block, so a `break` out of the yielded block (a documented way to stop) skipped the reset. Reusing the same iterator afterwards then saw `done?` immediately true and iterated nothing - silently dropping a fresh backlog until a third run recovered. The reset now runs in an `ensure`, so reuse is consistent regardless of how the previous run ended (normal, `#stop`, or `break`) (Pro). - [Fix] Virtual Partitions `VirtualOffsetManager#markable` raised `KeyError` under the `:exact` offset-metadata strategy when the lowest-offset virtual-partition group in a batch was left unmarked: the materialized real offset falls back to `min - 1`, an offset that was never registered, so `@offsets_metadata.fetch` missed. It now falls back to the current offset metadata instead of raising (which had forced a coordinator failure, collapse and full-batch reprocessing) (Pro). - [Fix] Encryption crashed on tombstone records (a key with a `nil` payload): producing one ran `cipher.encrypt(nil)` in the encryption middleware, and consuming one carrying an `encryption` header ran `cipher.decrypt(nil)` - both raising instead of passing the tombstone through. Tombstones are now left untouched on produce and consume, so they remain valid tombstones (for example for log compaction) (Pro). - [Fix] `Pro::Processing::JobsQueue#clear` (run per subscription group on listener recovery) disabled the async-locking fast path globally, so another subscription group's active `lock_async` locks were ignored until the next `lock_async` call - a listener parked by an advanced scheduler could resume polling early, and `empty?` could report a still-locked group as empty during shutdown. The fast-path flag is now recomputed from the remaining groups' locks instead of being forced off (Pro). @@ -52,6 +53,7 @@ - [Maintenance] Use namespaced topic naming format in all integration specs for consistent traceability. - [Maintenance] Add `bin/tests_topics_hashes` script for looking up spec files by their topic name hash prefix. - [Maintenance] Fix flaky `pro/admin/recovery/coordinator_for_spec.rb` by warming up the `__consumer_offsets` internal topic with a produce + `seek_consumer_group` and retrying the first `Recovery.coordinator_for` call with exponential backoff on `MetadataError`. +- [Refactor] Remove the unused `@mutex` from `Pro::Connection::Manager`. It was left dead after #1851 replaced lock-based `@changes` eviction with key preloading; the cross-thread `@changes` access (statistics callback on listener threads vs. `touch`/`stable?` on the Runner thread) is intentionally lock-free - only existing keys are mutated, so there is no insert-during-iteration, and the remaining field-level interleavings are benign and self-correcting. The threading model is now documented so the absence of a lock is explicit (Pro). - [Change] Require `karafka-rdkafka` `>=` `0.27.1` to pick up the fix for `poll_batch` and `poll_batch_nb` raising `RdkafkaError` with only the integer error code, which discarded topic/partition/offset context from `e.details` and caused `partition_eof` handling to call `@buffer.eof(nil, nil)`, resulting in a `TopicNotFoundError` crash. ## 2.5.9 (2026-03-30) diff --git a/Development/Karafka-Integration-Tests-Catalog.md b/Development/Karafka-Integration-Tests-Catalog.md index 59d28601..7d24d19d 100644 --- a/Development/Karafka-Integration-Tests-Catalog.md +++ b/Development/Karafka-Integration-Tests-Catalog.md @@ -1161,6 +1161,7 @@ | `pro/iterator/negative_multiple_topics_lookups_spec.rb` | The author retains all right, title, and interest in this software, including all copyrights, patents, and other intellectual property rights. No patent rights are granted under this license. - Reverse engineering, decompilation, or disassembly of this software Receipt, viewing, or possession of this software does not convey or imply any license or right beyond those expressly stated above. We should be able to subscribe to multiple topics with custom per topic negative lookups and they should work on all partitions | | `pro/iterator/negative_topic_multiple_partitions_spec.rb` | The author retains all right, title, and interest in this software, including all copyrights, patents, and other intellectual property rights. No patent rights are granted under this license. - Reverse engineering, decompilation, or disassembly of this software Receipt, viewing, or possession of this software does not convey or imply any license or right beyond those expressly stated above. We should be able to use different negative offsets for different partitions of the same topic | | `pro/iterator/repeated_eof_on_one_partition_spec.rb` | The author retains all right, title, and interest in this software, including all copyrights, patents, and other intellectual property rights. No patent rights are granted under this license. - Reverse engineering, decompilation, or disassembly of this software Receipt, viewing, or possession of this software does not convey or imply any license or right beyond those expressly stated above. When one partition reaches EOF more than once (because new data arrived after its first EOF and was consumed down to a second EOF), the iterator must not treat the repeated EOF of the same partition as if another partition finished. Iteration should continue until every partition has actually reached its end and all messages from all partitions were yielded. This guards against a bug where EOF events were counted globally instead of per partition: a single live partition receiving new data could exhaust the EOF budget for the whole subscription and silently terminate iteration while other partitions still had a backlog. | +| `pro/iterator/reuse_after_break_spec.rb` | The author retains all right, title, and interest in this software, including all copyrights, patents, and other intellectual property rights. No patent rights are granted under this license. - Reverse engineering, decompilation, or disassembly of this software Receipt, viewing, or possession of this software does not convey or imply any license or right beyond those expressly stated above. F34: `Pro::Iterator#each` reset `@stopped_partitions`/`@stopped` only after the `with_consumer` block, so a `break` out of the yielded block (a documented way to stop) skipped the reset. The accumulated EOF state then leaked into the next `#each`: `done?` was immediately true and the whole iteration was a silent no-op, dropping a fresh backlog. Reproduced by iterating to EOF, breaking, then reusing the SAME iterator. With `yield_nil` the drained (single-partition) iterator yields a nil once it reaches EOF - i.e. once `@stopped_partitions` is full - giving us a point to `break` on while the state is full. The second pass over the same iterator must still re-stream the whole backlog. | | `pro/iterator/stop_start_with_marking_spec.rb` | The author retains all right, title, and interest in this software, including all copyrights, patents, and other intellectual property rights. No patent rights are granted under this license. - Reverse engineering, decompilation, or disassembly of this software Receipt, viewing, or possession of this software does not convey or imply any license or right beyond those expressly stated above. We should be able with proper settings | | `pro/iterator/with_custom_group_id_spec.rb` | The author retains all right, title, and interest in this software, including all copyrights, patents, and other intellectual property rights. No patent rights are granted under this license. - Reverse engineering, decompilation, or disassembly of this software Receipt, viewing, or possession of this software does not convey or imply any license or right beyond those expressly stated above. The iterator should use the group.id specified in the settings hash when provided. By default, the iterator uses its own admin group ID (not the routing-defined consumer group), but this can be overridden via the settings hash. This is useful when you want to: - Track iterator progress for resuming later - Use a specific consumer group for offset management - Share offset state between multiple iterator instances | | `pro/iterator/with_granular_cleaning_spec.rb` | The author retains all right, title, and interest in this software, including all copyrights, patents, and other intellectual property rights. No patent rights are granted under this license. - Reverse engineering, decompilation, or disassembly of this software Receipt, viewing, or possession of this software does not convey or imply any license or right beyond those expressly stated above. We should be able to clean messages while keeping the metadata | diff --git a/Librdkafka/Configuration.md b/Librdkafka/Configuration.md index 40a01ef7..9a959685 100644 --- a/Librdkafka/Configuration.md +++ b/Librdkafka/Configuration.md @@ -23,10 +23,10 @@ client.id | * | | rdkafka metadata.broker.list | * | | | high | Initial list of brokers as a CSV list of broker host or host:port. The application may also use `rd_kafka_brokers_add()` to add brokers during runtime.
*Type: string* bootstrap.servers | * | | | high | Alias for `metadata.broker.list`: Initial list of brokers as a CSV list of broker host or host:port. The application may also use `rd_kafka_brokers_add()` to add brokers during runtime.
*Type: string* message.max.bytes | * | 1000 .. 1000000000 | 1000000 | medium | Maximum Kafka protocol request message size. Due to differing framing overhead between protocol versions the producer is unable to reliably enforce a strict max message limit at produce time and may exceed the maximum size by one message in protocol ProduceRequests, the broker will enforce the the topic's `max.message.bytes` limit (see Apache Kafka documentation).
*Type: integer* -message.copy.max.bytes | * | 0 .. 1000000000 | 65535 | low | Maximum size for message to be copied to buffer. Messages larger than this will be passed by reference (zero-copy) at the expense of larger iovecs.
*Type: integer* +message.copy.max.bytes | * | 0 .. 1000000000 | 65535 | low | Maximum size for message to be copied to buffer. Messages larger than this will be passed by reference (zero-copy) at the expense of larger iovecs. This property is not supported for share consumers.
*Type: integer* receive.message.max.bytes | * | 1000 .. 2147483647 | 100000000 | medium | Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least `fetch.max.bytes` + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set. For share consumers, the default value is INT_MAX.
*Type: integer* -max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000 | low | Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one.
*Type: integer* -max.in.flight | * | 1 .. 1000000 | 1000000 | low | Alias for `max.in.flight.requests.per.connection`: Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one.
*Type: integer* +max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000 | low | Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. This property is ignored for share consumers.
*Type: integer* +max.in.flight | * | 1 .. 1000000 | 1000000 | low | Alias for `max.in.flight.requests.per.connection`: Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. This property is ignored for share consumers.
*Type: integer* metadata.recovery.strategy | * | none, rebootstrap | rebootstrap | low | Controls how the client recovers when none of the brokers known to it is available. If set to `none`, the client doesn't re-bootstrap. If set to `rebootstrap`, the client repeats the bootstrap process using `bootstrap.servers` and brokers added through `rd_kafka_brokers_add()`. Rebootstrapping is useful when a client communicates with brokers so infrequently that the set of brokers may change entirely before the client refreshes metadata. Metadata recovery is triggered when all last-known brokers appear unavailable simultaneously or the client cannot refresh metadata within `metadata.recovery.rebootstrap.trigger.ms` or it's requested in a metadata response.
*Type: enum value* metadata.recovery.rebootstrap.trigger.ms | * | 0 .. 2147483647 | 300000 | low | If a client configured to rebootstrap using `metadata.recovery.strategy=rebootstrap` is unable to obtain metadata from any of the brokers for this interval, client repeats the bootstrap process using `bootstrap.servers` configuration and brokers added through `rd_kafka_brokers_add()`.
*Type: integer* topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | low | Period of time in milliseconds at which topic and broker metadata is refreshed in order to proactively discover any new brokers, topics, partitions or partition leader changes. Use -1 to disable the intervalled refresh (not recommended). If there are no locally referenced topics (no topic objects created, no messages produced, no subscription or no assignment) then only the broker list will be refreshed every interval but no more often than every 10s.
*Type: integer* @@ -35,7 +35,7 @@ topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 100 topic.metadata.refresh.fast.cnt | * | 0 .. 1000 | 10 | low | **DEPRECATED** No longer used.
*Type: integer* topic.metadata.refresh.sparse | * | true, false | true | low | Sparse metadata requests (consumes less network bandwidth)
*Type: boolean* topic.metadata.propagation.max.ms | * | 0 .. 3600000 | 30000 | low | Apache Kafka topic creation is asynchronous and it takes some time for a new topic to propagate throughout the cluster to all brokers. If a client requests topic metadata after manual topic creation but before the topic has been fully propagated to the broker the client is requesting metadata from, the topic will seem to be non-existent and the client will mark the topic as such, failing queued produced messages with `ERR__UNKNOWN_TOPIC`. This setting delays marking a topic as non-existent until the configured propagation max time has passed. The maximum propagation time is calculated from the time the topic is first referenced in the client, e.g., on produce().
*Type: integer* -topic.blacklist | * | | | low | Topic blacklist, a comma-separated list of regular expressions for matching topic names that should be ignored in broker metadata information as if the topics did not exist.
*Type: pattern list* +topic.blacklist | * | | | low | Topic blacklist, a comma-separated list of regular expressions for matching topic names that should be ignored in broker metadata information as if the topics did not exist. This property is not supported for share consumers.
*Type: pattern list* debug | * | generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, assignor, conf, telemetry, all | | medium | A comma-separated list of debug contexts to enable. Detailed Producer debugging: broker,topic,msg. Consumer: consumer,cgrp,topic,fetch
*Type: CSV flags* socket.timeout.ms | * | 10 .. 300000 | 60000 | low | Default timeout for network requests. Producer: ProduceRequests will use the lesser value of `socket.timeout.ms` and remaining `message.timeout.ms` for the first message in the batch. Consumer: FetchRequests will use `fetch.wait.max.ms` + `socket.timeout.ms`. Admin: Admin requests will use `socket.timeout.ms` or explicitly set `rd_kafka_AdminOptions_set_operation_timeout()` value.
*Type: integer* socket.blocking.max.ms | * | 1 .. 60000 | 1000 | low | **DEPRECATED** No longer used.
*Type: integer* @@ -47,7 +47,7 @@ socket.max.fails | * | 0 .. 1000000 | 1 broker.address.ttl | * | 0 .. 86400000 | 1000 | low | How long to cache the broker address resolving results (milliseconds).
*Type: integer* broker.address.family | * | any, v4, v6 | any | low | Allowed broker IP address families: any, v4, v6
*Type: enum value* socket.connection.setup.timeout.ms | * | 1000 .. 2147483647 | 30000 | medium | Maximum time allowed for broker connection setup (TCP connection setup as well SSL and SASL handshake). If the connection to the broker is not fully functional after this the connection will be closed and retried.
*Type: integer* -connections.max.idle.ms | * | 0 .. 2147483647 | 0 | medium | Close broker connections after the specified time of inactivity. Disable with 0. For share consumers, the default value is 32400 (9 mins).If this property is left at its default value some heuristics are performed to determine a suitable default value, this is currently limited to identifying brokers on Azure (see librdkafka issue #3109 for more info). Actual value can be lower, up to 2s lower, only if `connections.max.idle.ms` >= 4s, as jitter is added to avoid disconnecting all brokers at the same time.
*Type: integer* +connections.max.idle.ms | * | 0 .. 2147483647 | 0 | medium | Close broker connections after the specified time of inactivity. Disable with 0. For share consumers, the default value is 540000 (9 mins).If this property is left at its default value some heuristics are performed to determine a suitable default value, this is currently limited to identifying brokers on Azure (see librdkafka issue #3109 for more info). Actual value can be lower, up to 2s lower, only if `connections.max.idle.ms` >= 4s, as jitter is added to avoid disconnecting all brokers at the same time.
*Type: integer* reconnect.backoff.jitter.ms | * | 0 .. 3600000 | 0 | low | **DEPRECATED** No longer used. See `reconnect.backoff.ms` and `reconnect.backoff.max.ms`.
*Type: integer* reconnect.backoff.ms | * | 0 .. 3600000 | 100 | medium | The initial time to wait before reconnecting to a broker after the connection has been closed. The time is increased exponentially until `reconnect.backoff.max.ms` is reached. -25% to +50% jitter is applied to each reconnect backoff. A value of 0 disables the backoff and reconnects immediately. For share consumers, the default value is 50.
*Type: integer* reconnect.backoff.max.ms | * | 0 .. 3600000 | 10000 | medium | The maximum time to wait before reconnecting to a broker after the connection has been closed. For share consumers, the default value is 1000.
*Type: integer* @@ -75,7 +75,7 @@ api.version.request | * | true, false | true api.version.request.timeout.ms | * | 1 .. 300000 | 10000 | low | Timeout for broker API version requests.
*Type: integer* api.version.fallback.ms | * | 0 .. 604800000 | 0 | medium | **DEPRECATED** **Post-deprecation actions: remove this configuration property, brokers < 0.10.0 won't be supported anymore in librdkafka 3.x.** Dictates how long the `broker.version.fallback` fallback is used in the case the ApiVersionRequest fails. **NOTE**: The ApiVersionRequest is only issued when a new connection to the broker is made (such as after an upgrade).
*Type: integer* broker.version.fallback | * | | 0.10.0 | medium | **DEPRECATED** **Post-deprecation actions: remove this configuration property, brokers < 0.10.0 won't be supported anymore in librdkafka 3.x.** Older broker versions (before 0.10.0) provide no way for a client to query for supported protocol features (ApiVersionRequest, see `api.version.request`) making it impossible for the client to know what features it may use. As a workaround a user may set this property to the expected broker version and the client will automatically adjust its feature set accordingly if the ApiVersionRequest fails (or is disabled). The fallback broker version will be used for `api.version.fallback.ms`. Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. Any other value >= 0.10, such as 0.10.2.1, enables ApiVersionRequests.
*Type: string* -allow.auto.create.topics | * | true, false | false | low | Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics. The broker must also be configured with `auto.create.topics.enable=true` for this configuration to take effect. Note: the default value (true) for the producer is different from the default value (false) for the consumer. Further, the consumer default value is different from the Java consumer (true), and this property is not supported by the Java producer. Requires broker version >= 0.11.0.0, for older broker versions only the broker configuration applies.
*Type: boolean* +allow.auto.create.topics | * | true, false | false | low | Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics. The broker must also be configured with `auto.create.topics.enable=true` for this configuration to take effect. Note: the default value (true) for the producer is different from the default value (false) for the consumer. Further, the consumer default value is different from the Java consumer (true), and this property is not supported by the Java producer. Requires broker version >= 0.11.0.0, for older broker versions only the broker configuration applies. This property is currently not supported for share consumers and will be enabled in the General Availability (GA) release.
*Type: boolean* security.protocol | * | plaintext, ssl, sasl_plaintext, sasl_ssl | plaintext | high | Protocol used to communicate with brokers.
*Type: enum value* ssl.cipher.suites | * | | | low | A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. See manual page for `ciphers(1)` and `SSL_CTX_set_cipher_list(3).
*Type: string* ssl.curves.list | * | | | low | The supported-curves extension in the TLS ClientHello message specifies the curves (standard/named, or 'explicit' GF(2^k) or GF(p)) the client is willing to have the server use. See manual page for `SSL_CTX_set1_curves_list(3)`. OpenSSL >= 1.0.2 required.
*Type: string* @@ -151,22 +151,22 @@ group.remote.assignor | C | | coordinator.query.interval.ms | C | 1 .. 3600000 | 600000 | low | How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment.
*Type: integer* max.poll.interval.ms | C | 1 .. 86400000 | 300000 | high | Maximum allowed time between calls to consume messages (e.g., rd_kafka_consumer_poll()) for high-level consumers. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. Warning: Offset commits may be not possible at this point. Note: It is recommended to set `enable.auto.offset.store=false` for long-time processing applications and then explicitly store offsets (using offsets_store()) *after* message processing, to make sure offsets are not auto-committed prior to processing has finished. The interval is checked two times per second. See KIP-62 for more information.
*Type: integer* enable.auto.commit | C | true, false | true | high | Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign().
*Type: boolean* -auto.commit.interval.ms | C | 0 .. 86400000 | 5000 | medium | The frequency in milliseconds that the consumer offsets are committed (written) to offset storage. (0 = disable). This setting is used by the high-level consumer.
*Type: integer* +auto.commit.interval.ms | C | 0 .. 86400000 | 5000 | medium | The frequency in milliseconds that the consumer offsets are committed (written) to offset storage. (0 = disable). This setting is used by the high-level consumer. This property is ignored for share consumers.
*Type: integer* enable.auto.offset.store | C | true, false | true | high | Automatically store offset of last message provided to application. The offset store is an in-memory store of the next offset to (auto-)commit for each partition.
*Type: boolean* queued.min.messages | C | 1 .. 10000000 | 100000 | medium | Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue. This property is not supported for share consumers.
*Type: integer* queued.max.messages.kbytes | C | 1 .. 2097151 | 65536 | medium | Maximum number of kilobytes of queued pre-fetched messages in the local consumer queue. If using the high-level consumer this setting applies to the single consumer queue, regardless of the number of partitions. When using the legacy simple consumer or when separate partition queues are used this setting applies per partition. This value may be overshot by fetch.message.max.bytes. This property has higher priority than queued.min.messages. This property is not supported for share consumers.
*Type: integer* fetch.wait.max.ms | C | 0 .. 300000 | 500 | low | Maximum time the broker may wait to fill the Fetch response with fetch.min.bytes of messages.
*Type: integer* fetch.queue.backoff.ms | C | 0 .. 300000 | 1000 | medium | How long to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds (queued.min.messages or queued.max.messages.kbytes) have been exceded. This property may need to be decreased if the queue thresholds are set low and the application is experiencing long (~1s) delays between messages. Low values may increase CPU utilization. This property is not supported for share consumers.
*Type: integer* -fetch.message.max.bytes | C | 1 .. 1000000000 | 1048576 | medium | Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched. This property is not supported for share consumers.
*Type: integer* -max.partition.fetch.bytes | C | 1 .. 1000000000 | 1048576 | medium | Alias for `fetch.message.max.bytes`: Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched. This property is not supported for share consumers.
*Type: integer* -fetch.max.bytes | C | 0 .. 2147483135 | 52428800 | medium | Maximum amount of data the broker shall return for a Fetch request. Messages are fetched in batches by the consumer and if the first message batch in the first non-empty partition of the Fetch request is larger than this value, then the message batch will still be returned to ensure the consumer can make progress. The maximum message batch size accepted by the broker is defined via `message.max.bytes` (broker config) or `max.message.bytes` (broker topic config). For regular consumers, `fetch.max.bytes` is automatically adjusted upwards to be at least `message.max.bytes` (consumer config).
*Type: integer* +fetch.message.max.bytes | C | 1 .. 1000000000 | 1048576 | medium | Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched. This property is ignored for share consumers.
*Type: integer* +max.partition.fetch.bytes | C | 1 .. 1000000000 | 1048576 | medium | Alias for `fetch.message.max.bytes`: Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched. This property is ignored for share consumers.
*Type: integer* +fetch.max.bytes | C | 0 .. 2147483135 | 52428800 | medium | Maximum amount of data the broker shall return for a Fetch request. Messages are fetched in batches by the consumer and if the first message batch in the first non-empty partition of the Fetch request is larger than this value, then the message batch will still be returned to ensure the consumer can make progress. The maximum message batch size accepted by the broker is defined via `message.max.bytes` (broker config) or `max.message.bytes` (broker topic config). For regular (not share) consumers, `fetch.max.bytes` is automatically adjusted upwards to be at least `message.max.bytes` (consumer config).
*Type: integer* fetch.min.bytes | C | 0 .. 2147483647 | 1 | low | Minimum number of bytes the broker responds with. If fetch.wait.max.ms expires the accumulated data will be sent to the client regardless of this setting. For regular consumers, this value must be in range 1..100000000
*Type: integer* fetch.error.backoff.ms | C | 0 .. 300000 | 500 | medium | How long to postpone the next fetch request for a topic+partition in case of a fetch error. This property is not supported for share consumers.
*Type: integer* offset.store.method | C | none, file, broker | broker | low | **DEPRECATED** Offset commit store method: 'file' - DEPRECATED: local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.2 or later on the broker).
*Type: enum value* isolation.level | C | read_uncommitted, read_committed | read_committed | high | Controls how to read messages written transactionally: `read_committed` - only return transactional messages which have been committed. `read_uncommitted` - return all messages, even transactional messages which have been aborted. This property is not supported for share consumers.
*Type: enum value* -consume_cb | C | | | low | Message consume callback (set with rd_kafka_conf_set_consume_cb())
*Type: see dedicated API* -rebalance_cb | C | | | low | Called after consumer group has been rebalanced (set with rd_kafka_conf_set_rebalance_cb())
*Type: see dedicated API* -offset_commit_cb | C | | | low | Offset commit result propagation callback. (set with rd_kafka_conf_set_offset_commit_cb())
*Type: see dedicated API* +consume_cb | C | | | low | Message consume callback (set with rd_kafka_conf_set_consume_cb()). This property is not supported for share consumers.
*Type: see dedicated API* +rebalance_cb | C | | | low | Called after consumer group has been rebalanced (set with rd_kafka_conf_set_rebalance_cb()). This property is not supported for share consumers.
*Type: see dedicated API* +offset_commit_cb | C | | | low | Offset commit result propagation callback. (set with rd_kafka_conf_set_offset_commit_cb()). This property is not supported for share consumers.
*Type: see dedicated API* enable.partition.eof | C | true, false | false | low | Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition. This property is not supported for share consumers.
*Type: boolean* check.crcs | C | true, false | false | medium | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.
*Type: boolean* client.rack | * | | | low | A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config `broker.rack`.
*Type: string* @@ -217,11 +217,11 @@ compression.type | P | none, gzip, snappy, lz4, zstd | compression.level | P | -1 .. 12 | -1 | medium | Compression level parameter for algorithm selected by configuration property `compression.codec`. Higher values will result in better compression at the cost of more CPU usage. Usable range is algorithm-dependent: [0-9] for gzip; [0-12] for lz4; only 0 for snappy; -1 = codec-dependent default compression level.
*Type: integer* auto.commit.enable | C | true, false | true | low | **DEPRECATED** [**LEGACY PROPERTY:** This property is used by the simple legacy consumer only. When using the high-level KafkaConsumer, the global `enable.auto.commit` property must be used instead]. If true, periodically commit offset of the last message handed to the application. This committed offset will be used when the process restarts to pick up where it left off. If false, the application will have to call `rd_kafka_offset_store()` to store an offset (optional). Offsets will be written to broker or local file according to offset.store.method.
*Type: boolean* enable.auto.commit | C | true, false | true | low | **DEPRECATED** Alias for `auto.commit.enable`: [**LEGACY PROPERTY:** This property is used by the simple legacy consumer only. When using the high-level KafkaConsumer, the global `enable.auto.commit` property must be used instead]. If true, periodically commit offset of the last message handed to the application. This committed offset will be used when the process restarts to pick up where it left off. If false, the application will have to call `rd_kafka_offset_store()` to store an offset (optional). Offsets will be written to broker or local file according to offset.store.method.
*Type: boolean* -auto.commit.interval.ms | C | 10 .. 86400000 | 60000 | high | [**LEGACY PROPERTY:** This setting is used by the simple legacy consumer only. When using the high-level KafkaConsumer, the global `auto.commit.interval.ms` property must be used instead]. The frequency in milliseconds that the consumer offsets are committed (written) to offset storage.
*Type: integer* +auto.commit.interval.ms | C | 10 .. 86400000 | 60000 | high | [**LEGACY PROPERTY:** This setting is used by the simple legacy consumer only. When using the high-level KafkaConsumer, the global `auto.commit.interval.ms` property must be used instead]. The frequency in milliseconds that the consumer offsets are committed (written) to offset storage. This property is ignored for share consumers.
*Type: integer* auto.offset.reset | C | smallest, earliest, beginning, largest, latest, end, error | largest | high | Action to take when there is no initial offset in offset store or the desired offset is out of range: 'smallest','earliest' - automatically reset the offset to the smallest offset, 'largest','latest' - automatically reset the offset to the largest offset, 'error' - trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'.
*Type: enum value* offset.store.path | C | | . | low | **DEPRECATED** Path to local file for storing offsets. If the path is a directory a filename will be automatically generated in that directory based on the topic and partition. File-based offset storage will be removed in a future version.
*Type: string* offset.store.sync.interval.ms | C | -1 .. 86400000 | -1 | low | **DEPRECATED** fsync() interval for the offset file, in milliseconds. Use -1 to disable syncing, and 0 for immediate sync after each write. File-based offset storage will be removed in a future version.
*Type: integer* offset.store.method | C | file, broker | broker | low | **DEPRECATED** Offset commit store method: 'file' - DEPRECATED: local file store (offset.store.path, et.al), 'broker' - broker commit store (requires "group.id" to be configured and Apache Kafka 0.8.2 or later on the broker.).
*Type: enum value* -consume.callback.max.messages | C | 0 .. 1000000 | 0 | low | Maximum number of messages to dispatch in one `rd_kafka_consume_callback*()` call (0 = unlimited)
*Type: integer* +consume.callback.max.messages | C | 0 .. 1000000 | 0 | low | Maximum number of messages to dispatch in one `rd_kafka_consume_callback*()` call (0 = unlimited). This property is not supported for share consumers.
*Type: integer* ### C/P legend: C = Consumer, P = Producer, * = both