Google Cloud Pub/Sub
Synopsis
Creates a subscriber that connects to a Google Cloud Pub/Sub subscription and consumes messages using concurrent goroutine receivers. Supports Application Default Credentials and explicit service account credentials.
Schema
- id: <numeric>
name: <string>
description: <string>
type: pubsub
tags: <string[]>
pipelines: <pipeline[]>
status: <boolean>
properties:
project_id: <string>
subscription_id: <string>
credentials_json: <string>
workers: <numeric>
max_outstanding_messages: <numeric>
max_outstanding_bytes: <numeric>
Configuration
Device
| Field | Required | Default | Description |
|---|---|---|---|
id | Y | - | Unique numeric identifier |
name | Y | - | Device name |
description | N | - | Optional description |
type | Y | - | Must be pubsub |
tags | N | - | Optional tags |
pipelines | N | - | Optional pre-processor pipelines |
status | N | true | Enable/disable the device |
Connection
| Field | Required | Default | Description |
|---|---|---|---|
project_id | Y | Google Cloud project ID | |
subscription_id | Y | Pub/Sub subscription name | |
credentials_json | N | - | Service account credentials JSON. When omitted, Application Default Credentials are used |
Performance
| Field | Required | Default | Description |
|---|---|---|---|
workers | N | 0 | Number of goroutines for concurrent message processing. 0 defaults to GOMAXPROCS |
max_outstanding_messages | N | 1000 | Maximum number of unprocessed messages held in memory at once |
max_outstanding_bytes | N | 1000000000 | Maximum total byte size of unprocessed messages held in memory at once (1 GB) |
Details
The device connects to a single Pub/Sub subscription and opens a long-lived streaming pull connection. Message delivery uses the Google Cloud Pub/Sub SDK's Receive method, which dispatches the callback concurrently from multiple goroutines controlled by the workers setting. A mutex serializes all writes to the internal stream writers, so concurrent callbacks do not overwrite each other's data.
Messages are acknowledged after a successful write. If the write fails, the message is negatively acknowledged and redelivered. Empty payloads are acknowledged immediately without processing to prevent infinite redelivery loops.
The workers field controls the number of concurrent goroutine receivers within a single Receive call. This is the correct scaling mechanism for Pub/Sub throughput: multiple Receive calls on the same subscription compete for the same messages rather than parallelizing work, whereas increasing workers lets a single subscriber drain faster.
The max_outstanding_messages and max_outstanding_bytes limits cap how many messages are held in memory concurrently, bounding memory consumption under burst conditions. Setting these values too high can cause out-of-memory errors during traffic spikes.
When credentials_json is omitted, the SDK resolves credentials using Application Default Credentials, which includes Workload Identity Federation on GKE and other Google-managed environments. When running outside Google Cloud, a service account key JSON must be supplied explicitly.
The subscription must already exist in the specified project. The device validates both project_id and subscription_id at startup and will not start if either is missing or empty. The connection state is set to connected once the consumer is successfully created.
Examples
Basic
Creating a basic Pub/Sub subscriber using Application Default Credentials... | |
Service Account
Connecting with an explicit service account key... | |
High-Volume
Optimizing for high-throughput message consumption... | |
Setting max_outstanding_bytes above available memory can cause out-of-memory errors during traffic bursts.
Pipeline Processing
Applying custom processing to Pub/Sub messages... | |
Pipelines are processed sequentially and can modify or drop messages before ingestion.
Memory-Constrained
Limiting memory usage in resource-constrained deployments... | |