-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Description
Is your feature request related to a problem? Please describe.
I would like to see support for OAUTHBEARER authentication. This is a feature that has been present in librdkafka for a while now, but requires librdkafka to be built and linked against libcurl.
My project is NASA's General Coordinates Network (GCN), a public collaboration platform run by NASA for the astronomy research community to share alerts and rapid communications about high-energy, multimessenger, and transient phenomena. GCN distributes alerts between space- and ground-based observatories, physics experiments, and thousands of astronomers around the world. See https://gcn.nasa.gov.
Describe the solution you'd like
Enable OAUTHBEARER support in the Pathway binaries. This should only require adding the curl or curl-static feature flag to the rdkafka dependency in the Cargo.toml file. I tried this myself in #177, although it is not working yet. I could use some expert help from the team on this as I am not yet familiar with Pathway's build process.
Describe alternatives you've considered
I wrote a little Rust program to show to myself that OAUTHBEARER authentication does indeed work with the rdkafka Rust bindings: https://github.com/nasa-gcn/gcn-kafka-rust/blob/main/examples/consumer.rs
Additional context
Here is a minimal reproducer. It uses a small utility library of our own (https://github.com/nasa-gcn/gcn-kafka-python; pip install gcn-kafka) for the sole purpose of producing the librdkafka config dictionary.
import pathway as pw
from gcn_kafka.core import get_config
from pprint import pprint
rdkafka_config = get_config(
"consumer",
{
"client_id": "fill me in",
"client_secret": "fill me in",
},
)
pprint(rdkafka_config)
source = pw.io.kafka.read(rdkafka_config, "gcn.heartbeat")
pw.debug.compute_and_print(source)
pw.run()When I run this, I get the following error message:
$ python pathway_example.py
{'bootstrap.servers': 'kafka.gcn.nasa.gov',
'group.id': '5444ff4c-d9b4-49d3-a75e-c3dc3b7ebf4c',
'https.ca.location': '/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/certifi/cacert.pem',
'sasl.mechanisms': 'OAUTHBEARER',
'sasl.oauthbearer.client.id': '(redacted)',
'sasl.oauthbearer.client.secret': '(redacted)',
'sasl.oauthbearer.method': 'oidc',
'sasl.oauthbearer.token.endpoint.url': 'https://auth.gcn.nasa.gov/oauth2/token',
'security.protocol': 'sasl_ssl',
'ssl.ca.location': '/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/certifi/cacert.pem'}
[2026-01-26T11:01:52]:INFO:Preparing Pathway computation
thread 'pathway:work-0' (6135230) panicked at src/engine/report_error.rs:83:50:
ValueError: Creating Kafka consumer failed: Client config error: No such configuration property: "https.ca.location" https.ca.location /Users/lpsinger/Library/Python/3.12/lib/python/site-packages/certifi/cacert.pem
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/Users/lpsinger/src/shigawire/pathway_example.py", line 15, in <module>
pw.debug.compute_and_print(source)
File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/runtime_type_check.py", line 19, in with_type_validation
return beartype.beartype(f)(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<@beartype(pathway.debug.compute_and_print) at 0x119451bc0>", line 45, in compute_and_print
File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/trace.py", line 134, in _pathway_trace_marker
_reraise_with_user_frame(e)
File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/trace.py", line 100, in _reraise_with_user_frame
raise e
File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/debug/__init__.py", line 237, in compute_and_print
_compute_and_print_internal(
File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/debug/__init__.py", line 124, in _compute_and_print_internal
captured = _compute_tables(*tables, _stacklevel=_stacklevel + 1, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/runtime_type_check.py", line 19, in with_type_validation
return beartype.beartype(f)(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<@beartype(pathway.debug._compute_tables) at 0x10231dd00>", line 45, in _compute_tables
File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/debug/__init__.py", line 50, in _compute_tables
captured = GraphRunner(
^^^^^^^^^^^^
File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/graph_runner/__init__.py", line 103, in run_tables
return self._run(nodes, output_tables=tables, after_build=after_build)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/graph_runner/__init__.py", line 209, in _run
return api.run_with_new_graph(
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/graph_runner/__init__.py", line 183, in logic
storage_graph.build_scope(scope, state, self)
File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/graph_runner/storage_graph.py", line 330, in build_scope
handler.run(operator, self.output_storages[operator])
File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/graph_runner/operator_handler.py", line 88, in run
self._run(operator, output_storages)
File "/Users/lpsinger/Library/Python/3.12/lib/python/site-packages/pathway/internals/graph_runner/operator_handler.py", line 138, in _run
materialized_table = self.scope.connector_table(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: Creating Kafka consumer failed: Client config error: No such configuration property: "https.ca.location" https.ca.location /Users/lpsinger/Library/Python/3.12/lib/python/site-packages/certifi/cacert.pem
Occurred here:
Line: source = pw.io.kafka.read(rdkafka_config, "gcn.heartbeat")
File: /Users/lpsinger/src/shigawire/pathway_example.py:14