Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0561fde
first pass at adding waypoint caching
adambrett40 Feb 6, 2025
46c43f8
better version, needs testing
adambrett40 Feb 6, 2025
5ed355d
extract caching to function
adambrett40 Feb 8, 2025
16a65ef
Merge branch 'main' into HEAD
adambrett40 Feb 8, 2025
420f74c
basic test for caching
adambrett40 Feb 9, 2025
dd20a13
cache testing update
adambrett40 Feb 9, 2025
1d655fb
tests pass... kinda (on first try. need to cleanup added files on tea…
adambrett40 Feb 9, 2025
7da2df6
updated teardown so consecutive tests pass
adambrett40 Feb 9, 2025
08f715f
cleaned up and made paths constant
adambrett40 Feb 24, 2025
70493d2
Merge remote-tracking branch 'origin/main' into user/adambrett40/259-…
adambrett40 Feb 24, 2025
84f2a2e
removed comments
adambrett40 Feb 24, 2025
bdc81d9
minor cleanup
adambrett40 Feb 27, 2025
995ef9a
fixed copy/paste error
adambrett40 Feb 27, 2025
23bbf88
make cacheGlobalWaypoints async
adambrett40 Mar 3, 2025
0721dfa
made caching async, ros node run virtual iridium in background
adambrett40 Mar 5, 2025
406526a
Merge remote-tracking branch 'origin/main' into user/adambrett40/259-…
adambrett40 Mar 5, 2025
e83d6db
Merge remote-tracking branch 'origin/main' into user/adambrett40/259-…
adambrett40 Mar 8, 2025
c9b328b
made requested changes
adambrett40 Mar 22, 2025
e61bc01
Merge remote-tracking branch 'origin/main' into user/adambrett40/259-…
adambrett40 Mar 22, 2025
d0246a5
fixed extra merge conflict
adambrett40 Mar 29, 2025
746baa6
Merge remote-tracking branch 'origin/main' into user/adambrett40/259-…
adambrett40 Mar 29, 2025
6342120
Merge remote-tracking branch 'origin/main' into user/adambrett40/259-…
adambrett40 Mar 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/network_systems/lib/cmn_hdrs/shared_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,23 @@ static const std::string DEV = "development";
constexpr unsigned int MAX_LOCAL_TO_REMOTE_PAYLOAD_SIZE_BYTES = 340;
constexpr unsigned int MAX_REMOTE_TO_LOCAL_PAYLOAD_SIZE_BYTES = 270;

inline std::string getCachePath()
{
const char * ros_ws = std::getenv("ROS_WORKSPACE"); //NOLINT (concurrency-mt-unsafe)
return (ros_ws != nullptr ? std::string(ros_ws) : "/workspaces/sailbot_workspace") +
"/build/network_systems/projects/local_transceiver/global_waypoint_cache";
}

inline std::string getCacheTempPath()
{
const char * ros_ws = std::getenv("ROS_WORKSPACE"); //NOLINT (concurrency-mt-unsafe)
return (ros_ws != nullptr ? std::string(ros_ws) : "/workspaces/sailbot_workspace") +
"/build/network_systems/projects/local_transceiver/global_waypoint_cache_temp";
}

static const std::string CACHE_PATH = getCachePath();
static const std::string CACHE_TEMP_PATH = getCacheTempPath();

constexpr int NUM_BATTERIES = []() constexpr
{
using batteries_arr = custom_interfaces::msg::Batteries::_batteries_type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ class LocalTransceiver
*/
std::optional<std::string> debugSend(const std::string & cmd);

/**
* @brief Asynchronously save the received serialized data to a local file
*
* @param receivedDataBuffer string to cache
* @return future object representing completion of the function
*/
static void cacheGlobalWaypoints(std::string receivedDataBuffer);

/**
* @brief Retrieve the latest message from the remote server via the serial port
*
Expand All @@ -121,6 +129,22 @@ class LocalTransceiver
// TEST
bool checkMailbox();

/**
* @brief Read and parse the data from the global waypoints file, if it exists
*
* @return The global waypoints from the cache
*/
static std::optional<custom_interfaces::msg::Path> getCache();

//temp public for testing
/**
* @brief Parse the message received from the remote server
*
* @param msg message received from the remote server
* @return the data byte string payload from the message
*/
static custom_interfaces::msg::Path parseInMsg(const std::string & msg);

private:
// Serial port read/write timeout
constexpr static const struct timeval TIMEOUT
Expand Down Expand Up @@ -154,14 +178,6 @@ class LocalTransceiver

std::optional<std::string> readRsp();

/**
* @brief Parse the message received from the remote server
*
* @param msg message received from the remote server
* @return the data byte string payload from the message
*/
static custom_interfaces::msg::Path parseInMsg(const std::string & msg);

/**
* @brief Convert a boost::asio::streambuf into a string
* @warning Flushes the streambuf object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "at_cmds.h"
#include "cmn_hdrs/ros_info.h"
#include "cmn_hdrs/shared_constants.h"
#include "filesystem"
#include "fstream"
#include "global_path.pb.h"
#include "sensors.pb.h"
#include "waypoint.pb.h"
Expand Down Expand Up @@ -85,11 +87,14 @@ LocalTransceiver::LocalTransceiver(const std::string & port_name, const uint32_t
serial_.set_option(bio::serial_port_base::baud_rate(baud_rate));
// Set a timeout for read/write operations on the serial port
setsockopt(serial_.native_handle(), SOL_SOCKET, SO_RCVTIMEO, &TIMEOUT, sizeof(TIMEOUT));
std::ofstream cacheFile(CACHE_PATH);
cacheFile.close();
};

LocalTransceiver::~LocalTransceiver()
{
// Intentionally left blank
std::filesystem::remove(CACHE_PATH);
std::filesystem::remove(CACHE_TEMP_PATH);
}

void LocalTransceiver::stop()
Expand Down Expand Up @@ -205,6 +210,19 @@ std::optional<std::string> LocalTransceiver::debugSend(const std::string & cmd)
return readRsp();
}

void LocalTransceiver::cacheGlobalWaypoints(std::string receivedDataBuffer)
{
std::filesystem::path cache{CACHE_PATH};
std::filesystem::path cache_temp{CACHE_TEMP_PATH};
std::ofstream writeFile(CACHE_TEMP_PATH, std::ios::binary);
if (!writeFile) {
std::cerr << "Failed to create temp cache file" << std::endl;
}
writeFile.write(receivedDataBuffer.data(), static_cast<std::streamsize>(receivedDataBuffer.size()));
writeFile.close();
std::filesystem::rename(CACHE_TEMP_PATH, CACHE_PATH);
}

custom_interfaces::msg::Path LocalTransceiver::receive()
{
static constexpr int MAX_NUM_RETRIES = 20;
Expand Down Expand Up @@ -304,8 +322,12 @@ custom_interfaces::msg::Path LocalTransceiver::receive()

break;
}

std::future<void> fut = std::async(std::launch::async, cacheGlobalWaypoints, receivedDataBuffer);

custom_interfaces::msg::Path to_publish = parseInMsg(receivedDataBuffer);

fut.get();
return to_publish;
}

Expand Down Expand Up @@ -340,6 +362,17 @@ custom_interfaces::msg::Path LocalTransceiver::parseInMsg(const std::string & ms
return soln;
}

std::optional<custom_interfaces::msg::Path> LocalTransceiver::getCache()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make this function async as well

{
if (std::filesystem::exists(CACHE_PATH) && std::filesystem::file_size(CACHE_PATH) > 0) {
std::ifstream input(CACHE_PATH, std::ios::binary);
std::string cachedDataBuffer(std::istreambuf_iterator<char>(input), {});
custom_interfaces::msg::Path to_publish = parseInMsg(cachedDataBuffer);
return std::make_optional(to_publish);
}
return std::nullopt;
}

bool LocalTransceiver::rcvRsp(const AT::Line & expected_rsp)
{
bio::streambuf buf;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <bits/stdc++.h>

#include <chrono>
#include <functional>
#include <memory>
Expand Down Expand Up @@ -44,16 +46,32 @@ class LocalTransceiverIntf : public NetNode
} else if (mode == SYSTEM_MODE::DEV) {
default_port = LOCAL_TRANSCEIVER_TEST_PORT;
std::string run_iridium_cmd = "$ROS_WORKSPACE/scripts/run_virtual_iridium.sh";
int result = std::system(run_iridium_cmd.c_str()); //NOLINT(concurrency-mt-unsafe)
if (result != 0) {
std::string msg = "Error: could not start virtual iridium";
std::cerr << msg << std::endl;
throw std::exception();
std::thread vi_thread(std::system, run_iridium_cmd.c_str());
//vi needs to run in background
vi_thread.detach();

const int MAX_ATTEMPTS = 100; // 100ms timeout, should only take <5
int attempts = 0;
const int SLEEP_MS = 1;
const int IOCTL_ERR_CODE = 256;
std::string set_baud_cmd = "stty 19200 < $LOCAL_TRANSCEIVER_TEST_PORT 2>/dev/null";
//silence stderr to not clutter console while polling
while (attempts < MAX_ATTEMPTS) {
if (std::filesystem::exists(LOCAL_TRANSCEIVER_TEST_PORT)) {
int result = std::system(set_baud_cmd.c_str()); //NOLINT(concurrency-mt-unsafe)
if (result == 0) {
break;
}
if (result != IOCTL_ERR_CODE) {
std::cerr << "Failed to set baud rate with code: " << result << std::endl;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MS));
attempts++;
}
std::string set_baud_cmd = "stty 19200 < $LOCAL_TRANSCEIVER_TEST_PORT";
result = std::system(set_baud_cmd.c_str()); //NOLINT(concurrency-mt-unsafe)
if (result != 0) {
std::string msg = "Error: could not set baud rate for virtual iridium";

if (attempts == MAX_ATTEMPTS) {
std::string msg = "Error: could not start virtual iridium";
std::cerr << msg << std::endl;
throw std::exception();
}
Expand All @@ -71,6 +89,9 @@ class LocalTransceiverIntf : public NetNode
this->get_logger(), "Running Local Transceiver in mode: %s, with port: %s.", mode.c_str(), port.c_str());
lcl_trns_ = std::make_unique<LocalTransceiver>(port, SATELLITE_BAUD_RATE);

std::future<std::optional<custom_interfaces::msg::Path>> fut =
std::async(std::launch::async, lcl_trns_->getCache);

static constexpr int ROS_Q_SIZE = 5;
static constexpr auto TIMER_INTERVAL = std::chrono::milliseconds(300000);
timer_ = this->create_wall_timer(TIMER_INTERVAL, std::bind(&LocalTransceiverIntf::pub_cb, this));
Expand All @@ -91,6 +112,11 @@ class LocalTransceiverIntf : public NetNode
sub_local_path_data = this->create_subscription<custom_interfaces::msg::LPathData>(
ros_topics::LOCAL_PATH, ROS_Q_SIZE,
std::bind(&LocalTransceiverIntf::sub_local_path_data_cb, this, std::placeholders::_1));

std::optional<custom_interfaces::msg::Path> msg = fut.get();
if (msg) {
pub_->publish(*msg);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "at_cmds.h"
#include "cmn_hdrs/shared_constants.h"
#include "filesystem"
#include "global_path.pb.h"
#include "local_transceiver.h"
#include "sensors.pb.h"
Expand Down Expand Up @@ -45,7 +46,12 @@ class TestLocalTransceiver : public ::testing::Test
}
}

static void TearDownTestSuite() { http_echo_server_proc_.terminate(); }
static void TearDownTestSuite()
{
http_echo_server_proc_.terminate();
std::filesystem::remove(CACHE_PATH);
std::filesystem::remove(CACHE_TEMP_PATH);
}

TestLocalTransceiver()
{
Expand Down Expand Up @@ -227,6 +233,73 @@ TEST_F(TestLocalTransceiver, parseInMsgValid)
EXPECT_EQ(parsed_test.waypoints[1].longitude, holder);
}

TEST_F(TestLocalTransceiver, checkCache)
{
constexpr float holder_lat = 14.3;
constexpr float holder_long = -177.6;
constexpr float updated_lat = 17.9;
constexpr float updated_long = 0.1;
std::vector<custom_interfaces::msg::HelperLatLon> waypoints;

Polaris::GlobalPath path;

Polaris::Waypoint * waypoint_a = path.add_waypoints();
waypoint_a->set_latitude(holder_lat);
waypoint_a->set_longitude(holder_long);

Polaris::Waypoint * waypoint_b = path.add_waypoints();
waypoint_b->set_latitude(holder_lat);
waypoint_b->set_longitude(holder_long);

std::string serialized_test = path.SerializeAsString();
std::filesystem::path cache{CACHE_PATH};

ASSERT_TRUE(std::filesystem::exists(cache));
ASSERT_TRUE(std::filesystem::file_size(CACHE_PATH) == 0);

LocalTransceiver::cacheGlobalWaypoints(serialized_test);

ASSERT_TRUE(std::filesystem::exists(cache));
ASSERT_TRUE(std::filesystem::file_size(CACHE_PATH) > 0);

auto cache_obj = LocalTransceiver::getCache();

ASSERT_TRUE(cache_obj);

custom_interfaces::msg::Path parsed_cache = *cache_obj;
custom_interfaces::msg::Path parsed_test = LocalTransceiver::parseInMsg(serialized_test);

EXPECT_EQ(parsed_test.waypoints[0].latitude, parsed_cache.waypoints[0].latitude);
EXPECT_EQ(parsed_test.waypoints[0].longitude, parsed_cache.waypoints[0].longitude);
EXPECT_EQ(parsed_test.waypoints[1].latitude, parsed_cache.waypoints[0].latitude);
EXPECT_EQ(parsed_test.waypoints[1].longitude, parsed_cache.waypoints[0].longitude);

waypoint_a->set_latitude(updated_lat);
waypoint_a->set_longitude(updated_long);

waypoint_b->set_latitude(updated_lat);
waypoint_b->set_longitude(updated_long);

serialized_test = path.SerializeAsString();
LocalTransceiver::cacheGlobalWaypoints(serialized_test);

std::filesystem::path cache_temp{CACHE_TEMP_PATH};
ASSERT_TRUE(std::filesystem::exists(cache));
ASSERT_FALSE(std::filesystem::exists(CACHE_TEMP_PATH));

cache_obj = LocalTransceiver::getCache();

ASSERT_TRUE(std::filesystem::exists(cache));
ASSERT_TRUE(cache_obj);
parsed_cache = *cache_obj;

parsed_test = LocalTransceiver::parseInMsg(serialized_test);
EXPECT_EQ(parsed_test.waypoints[0].latitude, parsed_cache.waypoints[0].latitude);
EXPECT_EQ(parsed_test.waypoints[0].longitude, parsed_cache.waypoints[0].longitude);
EXPECT_EQ(parsed_test.waypoints[1].latitude, parsed_cache.waypoints[0].latitude);
EXPECT_EQ(parsed_test.waypoints[1].longitude, parsed_cache.waypoints[0].longitude);
}

std::mutex port_mutex;

TEST_F(TestLocalTransceiver, testMailboxBlackbox)
Expand Down
Loading