Skip to main content

WarpStream

Kafka-Compatible Message Queue

Synopsis

The WarpStream target writes log messages to WarpStream's serverless Kafka-compatible platform with object storage backend. WarpStream provides Kafka API compatibility with zero-disk architecture, automatic scaling, and consumption-based pricing. Configuration follows Apache Kafka patterns with WarpStream-specific endpoints and authentication.

Schema

- name: <string>
description: <string>
type: warpstream
pipelines: <pipeline[]>
status: <boolean>
properties:
address: <string>
port: <integer>
client_id: <string>
topic: <string>
algorithm: <string>
username: <string>
password: <string>
compression: <string>
compression_level: <string>
acknowledgments: <string>
allow_auto_topic_creation: <boolean>
disable_idempotent_write: <boolean>
max_bytes: <integer>
max_events: <integer>
batch_mode: <string>
batch_separator: <string>
tls:
status: <boolean>
insecure_skip_verify: <boolean>
min_tls_version: <string>
max_tls_version: <string>
cert_name: <string>
key_name: <string>
passphrase: <string>
field_format: <string>
debug:
status: <boolean>
dont_send_logs: <boolean>

Configuration

The following fields are used to define the target:

FieldRequiredDefaultDescription
nameYTarget name
descriptionN-Optional description
typeYMust be warpstream
pipelinesN-Optional post-processor pipelines
statusNtrueEnable/disable the target

WarpStream Connection

FieldRequiredDefaultDescription
addressY-WarpStream broker address (provided in console). Supports a single address or a comma-separated list for multiple seed brokers. Each entry may include a port; if omitted, the port field value is appended.
portN9092WarpStream broker port
client_idN-Client identifier for connection tracking
topicY-Kafka topic name for message delivery

Authentication

FieldRequiredDefaultDescription
algorithmN"plain"Authentication mechanism: none, plain, scram-sha-256, scram-sha-512
usernameY-WarpStream API key
passwordY-WarpStream API secret

Producer Settings

FieldRequiredDefaultDescription
compressionN"none"Message compression: none, gzip, snappy, lz4, zstd
compression_levelN-Compression level (algorithm-specific)
acknowledgmentsN"leader"Acknowledgment level: none, leader, all
allow_auto_topic_creationNfalseAllow automatic topic creation if topic doesn't exist
disable_idempotent_writeNfalseDisable idempotent producer

Batch Configuration

FieldRequiredDefaultDescription
max_bytesN1048576Maximum batch size in bytes (0 = unlimited, max: 104857600)
max_eventsN1000Maximum number of events per batch
batch_modeNindividualOutput format: individual (one message per event), json_array (array of JSON objects), jsonl (JSON Lines with separator)
batch_separatorN,Separator between messages when using jsonl batch mode
field_formatN-Data normalization format. See applicable Normalization section

TLS Configuration

FieldRequiredDefaultDescription
tls.statusNfalseEnable TLS/SSL encryption
tls.insecure_skip_verifyNfalseSkip certificate verification
tls.min_tls_versionN"tls1.2"Minimum TLS version: tls1.0, tls1.1, tls1.2, tls1.3
tls.max_tls_versionN"tls1.2"Maximum TLS version: tls1.0, tls1.1, tls1.2, tls1.3
tls.cert_nameN"cert.pem"Client certificate file name for mTLS
tls.key_nameN"key.pem"Private key file name for mTLS
tls.passphraseN-Passphrase for encrypted private key

Scheduling

See Scheduling and Pool Behavior for interval and cron fields shared by all targets.

Debug Options

FieldRequiredDefaultDescription
debug.statusNfalseEnable debug logging
debug.dont_send_logsNfalseProcess logs but don't send to target (testing)
note

Batches are sent when either max_bytes or max_events limit is reached, whichever comes first.

Details

Authentication

API Key Authentication:

  • Obtain API key and secret from WarpStream console
  • Use username field for API key
  • Use password field for API secret
  • Authentication required for all WarpStream connections

SASL/PLAIN:

  • WarpStream uses SASL/PLAIN authentication mechanism
  • Set algorithm: plain (default for WarpStream)
  • Credentials transmitted over TLS-encrypted connection
WarpStream Credentials

Generate API keys in WarpStream console. Each key has specific permissions for topics. Ensure API key has write permissions for configured topics.

Connection Endpoints

Broker Addresses:

  • WarpStream provides broker addresses in console
  • Format: <cluster-name>.warpstream.com
  • Default port: 9092 (Kafka protocol)
  • Use provided endpoint exactly as shown in console

Multi-Region Deployment:

  • WarpStream supports multi-region clusters
  • Connect to regional endpoints for optimal latency
  • Cross-region replication handled automatically

Topic Management

Topic Creation:

  • Configure allow_auto_topic_creation: true for automatic topic creation
  • Pre-create topics in WarpStream console for production use
  • Topic configuration managed through WarpStream console

Topic Permissions:

  • API keys grant topic-level permissions
  • Verify API key has write access to configured topics
  • Permission errors result in publish failures

Performance Optimization

Batch Configuration:

  • Larger batches improve throughput and reduce costs
  • Balance batch size against latency requirements
  • WarpStream optimizes object storage writes internally

Compression:

  • Enable compression to reduce bandwidth and storage costs
  • Recommended algorithms: zstd (best compression), snappy (fast)
  • Compression reduces WarpStream consumption-based charges

Connection Pooling:

  • Maintains persistent connection to WarpStream brokers
  • Automatic reconnection on connection loss
  • Configurable client ID for connection tracking
Cost Optimization

WarpStream charges based on data volume processed. Enable compression and tune batch sizes to optimize costs while meeting latency requirements.

Kafka API Compatibility

Supported Features:

  • Kafka Producer API
  • Idempotent writes
  • Batch compression
  • SASL authentication
  • TLS encryption

Differences from Apache Kafka:

  • No ZooKeeper dependency
  • Object storage backend instead of local disks
  • Serverless scaling without brokers
  • Different performance characteristics

Examples

Basic Configuration

Sending logs to WarpStream using API key authentication...

targets:
- name: warpstream-logs
type: warpstream
properties:
address: my-cluster.warpstream.com
topic: application-logs
username: "${WARPSTREAM_API_KEY}"
password: "${WARPSTREAM_API_SECRET}"

With Compression

Enabling Zstd compression for optimal cost and bandwidth efficiency...

targets:
- name: warpstream-compressed
type: warpstream
properties:
address: my-cluster.warpstream.com
topic: telemetry-events
username: "${WARPSTREAM_API_KEY}"
password: "${WARPSTREAM_API_SECRET}"
compression: zstd
compression_level: "3"

High-Volume Configuration

Optimizing for high-volume ingestion with larger batches and compression...

targets:
- name: warpstream-high-volume
type: warpstream
properties:
address: my-cluster.warpstream.com
topic: metrics-stream
username: "${WARPSTREAM_API_KEY}"
password: "${WARPSTREAM_API_SECRET}"
compression: snappy
max_events: 1000
max_bytes: 1048576
acknowledgments: all

With Auto Topic Creation

Allowing automatic topic creation for dynamic topic names...

targets:
- name: warpstream-auto-topic
type: warpstream
properties:
address: my-cluster.warpstream.com
topic: dynamic-logs
username: "${WARPSTREAM_API_KEY}"
password: "${WARPSTREAM_API_SECRET}"
allow_auto_topic_creation: true

With TLS Encryption

Enabling TLS encryption for secure data transmission...

targets:
- name: warpstream-secure
type: warpstream
properties:
address: my-cluster.warpstream.com
topic: security-logs
username: "${WARPSTREAM_API_KEY}"
password: "${WARPSTREAM_API_SECRET}"
tls:
status: true
min_tls_version: tls1.2
max_tls_version: tls1.3

With Normalization

Applying ECS normalization before sending to WarpStream...

targets:
- name: warpstream-normalized
type: warpstream
properties:
address: my-cluster.warpstream.com
topic: normalized-events
username: "${WARPSTREAM_API_KEY}"
password: "${WARPSTREAM_API_SECRET}"
field_format: ECS
compression: zstd

Production Configuration

Production-ready configuration with compression, batching, TLS, and acknowledgments...

targets:
- name: warpstream-production
type: warpstream
properties:
address: production-cluster.warpstream.com
port: 9092
client_id: datastream-director-01
topic: production-telemetry
username: "${WARPSTREAM_API_KEY}"
password: "${WARPSTREAM_API_SECRET}"
compression: zstd
compression_level: "3"
acknowledgments: all
max_events: 1000
max_bytes: 1048576
field_format: ASIM
tls:
status: true
min_tls_version: tls1.2