Skip to content

[22814] Improve DS routines (backport #5764) #5783

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 46 additions & 42 deletions src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ namespace fastdds {
namespace rtps {
namespace ddb {

using ParticipantState = DiscoveryParticipantsAckStatus::ParticipantState;

DiscoveryDataBase::DiscoveryDataBase(
const fastdds::rtps::GuidPrefix_t& server_guid_prefix)
: server_guid_prefix_(server_guid_prefix)
Expand Down Expand Up @@ -280,8 +282,8 @@ void DiscoveryDataBase::update_change_and_unmatch_(
changes_to_release_.push_back(entity.update_and_unmatch(new_change));
// Manually set relevant participants ACK status of this server, and of the participant that sent the
// change, to 1. This way, we avoid backprogation of the data.
entity.add_or_update_ack_participant(server_guid_prefix_, true);
entity.add_or_update_ack_participant(new_change->writerGUID.guidPrefix, true);
entity.add_or_update_ack_participant(server_guid_prefix_, ParticipantState::ACKED);
entity.add_or_update_ack_participant(new_change->writerGUID.guidPrefix, ParticipantState::ACKED);
}

void DiscoveryDataBase::add_ack_(
Expand All @@ -305,7 +307,7 @@ void DiscoveryDataBase::add_ack_(
// database has been updated, so this ACK is not relevant anymore
if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity())
{
it->second.add_or_update_ack_participant(acked_entity, true);
it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED);
}
}
}
Expand All @@ -320,7 +322,7 @@ void DiscoveryDataBase::add_ack_(
// database has been updated, so this ACK is not relevant anymore
if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity())
{
it->second.add_or_update_ack_participant(acked_entity, true);
it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED);
}
}
}
Expand All @@ -335,7 +337,7 @@ void DiscoveryDataBase::add_ack_(
// database has been updated, so this ACK is not relevant anymore
if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity())
{
it->second.add_or_update_ack_participant(acked_entity, true);
it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED);
}
}
}
Expand Down Expand Up @@ -637,7 +639,7 @@ void DiscoveryDataBase::match_new_server_(
if (server != participant_prefix)
{
// Make all known servers relevant to the new server, but not matched
part.second.add_or_update_ack_participant(server, false);
part.second.add_or_update_ack_participant(server, ParticipantState::PENDING_SEND);
resend_new_pdp = true;
}
}
Expand All @@ -650,7 +652,7 @@ void DiscoveryDataBase::match_new_server_(
else
{
// Make the new server relevant to all known servers
part.second.add_or_update_ack_participant(participant_prefix, false);
part.second.add_or_update_ack_participant(participant_prefix, ParticipantState::PENDING_SEND);
// Send DATA(p) of all known servers to the new participant
add_pdp_to_send_(part.second.change());
}
Expand Down Expand Up @@ -752,7 +754,7 @@ void DiscoveryDataBase::create_new_participant_from_change_(

// Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way,
// we avoid backprogation of the data.
ret.first->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
ret.first->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);

// If the DATA(p) it's from this server, it is already in history and we do nothing here
if (change_guid.guidPrefix != server_guid_prefix_)
Expand Down Expand Up @@ -854,7 +856,7 @@ void DiscoveryDataBase::update_participant_from_change_(
if (ch->write_params.sample_identity().sequence_number() ==
participant_info.change()->write_params.sample_identity().sequence_number())
{
participant_info.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
participant_info.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);
}

// we release it if it's the same or if it is lower
Expand Down Expand Up @@ -904,7 +906,7 @@ void DiscoveryDataBase::create_writers_from_change_(
if (ch->write_params.sample_identity().sequence_number() ==
writer_it->second.change()->write_params.sample_identity().sequence_number())
{
writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);
}

// we release it if it's the same or if it is lower
Expand Down Expand Up @@ -952,7 +954,7 @@ void DiscoveryDataBase::create_writers_from_change_(

// Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way,
// we avoid backprogation of the data.
writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);

// if topic is virtual, it must iterate over all readers
if (topic_name == virtual_topic_)
Expand Down Expand Up @@ -1022,7 +1024,7 @@ void DiscoveryDataBase::create_readers_from_change_(
if (ch->write_params.sample_identity().sequence_number() ==
reader_it->second.change()->write_params.sample_identity().sequence_number())
{
reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);
}

// we release it if it's the same or if it is lower
Expand Down Expand Up @@ -1070,7 +1072,7 @@ void DiscoveryDataBase::create_readers_from_change_(

// Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way,
// we avoid backprogation of the data.
reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);

// If topic is virtual, it must iterate over all readers
if (topic_name == virtual_topic_)
Expand Down Expand Up @@ -1467,37 +1469,42 @@ bool DiscoveryDataBase::process_dirty_topics()
// Find participants with writer info and participant with reader info in participants_
parts_reader_it = participants_.find(reader.guidPrefix);
parts_writer_it = participants_.find(writer.guidPrefix);
// Find reader info in readers_
readers_it = readers_.find(reader);
// Find writer info in writers_
writers_it = writers_.find(writer);

// Check in `participants_` whether the client with the reader has acknowledge the PDP of the client
// with the writer.
if (parts_reader_it != participants_.end())
{
if (parts_reader_it->second.is_matched(writer.guidPrefix))
{
// Find reader info in readers_
readers_it = readers_.find(reader);
// Check the status of the writer in `readers_[reader]::relevant_participants_builtin_ack_status`.
if (readers_it != readers_.end() &&
readers_it->second.is_relevant_participant(writer.guidPrefix) &&
!readers_it->second.is_matched(writer.guidPrefix))
!readers_it->second.is_waiting_ack(writer.guidPrefix))
{
// If the status is 0, add DATA(r) to a `edp_publications_to_send_` (if it's not there).
if (add_edp_subscriptions_to_send_(readers_it->second.change()))
{
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding DATA(r) to send: "
<< readers_it->second.change()->instanceHandle);
readers_it->second.add_or_update_ack_participant(writer.guidPrefix,
ParticipantState::WAITING_ACK);
}
}
}
else if (parts_reader_it->second.is_relevant_participant(writer.guidPrefix))
{
// Add DATA(p) of the client with the writer to `pdp_to_send_` (if it's not there).
if (add_pdp_to_send_(parts_reader_it->second.change()))
if (!parts_reader_it->second.is_waiting_ack(writer.guidPrefix))
{
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding readers' DATA(p) to send: "
<< parts_reader_it->second.change()->instanceHandle);
// Add DATA(p) of the client with the writer to `pdp_to_send_` (if it's not there).
if (add_pdp_to_send_(parts_reader_it->second.change()))
{
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding readers' DATA(p) to send: "
<< parts_reader_it->second.change()->instanceHandle);
parts_reader_it->second.add_or_update_ack_participant(writer.guidPrefix,
ParticipantState::WAITING_ACK);
}
}
// Set topic as not-clearable.
is_clearable = false;
Expand All @@ -1510,26 +1517,35 @@ bool DiscoveryDataBase::process_dirty_topics()
{
if (parts_writer_it->second.is_matched(reader.guidPrefix))
{
// Find writer info in writers_
writers_it = writers_.find(writer);
// Check the status of the reader in `writers_[writer]::relevant_participants_builtin_ack_status`.
if (writers_it != writers_.end() &&
writers_it->second.is_relevant_participant(reader.guidPrefix) &&
!writers_it->second.is_matched(reader.guidPrefix))
!writers_it->second.is_waiting_ack(reader.guidPrefix))
{
// If the status is 0, add DATA(w) to a `edp_subscriptions_to_send_` (if it's not there).
if (add_edp_publications_to_send_(writers_it->second.change()))
{
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding DATA(w) to send: "
<< writers_it->second.change()->instanceHandle);
writers_it->second.add_or_update_ack_participant(reader.guidPrefix,
ParticipantState::WAITING_ACK);
}
}
}
else if (parts_writer_it->second.is_relevant_participant(reader.guidPrefix))
{
// Add DATA(p) of the client with the reader to `pdp_to_send_` (if it's not there).
if (add_pdp_to_send_(parts_writer_it->second.change()))
if (!parts_writer_it->second.is_waiting_ack(reader.guidPrefix))
{
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding writers' DATA(p) to send: "
<< parts_writer_it->second.change()->instanceHandle);
// Add DATA(p) of the client with the reader to `pdp_to_send_` (if it's not there).
if (add_pdp_to_send_(parts_writer_it->second.change()))
{
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding writers' DATA(p) to send: "
<< parts_writer_it->second.change()->instanceHandle);
parts_writer_it->second.add_or_update_ack_participant(reader.guidPrefix,
ParticipantState::WAITING_ACK);
}
}
// Set topic as not-clearable.
is_clearable = false;
Expand Down Expand Up @@ -2350,18 +2366,6 @@ bool DiscoveryDataBase::add_pdp_to_send_(
return false;
}

bool DiscoveryDataBase::add_own_pdp_to_send_()
{
if (!backup_in_progress())
{
auto our_data_it = participants_.find(server_guid_prefix_);
assert(our_data_it != participants_.end());

return add_pdp_to_send_(our_data_it->second.change());
}
return false;
}

bool DiscoveryDataBase::add_edp_publications_to_send_(
eprosima::fastdds::rtps::CacheChange_t* change)
{
Expand Down Expand Up @@ -2494,7 +2498,7 @@ bool DiscoveryDataBase::from_json(
// Populate GuidPrefix_t
std::istringstream(it_ack.key()) >> prefix_aux_ack;

dpi.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<bool>());
dpi.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<ParticipantState>());
}

// Add Participant
Expand Down Expand Up @@ -2532,7 +2536,7 @@ bool DiscoveryDataBase::from_json(
// Populate GuidPrefix_t
std::istringstream(it_ack.key()) >> prefix_aux_ack;

dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<bool>());
dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<ParticipantState>());
}

// Add Participant
Expand Down Expand Up @@ -2592,7 +2596,7 @@ bool DiscoveryDataBase::from_json(
// Populate GuidPrefix_t
std::istringstream(it_ack.key()) >> prefix_aux_ack;

dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<bool>());
dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<ParticipantState>());
}

// Add Participant
Expand Down
3 changes: 0 additions & 3 deletions src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,6 @@ class DiscoveryDataBase
fastdds::rtps::WriterHistory* writer_history,
const fastdds::rtps::GuidPrefix_t& entity_guid_prefix);

// Add own Data(p) in pdp_to_send if not already in it
bool add_own_pdp_to_send_();

protected:

// Change a cacheChange by update or new disposal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace ddb {

void DiscoveryParticipantsAckStatus::add_or_update_participant(
const GuidPrefix_t& guid_p,
bool status = false)
ParticipantState status = ParticipantState::PENDING_SEND)
{
relevant_participants_map_[guid_p] = status;
}
Expand All @@ -45,13 +45,24 @@ void DiscoveryParticipantsAckStatus::remove_participant(
relevant_participants_map_.erase(guid_p);
}

bool DiscoveryParticipantsAckStatus::is_waiting_ack(
const GuidPrefix_t& guid_p) const
{
auto it = relevant_participants_map_.find(guid_p);
if (it != relevant_participants_map_.end())
{
return it->second >= ParticipantState::WAITING_ACK;
}
return false;
}

bool DiscoveryParticipantsAckStatus::is_matched(
const GuidPrefix_t& guid_p) const
{
auto it = relevant_participants_map_.find(guid_p);
if (it != relevant_participants_map_.end())
{
return it->second;
return it->second == ParticipantState::ACKED;
}
return false;
}
Expand All @@ -60,7 +71,7 @@ void DiscoveryParticipantsAckStatus::unmatch_all()
{
for (auto it = relevant_participants_map_.begin(); it != relevant_participants_map_.end(); ++it)
{
it->second = false;
it->second = ParticipantState::PENDING_SEND;
}
}

Expand Down Expand Up @@ -89,7 +100,7 @@ bool DiscoveryParticipantsAckStatus::is_acked_by_all() const
{
for (auto it = relevant_participants_map_.begin(); it != relevant_participants_map_.end(); ++it)
{
if (!it->second)
if (it->second != ParticipantState::ACKED)
{
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,25 @@ class DiscoveryParticipantsAckStatus

~DiscoveryParticipantsAckStatus() = default;

enum class ParticipantState : uint8_t
{
PENDING_SEND, // Data(p) has not been sent yet
WAITING_ACK, // Data(p) has already been sent but ACK has not been received
ACKED // Data(p) has been acked
};

void add_or_update_participant(
const GuidPrefix_t& guid_p,
bool status);
ParticipantState status);

void remove_participant(
const GuidPrefix_t& guid_p);

void unmatch_all();

bool is_waiting_ack(
const GuidPrefix_t& guid_p) const;

bool is_matched(
const GuidPrefix_t& guid_p) const;

Expand All @@ -69,9 +79,31 @@ class DiscoveryParticipantsAckStatus

private:

std::map<GuidPrefix_t, bool> relevant_participants_map_;
std::map<GuidPrefix_t, ParticipantState> relevant_participants_map_;
};

inline std::ostream& operator <<(
std::ostream& os,
DiscoveryParticipantsAckStatus::ParticipantState child)
{
switch (child)
{
case DiscoveryParticipantsAckStatus::ParticipantState::PENDING_SEND:
os << "PENDING_SEND";
break;
case DiscoveryParticipantsAckStatus::ParticipantState::WAITING_ACK:
os << "WAITING_ACK";
break;
case DiscoveryParticipantsAckStatus::ParticipantState::ACKED:
os << "ACKED";
break;
default:
os << "UNKNOWN";
break;
}
return os;
}

} /* namespace ddb */
} /* namespace rtps */
} /* namespace fastdds */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ DiscoverySharedInfo::DiscoverySharedInfo(
: change_(change)
{
// the server already knows every message
add_or_update_ack_participant(known_participant, true);
add_or_update_ack_participant(known_participant, DiscoveryParticipantsAckStatus::ParticipantState::ACKED);
}

CacheChange_t* DiscoverySharedInfo::update_and_unmatch(
Expand Down
Loading
Loading