Skip to main content
Version: Next

MQTT input

The MQTT input subscribes to one or more MQTT topics and forwards messages into a pipeline. Both mqtt:// and mqtts:// brokers are supported.

Labels are applied with the following rules:

  1. Labels are processed in the order they appear in labels in TOML.
  2. For conflicts, later rules override earlier rules for the same label key.
  3. This ordering behavior applies to all rule types: static, field, and property.
  4. Payload field labels (field = "...") work with JSON and protobuf payloads.
  5. MQTT v5 property labels:
  • property = "content_type" reads the built-in MQTT v5 content_type property.
  • property = "<name>" reads the MQTT v5 user property named <name>.

Protobuf support

MQTT input supports protobuf payloads and can map protobuf fields to labels. Choose one of these modes.

  1. Schema mode (recommended)
  • schema_path must point to a protobuf descriptor set file generated by protoc. Example command: protoc --include_imports --descriptor_set_out=schema.desc schema.proto.
  • schema_name is the protobuf message type to decode from the payload, e.g. my.package.Message.
  • Use { field = "...", label = "..." } to extract labels by field path.
  • The schema is emitted as attachment $schema in JSON with fields: encoding (payload encoding, protobuf), topic (MQTT publish topic that produced the record), schema_name (same as configured), and schema (the configured schema file content, base64-encoded).

Use this mode when you want self-describing data and easier downstream inspection.

  1. No-schema mode
  • Omit both schema_path and schema_name.
  • Use { field_id = <id>, field_type = "<type>", label = "..." } to extract labels from raw protobuf wire fields.

Use this mode for lightweight setups when you only need a few known fields.

Configuration

Shared input settings

[inputs.mqtt.main]
broker = "mqtts://broker.example.com:8883"
client_id = "reduct-bridge"
version = "v5"
qos = 1
username = "bridge"
password = "${MQTT_PASSWORD}"
entry_prefix = "/mqtt"

JSON topic

[[inputs.mqtt.main.topics]]
name = "factory/+/telemetry"
entry_name = "telemetry"
content_type = "application/json"
labels = [
{ field = "device_id", label = "device" },
{ field = "site", label = "site" },
{ static = { source = "mqtt" } },
{ property = "content_type", label = "mime" },
{ property = "tenant", label = "tenant" }
]
[[inputs.mqtt.main.topics]]
name = "factory/electrical/+/power"
entry_name = "power"
content_type = "application/protobuf"
schema_path = "./factory.desc"
schema_name = "factory.PowerReading"
labels = [
{ field = "device_id", label = "meter_id" },
{ field = "panel", label = "panel" }
]

Protobuf topic without schema

[[inputs.mqtt.main.topics]]
name = "factory/electrical/+/power/raw"
content_type = "application/protobuf"
labels = [
{ field_id = 1, field_type = "string", label = "meter_id" }
]

Static-only topic

[[inputs.mqtt.main.topics]]
name = "factory/+/events"
labels = [
{ static = { source = "mqtt-events" } }
]

entry_name, content_type, and labels are configured per topic. content_type is used as the default record content type when an MQTT v5 publish does not provide a content_type property; for MQTT v3 it is used directly. Use bare names such as tenant, status, or id for MQTT v5 user properties. schema_path and schema_name must be set together.

MQTT v3 example

[inputs.mqtt.legacy]
broker = "mqtt://broker.example.com:1883"
client_id = "reduct-bridge-legacy"
version = "v3"
qos = 0

username = "legacy_user"
password = "${LEGACY_MQTT_PASSWORD}"

entry_prefix = "/mqtt"

[[inputs.mqtt.legacy.topics]]
name = "legacy/+/data"
entry_name = "legacy"
content_type = "application/json"
labels = [
{ field = "line", label = "line" },
{ static = { source = "mqtt-v3" } }
]