A Temporal integration for benthos/redpanda-connect/bento.
Easily execute Temporal workflows from:
amqp
,aws_kinesis
,aws_s3
,aws_sqs
,azure_blob_storage
,azure_cosmosdb
,azure_queue_storage
,azure_table_storage
,beanstalkd
,cassandra
,discord
,gcp_bigquery_select
,gcp_cloud_storage
,gcp_pubsub
,hdfs
,kafka
,mongodb
,mqtt
,nats
,nats_jetstream
,nsq
,pulsar
,redis_list
,redis_pubsub
,redis_scan
,redis_streams
,sftp
,sql
,webhooks
, and more!
Launch a webhook server for executing dynamic workflows:
- Import this plugin
- Redpanda Connect
// in main.go package main import ( _ "github.com/cludden/benthos-plugin-temporal/pkg/connect/all" "github.com/redpanda-data/benthos/v4/public/service" _ "github.com/redpanda-data/connect/public/bundle/free/v4" ) func main() { service.RunCLI(context.Background()) }
- Bento
// in main.go package main import ( _ "github.com/cludden/benthos-plugin-temporal/pkg/bento/all" _ "github.com/warpstreamlabs/bento/public/components/all" "github.com/warpstreamlabs/bento/public/service" ) func main() { service.RunCLI(context.Background()) }
- Use plugin in stream configuration
# in config.yml
input:
http_server:
address: 0.0.0.0:8080
path: /temporal
pipeline:
processors:
# extract task_queue, workflow_type, and workflow_id from http headers, query parameters, or json payload
- mapping: |
root = this.without("@task_queue", "@workflow_type", "@workflow_id")
root."@metadata" = @
meta task_queue = @."task_queue".or(this."@task_queue").or("example")
meta workflow_id = @."workflow_id".or(this."@workflow_id").or("example/%s".format(uuid_v4()))
meta workflow_type = @."workflow_type".or(this."@workflow_type").or("example")
output:
temporal_workflow:
address: localhost:7233
task_queue: '${! @.task_queue }'
workflow_id: '${! @.workflow_id }'
workflow_type: '${! @.workflow_type }'
- Start Temporal server
temporal server start-dev
- In a separate terminal, run the application
go run main.go -c config.yml
- In a separate terminal, trigger a workflow
curl http://localhost:8080/temporal \
-X POST \
-H "Content-Type: application/json" \
-d '{"@task_queue":"example","@workflow_type":"example","foo":"bar"}'
See the example directory for complete examples.
securely verifies an hmac_sha256 signature without leaking timing information
- secret (
<InterpolatedString>
) - hmac secret - signature (
<InterpolatedString>
) - signature to verify (excluding any prefix/suffix) - string_to_sign (
<Mapping>
) - string to sign
GitHub Webhook:
input:
http_server:
path: /post
pipeline:
processors:
- verify_hmac_sha256:
secret: '${! env("WEBHOOK_SECRET") }'
signature: '${! @."X-Hub-Signature-256".trim_prefix("sha256=") }'
string_to_sign: root = content()
output:
reject_errored:
sync_response: {}
Slack Request:
input:
http_server:
path: /post
pipeline:
processors:
- verify_hmac_sha256:
secret: '${! env("WEBHOOK_SECRET") }'
signature: '${! @."X-Slack-Signature".trim_prefix("v0=") }'
string_to_sign: root = "v0:%s:%s".format(@."X-Slack-Request-Timestamp", content())
output:
reject_errored:
sync_response: {}
executes a Temporal workflow for each message as input
- address
<string>
- temporal cluster address - codec_auth
[string]
- codec endpoint authorization header - codec_endpoint
[string]
- remote codec server endpoint - detach
[InterpolatedString]
- boolean indicating whether the output should wait for workflow completion before acknowleding a message - namespace
[string]
- temporal namespace name - search_attributes
[Mapping]
- bloblang mapping defining workflow search attributes - task_queue
<InterpolatedString>
- temporal worker task queue name - tls.ca_data
[string]
- pem-encoded ca data - tls.ca_file
[string]
- path to pem-encoded ca certificate - tls.cert_data
[string]
- pem-encoded client certificate data - tls.cert_file
[string]
- path to pem-encoded client certificate - tls.disable_host_verification
[bool]
- disables tls host verification - tls.key_data
[string]
- pem-encoded client private key - tls.key_file
[string]
- path to pem-encoded client private key - tls.server_name
[string]
- overrides target tls server name - workflow_id
<InterpolatedString>
- temporal workflow id - workflow_type
<InterpolatedString>
- temporal workflow type
output:
temporal_workflow:
address: localhost:7233
task_queue: example
workflow_id: test/${! uuid_v4() }
workflow_type: ${! @.workflow_type.or(this."@workflow_type").or("test") }
Licensed under the MIT License
Copyright (c) 2024 Chris Ludden