MQTT input
The MQTT input subscribes to one or more MQTT topics and forwards messages into a pipeline.
Both mqtt:// and mqtts:// brokers are supported.
Labels are applied with the following rules:
- Labels are processed in the order they appear in
labelsin TOML. - For conflicts, later rules override earlier rules for the same label key.
- This ordering behavior applies to all rule types:
static,field, andproperty. - Payload field labels (
field = "...") work with JSON and protobuf payloads. - MQTT v5 property labels:
property = "content_type"reads the built-in MQTT v5content_typeproperty.property = "<name>"reads the MQTT v5 user property named<name>.
Protobuf support
MQTT input supports protobuf payloads and can map protobuf fields to labels. Choose one of these modes.
- Schema mode (recommended)
schema_pathmust point to a protobuf descriptor set file generated byprotoc. Example command:protoc --include_imports --descriptor_set_out=schema.desc schema.proto.schema_nameis the protobuf message type to decode from the payload, e.g.my.package.Message.- Use
{ field = "...", label = "..." }to extract labels by field path. - The schema is emitted as attachment
$schemain JSON with fields:encoding(payload encoding,protobuf),topic(MQTT publish topic that produced the record),schema_name(same as configured), andschema(the configured schema file content, base64-encoded).
Use this mode when you want self-describing data and easier downstream inspection.
- No-schema mode
- Omit both
schema_pathandschema_name. - Use
{ field_id = <id>, field_type = "<type>", label = "..." }to extract labels from raw protobuf wire fields.
Use this mode for lightweight setups when you only need a few known fields.
Configuration
Shared input settings
[inputs.mqtt.main]
broker = "mqtts://broker.example.com:8883"
client_id = "reduct-bridge"
version = "v5"
qos = 1
username = "bridge"
password = "${MQTT_PASSWORD}"
entry_prefix = "/mqtt"
JSON topic
[[inputs.mqtt.main.topics]]
name = "factory/+/telemetry"
entry_name = "telemetry"
content_type = "application/json"
labels = [
{ field = "device_id", label = "device" },
{ field = "site", label = "site" },
{ static = { source = "mqtt" } },
{ property = "content_type", label = "mime" },
{ property = "tenant", label = "tenant" }
]
Protobuf topic with schema (recommended)
[[inputs.mqtt.main.topics]]
name = "factory/electrical/+/power"
entry_name = "power"
content_type = "application/protobuf"
schema_path = "./factory.desc"
schema_name = "factory.PowerReading"
labels = [
{ field = "device_id", label = "meter_id" },
{ field = "panel", label = "panel" }
]
Protobuf topic without schema
[[inputs.mqtt.main.topics]]
name = "factory/electrical/+/power/raw"
content_type = "application/protobuf"
labels = [
{ field_id = 1, field_type = "string", label = "meter_id" }
]
Static-only topic
[[inputs.mqtt.main.topics]]
name = "factory/+/events"
labels = [
{ static = { source = "mqtt-events" } }
]
entry_name, content_type, and labels are configured per topic. content_type
is used as the default record content type when an MQTT v5 publish does not provide
a content_type property; for MQTT v3 it is used directly. Use bare names such as
tenant, status, or id for MQTT v5 user properties.
schema_path and schema_name must be set together.
MQTT v3 example
[inputs.mqtt.legacy]
broker = "mqtt://broker.example.com:1883"
client_id = "reduct-bridge-legacy"
version = "v3"
qos = 0
username = "legacy_user"
password = "${LEGACY_MQTT_PASSWORD}"
entry_prefix = "/mqtt"
[[inputs.mqtt.legacy.topics]]
name = "legacy/+/data"
entry_name = "legacy"
content_type = "application/json"
labels = [
{ field = "line", label = "line" },
{ static = { source = "mqtt-v3" } }
]