Skip to main content

RabbitMQ

Synopsis

Creates a consumer that connects to RabbitMQ servers and consumes messages from specified exchanges and queues. Supports multiple authentication methods, exchange types, TLS encryption, and multiple workers with automatic message acknowledgment.

Schema

- id: <numeric>
name: <string>
description: <string>
type: rabbitmq
tags: <string[]>
pipelines: <pipeline[]>
status: <boolean>
properties:
address: <string>
port: <numeric>
username: <string>
password: <string>
authentication: <string>
exchange:
name: <string>
type: <string>
queue:
name: <string>
key: <string>
reuse: <boolean>
workers: <numeric>
tls:
status: <boolean>
cert_name: <string>
key_name: <string>

Configuration

Device

FieldRequiredDefaultDescription
idY-Unique numeric identifier
nameY-Device name
descriptionN-Optional description
typeY-Must be rabbitmq
tagsN-Optional tags
pipelinesN-Optional pre-processor pipelines
statusNtrueEnable/disable the device

Connection

FieldRequiredDefaultDescription
addressN"0.0.0.0"RabbitMQ server address
portY-RabbitMQ server port
usernameN-Authentication username
passwordN-Authentication password
authenticationY-Auth type (plain, amqplain)

Exchange

FieldRequiredDefaultDescription
exchange.nameY-Exchange name
exchange.typeY-Exchange type (direct, fanout, topic, x-custom)

Supported exchange types:

Exchange TypeDescription
directExact routing key match
fanoutBroadcast to all bound queues
topicPattern-based routing using wildcards (*, #)
x-customCustom exchange plugin registered on the broker

Queue

FieldRequiredDefaultDescription
queue.nameY-Queue name
queue.keyY-Routing key pattern

TLS

FieldRequiredDefaultDescription
tls.statusNfalseEnable TLS encryption
tls.cert_nameY*-TLS certificate file name
tls.key_nameY*-TLS private key file name

* = Conditionally required when tls.status is true.

note

The TLS certificate and key files must be placed in the service root directory.

Performance

FieldRequiredDefaultDescription
reuseNtrueEnable multi-worker mode
workersN4Number of worker processes when reuse enabled (capped at the number of available CPU cores)

Examples

Basic

Creating a simple RabbitMQ consumer with a direct exchange...

- id: 1
name: basic_rabbitmq
type: rabbitmq
properties:
address: "rabbitmq.example.com"
port: 5672
authentication: "plain"
exchange:
name: "logs"
type: "direct"
queue:
name: "app_logs"
key: "app.*"

Secure

Connecting with authentication and TLS encryption...

- id: 2
name: secure_rabbitmq
type: rabbitmq
properties:
address: "rabbitmq.example.com"
port: 5671
username: "consumer"
password: "secret"
authentication: "plain"
exchange:
name: "secure_logs"
type: "topic"
queue:
name: "secure_app_logs"
key: "secure.app.#"
tls:
status: true
cert_name: "rabbitmq.crt"
key_name: "rabbitmq.key"

High-Volume

Optimizing for high throughput with multi-worker mode...

- id: 3
name: performant_rabbitmq
type: rabbitmq
properties:
address: "rabbitmq.example.com"
port: 5672
authentication: "plain"
exchange:
name: "high_volume"
type: "direct"
queue:
name: "high_volume_logs"
key: "logs"
reuse: true
workers: 4

Topic Exchange

Pattern-based routing on a topic exchange...

- id: 4
name: topic_rabbitmq
type: rabbitmq
properties:
address: "rabbitmq.example.com"
port: 5672
authentication: "plain"
exchange:
name: "logs"
type: "topic"
queue:
name: "filtered_logs"
key: "app.*.error"
tip

Topic routing keys support * and # wildcards for single and multiple words respectively.

Pipelines

Applying custom processing to messages...

- id: 5
name: pipeline_rabbitmq
type: rabbitmq
pipelines:
- json_parser
- field_extractor
properties:
address: "rabbitmq.example.com"
port: 5672
authentication: "plain"
exchange:
name: "raw_logs"
type: "direct"
queue:
name: "processed_logs"
key: "logs"
note

Pipelines are processed sequentially, and can modify or drop messages before ingestion.