Skip to main content

NATS

Synopsis

Creates a JetStream consumer that connects to NATS servers and processes messages from specified streams and subjects. Supports authentication, TLS encryption, and multiple workers with automatic message acknowledgment.

Schema

- id: <numeric>
name: <string>
description: <string>
type: nats
tags: <string[]>
pipelines: <pipeline[]>
status: <boolean>
properties:
address: <string>
port: <numeric>
username: <string>
password: <string>
auth_type: <string>
stream: <string>
consumer: <string>
subject: <string>
max_bytes: <numeric>
timeout: <numeric>
batch_size: <numeric>
reuse: <boolean>
workers: <numeric>
secure:
status: <boolean>
tls:
status: <boolean>
cert_name: <string>
key_name: <string>

Configuration

Device

FieldRequiredDefaultDescription
idY-Unique numeric identifier
nameY-Device name
descriptionN-Optional description
typeY-Must be nats
tagsN-Optional tags
pipelinesN-Optional pre-processor pipelines
statusNtrueEnable/disable the device

Connection

FieldRequiredDefaultDescription
addressN"0.0.0.0"NATS server address
portY4222NATS server port
usernameN-Authentication username (or JWT for jwt auth, or NKey identity for nkey auth)
passwordN-Authentication password (or JWT seed for jwt auth, or NKey seed for nkey auth)
auth_typeN-Authentication type: basic, jwt, nkey, or none. When omitted with credentials present, jwt is inferred if username contains a ., otherwise basic; with no credentials, none is used
secure.statusNfalseWhen true, the password is decrypted with the service shared key before use

Stream

FieldRequiredDefaultDescription
streamN"vmetric"JetStream stream name
consumerN"vmetric"JetStream consumer name
subjectY-Subject pattern to subscribe to
max_bytesN1048576Maximum message size in bytes (1 MB default)
timeoutN5Fetch timeout in seconds (1-30)
batch_sizeN10000Number of messages to fetch per request
note

The stream and consumer defaults (vmetric) must exist on the NATS server before the device can subscribe. Pre-create them via nats stream add and nats consumer add or override these fields to match an existing stream and consumer.

TLS

FieldRequiredDefaultDescription
tls.statusNfalseEnable TLS encryption
tls.cert_nameY*-TLS certificate file name
tls.key_nameY*-TLS private key file name

* = Conditionally required when tls.status is true.

note

The TLS certificate and key files must be placed in the service root directory.

Performance

FieldRequiredDefaultDescription
reuseNtrueEnable multi-worker mode
workersN4Number of worker processes when reuse enabled (capped at the number of available CPU cores)

Examples

Basic

Creating a simple NATS consumer using the default stream and consumer...

- id: 1
name: basic_nats
type: nats
properties:
address: "nats.example.com"
port: 4222
subject: "logs.>"

Secure

Connecting with basic authentication and TLS encryption...

- id: 2
name: secure_nats
type: nats
properties:
address: "nats.example.com"
port: 4222
username: "consumer"
password: "secret"
auth_type: "basic"
stream: "LOGS"
consumer: "processor"
subject: "logs.secure.>"
tls:
status: true
cert_name: "nats.crt"
key_name: "nats.key"

High-Volume

Tuning batch size and timeout for high-throughput consumption...

- id: 3
name: performant_nats
type: nats
properties:
address: "nats.example.com"
port: 4222
stream: "LOGS"
consumer: "high-perf"
subject: "logs.>"
max_bytes: 4194304
timeout: 10
batch_size: 50000
reuse: true
workers: 4

Subject Filters

Subject-based filtering with wildcards...

- id: 4
name: filtered_nats
type: nats
properties:
address: "nats.example.com"
port: 4222
stream: "LOGS"
consumer: "filtered"
subject: "logs.*.error"
reuse: true
workers: 2
tip

NATS subjects support * and > as wildcards for single and multiple tokens respectively.

Pipelines

Applying custom processing to messages...

- id: 5
name: pipeline_nats
type: nats
pipelines:
- json_parser
- field_extractor
properties:
address: "nats.example.com"
port: 4222
stream: "LOGS"
consumer: "processed"
subject: "logs.raw.>"
note

Pipelines are processed sequentially, and can modify or drop messages before ingestion.