Skip to content

CSV parser plugin concurrency panic when using metadata #17506

@carsonsaldanha

Description

@carsonsaldanha

Relevant telegraf.conf

[agent]
  interval = "30s"
  flush_interval = "30s"
  round_interval = true
  metric_batch_size = 20000
  metric_buffer_limit = 500000
  debug = true
  logfile = "debug.log"
  omit_hostname = true

[[inputs.http_listener_v2]]
  service_address = "tcp://:8080"
  data_format = "csv"
  csv_metadata_rows = 2
  csv_metadata_separators = [":"]
  csv_metadata_trim_set = "# "
  csv_header_row_count = 1
  csv_reset_mode = "always"

[[outputs.file]]
  files = ["output.log"]

Logs from Telegraf

fatal error: concurrent map writes

goroutine 26 [running]:
internal/runtime/maps.fatal({0x1094b1486?, 0x6?})
        runtime/panic.go:1046 +0x20
github.com/influxdata/telegraf/plugins/parsers/csv.parseCSV(0x14000982c40, {0x10d0638e0, 0x14000cde300?})
        github.com/influxdata/telegraf/plugins/parsers/csv/parser.go:286 +0x66c
github.com/influxdata/telegraf/plugins/parsers/csv.(*Parser).Parse(0x14000982c40, {0x1400042a600?, 0x10d09cff0?, 0x1400149e340?})
        github.com/influxdata/telegraf/plugins/parsers/csv/parser.go:232 +0x19c
github.com/influxdata/telegraf/models.(*RunningParser).Parse(0x140014a62d0, {0x1400042a600, 0x1ad, 0x200})
        github.com/influxdata/telegraf/models/running_parsers.go:78 +0x5c
github.com/influxdata/telegraf/plugins/inputs/http_listener_v2.(*HTTPListenerV2).serveWrite(0x14000982a80, {0x10d0ebfb0, 0x14000c5c000}, 0x14001d38140)
        github.com/influxdata/telegraf/plugins/inputs/http_listener_v2/http_listener_v2.go:274 +0x1f0
github.com/influxdata/telegraf/plugins/inputs/http_listener_v2.(*HTTPListenerV2).authenticateIfSet(0x38155fa93?, 0x11282e0a0?, {0x10d0ebfb0?, 0x14000c5c000?}, 0x1400010eab8?)
        github.com/influxdata/telegraf/plugins/inputs/http_listener_v2/http_listener_v2.go:411 +0xa4
github.com/influxdata/telegraf/plugins/inputs/http_listener_v2.(*HTTPListenerV2).ServeHTTP(0x14000982a80, {0x10d0ebfb0, 0x14000c5c000}, 0x14001d38140)
        github.com/influxdata/telegraf/plugins/inputs/http_listener_v2/http_listener_v2.go:216 +0x1f0
net/http.serverHandler.ServeHTTP({0x10d09cff0?}, {0x10d0ebfb0?, 0x14000c5c000?}, 0x1?)
        net/http/server.go:3340 +0xb0
net/http.(*conn).serve(0x1400020ba70, {0x10d114db8, 0x14001f82120})
        net/http/server.go:2109 +0x528
created by net/http.(*Server).Serve in goroutine 8
        net/http/server.go:3493 +0x384

goroutine 1 [sync.WaitGroup.Wait]:
sync.runtime_SemacquireWaitGroup(0x1027898c0?, 0x0?)
        runtime/sema.go:114 +0x38
sync.(*WaitGroup).Wait(0x14000cec030)
        sync/waitgroup.go:206 +0xa8
github.com/influxdata/telegraf/agent.(*Agent).Run(0x14001d14018, {0x10d114df0, 0x140014a6410})
        github.com/influxdata/telegraf/agent/agent.go:208 +0x7d0
main.(*Telegraf).runAgent(0x140013f46e0, {0x10d114df0, 0x140014a6410}, 0x0?)
        github.com/influxdata/telegraf/cmd/telegraf/telegraf.go:565 +0x1184
main.(*Telegraf).reloadLoop(0x140013f46e0)
        github.com/influxdata/telegraf/cmd/telegraf/telegraf.go:207 +0x1b8
main.(*Telegraf).Run(0x140013f46e0)
        github.com/influxdata/telegraf/cmd/telegraf/telegraf_posix.go:20 +0xb0
main.runApp.func1(0x1400149e4c0)
        github.com/influxdata/telegraf/cmd/telegraf/main.go:257 +0x9a0
github.com/urfave/cli/v2.(*Command).Run(0x140014ac2c0, 0x1400149e4c0, {0x1400019d410, 0x3, 0x3})
        github.com/urfave/cli/v2@v2.27.7/command.go:276 +0x580
github.com/urfave/cli/v2.(*App).RunContext(0x1400104f200, {0x10d114b10, 0x112da3240}, {0x1400019d410, 0x3, 0x3})
        github.com/urfave/cli/v2@v2.27.7/app.go:333 +0x4c8
github.com/urfave/cli/v2.(*App).Run(...)
        github.com/urfave/cli/v2@v2.27.7/app.go:307
main.runApp({0x1400019d410, 0x3, 0x3}, {0x10d063158, 0x1400019e080}, {0x10d09c858, 0x14000c589e8}, {0x10d09c880, 0x140014a4000}, {0x10d1149c0, ...})
        github.com/influxdata/telegraf/cmd/telegraf/main.go:407 +0xd00
main.main()
        github.com/influxdata/telegraf/cmd/telegraf/main.go:421 +0xec

goroutine 6 [select]:
go.opencensus.io/stats/view.(*worker).start(0x14000c96a00)
        go.opencensus.io@v0.24.0/stats/view/worker.go:292 +0x84
created by go.opencensus.io/stats/view.init.0 in goroutine 1
        go.opencensus.io@v0.24.0/stats/view/worker.go:34 +0x94

goroutine 23 [syscall]:
os/signal.signal_recv()
        runtime/sigqueue.go:149 +0x2c
os/signal.loop()
        os/signal/signal_unix.go:23 +0x1c
created by os/signal.Notify.func1.1 in goroutine 1
        os/signal/signal.go:152 +0x28

goroutine 82 [select]:
main.(*Telegraf).reloadLoop.func1()
        github.com/influxdata/telegraf/cmd/telegraf/telegraf.go:185 +0x84
created by main.(*Telegraf).reloadLoop in goroutine 1
        github.com/influxdata/telegraf/cmd/telegraf/telegraf.go:184 +0x1a4

goroutine 8 [IO wait]:
internal/poll.runtime_pollWait(0x13a2e9e00, 0x72)
        runtime/netpoll.go:351 +0xa0
internal/poll.(*pollDesc).wait(0x14000c5a000?, 0x1028383ec?, 0x0)
        internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
        internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Accept(0x14000c5a000)
        internal/poll/fd_unix.go:613 +0x21c
net.(*netFD).accept(0x14000c5a000)
        net/fd_unix.go:161 +0x28
net.(*TCPListener).accept(0x14000cea000)
        net/tcpsock_posix.go:159 +0x24
net.(*TCPListener).Accept(0x14000cea000)
        net/tcpsock.go:380 +0x2c
net/http.(*Server).Serve(0x1400034c400, {0x10d0ec2b0, 0x14000cea000})
        net/http/server.go:3463 +0x24c
github.com/influxdata/telegraf/plugins/inputs/http_listener_v2.(*HTTPListenerV2).Start.func1()
        github.com/influxdata/telegraf/plugins/inputs/http_listener_v2/http_listener_v2.go:180 +0x54
created by github.com/influxdata/telegraf/plugins/inputs/http_listener_v2.(*HTTPListenerV2).Start in goroutine 1
        github.com/influxdata/telegraf/plugins/inputs/http_listener_v2/http_listener_v2.go:178 +0x378

goroutine 9 [chan receive]:
github.com/influxdata/telegraf/agent.(*Agent).runOutputs(0x14001d14018, 0x140006980c0)
        github.com/influxdata/telegraf/agent/agent.go:864 +0x1c0
github.com/influxdata/telegraf/agent.(*Agent).Run.func1()
        github.com/influxdata/telegraf/agent/agent.go:177 +0x4c
created by github.com/influxdata/telegraf/agent.(*Agent).Run in goroutine 1
        github.com/influxdata/telegraf/agent/agent.go:175 +0x5d4

goroutine 10 [sync.WaitGroup.Wait]:
sync.runtime_SemacquireWaitGroup(0x1027898c0?, 0x30?)
        runtime/sema.go:114 +0x38
sync.(*WaitGroup).Wait(0x14001e08000)
        sync/waitgroup.go:206 +0xa8
github.com/influxdata/telegraf/agent.(*Agent).runInputs(0x14001d14018, {0x10d114df0, 0x140014a6410}, {0x0?, 0x0?, 0x11282e0a0?}, 0x140006980e0)
        github.com/influxdata/telegraf/agent/agent.go:445 +0x468
github.com/influxdata/telegraf/agent.(*Agent).Run.func5()
        github.com/influxdata/telegraf/agent/agent.go:205 +0x54
created by github.com/influxdata/telegraf/agent.(*Agent).Run in goroutine 1
        github.com/influxdata/telegraf/agent/agent.go:203 +0x7c8

goroutine 98 [select]:
github.com/influxdata/telegraf/agent.(*Agent).flushLoop(0x14001d14018, {0x10d114df0, 0x14001e8a000}, 0x140014aa8f0, {0x10d09cd48, 0x14001f82150})
        github.com/influxdata/telegraf/agent/agent.go:909 +0x12c
github.com/influxdata/telegraf/agent.(*Agent).runOutputs.func1(0x140014aa8f0)
        github.com/influxdata/telegraf/agent/agent.go:860 +0xe4
created by github.com/influxdata/telegraf/agent.(*Agent).runOutputs in goroutine 9
        github.com/influxdata/telegraf/agent/agent.go:854 +0x90

goroutine 114 [select]:
github.com/influxdata/telegraf/agent.(*AlignedTicker).run(0x14001e0a000, {0x10d114df0, 0x14001e0e000}, 0x14001e0e050)
        github.com/influxdata/telegraf/agent/tick.go:87 +0xd0
github.com/influxdata/telegraf/agent.(*AlignedTicker).start.func1()
        github.com/influxdata/telegraf/agent/tick.go:65 +0x50
created by github.com/influxdata/telegraf/agent.(*AlignedTicker).start in goroutine 10
        github.com/influxdata/telegraf/agent/tick.go:63 +0x144

goroutine 115 [select]:
github.com/influxdata/telegraf/agent.(*Agent).gatherLoop(0x14001d14018, {0x10d114df0, 0x140014a6410}, {0x10d160b00, 0x1400069e000}, 0x140014a8540, {0x10d09cd70, 0x14001e0a000}, 0x6fc23ac00)
        github.com/influxdata/telegraf/agent/agent.go:573 +0x98
github.com/influxdata/telegraf/agent.(*Agent).runInputs.func1(0x0?)
        github.com/influxdata/telegraf/agent/agent.go:441 +0x60
created by github.com/influxdata/telegraf/agent.(*Agent).runInputs in goroutine 10
        github.com/influxdata/telegraf/agent/agent.go:439 +0xc0

goroutine 38 [select]:
github.com/influxdata/telegraf/agent.(*RollingTicker).run(0x14001f82150, {0x10d114df0, 0x14001f8c000}, 0x14001f8c050)
        github.com/influxdata/telegraf/agent/tick.go:258 +0x94
github.com/influxdata/telegraf/agent.(*RollingTicker).start.func1()
        github.com/influxdata/telegraf/agent/tick.go:248 +0x50
created by github.com/influxdata/telegraf/agent.(*RollingTicker).start in goroutine 98
        github.com/influxdata/telegraf/agent/tick.go:246 +0x14c

goroutine 15 [runnable]:
internal/poll.runtime_pollWait(0x13a2e9c00, 0x72)
        runtime/netpoll.go:351 +0xa0
internal/poll.(*pollDesc).wait(0x14000c5a080?, 0x14000cea061?, 0x0)
        internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
        internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x14000c5a080, {0x14000cea061, 0x1, 0x1})
        internal/poll/fd_unix.go:165 +0x1e0
net.(*netFD).Read(0x14000c5a080, {0x14000cea061?, 0x14000ff0240?, 0x612ac68a930b?})
        net/fd_posix.go:68 +0x28
net.(*conn).Read(0x1400019e188, {0x14000cea061?, 0x612ac68a930b?, 0x0?})
        net/net.go:196 +0x34
net/http.(*connReader).backgroundRead(0x14000cea040)
        net/http/server.go:702 +0x38
created by net/http.(*connReader).startBackgroundRead in goroutine 25
        net/http/server.go:698 +0xb8

goroutine 25 [sync.Cond.Wait]:
sync.runtime_notifyListWait(0x14000cea110, 0x0)
        runtime/sema.go:606 +0x140
sync.(*Cond).Wait(0x14000cea100)
        sync/cond.go:71 +0xa4
net/http.(*connReader).abortPendingRead(0x14000cea040)
        net/http/server.go:750 +0x9c
net/http.(*response).finishRequest(0x14000f532c0)
        net/http/server.go:1712 +0x80
net/http.(*conn).serve(0x1400020abd0, {0x10d114db8, 0x14001f82120})
        net/http/server.go:2116 +0x550
created by net/http.(*Server).Serve in goroutine 8
        net/http/server.go:3493 +0x384

goroutine 27 [IO wait]:
internal/poll.runtime_pollWait(0x13a2e9a00, 0x72)
        runtime/netpoll.go:351 +0xa0
internal/poll.(*pollDesc).wait(0x14000c5a100?, 0x1400149e2a1?, 0x0)
        internal/poll/fd_poll_runtime.go:84 +0x28
internal/poll.(*pollDesc).waitRead(...)
        internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0x14000c5a100, {0x1400149e2a1, 0x1, 0x1})
        internal/poll/fd_unix.go:165 +0x1e0
net.(*netFD).Read(0x14000c5a100, {0x1400149e2a1?, 0x0?, 0x0?})
        net/fd_posix.go:68 +0x28
net.(*conn).Read(0x1400019e1c0, {0x1400149e2a1?, 0x0?, 0x0?})
        net/net.go:196 +0x34
net/http.(*connReader).backgroundRead(0x1400149e280)
        net/http/server.go:702 +0x38
created by net/http.(*connReader).startBackgroundRead in goroutine 26
        net/http/server.go:698 +0xb8

System info

Telegraf 1.35.4, curl 8.7.1

Steps to reproduce

Save the below data as example.csv:

# Meta: foo
# File: bar
measurement,cpu,time_user,time_system,time_idle
cpu,cpu0,42,42,42
  1. Start Telegraf:
    telegraf --config telegraf.conf
  2. In a separate terminal, run the following command to execute curl 100 times with max 4 jobs in parallel:
    seq 1 100 | xargs -n1 -P4 curl -i -X POST "http://localhost:8080/telegraf" --data-binary @example.csv

Expected behavior

Telegraf processes all data without errors or crashing

Actual behavior

Telegraf crashes entirely with "fatal error: concurrent map writes"

Additional info

When parallel processing CSV data without metadata, there is no problem! Telegraf does not crash and is able to process all data.

Save the below config as telegraf.conf:

[agent]
  interval = "30s"
  flush_interval = "30s"
  round_interval = true
  metric_batch_size = 20000
  metric_buffer_limit = 500000
  debug = true
  logfile = "debug.log"
  omit_hostname = true

[[inputs.http_listener_v2]]
  service_address = "tcp://:8080"
  data_format = "csv"
  csv_header_row_count = 1
  csv_reset_mode = "always"

[[outputs.file]]
  files = ["output.log"]

Save the below data as example.csv:

measurement,cpu,time_user,time_system,time_idle
cpu,cpu0,42,42,42

To run and test, follow the same steps as before.

Metadata

Metadata

Assignees

Labels

bugunexpected problem or unintended behavior

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions