From 6e15738e8c48bc881fbe6e5c026a0f8289036007 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ferreira=20Gonz=C3=A1lez?= Date: Fri, 25 Apr 2025 08:08:23 +0200 Subject: [PATCH 1/2] Improve DS routines (#5764) * Refs #22814: Data(p) test Signed-off-by: cferreiragonz * Refs #22814: Data(r/w) test Signed-off-by: cferreiragonz * Refs #22814: Tristate for ParticipantsAckStatus Signed-off-by: cferreiragonz * Refs #22814: Send direct messages to new clients Signed-off-by: cferreiragonz * Refs #22814: Review - Changes Signed-off-by: cferreiragonz * Refs #22814: Uncrustify Signed-off-by: cferreiragonz --------- Signed-off-by: cferreiragonz (cherry picked from commit 0fa7b1eba4fb21fae6113924bfc433babdded7f3) --- .../discovery/database/DiscoveryDataBase.cpp | 88 ++--- .../discovery/database/DiscoveryDataBase.hpp | 3 - .../DiscoveryParticipantsAckStatus.cpp | 19 +- .../DiscoveryParticipantsAckStatus.hpp | 36 +- .../database/DiscoverySharedInfo.cpp | 2 +- .../database/DiscoverySharedInfo.hpp | 8 +- .../discovery/participant/PDPServer.cpp | 21 +- .../discovery/participant/PDPServer.hpp | 8 + .../api/dds-pim/PubSubParticipant.hpp | 7 + .../common/BlackboxTestsDiscovery.cpp | 336 ++++++++++++++++++ 10 files changed, 474 insertions(+), 54 deletions(-) diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp index a44828dfa14..ad01fdce3bd 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp @@ -36,6 +36,8 @@ namespace fastdds { namespace rtps { namespace ddb { +using ParticipantState = DiscoveryParticipantsAckStatus::ParticipantState; + DiscoveryDataBase::DiscoveryDataBase( const fastdds::rtps::GuidPrefix_t& server_guid_prefix) : server_guid_prefix_(server_guid_prefix) @@ -280,8 +282,8 @@ void DiscoveryDataBase::update_change_and_unmatch_( changes_to_release_.push_back(entity.update_and_unmatch(new_change)); // Manually set relevant participants ACK status of this server, and of the participant that sent the // change, to 1. This way, we avoid backprogation of the data. - entity.add_or_update_ack_participant(server_guid_prefix_, true); - entity.add_or_update_ack_participant(new_change->writerGUID.guidPrefix, true); + entity.add_or_update_ack_participant(server_guid_prefix_, ParticipantState::ACKED); + entity.add_or_update_ack_participant(new_change->writerGUID.guidPrefix, ParticipantState::ACKED); } void DiscoveryDataBase::add_ack_( @@ -305,7 +307,7 @@ void DiscoveryDataBase::add_ack_( // database has been updated, so this ACK is not relevant anymore if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity()) { - it->second.add_or_update_ack_participant(acked_entity, true); + it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED); } } } @@ -320,7 +322,7 @@ void DiscoveryDataBase::add_ack_( // database has been updated, so this ACK is not relevant anymore if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity()) { - it->second.add_or_update_ack_participant(acked_entity, true); + it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED); } } } @@ -335,7 +337,7 @@ void DiscoveryDataBase::add_ack_( // database has been updated, so this ACK is not relevant anymore if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity()) { - it->second.add_or_update_ack_participant(acked_entity, true); + it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED); } } } @@ -637,7 +639,7 @@ void DiscoveryDataBase::match_new_server_( if (server != participant_prefix) { // Make all known servers relevant to the new server, but not matched - part.second.add_or_update_ack_participant(server, false); + part.second.add_or_update_ack_participant(server, ParticipantState::PENDING_SEND); resend_new_pdp = true; } } @@ -650,7 +652,7 @@ void DiscoveryDataBase::match_new_server_( else { // Make the new server relevant to all known servers - part.second.add_or_update_ack_participant(participant_prefix, false); + part.second.add_or_update_ack_participant(participant_prefix, ParticipantState::PENDING_SEND); // Send DATA(p) of all known servers to the new participant add_pdp_to_send_(part.second.change()); } @@ -752,7 +754,7 @@ void DiscoveryDataBase::create_new_participant_from_change_( // Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way, // we avoid backprogation of the data. - ret.first->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + ret.first->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); // If the DATA(p) it's from this server, it is already in history and we do nothing here if (change_guid.guidPrefix != server_guid_prefix_) @@ -854,7 +856,7 @@ void DiscoveryDataBase::update_participant_from_change_( if (ch->write_params.sample_identity().sequence_number() == participant_info.change()->write_params.sample_identity().sequence_number()) { - participant_info.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + participant_info.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); } // we release it if it's the same or if it is lower @@ -904,7 +906,7 @@ void DiscoveryDataBase::create_writers_from_change_( if (ch->write_params.sample_identity().sequence_number() == writer_it->second.change()->write_params.sample_identity().sequence_number()) { - writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); } // we release it if it's the same or if it is lower @@ -952,7 +954,7 @@ void DiscoveryDataBase::create_writers_from_change_( // Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way, // we avoid backprogation of the data. - writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); // if topic is virtual, it must iterate over all readers if (topic_name == virtual_topic_) @@ -1022,7 +1024,7 @@ void DiscoveryDataBase::create_readers_from_change_( if (ch->write_params.sample_identity().sequence_number() == reader_it->second.change()->write_params.sample_identity().sequence_number()) { - reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); } // we release it if it's the same or if it is lower @@ -1070,7 +1072,7 @@ void DiscoveryDataBase::create_readers_from_change_( // Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way, // we avoid backprogation of the data. - reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); // If topic is virtual, it must iterate over all readers if (topic_name == virtual_topic_) @@ -1467,10 +1469,6 @@ bool DiscoveryDataBase::process_dirty_topics() // Find participants with writer info and participant with reader info in participants_ parts_reader_it = participants_.find(reader.guidPrefix); parts_writer_it = participants_.find(writer.guidPrefix); - // Find reader info in readers_ - readers_it = readers_.find(reader); - // Find writer info in writers_ - writers_it = writers_.find(writer); // Check in `participants_` whether the client with the reader has acknowledge the PDP of the client // with the writer. @@ -1478,26 +1476,35 @@ bool DiscoveryDataBase::process_dirty_topics() { if (parts_reader_it->second.is_matched(writer.guidPrefix)) { + // Find reader info in readers_ + readers_it = readers_.find(reader); // Check the status of the writer in `readers_[reader]::relevant_participants_builtin_ack_status`. if (readers_it != readers_.end() && readers_it->second.is_relevant_participant(writer.guidPrefix) && - !readers_it->second.is_matched(writer.guidPrefix)) + !readers_it->second.is_waiting_ack(writer.guidPrefix)) { // If the status is 0, add DATA(r) to a `edp_publications_to_send_` (if it's not there). if (add_edp_subscriptions_to_send_(readers_it->second.change())) { EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding DATA(r) to send: " << readers_it->second.change()->instanceHandle); + readers_it->second.add_or_update_ack_participant(writer.guidPrefix, + ParticipantState::WAITING_ACK); } } } else if (parts_reader_it->second.is_relevant_participant(writer.guidPrefix)) { - // Add DATA(p) of the client with the writer to `pdp_to_send_` (if it's not there). - if (add_pdp_to_send_(parts_reader_it->second.change())) + if (!parts_reader_it->second.is_waiting_ack(writer.guidPrefix)) { - EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding readers' DATA(p) to send: " - << parts_reader_it->second.change()->instanceHandle); + // Add DATA(p) of the client with the writer to `pdp_to_send_` (if it's not there). + if (add_pdp_to_send_(parts_reader_it->second.change())) + { + EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding readers' DATA(p) to send: " + << parts_reader_it->second.change()->instanceHandle); + parts_reader_it->second.add_or_update_ack_participant(writer.guidPrefix, + ParticipantState::WAITING_ACK); + } } // Set topic as not-clearable. is_clearable = false; @@ -1510,26 +1517,35 @@ bool DiscoveryDataBase::process_dirty_topics() { if (parts_writer_it->second.is_matched(reader.guidPrefix)) { + // Find writer info in writers_ + writers_it = writers_.find(writer); // Check the status of the reader in `writers_[writer]::relevant_participants_builtin_ack_status`. if (writers_it != writers_.end() && writers_it->second.is_relevant_participant(reader.guidPrefix) && - !writers_it->second.is_matched(reader.guidPrefix)) + !writers_it->second.is_waiting_ack(reader.guidPrefix)) { // If the status is 0, add DATA(w) to a `edp_subscriptions_to_send_` (if it's not there). if (add_edp_publications_to_send_(writers_it->second.change())) { EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding DATA(w) to send: " << writers_it->second.change()->instanceHandle); + writers_it->second.add_or_update_ack_participant(reader.guidPrefix, + ParticipantState::WAITING_ACK); } } } else if (parts_writer_it->second.is_relevant_participant(reader.guidPrefix)) { - // Add DATA(p) of the client with the reader to `pdp_to_send_` (if it's not there). - if (add_pdp_to_send_(parts_writer_it->second.change())) + if (!parts_writer_it->second.is_waiting_ack(reader.guidPrefix)) { - EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding writers' DATA(p) to send: " - << parts_writer_it->second.change()->instanceHandle); + // Add DATA(p) of the client with the reader to `pdp_to_send_` (if it's not there). + if (add_pdp_to_send_(parts_writer_it->second.change())) + { + EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding writers' DATA(p) to send: " + << parts_writer_it->second.change()->instanceHandle); + parts_writer_it->second.add_or_update_ack_participant(reader.guidPrefix, + ParticipantState::WAITING_ACK); + } } // Set topic as not-clearable. is_clearable = false; @@ -2350,18 +2366,6 @@ bool DiscoveryDataBase::add_pdp_to_send_( return false; } -bool DiscoveryDataBase::add_own_pdp_to_send_() -{ - if (!backup_in_progress()) - { - auto our_data_it = participants_.find(server_guid_prefix_); - assert(our_data_it != participants_.end()); - - return add_pdp_to_send_(our_data_it->second.change()); - } - return false; -} - bool DiscoveryDataBase::add_edp_publications_to_send_( eprosima::fastdds::rtps::CacheChange_t* change) { @@ -2494,7 +2498,7 @@ bool DiscoveryDataBase::from_json( // Populate GuidPrefix_t std::istringstream(it_ack.key()) >> prefix_aux_ack; - dpi.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); + dpi.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); } // Add Participant @@ -2532,7 +2536,7 @@ bool DiscoveryDataBase::from_json( // Populate GuidPrefix_t std::istringstream(it_ack.key()) >> prefix_aux_ack; - dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); + dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); } // Add Participant @@ -2592,7 +2596,7 @@ bool DiscoveryDataBase::from_json( // Populate GuidPrefix_t std::istringstream(it_ack.key()) >> prefix_aux_ack; - dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); + dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); } // Add Participant diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.hpp b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.hpp index b09d2911d98..21fbea83888 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.hpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.hpp @@ -351,9 +351,6 @@ class DiscoveryDataBase fastdds::rtps::WriterHistory* writer_history, const fastdds::rtps::GuidPrefix_t& entity_guid_prefix); - // Add own Data(p) in pdp_to_send if not already in it - bool add_own_pdp_to_send_(); - protected: // Change a cacheChange by update or new disposal diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.cpp b/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.cpp index 522862bbd97..04669bafa7b 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.cpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.cpp @@ -34,7 +34,7 @@ namespace ddb { void DiscoveryParticipantsAckStatus::add_or_update_participant( const GuidPrefix_t& guid_p, - bool status = false) + ParticipantState status = ParticipantState::PENDING_SEND) { relevant_participants_map_[guid_p] = status; } @@ -45,13 +45,24 @@ void DiscoveryParticipantsAckStatus::remove_participant( relevant_participants_map_.erase(guid_p); } +bool DiscoveryParticipantsAckStatus::is_waiting_ack( + const GuidPrefix_t& guid_p) const +{ + auto it = relevant_participants_map_.find(guid_p); + if (it != relevant_participants_map_.end()) + { + return it->second >= ParticipantState::WAITING_ACK; + } + return false; +} + bool DiscoveryParticipantsAckStatus::is_matched( const GuidPrefix_t& guid_p) const { auto it = relevant_participants_map_.find(guid_p); if (it != relevant_participants_map_.end()) { - return it->second; + return it->second == ParticipantState::ACKED; } return false; } @@ -60,7 +71,7 @@ void DiscoveryParticipantsAckStatus::unmatch_all() { for (auto it = relevant_participants_map_.begin(); it != relevant_participants_map_.end(); ++it) { - it->second = false; + it->second = ParticipantState::PENDING_SEND; } } @@ -89,7 +100,7 @@ bool DiscoveryParticipantsAckStatus::is_acked_by_all() const { for (auto it = relevant_participants_map_.begin(); it != relevant_participants_map_.end(); ++it) { - if (!it->second) + if (it->second != ParticipantState::ACKED) { return false; } diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.hpp b/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.hpp index 4def2132454..804a1bfe461 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.hpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.hpp @@ -45,15 +45,25 @@ class DiscoveryParticipantsAckStatus ~DiscoveryParticipantsAckStatus() = default; + enum class ParticipantState : uint8_t + { + PENDING_SEND, // Data(p) has not been sent yet + WAITING_ACK, // Data(p) has already been sent but ACK has not been received + ACKED // Data(p) has been acked + }; + void add_or_update_participant( const GuidPrefix_t& guid_p, - bool status); + ParticipantState status); void remove_participant( const GuidPrefix_t& guid_p); void unmatch_all(); + bool is_waiting_ack( + const GuidPrefix_t& guid_p) const; + bool is_matched( const GuidPrefix_t& guid_p) const; @@ -69,9 +79,31 @@ class DiscoveryParticipantsAckStatus private: - std::map relevant_participants_map_; + std::map relevant_participants_map_; }; +inline std::ostream& operator <<( + std::ostream& os, + DiscoveryParticipantsAckStatus::ParticipantState child) +{ + switch (child) + { + case DiscoveryParticipantsAckStatus::ParticipantState::PENDING_SEND: + os << "PENDING_SEND"; + break; + case DiscoveryParticipantsAckStatus::ParticipantState::WAITING_ACK: + os << "WAITING_ACK"; + break; + case DiscoveryParticipantsAckStatus::ParticipantState::ACKED: + os << "ACKED"; + break; + default: + os << "UNKNOWN"; + break; + } + return os; +} + } /* namespace ddb */ } /* namespace rtps */ } /* namespace fastdds */ diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.cpp b/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.cpp index cd1d0abfca3..f803e42c310 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.cpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.cpp @@ -36,7 +36,7 @@ DiscoverySharedInfo::DiscoverySharedInfo( : change_(change) { // the server already knows every message - add_or_update_ack_participant(known_participant, true); + add_or_update_ack_participant(known_participant, DiscoveryParticipantsAckStatus::ParticipantState::ACKED); } CacheChange_t* DiscoverySharedInfo::update_and_unmatch( diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.hpp b/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.hpp index 9765e62dffd..5c95f3ed506 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.hpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.hpp @@ -56,7 +56,7 @@ class DiscoverySharedInfo void add_or_update_ack_participant( const GuidPrefix_t& guid_p, - bool status = false) + DiscoveryParticipantsAckStatus::ParticipantState status = DiscoveryParticipantsAckStatus::ParticipantState::PENDING_SEND) { EPROSIMA_LOG_INFO( DISCOVERY_DATABASE, @@ -72,6 +72,12 @@ class DiscoverySharedInfo relevant_participants_builtin_ack_status_.remove_participant(guid_p); } + bool is_waiting_ack( + const GuidPrefix_t& guid_p) const + { + return relevant_participants_builtin_ack_status_.is_waiting_ack(guid_p); + } + bool is_matched( const GuidPrefix_t& guid_p) const { diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index 0b2fd6546a5..c4b1957f534 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -641,7 +641,7 @@ void PDPServer::assignRemoteEndpoints( // Send the Data(p) to the client if (part_type == ParticipantType::CLIENT || part_type == ParticipantType::SUPER_CLIENT) { - discovery_db().add_own_pdp_to_send_(); + send_own_pdp(pdata); } } @@ -1619,7 +1619,26 @@ void PDPServer::send_announcement( EPROSIMA_LOG_ERROR(RTPS_PDP_SERVER, "Error sending announcement from server to clients"); } } +} + +void PDPServer::send_own_pdp( + ParticipantProxyData* pdata) +{ + std::vector remote_readers; + LocatorList locators; + + remote_readers.emplace_back(pdata->guid.guidPrefix, c_EntityId_SPDPReader); + + for (auto& locator : pdata->metatraffic_locators.unicast) + { + locators.push_back(locator); + } + send_announcement( + discovery_db().cache_change_own_participant(), + remote_readers, + locators + ); } bool PDPServer::read_backup( diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp index 932d7b35a98..dd38a2ad3fe 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.hpp @@ -137,6 +137,14 @@ class PDPServer : public fastdds::rtps::PDP LocatorList locators, bool dispose = false); + /** + * Sends own DATA(p) to the participant specified in @p pdata. + * Used to send first DATA(p) to new clients after discover them. + * @param pdata Pointer to the RTPSParticipantProxyData object. + * */ + void send_own_pdp( + ParticipantProxyData* pdata); + /** * These methods wouldn't be needed under perfect server operation (no need of dynamic endpoint allocation) * but must be implemented to solve server shutdown situations. diff --git a/test/blackbox/api/dds-pim/PubSubParticipant.hpp b/test/blackbox/api/dds-pim/PubSubParticipant.hpp index ad4d711e7fb..a67fb2e91a0 100644 --- a/test/blackbox/api/dds-pim/PubSubParticipant.hpp +++ b/test/blackbox/api/dds-pim/PubSubParticipant.hpp @@ -653,6 +653,13 @@ class PubSubParticipant return *this; } + PubSubParticipant& setup_transports( + eprosima::fastdds::rtps::BuiltinTransports transports) + { + participant_qos_.setup_transports(transports); + return *this; + } + PubSubParticipant& user_data( const std::vector& user_data) { diff --git a/test/blackbox/common/BlackboxTestsDiscovery.cpp b/test/blackbox/common/BlackboxTestsDiscovery.cpp index ae3be4c9b38..d843790f7ca 100644 --- a/test/blackbox/common/BlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/BlackboxTestsDiscovery.cpp @@ -1770,3 +1770,339 @@ TEST(Discovery, discovery_cyclone_participant_with_custom_pid) /* Clean up */ factory->delete_participant(participant); } + +// This test checks that a Discover Server does not send duplicated PDP messages of itself when new clients +// are discovered +TEST_P(Discovery, discovery_server_pdp_messages_sent) +{ + // Skip test in intraprocess and datasharing mode + if (TRANSPORT != GetParam()) + { + GTEST_SKIP() << "Only makes sense on TRANSPORT"; + return; + } + + using namespace eprosima::fastdds::dds; + + // One discovery server will be created, with multiple direct clients connected to it. + // Initial announcements will be disabled and lease announcements will be configured to control discovery sequence. + // The main participant will use the test transport to count the number of Data(p) sent. + + // Look for the PID_DOMAIN_ID in the message as it is only present in Data(p) messages + auto builtin_msg_is_data_p = [](CDRMessage_t& msg, std::atomic& num_data_p) + { + uint32_t qos_size = 0; + uint32_t original_pos = msg.pos; + bool is_sentinel = false; + bool inline_qos_msg = false; + + while (!is_sentinel) + { + msg.pos = original_pos + qos_size; + + uint16_t pid = eprosima::fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + uint16_t plength = eprosima::fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + bool valid = true; + + // If inline_qos submessage is found we will have an additional Sentinel + if (pid == eprosima::fastdds::dds::PID_RELATED_SAMPLE_IDENTITY) + { + inline_qos_msg = true; + } + else if (pid == eprosima::fastdds::dds::PID_SENTINEL) + { + // PID_SENTINEL is always considered of length 0 + plength = 0; + if (!inline_qos_msg) + { + // If the PID is not inline qos, then we need to set the sentinel + // to true, as it is the last PID + is_sentinel = true; + } + } + + qos_size += (4 + plength); + + // Align to 4 byte boundary and prepare for next iteration + qos_size = (qos_size + 3) & ~3; + + if (!valid || ((msg.pos + plength) > msg.length)) + { + return false; + } + else if (!is_sentinel) + { + if (pid == eprosima::fastdds::dds::PID_DOMAIN_ID) + { + std::cout << "Data(p) sent by the server" << std::endl; + inline_qos_msg = false; + num_data_p.fetch_add(1u, std::memory_order_seq_cst); + break; + } + } + } + + // Do not drop the packet in any case + return false; + }; + + // Declare a test transport that will count the number of Data(p) messages sent + std::atomic num_data_p_sends{ 0 }; + auto test_transport = std::make_shared(); + test_transport->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) + { + return builtin_msg_is_data_p(msg, num_data_p_sends); + }; + + // Create the main participant + auto server = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server; // UDPv4 locator by default + eprosima::fastdds::rtps::IPLocator::setIPv4(locator_server, 127, 0, 0, 1); + eprosima::fastdds::rtps::IPLocator::setPhysicalPort(locator_server, global_port); + + WireProtocolConfigQos server_wp_qos; + server_wp_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; + server_wp_qos.builtin.metatrafficUnicastLocatorList.push_back(locator_server); + + server_wp_qos.builtin.discovery_config.leaseDuration = c_TimeInfinite; + server_wp_qos.builtin.discovery_config.leaseDuration_announcementperiod = c_TimeInfinite; + server_wp_qos.builtin.discovery_config.initial_announcements.count = 0; + + // The main participant will use the test transport and a specific announcements configuration + server->disable_builtin_transport().add_user_transport_to_pparams(test_transport) + .wire_protocol(server_wp_qos); + + // Start the main participant + ASSERT_TRUE(server->init_participant()); + + // Create a client that connects to the first server + PubSubParticipant client_1(0u, 0u, 0u, 0u); + PubSubParticipant client_2(0u, 0u, 0u, 0u); + PubSubParticipant client_3(0u, 0u, 0u, 0u); + // Set participant as client + WireProtocolConfigQos client_qos; + client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server); + client_qos.builtin.discovery_config.leaseDuration = c_TimeInfinite; + client_qos.builtin.discovery_config.leaseDuration_announcementperiod = c_TimeInfinite; + client_qos.builtin.discovery_config.initial_announcements.count = 1; + // Init client 1 + ASSERT_TRUE(client_1.wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init_participant()); + + // Wait for the initial announcements to be sent + server->wait_discovery(std::chrono::seconds(5), 1, true); + // Let some time for the server to run the internal routine and check if it sent Data(p) + std::this_thread::sleep_for(std::chrono::seconds(3)); + EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 1u); + + // Init client 2 + ASSERT_TRUE(client_2.wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init_participant()); + + + // Wait for the initial announcements to be sent + server->wait_discovery(std::chrono::seconds(5), 2, true); + // Let some time for the server to run the internal routine and check if it sent Data(p) + std::this_thread::sleep_for(std::chrono::seconds(3)); + EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 2u); + + // Init client 3 + ASSERT_TRUE(client_3.wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init_participant()); + + + // Wait for the initial announcements to be sent + server->wait_discovery(std::chrono::seconds(5), 3, true); + // Let some time for the server to run the internal routine and check if it sent Data(p) + std::this_thread::sleep_for(std::chrono::seconds(3)); + EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 3u); +} + +TEST_P(Discovery, discovery_server_edp_messages_sent) +{ + // Skip test in intraprocess and datasharing mode + if (TRANSPORT != GetParam()) + { + GTEST_SKIP() << "Only makes sense on TRANSPORT"; + return; + } + + using namespace eprosima::fastdds::dds; + + // Two discovery servers will be created, each with a direct client connected to them. + // Initial announcements will be disabled and lease announcements will be configured to control discovery sequence. + // The main participant will use the test transport to count the number of Data(r/w) sent. + + // Look for the PID_ENDPOINT_GUID in the message as it is only present in Data(r/w) messages + auto builtin_msg_is_data_r_w = [](CDRMessage_t& msg, std::atomic& num_data_r_w) + { + uint32_t qos_size = 0; + uint32_t original_pos = msg.pos; + bool is_sentinel = false; + bool inline_qos_msg = false; + + while (!is_sentinel) + { + msg.pos = original_pos + qos_size; + + uint16_t pid = eprosima::fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + uint16_t plength = eprosima::fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + bool valid = true; + + if (pid == eprosima::fastdds::dds::PID_RELATED_SAMPLE_IDENTITY) + { + inline_qos_msg = true; + } + else if (pid == eprosima::fastdds::dds::PID_SENTINEL) + { + // PID_SENTINEL is always considered of length 0 + plength = 0; + if (!inline_qos_msg) + { + // If the PID is not inline qos, then we need to set the sentinel + // to true, as it is the last PID + is_sentinel = true; + } + } + + qos_size += (4 + plength); + + // Align to 4 byte boundary and prepare for next iteration + qos_size = (qos_size + 3) & ~3; + + if (!valid || ((msg.pos + plength) > msg.length)) + { + return false; + } + else if (!is_sentinel) + { + if (pid == eprosima::fastdds::dds::PID_ENDPOINT_GUID) + { + std::cout << "Data (r/w) sent by the server" << std::endl; + num_data_r_w.fetch_add(1u, std::memory_order_seq_cst); + break; + } + else if (pid == eprosima::fastdds::dds::PID_VENDORID) + { + // Vendor ID is present in both Data(p) and Data(r/w) messages + inline_qos_msg = false; + } + } + } + + // Do not drop the packet in any case + return false; + }; + + // Declare a test transport that will count the number of Data(r/w) messages sent + std::atomic num_data_r_w_sends_s1{ 0 }; + std::atomic num_data_r_w_sends_s2{ 0 }; + auto test_transport_s1 = std::make_shared(); + test_transport_s1->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) + { + return builtin_msg_is_data_r_w(msg, num_data_r_w_sends_s1); + }; + + auto test_transport_s2 = std::make_shared(); + test_transport_s2->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) + { + return builtin_msg_is_data_r_w(msg, num_data_r_w_sends_s2); + }; + + // Create server 1 + auto server_1 = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server_1; // UDPv4 locator by default + eprosima::fastdds::rtps::IPLocator::setIPv4(locator_server_1, 127, 0, 0, 1); + eprosima::fastdds::rtps::IPLocator::setPhysicalPort(locator_server_1, global_port); + + WireProtocolConfigQos server_wp_qos_1; + server_wp_qos_1.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; + server_wp_qos_1.builtin.metatrafficUnicastLocatorList.push_back(locator_server_1); + + server_wp_qos_1.builtin.discovery_config.leaseDuration = c_TimeInfinite; + server_wp_qos_1.builtin.discovery_config.leaseDuration_announcementperiod = c_TimeInfinite; + server_wp_qos_1.builtin.discovery_config.initial_announcements.count = 0; + + // The main participant will use the test transport and a specific announcements configuration + server_1->disable_builtin_transport().add_user_transport_to_pparams(test_transport_s1) + .wire_protocol(server_wp_qos_1); + + // Start the main participant + ASSERT_TRUE(server_1->init_participant()); + + // Create server 2 + auto server_2 = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server_2 = locator_server_1; // UDPv4 locator by default + eprosima::fastdds::rtps::IPLocator::setPhysicalPort(locator_server_2, global_port + 1); + + WireProtocolConfigQos server_wp_qos_2 = server_wp_qos_1; + server_wp_qos_2.builtin.metatrafficUnicastLocatorList.clear(); + server_wp_qos_2.builtin.metatrafficUnicastLocatorList.push_back(locator_server_2); + // Configure 1 initial announcement as this Server will connect to the first one + server_wp_qos_2.builtin.discovery_config.initial_announcements.count = 1; + server_wp_qos_2.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server_1); + + // The main participant will use the test transport and a specific announcements configuration + server_2->disable_builtin_transport().add_user_transport_to_pparams(test_transport_s2) + .wire_protocol(server_wp_qos_2); + + // Start the main participant + ASSERT_TRUE(server_2->init_participant()); + + // Both servers match + server_1->wait_discovery(std::chrono::seconds(5), 1, true); + server_2->wait_discovery(std::chrono::seconds(5), 1, true); + // Let some time for the server to run the internal routine and match virtual endpoints + std::this_thread::sleep_for(std::chrono::seconds(2)); + + // Create a client that connects to their corresponding server + PubSubWriter client_1(TEST_TOPIC_NAME); + PubSubReader client_2(TEST_TOPIC_NAME); + // Set participant as client + WireProtocolConfigQos client_qos; + client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server_1); + client_qos.builtin.discovery_config.leaseDuration = c_TimeInfinite; + client_qos.builtin.discovery_config.leaseDuration_announcementperiod = { 15, 0 }; + client_qos.builtin.discovery_config.initial_announcements.count = 0; + + // Init client 1 + client_1.set_wire_protocol_qos(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init(); + + // Init client 2 + client_qos.builtin.discovery_config.m_DiscoveryServers.clear(); + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server_2); + client_2.set_wire_protocol_qos(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init(); + + ASSERT_TRUE(client_1.isInitialized()); + ASSERT_TRUE(client_2.isInitialized()); + + // Wait the lease announcement period to discover endpoints + server_1->wait_discovery(std::chrono::seconds(5), 2, true); + server_2->wait_discovery(std::chrono::seconds(5), 2, true); + + // Ensure that no additional Data(r/w) messages are sent by DS routine + std::this_thread::sleep_for(std::chrono::seconds(15)); + + EXPECT_EQ(num_data_r_w_sends_s1.load(std::memory_order::memory_order_seq_cst), 2u); + EXPECT_EQ(num_data_r_w_sends_s2.load(std::memory_order::memory_order_seq_cst), 2u); +} From 2b3abaed7d190e287cbf4904d886f43b526e66d5 Mon Sep 17 00:00:00 2001 From: Mario Dominguez Date: Mon, 12 May 2025 08:03:44 +0200 Subject: [PATCH 2/2] Fix pdata guid access to 3.1.x Signed-off-by: Mario Dominguez --- src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index c4b1957f534..7b80ffb0d33 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -1627,7 +1627,7 @@ void PDPServer::send_own_pdp( std::vector remote_readers; LocatorList locators; - remote_readers.emplace_back(pdata->guid.guidPrefix, c_EntityId_SPDPReader); + remote_readers.emplace_back(pdata->m_guid.guidPrefix, c_EntityId_SPDPReader); for (auto& locator : pdata->metatraffic_locators.unicast) {