diff --git a/changelogs/current.yaml b/changelogs/current.yaml index a9936106b6eac..c76a8b0222cf0 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -275,6 +275,12 @@ new_features: functions by name via ``envoy_dynamic_module_callback_register_function`` and other modules can resolve them via ``envoy_dynamic_module_callback_get_function``, enabling zero-copy cross-module interactions analogous to ``dlsym``. +- area: dynamic_modules + change: | + Network filter read and write buffers now persist after ``on_read``/``on_write`` callbacks, allowing + modules to access buffered data from ``on_scheduled`` and other callbacks. Added + ``envoy_dynamic_module_callback_network_filter_get_cluster_host_count`` to query cluster host counts + by name, enabling scale-to-zero and custom load balancing decisions in network filters. - area: dynamic_modules change: | Added metrics definition and update support for bootstrap dynamic modules. diff --git a/source/extensions/dynamic_modules/abi/abi.h b/source/extensions/dynamic_modules/abi/abi.h index 9a4d0d9edc289..a3a1b895cd1e9 100644 --- a/source/extensions/dynamic_modules/abi/abi.h +++ b/source/extensions/dynamic_modules/abi/abi.h @@ -2877,8 +2877,8 @@ void envoy_dynamic_module_callback_network_get_socket_options( * envoy_dynamic_module_callback_network_filter_get_read_buffer_chunks_size is called by the module * to get the number of chunks in the current read data buffer. Combined with * envoy_dynamic_module_callback_network_filter_get_read_buffer_chunks, this can be used to iterate - * over all chunks in the read buffer. This is only valid during the - * envoy_dynamic_module_on_network_filter_read callback. + * over all chunks in the read buffer. This is valid after the first + * envoy_dynamic_module_on_network_filter_read callback for the lifetime of the connection. * * @param filter_envoy_ptr is the pointer to the DynamicModuleNetworkFilter object. * @return the number of chunks in the read buffer. 0 if the buffer is not available or empty. @@ -2888,8 +2888,9 @@ size_t envoy_dynamic_module_callback_network_filter_get_read_buffer_chunks_size( /** * envoy_dynamic_module_callback_network_filter_get_read_buffer_size is called by the module to - * get the total size of the current read data buffer. This is only valid during the - * envoy_dynamic_module_on_network_filter_read callback. + * get the total size of the current read data buffer. This is valid after the first + * envoy_dynamic_module_on_network_filter_read callback for the lifetime of the connection. + * * @param filter_envoy_ptr is the pointer to the DynamicModuleNetworkFilter object. * @return the total size of the read buffer. 0 if the buffer is not available or empty. */ @@ -2898,8 +2899,8 @@ size_t envoy_dynamic_module_callback_network_filter_get_read_buffer_size( /** * envoy_dynamic_module_callback_network_filter_get_read_buffer_chunks is called by the module to - * get the current read data buffer as chunks. This is only valid during the - * envoy_dynamic_module_on_network_filter_read callback. + * get the current read data buffer as chunks. This is valid after the first + * envoy_dynamic_module_on_network_filter_read callback for the lifetime of the connection. * * PRECONDITION: The module must ensure that the result_buffer_vector is valid and has enough length * to store all the chunks. The module can use @@ -2920,8 +2921,8 @@ bool envoy_dynamic_module_callback_network_filter_get_read_buffer_chunks( * envoy_dynamic_module_callback_network_filter_get_write_buffer_chunks_size is called by the module * to get the number of chunks in the current write data buffer. Combined with * envoy_dynamic_module_callback_network_filter_get_write_buffer_chunks, this can be used to iterate - * over all chunks in the write buffer. This is only valid during the - * envoy_dynamic_module_on_network_filter_write callback. + * over all chunks in the write buffer. This is valid after the first + * envoy_dynamic_module_on_network_filter_write callback for the lifetime of the connection. * * @param filter_envoy_ptr is the pointer to the DynamicModuleNetworkFilter object. * @return the number of chunks in the write buffer. 0 if the buffer is not available or empty. @@ -2931,8 +2932,9 @@ size_t envoy_dynamic_module_callback_network_filter_get_write_buffer_chunks_size /** * envoy_dynamic_module_callback_network_filter_get_write_buffer_size is called by the module to - * get the total size of the current write data buffer. This is only valid during the - * envoy_dynamic_module_on_network_filter_write callback. + * get the total size of the current write data buffer. This is valid after the first + * envoy_dynamic_module_on_network_filter_write callback for the lifetime of the connection. + * * @param filter_envoy_ptr is the pointer to the DynamicModuleNetworkFilter object. * @return the total size of the write buffer. 0 if the buffer is not available or empty. */ @@ -2941,8 +2943,8 @@ size_t envoy_dynamic_module_callback_network_filter_get_write_buffer_size( /** * envoy_dynamic_module_callback_network_filter_get_write_buffer_chunks is called by the module to - * get the current write data buffer as chunks. This is only valid during the - * envoy_dynamic_module_on_network_filter_write callback. + * get the current write data buffer as chunks. This is valid after the first + * envoy_dynamic_module_on_network_filter_write callback for the lifetime of the connection. * * PRECONDITION: The module must ensure that the result_buffer_vector is valid and has enough length * to store all the chunks. The module can use @@ -3475,6 +3477,27 @@ envoy_dynamic_module_callback_network_filter_record_histogram_value( // ---------------------- Upstream Host Access Callbacks ----------------------- +/** + * envoy_dynamic_module_callback_network_filter_get_cluster_host_count retrieves the host counts for + * a cluster by name. This provides visibility into the cluster's health state and can be used to + * implement scale-to-zero logic or custom load balancing decisions. + * + * @param filter_envoy_ptr is the pointer to the DynamicModuleNetworkFilter object. + * @param cluster_name is the name of the cluster to query owned by the module. + * @param priority is the priority level to query (0 for default priority). + * @param total_count is the pointer to store the total number of hosts. Can be null if not needed. + * @param healthy_count is the pointer to store the number of healthy hosts. Can be null if not + * needed. + * @param degraded_count is the pointer to store the number of degraded hosts. Can be null if not + * needed. + * @return true if the counts were retrieved successfully, false otherwise (e.g., cluster not + * found). + */ +bool envoy_dynamic_module_callback_network_filter_get_cluster_host_count( + envoy_dynamic_module_type_network_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_module_buffer cluster_name, uint32_t priority, size_t* total_count, + size_t* healthy_count, size_t* degraded_count); + /** * envoy_dynamic_module_callback_network_filter_get_upstream_host_address is called by the module * to get the address and port of the currently selected upstream host. diff --git a/source/extensions/dynamic_modules/sdk/rust/src/lib.rs b/source/extensions/dynamic_modules/sdk/rust/src/lib.rs index 40830fddb3d77..df4f057dbab63 100644 --- a/source/extensions/dynamic_modules/sdk/rust/src/lib.rs +++ b/source/extensions/dynamic_modules/sdk/rust/src/lib.rs @@ -4189,10 +4189,12 @@ pub trait NetworkFilter { /// The trait that represents the Envoy network filter. /// This is used in [`NetworkFilter`] to interact with the underlying Envoy network filter object. pub trait EnvoyNetworkFilter { - /// Get the read buffer chunks. This is only valid during the on_read callback. + /// Get the read buffer chunks. This is valid after the first on_read callback for the lifetime + /// of the connection. fn get_read_buffer_chunks(&mut self) -> (Vec, usize); - /// Get the write buffer chunks. This is only valid during the on_write callback. + /// Get the write buffer chunks. This is valid after the first on_write callback for the lifetime + /// of the connection. fn get_write_buffer_chunks(&mut self) -> (Vec, usize); /// Drain bytes from the beginning of the read buffer. @@ -4395,6 +4397,12 @@ pub trait EnvoyNetworkFilter { value: u64, ) -> Result<(), envoy_dynamic_module_type_metrics_result>; + /// Retrieve the host counts for a cluster by name at the given priority level. + /// Returns None if the cluster is not found or the priority level does not exist. + /// + /// This is useful for implementing scale-to-zero logic or custom load balancing decisions. + fn get_cluster_host_count(&self, cluster_name: &str, priority: u32) -> Option; + /// Get the upstream host address and port if an upstream host is selected. /// Returns None if no upstream host is set or the address is not an IP. fn get_upstream_host_address(&self) -> Option<(String, u32)>; @@ -5424,6 +5432,31 @@ impl EnvoyNetworkFilter for EnvoyNetworkFilterImpl { } } + fn get_cluster_host_count(&self, cluster_name: &str, priority: u32) -> Option { + let mut total: usize = 0; + let mut healthy: usize = 0; + let mut degraded: usize = 0; + let success = unsafe { + abi::envoy_dynamic_module_callback_network_filter_get_cluster_host_count( + self.raw, + str_to_module_buffer(cluster_name), + priority, + &mut total as *mut _, + &mut healthy as *mut _, + &mut degraded as *mut _, + ) + }; + if success { + Some(ClusterHostCount { + total, + healthy, + degraded, + }) + } else { + None + } + } + fn get_upstream_host_address(&self) -> Option<(String, u32)> { let mut address = abi::envoy_dynamic_module_type_envoy_buffer { ptr: std::ptr::null_mut(), diff --git a/source/extensions/dynamic_modules/sdk/rust/src/lib_test.rs b/source/extensions/dynamic_modules/sdk/rust/src/lib_test.rs index b09828a354f36..92728a10081a1 100644 --- a/source/extensions/dynamic_modules/sdk/rust/src/lib_test.rs +++ b/source/extensions/dynamic_modules/sdk/rust/src/lib_test.rs @@ -992,6 +992,93 @@ fn test_envoy_dynamic_module_on_udp_listener_filter_callbacks() { assert!(ON_DATA_CALLED.load(std::sync::atomic::Ordering::SeqCst)); } +// ============================================================================= +// Cluster Host Count FFI stubs and tests. +// ============================================================================= + +struct MockClusterHostCount { + total: usize, + healthy: usize, + degraded: usize, +} + +static MOCK_CLUSTER_HOST_COUNT: std::sync::Mutex> = + std::sync::Mutex::new(None); + +fn reset_cluster_host_count_mock() { + *MOCK_CLUSTER_HOST_COUNT.lock().unwrap() = None; +} + +fn set_cluster_host_count_mock(count: MockClusterHostCount) { + *MOCK_CLUSTER_HOST_COUNT.lock().unwrap() = Some(count); +} + +#[no_mangle] +pub extern "C" fn envoy_dynamic_module_callback_network_filter_get_cluster_host_count( + _filter_envoy_ptr: abi::envoy_dynamic_module_type_network_filter_envoy_ptr, + _cluster_name: abi::envoy_dynamic_module_type_module_buffer, + _priority: u32, + total_count: *mut usize, + healthy_count: *mut usize, + degraded_count: *mut usize, +) -> bool { + let guard = MOCK_CLUSTER_HOST_COUNT.lock().unwrap(); + match &*guard { + Some(count) => { + if !total_count.is_null() { + unsafe { + *total_count = count.total; + } + } + if !healthy_count.is_null() { + unsafe { + *healthy_count = count.healthy; + } + } + if !degraded_count.is_null() { + unsafe { + *degraded_count = count.degraded; + } + } + true + }, + None => false, + } +} + +#[test] +fn test_get_cluster_host_count_success() { + reset_cluster_host_count_mock(); + set_cluster_host_count_mock(MockClusterHostCount { + total: 10, + healthy: 8, + degraded: 1, + }); + + let filter = EnvoyNetworkFilterImpl { + raw: std::ptr::null_mut(), + }; + + let result = filter.get_cluster_host_count("test_cluster", 0); + assert!(result.is_some()); + let count = result.unwrap(); + assert_eq!(count.total, 10); + assert_eq!(count.healthy, 8); + assert_eq!(count.degraded, 1); +} + +#[test] +fn test_get_cluster_host_count_not_found() { + reset_cluster_host_count_mock(); + + let filter = EnvoyNetworkFilterImpl { + raw: std::ptr::null_mut(), + }; + + let result = filter.get_cluster_host_count("nonexistent_cluster", 0); + assert!(result.is_none()); +} + // ============================================================================= // Upstream Host Access and StartTLS FFI stubs for testing. // ============================================================================= diff --git a/source/extensions/filters/network/dynamic_modules/abi_impl.cc b/source/extensions/filters/network/dynamic_modules/abi_impl.cc index 67b8e88e2ae9e..7efdc7e133354 100644 --- a/source/extensions/filters/network/dynamic_modules/abi_impl.cc +++ b/source/extensions/filters/network/dynamic_modules/abi_impl.cc @@ -830,6 +830,36 @@ envoy_dynamic_module_callback_network_filter_record_histogram_value( // Upstream Host Access Callbacks // ----------------------------------------------------------------------------- +bool envoy_dynamic_module_callback_network_filter_get_cluster_host_count( + envoy_dynamic_module_type_network_filter_envoy_ptr filter_envoy_ptr, + envoy_dynamic_module_type_module_buffer cluster_name, uint32_t priority, size_t* total_count, + size_t* healthy_count, size_t* degraded_count) { + auto* filter = static_cast(filter_envoy_ptr); + auto* tl_cluster = filter->getFilterConfig().cluster_manager_.getThreadLocalCluster( + absl::string_view(cluster_name.ptr, cluster_name.length)); + if (tl_cluster == nullptr) { + return false; + } + const auto& priority_set = tl_cluster->prioritySet(); + if (priority >= priority_set.hostSetsPerPriority().size()) { + return false; + } + const auto& host_set = priority_set.hostSetsPerPriority()[priority]; + if (host_set == nullptr) { + return false; + } + if (total_count != nullptr) { + *total_count = host_set->hosts().size(); + } + if (healthy_count != nullptr) { + *healthy_count = host_set->healthyHosts().size(); + } + if (degraded_count != nullptr) { + *degraded_count = host_set->degradedHosts().size(); + } + return true; +} + bool envoy_dynamic_module_callback_network_filter_get_upstream_host_address( envoy_dynamic_module_type_network_filter_envoy_ptr filter_envoy_ptr, envoy_dynamic_module_type_envoy_buffer* address_out, uint32_t* port_out) { diff --git a/source/extensions/filters/network/dynamic_modules/filter.cc b/source/extensions/filters/network/dynamic_modules/filter.cc index 4ce83673a1b89..3a415fb536983 100644 --- a/source/extensions/filters/network/dynamic_modules/filter.cc +++ b/source/extensions/filters/network/dynamic_modules/filter.cc @@ -100,11 +100,13 @@ Network::FilterStatus DynamicModuleNetworkFilter::onData(Buffer::Instance& data, if (in_module_filter_ == nullptr) { return Network::FilterStatus::Continue; } - // Set the current read buffer for ABI callbacks. + // Set the current read buffer for ABI callbacks. The buffer pointer is kept after the callback + // returns so that modules can access buffered data outside of on_read (e.g., in on_scheduled or + // on_http_callout_done). The buffer is the connection's persistent read buffer and remains valid + // for the lifetime of the connection. current_read_buffer_ = &data; auto status = config_->on_network_filter_read_(thisAsVoidPtr(), in_module_filter_, data.length(), end_stream); - current_read_buffer_ = nullptr; return toEnvoyFilterStatus(status); } @@ -112,11 +114,13 @@ Network::FilterStatus DynamicModuleNetworkFilter::onWrite(Buffer::Instance& data if (in_module_filter_ == nullptr) { return Network::FilterStatus::Continue; } - // Set the current write buffer for ABI callbacks. + // Set the current write buffer for ABI callbacks. The buffer pointer is kept after the callback + // returns so that modules can access buffered data outside of on_write (e.g., in on_scheduled). + // The buffer is the connection's persistent write buffer and remains valid for the lifetime of + // the connection. current_write_buffer_ = &data; auto status = config_->on_network_filter_write_(thisAsVoidPtr(), in_module_filter_, data.length(), end_stream); - current_write_buffer_ = nullptr; return toEnvoyFilterStatus(status); } diff --git a/source/extensions/filters/network/dynamic_modules/filter.h b/source/extensions/filters/network/dynamic_modules/filter.h index e94eb13ba88a7..bbccb87dd5b7d 100644 --- a/source/extensions/filters/network/dynamic_modules/filter.h +++ b/source/extensions/filters/network/dynamic_modules/filter.h @@ -176,7 +176,8 @@ class DynamicModuleNetworkFilter : public Network::Filter, Network::ReadFilterCallbacks* read_callbacks_ = nullptr; Network::WriteFilterCallbacks* write_callbacks_ = nullptr; - // Current buffers, only valid during callbacks. + // Current buffers. Set on the first on_read/on_write callback and kept for the lifetime of the + // connection so that modules can access buffered data outside of on_read/on_write callbacks. Buffer::Instance* current_read_buffer_ = nullptr; Buffer::Instance* current_write_buffer_ = nullptr; diff --git a/test/extensions/dynamic_modules/network/abi_impl_test.cc b/test/extensions/dynamic_modules/network/abi_impl_test.cc index a118f04ea5101..749a08f2038b9 100644 --- a/test/extensions/dynamic_modules/network/abi_impl_test.cc +++ b/test/extensions/dynamic_modules/network/abi_impl_test.cc @@ -1585,6 +1585,115 @@ TEST_F(DynamicModuleNetworkFilterHttpCalloutTest, FilterDestructionCancelsPendin filter_.reset(); } +// ============================================================================= +// Tests for cluster host count. +// ============================================================================= + +TEST_F(DynamicModuleNetworkFilterAbiCallbackTest, GetClusterHostCountClusterNotFound) { + std::string cluster_name = "nonexistent_cluster"; + EXPECT_CALL(cluster_manager_, getThreadLocalCluster(absl::string_view{cluster_name})) + .WillOnce(testing::Return(nullptr)); + + size_t total = 0, healthy = 0, degraded = 0; + envoy_dynamic_module_type_module_buffer name_buf = {cluster_name.data(), cluster_name.size()}; + EXPECT_FALSE(envoy_dynamic_module_callback_network_filter_get_cluster_host_count( + filterPtr(), name_buf, 0, &total, &healthy, °raded)); +} + +TEST_F(DynamicModuleNetworkFilterAbiCallbackTest, GetClusterHostCountInvalidPriority) { + std::string cluster_name = "test_cluster"; + NiceMock thread_local_cluster; + EXPECT_CALL(cluster_manager_, getThreadLocalCluster(absl::string_view{cluster_name})) + .WillOnce(testing::Return(&thread_local_cluster)); + + // Priority 99 should exceed available priorities. + size_t total = 0, healthy = 0, degraded = 0; + envoy_dynamic_module_type_module_buffer name_buf = {cluster_name.data(), cluster_name.size()}; + EXPECT_FALSE(envoy_dynamic_module_callback_network_filter_get_cluster_host_count( + filterPtr(), name_buf, 99, &total, &healthy, °raded)); +} + +TEST_F(DynamicModuleNetworkFilterAbiCallbackTest, GetClusterHostCountSuccess) { + std::string cluster_name = "test_cluster"; + NiceMock thread_local_cluster; + EXPECT_CALL(cluster_manager_, getThreadLocalCluster(absl::string_view{cluster_name})) + .WillOnce(testing::Return(&thread_local_cluster)); + + // Set up hosts in the mock host set. + auto* mock_host_set = thread_local_cluster.cluster_.priority_set_.getMockHostSet(0); + mock_host_set->hosts_.resize(5); + mock_host_set->healthy_hosts_.resize(3); + mock_host_set->degraded_hosts_.resize(1); + + size_t total = 0, healthy = 0, degraded = 0; + envoy_dynamic_module_type_module_buffer name_buf = {cluster_name.data(), cluster_name.size()}; + EXPECT_TRUE(envoy_dynamic_module_callback_network_filter_get_cluster_host_count( + filterPtr(), name_buf, 0, &total, &healthy, °raded)); + EXPECT_EQ(total, 5); + EXPECT_EQ(healthy, 3); + EXPECT_EQ(degraded, 1); +} + +TEST_F(DynamicModuleNetworkFilterAbiCallbackTest, GetClusterHostCountNullOutputParams) { + std::string cluster_name = "test_cluster"; + NiceMock thread_local_cluster; + EXPECT_CALL(cluster_manager_, getThreadLocalCluster(absl::string_view{cluster_name})) + .WillRepeatedly(testing::Return(&thread_local_cluster)); + + auto* mock_host_set = thread_local_cluster.cluster_.priority_set_.getMockHostSet(0); + mock_host_set->hosts_.resize(10); + mock_host_set->healthy_hosts_.resize(8); + mock_host_set->degraded_hosts_.resize(2); + + envoy_dynamic_module_type_module_buffer name_buf = {cluster_name.data(), cluster_name.size()}; + + // Call with nullptr for some output params - should still succeed. + EXPECT_TRUE(envoy_dynamic_module_callback_network_filter_get_cluster_host_count( + filterPtr(), name_buf, 0, nullptr, nullptr, nullptr)); + + // Call with only total. + size_t total = 0; + EXPECT_TRUE(envoy_dynamic_module_callback_network_filter_get_cluster_host_count( + filterPtr(), name_buf, 0, &total, nullptr, nullptr)); + EXPECT_EQ(total, 10); +} + +TEST_F(DynamicModuleNetworkFilterAbiCallbackTest, GetClusterHostCountDifferentPriority) { + std::string cluster_name = "test_cluster"; + NiceMock thread_local_cluster; + EXPECT_CALL(cluster_manager_, getThreadLocalCluster(absl::string_view{cluster_name})) + .WillRepeatedly(testing::Return(&thread_local_cluster)); + + // Set up priority 0 with 5 hosts. + auto* mock_host_set_0 = thread_local_cluster.cluster_.priority_set_.getMockHostSet(0); + mock_host_set_0->hosts_.resize(5); + mock_host_set_0->healthy_hosts_.resize(5); + mock_host_set_0->degraded_hosts_.resize(0); + + // Set up priority 1 with 3 hosts. + auto* mock_host_set_1 = thread_local_cluster.cluster_.priority_set_.getMockHostSet(1); + mock_host_set_1->hosts_.resize(3); + mock_host_set_1->healthy_hosts_.resize(2); + mock_host_set_1->degraded_hosts_.resize(1); + + envoy_dynamic_module_type_module_buffer name_buf = {cluster_name.data(), cluster_name.size()}; + + // Check priority 0. + size_t total = 0, healthy = 0, degraded = 0; + EXPECT_TRUE(envoy_dynamic_module_callback_network_filter_get_cluster_host_count( + filterPtr(), name_buf, 0, &total, &healthy, °raded)); + EXPECT_EQ(total, 5); + EXPECT_EQ(healthy, 5); + EXPECT_EQ(degraded, 0); + + // Check priority 1. + EXPECT_TRUE(envoy_dynamic_module_callback_network_filter_get_cluster_host_count( + filterPtr(), name_buf, 1, &total, &healthy, °raded)); + EXPECT_EQ(total, 3); + EXPECT_EQ(healthy, 2); + EXPECT_EQ(degraded, 1); +} + // ============================================================================= // Tests for upstream host access. // ============================================================================= diff --git a/test/extensions/dynamic_modules/network/filter_test.cc b/test/extensions/dynamic_modules/network/filter_test.cc index f4d4b4b2616c9..a8e98024c21bf 100644 --- a/test/extensions/dynamic_modules/network/filter_test.cc +++ b/test/extensions/dynamic_modules/network/filter_test.cc @@ -58,9 +58,9 @@ TEST_F(DynamicModuleNetworkFilterTest, BasicDataFlow) { EXPECT_EQ(Network::FilterStatus::Continue, filter->onWrite(write_data, false)); EXPECT_EQ(Network::FilterStatus::Continue, filter->onWrite(write_data, true)); - // Verify buffer is cleared after callbacks. - EXPECT_EQ(nullptr, filter->currentReadBuffer()); - EXPECT_EQ(nullptr, filter->currentWriteBuffer()); + // Verify buffers persist after callbacks for access from on_scheduled and other callbacks. + EXPECT_NE(nullptr, filter->currentReadBuffer()); + EXPECT_NE(nullptr, filter->currentWriteBuffer()); } TEST_F(DynamicModuleNetworkFilterTest, AllConnectionEvents) {