Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,12 @@ new_features:
functions by name via ``envoy_dynamic_module_callback_register_function`` and other modules
can resolve them via ``envoy_dynamic_module_callback_get_function``, enabling zero-copy
cross-module interactions analogous to ``dlsym``.
- area: dynamic_modules
change: |
Network filter read and write buffers now persist after ``on_read``/``on_write`` callbacks, allowing
modules to access buffered data from ``on_scheduled`` and other callbacks. Added
``envoy_dynamic_module_callback_network_filter_get_cluster_host_count`` to query cluster host counts
by name, enabling scale-to-zero and custom load balancing decisions in network filters.
- area: dynamic_modules
change: |
Added metrics definition and update support for bootstrap dynamic modules.
Expand Down
47 changes: 35 additions & 12 deletions source/extensions/dynamic_modules/abi/abi.h
Original file line number Diff line number Diff line change
Expand Up @@ -2877,8 +2877,8 @@ void envoy_dynamic_module_callback_network_get_socket_options(
* envoy_dynamic_module_callback_network_filter_get_read_buffer_chunks_size is called by the module
* to get the number of chunks in the current read data buffer. Combined with
* envoy_dynamic_module_callback_network_filter_get_read_buffer_chunks, this can be used to iterate
* over all chunks in the read buffer. This is only valid during the
* envoy_dynamic_module_on_network_filter_read callback.
* over all chunks in the read buffer. This is valid after the first
* envoy_dynamic_module_on_network_filter_read callback for the lifetime of the connection.
*
* @param filter_envoy_ptr is the pointer to the DynamicModuleNetworkFilter object.
* @return the number of chunks in the read buffer. 0 if the buffer is not available or empty.
Expand All @@ -2888,8 +2888,9 @@ size_t envoy_dynamic_module_callback_network_filter_get_read_buffer_chunks_size(

/**
* envoy_dynamic_module_callback_network_filter_get_read_buffer_size is called by the module to
* get the total size of the current read data buffer. This is only valid during the
* envoy_dynamic_module_on_network_filter_read callback.
* get the total size of the current read data buffer. This is valid after the first
* envoy_dynamic_module_on_network_filter_read callback for the lifetime of the connection.
*
* @param filter_envoy_ptr is the pointer to the DynamicModuleNetworkFilter object.
* @return the total size of the read buffer. 0 if the buffer is not available or empty.
*/
Expand All @@ -2898,8 +2899,8 @@ size_t envoy_dynamic_module_callback_network_filter_get_read_buffer_size(

/**
* envoy_dynamic_module_callback_network_filter_get_read_buffer_chunks is called by the module to
* get the current read data buffer as chunks. This is only valid during the
* envoy_dynamic_module_on_network_filter_read callback.
* get the current read data buffer as chunks. This is valid after the first
* envoy_dynamic_module_on_network_filter_read callback for the lifetime of the connection.
*
* PRECONDITION: The module must ensure that the result_buffer_vector is valid and has enough length
* to store all the chunks. The module can use
Expand All @@ -2920,8 +2921,8 @@ bool envoy_dynamic_module_callback_network_filter_get_read_buffer_chunks(
* envoy_dynamic_module_callback_network_filter_get_write_buffer_chunks_size is called by the module
* to get the number of chunks in the current write data buffer. Combined with
* envoy_dynamic_module_callback_network_filter_get_write_buffer_chunks, this can be used to iterate
* over all chunks in the write buffer. This is only valid during the
* envoy_dynamic_module_on_network_filter_write callback.
* over all chunks in the write buffer. This is valid after the first
* envoy_dynamic_module_on_network_filter_write callback for the lifetime of the connection.
*
* @param filter_envoy_ptr is the pointer to the DynamicModuleNetworkFilter object.
* @return the number of chunks in the write buffer. 0 if the buffer is not available or empty.
Expand All @@ -2931,8 +2932,9 @@ size_t envoy_dynamic_module_callback_network_filter_get_write_buffer_chunks_size

/**
* envoy_dynamic_module_callback_network_filter_get_write_buffer_size is called by the module to
* get the total size of the current write data buffer. This is only valid during the
* envoy_dynamic_module_on_network_filter_write callback.
* get the total size of the current write data buffer. This is valid after the first
* envoy_dynamic_module_on_network_filter_write callback for the lifetime of the connection.
*
* @param filter_envoy_ptr is the pointer to the DynamicModuleNetworkFilter object.
* @return the total size of the write buffer. 0 if the buffer is not available or empty.
*/
Expand All @@ -2941,8 +2943,8 @@ size_t envoy_dynamic_module_callback_network_filter_get_write_buffer_size(

/**
* envoy_dynamic_module_callback_network_filter_get_write_buffer_chunks is called by the module to
* get the current write data buffer as chunks. This is only valid during the
* envoy_dynamic_module_on_network_filter_write callback.
* get the current write data buffer as chunks. This is valid after the first
* envoy_dynamic_module_on_network_filter_write callback for the lifetime of the connection.
*
* PRECONDITION: The module must ensure that the result_buffer_vector is valid and has enough length
* to store all the chunks. The module can use
Expand Down Expand Up @@ -3475,6 +3477,27 @@ envoy_dynamic_module_callback_network_filter_record_histogram_value(

// ---------------------- Upstream Host Access Callbacks -----------------------

/**
* envoy_dynamic_module_callback_network_filter_get_cluster_host_count retrieves the host counts for
* a cluster by name. This provides visibility into the cluster's health state and can be used to
* implement scale-to-zero logic or custom load balancing decisions.
*
* @param filter_envoy_ptr is the pointer to the DynamicModuleNetworkFilter object.
* @param cluster_name is the name of the cluster to query owned by the module.
* @param priority is the priority level to query (0 for default priority).
* @param total_count is the pointer to store the total number of hosts. Can be null if not needed.
* @param healthy_count is the pointer to store the number of healthy hosts. Can be null if not
* needed.
* @param degraded_count is the pointer to store the number of degraded hosts. Can be null if not
* needed.
* @return true if the counts were retrieved successfully, false otherwise (e.g., cluster not
* found).
*/
bool envoy_dynamic_module_callback_network_filter_get_cluster_host_count(
envoy_dynamic_module_type_network_filter_envoy_ptr filter_envoy_ptr,
envoy_dynamic_module_type_module_buffer cluster_name, uint32_t priority, size_t* total_count,
size_t* healthy_count, size_t* degraded_count);

/**
* envoy_dynamic_module_callback_network_filter_get_upstream_host_address is called by the module
* to get the address and port of the currently selected upstream host.
Expand Down
37 changes: 35 additions & 2 deletions source/extensions/dynamic_modules/sdk/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4189,10 +4189,12 @@ pub trait NetworkFilter<ENF: EnvoyNetworkFilter> {
/// The trait that represents the Envoy network filter.
/// This is used in [`NetworkFilter`] to interact with the underlying Envoy network filter object.
pub trait EnvoyNetworkFilter {
/// Get the read buffer chunks. This is only valid during the on_read callback.
/// Get the read buffer chunks. This is valid after the first on_read callback for the lifetime
/// of the connection.
fn get_read_buffer_chunks(&mut self) -> (Vec<EnvoyBuffer>, usize);

/// Get the write buffer chunks. This is only valid during the on_write callback.
/// Get the write buffer chunks. This is valid after the first on_write callback for the lifetime
/// of the connection.
fn get_write_buffer_chunks(&mut self) -> (Vec<EnvoyBuffer>, usize);

/// Drain bytes from the beginning of the read buffer.
Expand Down Expand Up @@ -4395,6 +4397,12 @@ pub trait EnvoyNetworkFilter {
value: u64,
) -> Result<(), envoy_dynamic_module_type_metrics_result>;

/// Retrieve the host counts for a cluster by name at the given priority level.
/// Returns None if the cluster is not found or the priority level does not exist.
///
/// This is useful for implementing scale-to-zero logic or custom load balancing decisions.
fn get_cluster_host_count(&self, cluster_name: &str, priority: u32) -> Option<ClusterHostCount>;

/// Get the upstream host address and port if an upstream host is selected.
/// Returns None if no upstream host is set or the address is not an IP.
fn get_upstream_host_address(&self) -> Option<(String, u32)>;
Expand Down Expand Up @@ -5424,6 +5432,31 @@ impl EnvoyNetworkFilter for EnvoyNetworkFilterImpl {
}
}

fn get_cluster_host_count(&self, cluster_name: &str, priority: u32) -> Option<ClusterHostCount> {
let mut total: usize = 0;
let mut healthy: usize = 0;
let mut degraded: usize = 0;
let success = unsafe {
abi::envoy_dynamic_module_callback_network_filter_get_cluster_host_count(
self.raw,
str_to_module_buffer(cluster_name),
priority,
&mut total as *mut _,
&mut healthy as *mut _,
&mut degraded as *mut _,
)
};
if success {
Some(ClusterHostCount {
total,
healthy,
degraded,
})
} else {
None
}
}

fn get_upstream_host_address(&self) -> Option<(String, u32)> {
let mut address = abi::envoy_dynamic_module_type_envoy_buffer {
ptr: std::ptr::null_mut(),
Expand Down
87 changes: 87 additions & 0 deletions source/extensions/dynamic_modules/sdk/rust/src/lib_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,93 @@ fn test_envoy_dynamic_module_on_udp_listener_filter_callbacks() {
assert!(ON_DATA_CALLED.load(std::sync::atomic::Ordering::SeqCst));
}

// =============================================================================
// Cluster Host Count FFI stubs and tests.
// =============================================================================

struct MockClusterHostCount {
total: usize,
healthy: usize,
degraded: usize,
}

static MOCK_CLUSTER_HOST_COUNT: std::sync::Mutex<Option<MockClusterHostCount>> =
std::sync::Mutex::new(None);

fn reset_cluster_host_count_mock() {
*MOCK_CLUSTER_HOST_COUNT.lock().unwrap() = None;
}

fn set_cluster_host_count_mock(count: MockClusterHostCount) {
*MOCK_CLUSTER_HOST_COUNT.lock().unwrap() = Some(count);
}

#[no_mangle]
pub extern "C" fn envoy_dynamic_module_callback_network_filter_get_cluster_host_count(
_filter_envoy_ptr: abi::envoy_dynamic_module_type_network_filter_envoy_ptr,
_cluster_name: abi::envoy_dynamic_module_type_module_buffer,
_priority: u32,
total_count: *mut usize,
healthy_count: *mut usize,
degraded_count: *mut usize,
) -> bool {
let guard = MOCK_CLUSTER_HOST_COUNT.lock().unwrap();
match &*guard {
Some(count) => {
if !total_count.is_null() {
unsafe {
*total_count = count.total;
}
}
if !healthy_count.is_null() {
unsafe {
*healthy_count = count.healthy;
}
}
if !degraded_count.is_null() {
unsafe {
*degraded_count = count.degraded;
}
}
true
},
None => false,
}
}

#[test]
fn test_get_cluster_host_count_success() {
reset_cluster_host_count_mock();
set_cluster_host_count_mock(MockClusterHostCount {
total: 10,
healthy: 8,
degraded: 1,
});

let filter = EnvoyNetworkFilterImpl {
raw: std::ptr::null_mut(),
};

let result = filter.get_cluster_host_count("test_cluster", 0);
assert!(result.is_some());
let count = result.unwrap();
assert_eq!(count.total, 10);
assert_eq!(count.healthy, 8);
assert_eq!(count.degraded, 1);
}

#[test]
fn test_get_cluster_host_count_not_found() {
reset_cluster_host_count_mock();

let filter = EnvoyNetworkFilterImpl {
raw: std::ptr::null_mut(),
};

let result = filter.get_cluster_host_count("nonexistent_cluster", 0);
assert!(result.is_none());
}

// =============================================================================
// Upstream Host Access and StartTLS FFI stubs for testing.
// =============================================================================
Expand Down
30 changes: 30 additions & 0 deletions source/extensions/filters/network/dynamic_modules/abi_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,36 @@ envoy_dynamic_module_callback_network_filter_record_histogram_value(
// Upstream Host Access Callbacks
// -----------------------------------------------------------------------------

bool envoy_dynamic_module_callback_network_filter_get_cluster_host_count(
envoy_dynamic_module_type_network_filter_envoy_ptr filter_envoy_ptr,
envoy_dynamic_module_type_module_buffer cluster_name, uint32_t priority, size_t* total_count,
size_t* healthy_count, size_t* degraded_count) {
auto* filter = static_cast<DynamicModuleNetworkFilter*>(filter_envoy_ptr);
auto* tl_cluster = filter->getFilterConfig().cluster_manager_.getThreadLocalCluster(
absl::string_view(cluster_name.ptr, cluster_name.length));
if (tl_cluster == nullptr) {
return false;
}
const auto& priority_set = tl_cluster->prioritySet();
if (priority >= priority_set.hostSetsPerPriority().size()) {
return false;
}
const auto& host_set = priority_set.hostSetsPerPriority()[priority];
if (host_set == nullptr) {
return false;
}
if (total_count != nullptr) {
*total_count = host_set->hosts().size();
}
if (healthy_count != nullptr) {
*healthy_count = host_set->healthyHosts().size();
}
if (degraded_count != nullptr) {
*degraded_count = host_set->degradedHosts().size();
}
return true;
}

bool envoy_dynamic_module_callback_network_filter_get_upstream_host_address(
envoy_dynamic_module_type_network_filter_envoy_ptr filter_envoy_ptr,
envoy_dynamic_module_type_envoy_buffer* address_out, uint32_t* port_out) {
Expand Down
12 changes: 8 additions & 4 deletions source/extensions/filters/network/dynamic_modules/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,27 @@ Network::FilterStatus DynamicModuleNetworkFilter::onData(Buffer::Instance& data,
if (in_module_filter_ == nullptr) {
return Network::FilterStatus::Continue;
}
// Set the current read buffer for ABI callbacks.
// Set the current read buffer for ABI callbacks. The buffer pointer is kept after the callback
// returns so that modules can access buffered data outside of on_read (e.g., in on_scheduled or
// on_http_callout_done). The buffer is the connection's persistent read buffer and remains valid
// for the lifetime of the connection.
current_read_buffer_ = &data;
auto status = config_->on_network_filter_read_(thisAsVoidPtr(), in_module_filter_, data.length(),
end_stream);
current_read_buffer_ = nullptr;
return toEnvoyFilterStatus(status);
}

Network::FilterStatus DynamicModuleNetworkFilter::onWrite(Buffer::Instance& data, bool end_stream) {
if (in_module_filter_ == nullptr) {
return Network::FilterStatus::Continue;
}
// Set the current write buffer for ABI callbacks.
// Set the current write buffer for ABI callbacks. The buffer pointer is kept after the callback
// returns so that modules can access buffered data outside of on_write (e.g., in on_scheduled).
// The buffer is the connection's persistent write buffer and remains valid for the lifetime of
// the connection.
current_write_buffer_ = &data;
auto status = config_->on_network_filter_write_(thisAsVoidPtr(), in_module_filter_, data.length(),
end_stream);
current_write_buffer_ = nullptr;
return toEnvoyFilterStatus(status);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ class DynamicModuleNetworkFilter : public Network::Filter,
Network::ReadFilterCallbacks* read_callbacks_ = nullptr;
Network::WriteFilterCallbacks* write_callbacks_ = nullptr;

// Current buffers, only valid during callbacks.
// Current buffers. Set on the first on_read/on_write callback and kept for the lifetime of the
// connection so that modules can access buffered data outside of on_read/on_write callbacks.
Buffer::Instance* current_read_buffer_ = nullptr;
Buffer::Instance* current_write_buffer_ = nullptr;

Expand Down
Loading
Loading