Skip to main content

Kafka

Synopsis

Creates a collector that connects to Kafka brokers and consumes messages from specified topics. Supports authentication, TLS encryption, and multiple workers.

Schema

- id: <numeric>
name: <string>
description: <string>
type: kafka
tags: <string[]>
pipelines: <pipeline[]>
status: <boolean>
properties:
address: <string>
port: <numeric>
username: <string>
password: <string>
algorithm: <string>
group: <string>
topic: <string>
balancer: <string>
reuse: <boolean>
workers: <numeric>
tls:
status: <boolean>
cert_name: <string>
key_name: <string>

Configuration

Device

FieldRequiredDefaultDescription
idY-Unique numeric identifier
nameY-Device name
descriptionN-Optional description
typeY-Must be kafka
tagsN-Optional tags
pipelinesN-Optional pre-processor pipelines
statusNtrueEnable/disable the device

Connection

FieldRequiredDefaultDescription
addressN"0.0.0.0"Kafka broker address
portY-Kafka broker port
usernameN-SASL username
passwordN-SASL password
algorithmN-SASL mechanism: plain, scram-sha-256, or scram-sha-512. Leave empty or set to none to disable authentication
groupN"vmetric"Consumer group ID
topicY-Topic to consume from
balancerN"roundrobin"Partition balancing strategy

TLS

FieldRequiredDefaultDescription
tls.statusNfalseEnable TLS encryption
tls.cert_nameY*-TLS certificate file name
tls.key_nameY*-TLS private key file name

* = Conditionally required when tls.status is true.

note

The TLS certificate and key files must be placed in the service root directory.

Performance

FieldRequiredDefaultDescription
reuseNtrueEnable multi-worker mode
workersN4Number of worker processes when reuse enabled (capped at the number of available CPU cores)

Examples

Basic

Creating a simple Kafka consumer...

- id: 1
name: basic_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9092
topic: "logs"

Secure

Connecting with SASL authentication and TLS encryption...

- id: 2
name: secure_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9093
username: "consumer"
password: "secret"
algorithm: "scram-sha-256"
topic: "secure-logs"
tls:
status: true
cert_name: "kafka.crt"
key_name: "kafka.key"

High-Volume

Optimizing for throughput with multi-worker mode...

- id: 3
name: performant_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9092
topic: "high-volume-logs"
group: "high-perf-group"
reuse: true
workers: 4

Consumer Groups

Configuring consumer group behavior...

- id: 4
name: group_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9092
topic: "shared-logs"
group: "processing-group"
balancer: "roundrobin"
reuse: true
workers: 2
warning

Consumers in the same group must use compatible configuration settings.

Pipelines

Applying custom processing to messages...

- id: 5
name: pipeline_kafka
type: kafka
pipelines:
- json_parser
- field_extractor
properties:
address: "kafka.example.com"
port: 9092
topic: "raw-logs"
group: "processing-group"
note

Pipelines are processed sequentially, and can modify or drop messages before ingestion.