Multi-Tier Pipelines
Multi-tier pipelines solve a common challenge in telemetry processing: sending the same data to multiple destinations that require different normalization levels, without duplicating events at each tier.
The Data Duplication Problem
Consider a firewall log that needs to reach three destinations:
- Raw archive - Original Syslog format for compliance
- SIEM analysis - Common Security Log (CSL) format for correlation
- Microsoft Sentinel - ASIM NetworkSession format for advanced analytics
A naive approach routes the event three times, creating three copies flowing through the pipeline. With high-volume sources, this multiplies ingestion costs and processing overhead.
Staged Routes
DataStream solves this with staged routing. Instead of immediately sending events to destinations, the reroute processor can stage them for later commit. Each subsequent stage to the same destination overwrites the previous version, ensuring only the final, most-normalized form is delivered.
processors:
# Stage 1: Route raw format
- reroute:
destination: "archive"
staging: true
# Stage 2: Normalize to CSL, overwrite archive stage
- normalize:
target_format: csl
- reroute:
destination: "siem"
staging: true
# Stage 3: Normalize to ASIM, overwrite SIEM stage
- normalize:
target_format: asim
- reroute:
destination: "sentinel"
staging: true
# Commit all staged routes
- commit:
The result: three destinations receive appropriately normalized data from a single event traversal.
How Staging Works
When staging: true is set on a reroute processor:
- The event is stored in a staging area keyed by destination name
- Subsequent staged routes to the same destination overwrite the previous staged version
- The commit processor moves all staged events to their final destinations
- Without commit, staged events are discarded at pipeline end
This overwrite behavior is the key innovation—progressive normalization naturally produces the best version for each tier.
Configuration Patterns
Progressive Normalization
Transform data through increasing normalization tiers:
processors:
# Parse raw syslog
- syslog:
field: "message"
target: "parsed"
- reroute:
destination: "raw_lake"
table: "Syslog"
staging: true
# Normalize to CSL
- normalize:
target_format: csl
- reroute:
destination: "sentinel"
table: "CommonSecurityLog"
staging: true
# Normalize to ASIM
- normalize:
target_format: asim
- reroute:
destination: "sentinel"
table: "ASimNetworkSession"
staging: true
# Final commit
- commit:
Conditional Tier Selection
Use check_schema validation to determine the highest achievable normalization tier:
processors:
# Attempt ASIM normalization
- normalize:
target_format: asim
- check_schema:
schema: "ASimNetworkSessionLogs"
target_field: "asim_check"
check_mode: "missing"
# Stage ASIM if valid
- reroute:
if: "asim_check.is_valid == true"
destination: "sentinel"
table: "ASimNetworkSession"
staging: true
# Fall back to CSL if ASIM validation fails
- normalize:
if: "asim_check.is_valid == false"
target_format: csl
- reroute:
if: "asim_check.is_valid == false"
destination: "sentinel"
table: "CommonSecurityLog"
staging: true
- commit:
Dynamic Staging Control
Use template expressions to conditionally enable staging:
processors:
- reroute:
destination: "sentinel"
table: "{{ .target_table }}"
staging: "{{ .requires_further_processing }}"
Selective Commit
Commit specific destinations rather than all staged routes:
processors:
# Multiple staged routes...
# Commit only sentinel destination
- commit:
destination: "sentinel"
# Continue processing for other destinations
- enrich:
# Additional enrichment...
# Commit remaining destinations
- commit:
Destination Metadata
The reroute processor supports destination-specific metadata fields that targets use for routing and organization:
| Field | Description | Common Targets |
|---|---|---|
table | Database table or collection name | Sentinel, Data Explorer, databases |
index | Search index name | Elasticsearch, Splunk |
schema | Schema identifier | ASIM tables, OCSF categories |
bucket | Storage bucket name | S3, Blob Storage, GCS |
container | Storage container name | Azure Blob Storage |
stream | Data stream identifier | Kinesis, Event Hubs |
topic | Message topic name | Kafka, Pub/Sub |
log_type | Log type classifier | Chronicle, various SIEMs |
namespace | Namespace identifier | Kubernetes logs, multi-tenant systems |
Example with multiple metadata fields:
processors:
- reroute:
destination: "sentinel"
table: "ASimNetworkSession"
schema: "NetworkSession"
staging: true
Clone Mode
When you need to send both the current state and a later state to different destinations, use the clone option to preserve the original routing while continuing processing:
processors:
# Send raw to archive immediately (no staging)
- reroute:
destination: "archive"
clone: true
# Continue processing - original routing preserved
- normalize:
target_format: asim
- reroute:
destination: "sentinel"
With clone: true, the reroute sends a copy while preserving the original destination metadata for subsequent processors.
Integration with Schema Drift Detection
Multi-tier pipelines pair naturally with schema drift detection to ensure each tier receives valid, properly normalized data:
processors:
# Stage raw
- reroute:
destination: "raw"
staging: true
# Normalize and validate CSL
- normalize:
target_format: csl
- check_schema:
schema: "CommonSecurityLog"
target_field: "csl_check"
check_mode: "missing"
on_missing:
- slack:
title: "CSL Schema Drift"
message: "Missing: {{ .csl_check.missing_required_fields }}"
- reroute:
if: "csl_check.is_valid == true"
destination: "siem"
staging: true
# Normalize and validate ASIM
- normalize:
target_format: asim
- check_schema:
schema: "ASimNetworkSessionLogs"
target_field: "asim_check"
check_mode: "missing"
on_missing:
- pagerduty:
summary: "ASIM schema drift detected"
severity: "warning"
- reroute:
if: "asim_check.is_valid == true"
destination: "sentinel"
staging: true
- commit:
This pattern ensures only valid data reaches each destination while alerting on schema drift at any tier.