Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions crates/fuel-indexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use fuel_streams_core::{
nats::{types::DeliverPolicy, FuelNetwork, NatsClient, NatsClientOpts},
nats::{types::DeliverPolicy, NatsClient, NatsClientOpts},
types::{Block, Transaction},
StreamEncoder,
Streamable,
Expand Down Expand Up @@ -44,7 +44,8 @@ async fn main() -> anyhow::Result<()> {

db.use_ns("fuel_indexer").use_db("fuel_indexer").await?;

let nats_client_opts = NatsClientOpts::admin_opts(FuelNetwork::Testnet);
let nats_client_opts = NatsClientOpts::admin_opts(None)
.with_custom_url("nats:4222".to_string());
let nats_client = NatsClient::connect(&nats_client_opts).await?;

tokio::try_join!(
Expand Down
12 changes: 8 additions & 4 deletions crates/fuel-streams-core/src/nats/nats_client_opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,21 @@ pub struct NatsClientOpts {
}

impl NatsClientOpts {
pub fn new(network: FuelNetwork) -> Self {
pub fn new(network: Option<FuelNetwork>) -> Self {
Self {
url: network.to_url(),
url: network.unwrap_or_default().to_url(),
role: NatsUserRole::default(),
namespace: NatsNamespace::default(),
timeout_secs: 5,
}
}

pub fn default_opts(network: FuelNetwork) -> Self {
pub fn default_opts(network: Option<FuelNetwork>) -> Self {
Self::new(network).with_role(NatsUserRole::Default)
}

#[cfg(any(test, feature = "test-helpers"))]
pub fn admin_opts(network: FuelNetwork) -> Self {
pub fn admin_opts(network: Option<FuelNetwork>) -> Self {
Self::new(network).with_role(NatsUserRole::Admin)
}

Expand All @@ -107,6 +107,10 @@ impl NatsClientOpts {
}
}

pub fn with_custom_url(self, url: String) -> Self {
Self { url, ..self }
}

#[cfg(any(test, feature = "test-helpers"))]
pub fn with_rdn_namespace(self) -> Self {
let namespace = format!(r"namespace-{}", Self::random_int());
Expand Down
11 changes: 5 additions & 6 deletions crates/fuel-streams-publisher/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
//! to publish streams that can consumed via the `fuel-streams` SDK.

use clap::Parser;
use fuel_streams::types::FuelNetwork;

/// CLI structure for parsing command-line arguments.
///
Expand All @@ -13,12 +12,12 @@ pub struct Cli {
/// Fuel Network to connect to.
#[arg(
long,
value_name = "NETWORK",
env = "NETWORK",
default_value = "Local",
value_parser = clap::value_parser!(FuelNetwork)
value_name = "NATS_URL",
env = "NATS_URL",
default_value = "nats:4222",
help = "NATS URL to connect to."
)]
pub network: FuelNetwork,
pub nats_url: String,
/// Flattened command structure for Fuel Core configuration.
#[command(flatten)]
pub fuel_core_config: fuel_core_bin::cli::run::Command,
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-streams-publisher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn main() -> anyhow::Result<()> {

let publisher = fuel_streams_publisher::Publisher::new(
Arc::clone(&fuel_core),
cli.network,
cli.nats_url,
telemetry.clone(),
)
.await?;
Expand Down
5 changes: 3 additions & 2 deletions crates/fuel-streams-publisher/src/publisher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ pub struct Publisher {
impl Publisher {
pub async fn new(
fuel_core: Arc<dyn FuelCoreLike>,
network: FuelNetwork,
nats_url: String,
telemetry: Arc<Telemetry>,
) -> anyhow::Result<Self> {
let nats_client_opts = NatsClientOpts::admin_opts(network);
let nats_client_opts =
NatsClientOpts::admin_opts(None).with_custom_url(nats_url);
let nats_client = NatsClient::connect(&nats_client_opts).await?;
let streams = Arc::new(Streams::new(&nats_client).await);

Expand Down
12 changes: 7 additions & 5 deletions crates/fuel-streams-publisher/src/server/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ mod tests {
use fuel_core::service::Config;
use fuel_core_bin::FuelService;
use fuel_core_services::State;
use fuel_streams::types::{FuelNetwork, NatsClientOpts};

use crate::{
server::state::{HealthResponse, ServerState},
Expand All @@ -84,10 +83,13 @@ mod tests {
let telemetry = Telemetry::new().await.unwrap();

let fuel_core = FuelCore::from(fuel_service);
let publisher =
Publisher::new(fuel_core.arc(), FuelNetwork::Local, telemetry)
.await
.unwrap();
let publisher = Publisher::new(
fuel_core.arc(),
"nats://nats:4222".to_string(),
telemetry,
)
.await
.unwrap();
let state = ServerState::new(publisher).await;
assert!(state.publisher.nats_client.is_connected());

Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-streams/src/client/client_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Client {
/// # }
/// ```
pub async fn connect(network: FuelNetwork) -> Result<Self, crate::Error> {
let opts = NatsClientOpts::new(network);
let opts = NatsClientOpts::new(Some(network));
let conn = NatsClient::connect(&opts)
.await
.map_err(ClientError::ConnectionFailed)?;
Expand Down
4 changes: 2 additions & 2 deletions tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ impl Streams {
}

pub async fn server_setup() -> BoxedResult<(NatsClient, Streams)> {
let opts =
NatsClientOpts::admin_opts(FuelNetwork::Local).with_rdn_namespace();
let opts = NatsClientOpts::admin_opts(Some(FuelNetwork::Local))
.with_rdn_namespace();
let client = NatsClient::connect(&opts).await?;
let streams = Streams::new(&client).await;
Ok((client, streams))
Expand Down
4 changes: 2 additions & 2 deletions tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn main() -> BoxedResult<()> {
.expect("Failed to change directory to workspace root");

// ensure nats is connected and running
let client_opts = NatsClientOpts::admin_opts(FuelNetwork::Local)
let client_opts = NatsClientOpts::admin_opts(Some(FuelNetwork::Local))
.with_rdn_namespace()
.with_timeout(1);
let is_connected = Client::with_opts(&client_opts)
Expand Down Expand Up @@ -124,7 +124,7 @@ async fn main() -> BoxedResult<()> {
}
}
_ = action_interval.tick() => {
let client_opts = NatsClientOpts::admin_opts(FuelNetwork::Local)
let client_opts = NatsClientOpts::admin_opts(Some(FuelNetwork::Local))
.with_rdn_namespace()
.with_timeout(1);
let is_nats_connected = Client::with_opts(&client_opts).await.ok().map(|c| c.conn.is_connected()).unwrap_or_default();
Expand Down
24 changes: 12 additions & 12 deletions tests/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn conn_streams_has_required_streams() -> BoxedResult<()> {

#[tokio::test]
async fn fuel_streams_client_connection() -> BoxedResult<()> {
let opts = NatsClientOpts::admin_opts(FuelNetwork::Local);
let opts = NatsClientOpts::admin_opts(Some(FuelNetwork::Local));
let client = NatsClient::connect(&opts).await?;
assert!(client.is_connected());
let client = Client::with_opts(&opts).await?;
Expand All @@ -51,7 +51,7 @@ async fn fuel_streams_client_connection() -> BoxedResult<()> {

#[tokio::test]
async fn multiple_client_connections() -> BoxedResult<()> {
let opts = NatsClientOpts::admin_opts(FuelNetwork::Local);
let opts = NatsClientOpts::admin_opts(Some(FuelNetwork::Local));
let tasks: Vec<_> = (0..100)
.map(|_| {
let opts = opts.clone();
Expand All @@ -69,7 +69,7 @@ async fn multiple_client_connections() -> BoxedResult<()> {

#[tokio::test]
async fn public_user_cannot_create_streams() -> BoxedResult<()> {
let opts = NatsClientOpts::default_opts(FuelNetwork::Local)
let opts = NatsClientOpts::default_opts(Some(FuelNetwork::Local))
.with_rdn_namespace()
.with_timeout(1);
let client = NatsClient::connect(&opts).await?;
Expand All @@ -91,7 +91,7 @@ async fn public_user_cannot_create_streams() -> BoxedResult<()> {

#[tokio::test]
async fn public_user_cannot_create_stores() -> BoxedResult<()> {
let opts = NatsClientOpts::default_opts(FuelNetwork::Local)
let opts = NatsClientOpts::default_opts(Some(FuelNetwork::Local))
.with_rdn_namespace()
.with_timeout(1);

Expand All @@ -112,7 +112,7 @@ async fn public_user_cannot_create_stores() -> BoxedResult<()> {

#[tokio::test]
async fn public_user_cannot_delete_stores() -> BoxedResult<()> {
let opts = NatsClientOpts::admin_opts(FuelNetwork::Local)
let opts = NatsClientOpts::admin_opts(Some(FuelNetwork::Local))
.with_rdn_namespace()
.with_timeout(1);

Expand All @@ -127,7 +127,7 @@ async fn public_user_cannot_delete_stores() -> BoxedResult<()> {
})
.await?;

let opts = NatsClientOpts::default_opts(FuelNetwork::Local)
let opts = NatsClientOpts::default_opts(Some(FuelNetwork::Local))
.with_rdn_namespace()
.with_timeout(1);
let client = NatsClient::connect(&opts).await?;
Expand All @@ -143,7 +143,7 @@ async fn public_user_cannot_delete_stores() -> BoxedResult<()> {

#[tokio::test]
async fn public_user_cannot_delete_stream() -> BoxedResult<()> {
let opts = NatsClientOpts::admin_opts(FuelNetwork::Local)
let opts = NatsClientOpts::admin_opts(Some(FuelNetwork::Local))
.with_rdn_namespace()
.with_timeout(1);
let client = NatsClient::connect(&opts).await?;
Expand Down Expand Up @@ -177,7 +177,7 @@ async fn public_user_cannot_delete_stream() -> BoxedResult<()> {

#[tokio::test]
async fn public_user_can_access_streams_after_created() {
let opts = NatsClientOpts::new(FuelNetwork::Local)
let opts = NatsClientOpts::new(Some(FuelNetwork::Local))
.with_rdn_namespace()
.with_timeout(1);

Expand All @@ -191,7 +191,7 @@ async fn public_user_can_access_streams_after_created() {
#[tokio::test]
async fn public_and_admin_user_can_access_streams_after_created(
) -> BoxedResult<()> {
let admin_opts = NatsClientOpts::admin_opts(FuelNetwork::Local);
let admin_opts = NatsClientOpts::admin_opts(Some(FuelNetwork::Local));
let admin_tasks: Vec<BoxFuture<'_, Result<(), NatsError>>> = (0..100)
.map(|_| {
let opts: NatsClientOpts = admin_opts.clone();
Expand All @@ -204,7 +204,7 @@ async fn public_and_admin_user_can_access_streams_after_created(
})
.collect();

let public_opts = NatsClientOpts::default_opts(FuelNetwork::Local);
let public_opts = NatsClientOpts::default_opts(Some(FuelNetwork::Local));
let public_tasks: Vec<BoxFuture<'_, Result<(), NatsError>>> = (0..100)
.map(|_| {
let opts: NatsClientOpts = public_opts.clone();
Expand All @@ -229,7 +229,7 @@ async fn public_and_admin_user_can_access_streams_after_created(

#[tokio::test]
async fn admin_user_can_delete_stream() -> BoxedResult<()> {
let opts = NatsClientOpts::admin_opts(FuelNetwork::Local)
let opts = NatsClientOpts::admin_opts(Some(FuelNetwork::Local))
.with_rdn_namespace()
.with_timeout(1);
let client = NatsClient::connect(&opts).await?;
Expand All @@ -254,7 +254,7 @@ async fn admin_user_can_delete_stream() -> BoxedResult<()> {

#[tokio::test]
async fn admin_user_can_delete_stores() -> BoxedResult<()> {
let opts = NatsClientOpts::admin_opts(FuelNetwork::Local)
let opts = NatsClientOpts::admin_opts(Some(FuelNetwork::Local))
.with_rdn_namespace()
.with_timeout(1);

Expand Down
4 changes: 2 additions & 2 deletions tests/tests/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ fn create_test_block() -> ImporterResult {
}

async fn nats_client() -> NatsClient {
let nats_client_opts =
NatsClientOpts::admin_opts(FuelNetwork::Local).with_rdn_namespace();
let nats_client_opts = NatsClientOpts::admin_opts(Some(FuelNetwork::Local))
.with_rdn_namespace();
NatsClient::connect(&nats_client_opts)
.await
.expect("NATS connection failed")
Expand Down
Loading