Skip to content

Enable OAUTHBEARER authentication support in rdkafka #180

@lpsinger

Description

@lpsinger

Is your feature request related to a problem? Please describe.
I would like to see support for OAUTHBEARER authentication. This is a feature that has been present in librdkafka for a while now, but requires librdkafka to be built and linked against libcurl.

My project is NASA's General Coordinates Network (GCN), a public collaboration platform run by NASA for the astronomy research community to share alerts and rapid communications about high-energy, multimessenger, and transient phenomena. GCN distributes alerts between space- and ground-based observatories, physics experiments, and thousands of astronomers around the world. See https://gcn.nasa.gov.

Describe the solution you'd like
Enable OAUTHBEARER support in the Pathway binaries. This should only require adding the curl or curl-static feature flag to the rdkafka dependency in the Cargo.toml file. I tried this myself in #177, although it is not working yet. I could use some expert help from the team on this as I am not yet familiar with Pathway's build process.

Describe alternatives you've considered
I wrote a little Rust program to show to myself that OAUTHBEARER authentication does indeed work with the rdkafka Rust bindings: https://github.com/nasa-gcn/gcn-kafka-rust/blob/main/examples/consumer.rs

Additional context
Here is a minimal reproducer. It uses a small utility library of our own (https://github.com/nasa-gcn/gcn-kafka-python; pip install gcn-kafka) for the sole purpose of producing the librdkafka config dictionary.

import pathway as pw
from gcn_kafka.core import get_config
from pprint import pprint

rdkafka_config = get_config(
    "consumer",
    {
        "client_id": "fill me in",
        "client_secret": "fill me in",
    },
)
pprint(rdkafka_config)

source = pw.io.kafka.read(rdkafka_config, "gcn.heartbeat")
pw.debug.compute_and_print(source)
pw.run()

When I run this, I get the following error message:

$ python pathway_example.py 
{'bootstrap.servers': 'kafka.gcn.nasa.gov',
 'group.id': '5444ff4c-d9b4-49d3-a75e-c3dc3b7ebf4c',
 'https.ca.location': '/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/certifi/cacert.pem',
 'sasl.mechanisms': 'OAUTHBEARER',
 'sasl.oauthbearer.client.id': '(redacted)',
 'sasl.oauthbearer.client.secret': '(redacted)',
 'sasl.oauthbearer.method': 'oidc',
 'sasl.oauthbearer.token.endpoint.url': 'https://auth.gcn.nasa.gov/oauth2/token',
 'security.protocol': 'sasl_ssl',
 'ssl.ca.location': '/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/certifi/cacert.pem'}
[2026-01-26T11:01:52]:INFO:Preparing Pathway computation

thread 'pathway:work-0' (6135230) panicked at src/engine/report_error.rs:83:50:
ValueError: Creating Kafka consumer failed: Client config error: No such configuration property: "https.ca.location" https.ca.location /Users/lpsinger/Library/Python/3.12/lib/python/site-packages/certifi/cacert.pem
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
  File "/Users/lpsinger/src/shigawire/pathway_example.py", line 15, in <module>
    pw.debug.compute_and_print(source)
  File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/runtime_type_check.py", line 19, in with_type_validation
    return beartype.beartype(f)(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<@beartype(pathway.debug.compute_and_print) at 0x119451bc0>", line 45, in compute_and_print
  File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/trace.py", line 134, in _pathway_trace_marker
    _reraise_with_user_frame(e)
  File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/trace.py", line 100, in _reraise_with_user_frame
    raise e
  File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/debug/__init__.py", line 237, in compute_and_print
    _compute_and_print_internal(
  File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/debug/__init__.py", line 124, in _compute_and_print_internal
    captured = _compute_tables(*tables, _stacklevel=_stacklevel + 1, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/runtime_type_check.py", line 19, in with_type_validation
    return beartype.beartype(f)(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<@beartype(pathway.debug._compute_tables) at 0x10231dd00>", line 45, in _compute_tables
  File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/debug/__init__.py", line 50, in _compute_tables
    captured = GraphRunner(
               ^^^^^^^^^^^^
  File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/graph_runner/__init__.py", line 103, in run_tables
    return self._run(nodes, output_tables=tables, after_build=after_build)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/graph_runner/__init__.py", line 209, in _run
    return api.run_with_new_graph(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/graph_runner/__init__.py", line 183, in logic
    storage_graph.build_scope(scope, state, self)
  File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/graph_runner/storage_graph.py", line 330, in build_scope
    handler.run(operator, self.output_storages[operator])
  File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/graph_runner/operator_handler.py", line 88, in run
    self._run(operator, output_storages)
  File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/graph_runner/operator_handler.py", line 138, in _run
    materialized_table = self.scope.connector_table(
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: Creating Kafka consumer failed: Client config error: No such configuration property: "https.ca.location" https.ca.location /Users/lpsinger/Library/Python/3.12/lib/python/site-packages/certifi/cacert.pem
Occurred here:
    Line: source = pw.io.kafka.read(rdkafka_config, "gcn.heartbeat")
    File: /Users/lpsinger/src/shigawire/pathway_example.py:14

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions