Skip to main content

Multi-Tier Pipelines

Multi-tier pipelines solve a common challenge in telemetry processing: sending the same data to multiple destinations that require different normalization levels, without duplicating events at each tier.

The Data Duplication Problem

Consider a firewall log that needs to reach three destinations:

  1. Raw archive - Original Syslog format for compliance
  2. SIEM analysis - Common Security Log (CSL) format for correlation
  3. Microsoft Sentinel - ASIM NetworkSession format for advanced analytics

A naive approach routes the event three times, creating three copies flowing through the pipeline. With high-volume sources, this multiplies ingestion costs and processing overhead.

Staged Routes

DataStream solves this with staged routing. Instead of immediately sending events to destinations, the reroute processor can stage them for later commit. Each subsequent stage to the same destination overwrites the previous version, ensuring only the final, most-normalized form is delivered.

processors:
# Stage 1: Route raw format
- reroute:
destination: "archive"
staging: true

# Stage 2: Normalize to CSL, overwrite archive stage
- normalize:
target_format: csl
- reroute:
destination: "siem"
staging: true

# Stage 3: Normalize to ASIM, overwrite SIEM stage
- normalize:
target_format: asim
- reroute:
destination: "sentinel"
staging: true

# Commit all staged routes
- commit:

The result: three destinations receive appropriately normalized data from a single event traversal.

How Staging Works

When staging: true is set on a reroute processor:

  1. The event is stored in a staging area keyed by destination name
  2. Subsequent staged routes to the same destination overwrite the previous staged version
  3. The commit processor moves all staged events to their final destinations
  4. Without commit, staged events are discarded at pipeline end

This overwrite behavior is the key innovation—progressive normalization naturally produces the best version for each tier.

Configuration Patterns

Progressive Normalization

Transform data through increasing normalization tiers:

processors:
# Parse raw syslog
- syslog:
field: "message"
target: "parsed"
- reroute:
destination: "raw_lake"
table: "Syslog"
staging: true

# Normalize to CSL
- normalize:
target_format: csl
- reroute:
destination: "sentinel"
table: "CommonSecurityLog"
staging: true

# Normalize to ASIM
- normalize:
target_format: asim
- reroute:
destination: "sentinel"
table: "ASimNetworkSession"
staging: true

# Final commit
- commit:

Conditional Tier Selection

Use check_schema validation to determine the highest achievable normalization tier:

processors:
# Attempt ASIM normalization
- normalize:
target_format: asim
- check_schema:
schema: "ASimNetworkSessionLogs"
target_field: "asim_check"
check_mode: "missing"

# Stage ASIM if valid
- reroute:
if: "asim_check.is_valid == true"
destination: "sentinel"
table: "ASimNetworkSession"
staging: true

# Fall back to CSL if ASIM validation fails
- normalize:
if: "asim_check.is_valid == false"
target_format: csl
- reroute:
if: "asim_check.is_valid == false"
destination: "sentinel"
table: "CommonSecurityLog"
staging: true

- commit:

Dynamic Staging Control

Use template expressions to conditionally enable staging:

processors:
- reroute:
destination: "sentinel"
table: "{{ .target_table }}"
staging: "{{ .requires_further_processing }}"

Selective Commit

Commit specific destinations rather than all staged routes:

processors:
# Multiple staged routes...

# Commit only sentinel destination
- commit:
destination: "sentinel"

# Continue processing for other destinations
- enrich:
# Additional enrichment...

# Commit remaining destinations
- commit:

Destination Metadata

The reroute processor supports destination-specific metadata fields that targets use for routing and organization:

FieldDescriptionCommon Targets
tableDatabase table or collection nameSentinel, Data Explorer, databases
indexSearch index nameElasticsearch, Splunk
schemaSchema identifierASIM tables, OCSF categories
bucketStorage bucket nameS3, Blob Storage, GCS
containerStorage container nameAzure Blob Storage
streamData stream identifierKinesis, Event Hubs
topicMessage topic nameKafka, Pub/Sub
log_typeLog type classifierChronicle, various SIEMs
namespaceNamespace identifierKubernetes logs, multi-tenant systems

Example with multiple metadata fields:

processors:
- reroute:
destination: "sentinel"
table: "ASimNetworkSession"
schema: "NetworkSession"
staging: true

Clone Mode

When you need to send both the current state and a later state to different destinations, use the clone option to preserve the original routing while continuing processing:

processors:
# Send raw to archive immediately (no staging)
- reroute:
destination: "archive"
clone: true

# Continue processing - original routing preserved
- normalize:
target_format: asim
- reroute:
destination: "sentinel"

With clone: true, the reroute sends a copy while preserving the original destination metadata for subsequent processors.

Integration with Schema Drift Detection

Multi-tier pipelines pair naturally with schema drift detection to ensure each tier receives valid, properly normalized data:

processors:
# Stage raw
- reroute:
destination: "raw"
staging: true

# Normalize and validate CSL
- normalize:
target_format: csl
- check_schema:
schema: "CommonSecurityLog"
target_field: "csl_check"
check_mode: "missing"
on_missing:
- slack:
title: "CSL Schema Drift"
message: "Missing: {{ .csl_check.missing_required_fields }}"
- reroute:
if: "csl_check.is_valid == true"
destination: "siem"
staging: true

# Normalize and validate ASIM
- normalize:
target_format: asim
- check_schema:
schema: "ASimNetworkSessionLogs"
target_field: "asim_check"
check_mode: "missing"
on_missing:
- pagerduty:
summary: "ASIM schema drift detected"
severity: "warning"
- reroute:
if: "asim_check.is_valid == true"
destination: "sentinel"
staging: true

- commit:

This pattern ensures only valid data reaches each destination while alerting on schema drift at any tier.