Skip to content

[inputs.mqtt_consumer] A panic may occur if the MQTT server loses network connectivity #17564

@lnxpgn

Description

@lnxpgn

Relevant telegraf.conf

telegraf.conf


[agent]
  interval = "30s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = "0s"
  omit_hostname = true


telegraf.d/input-mqtt-server-1.conf


[[inputs.mqtt_consumer]]
  servers = ["tcp://mqtt-server-1:9009"]
  topics = [
    "sensor/#",
  ]
  topic_tag = ""
  qos = 2
  keepalive = "30s"
  ping_timeout = "10s"
  persistent_session = true
  client_id = "t-mqtt-server-1-n10"
  username="xxx"
  password="xxx"
  data_format = "json"
  json_time_key = "time"


telegraf.d/input-mqtt-server-2.conf


[[inputs.mqtt_consumer]]
  servers = ["wss://mqtt-server-2:443/mqtt"]
  topics = [
    "sensor/#",
  ]
  topic_tag = ""
  qos = 2
  keepalive = "30s"
  ping_timeout = "10s"
  persistent_session = true
  client_id = "t-mqtt-server-2-n10"
  username="xxx"
  password="xxx"
  data_format = "json"
  json_time_key = "time"

Logs from Telegraf

Sep  2 17:52:21 tvikgge telegraf: 2025-09-02T09:52:21Z E! [inputs.mqtt_consumer] Error in plugin: connection lost: EOF
Sep  2 17:52:21 tvikgge telegraf: panic: send on closed channel
Sep  2 17:52:21 tvikgge telegraf: goroutine 52 [running]:
Sep  2 17:52:21 tvikgge telegraf: github.com/eclipse/paho%2emqtt%2egolang.(*router).matchAndDispatch.func2.ackFunc.4()
Sep  2 17:52:21 tvikgge telegraf: /home/dev/go/pkg/mod/github.com/eclipse/[email protected]/net.go:464 +0x28e
Sep  2 17:52:21 tvikgge telegraf: sync.(*Once).doSlow(0x0?, 0x56280?)
Sep  2 17:52:21 tvikgge telegraf: /home/dev/go-pkg/src/sync/once.go:78 +0xab
Sep  2 17:52:21 tvikgge telegraf: sync.(*Once).Do(...)
Sep  2 17:52:21 tvikgge telegraf: /home/dev/go-pkg/src/sync/once.go:69
Sep  2 17:52:21 tvikgge telegraf: github.com/eclipse/paho%2emqtt%2egolang.(*message).Ack(0x1047e80?)
Sep  2 17:52:21 tvikgge telegraf: /home/dev/go/pkg/mod/github.com/eclipse/[email protected]/message.go:77 +0x25
Sep  2 17:52:21 tvikgge telegraf: github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer.(*MQTTConsumer).onDelivered(0xc0000ca508, {0x13cf190, 0xc000143c80})
Sep  2 17:52:21 tvikgge telegraf: /home/dev/telegraf-1.35.4/plugins/inputs/mqtt_consumer/mqtt_consumer.go:240 +0x12d
Sep  2 17:52:21 tvikgge telegraf: github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer.(*MQTTConsumer).Start.func1()
Sep  2 17:52:21 tvikgge telegraf: /home/dev/telegraf-1.35.4/plugins/inputs/mqtt_consumer/mqtt_consumer.go:145 +0x79
Sep  2 17:52:21 tvikgge telegraf: created by github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer.(*MQTTConsumer).Start in goroutine 1
Sep  2 17:52:21 tvikgge telegraf: /home/dev/telegraf-1.35.4/plugins/inputs/mqtt_consumer/mqtt_consumer.go:138 +0x147
Sep  2 17:52:21 tvikgge systemd: telegraf.service: main process exited, code=exited, status=2/INVALIDARGUMENT
Sep  2 17:52:21 tvikgge systemd: Unit telegraf.service entered failed state.
Sep  2 17:52:21 tvikgge systemd: telegraf.service failed.
Sep  2 17:52:21 tvikgge systemd: telegraf.service holdoff time over, scheduling restart.
Sep  2 17:52:21 tvikgge systemd: Stopped Telegraf.
Sep  2 17:52:21 tvikgge systemd: Starting Telegraf...

System info

Telegraf 1.35.4,CentOS Linux release 7.9.2009

Docker

No response

Steps to reproduce

  1. Panics may occur when an MQTT server loses network connectivity, but it does not happen every time.

Expected behavior

After the network connection with the MQTT server is closed, no panic should occur.

Actual behavior

After the network connection with the MQTT server is closed, a panic occurs.

Additional info

I modified the code in telegraf-1.35.4/plugins/inputs/mqtt_consumer/mqtt_consumer.go like this, but I'm not sure if it's correct. I'm not very familiar with the Telegraf project's code.

--- telegraf-1.35.4/plugins/inputs/mqtt_consumer/mqtt_consumer.go   2025-08-19 02:43:39
+++ mqtt_consumer.go    2025-09-03 10:57:38
@@ -236,8 +236,15 @@
        return
    }

-   if track.Delivered() && m.PersistentSession {
-       msg.Ack()
+   if track.Delivered() && m.PersistentSession && m.client.IsConnected() {
+       func() {
+           defer func() {
+               if r := recover(); r != nil {
+                   m.Log.Warnf("ack message failed, connection lost: %v", r)
+               }
+           }()
+           msg.Ack()
+       }()
    }

    delete(m.messages, track.ID())

Metadata

Metadata

Assignees

Labels

bugunexpected problem or unintended behaviorupstreambug or issues that rely on dependency fixes

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions