Skip to content

Commit de7b95e

Browse files
committed
refactor: improve price stream async handling and define SharedPriceMap
1 parent 8a86eb3 commit de7b95e

File tree

1 file changed

+20
-17
lines changed

1 file changed

+20
-17
lines changed

src/pyth.rs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use std::fs;
99

1010
const BASE_URL: &str = "https://hermes.pyth.network";
1111

12+
type SharedPriceMap = Arc<Mutex<Vec<(String, f64)>>>;
13+
1214
/// 訂閱 Pyth 即時價格串流,並將價格回傳給 callback 函數。
1315
///
1416
/// # 參數
@@ -48,7 +50,9 @@ where
4850
}
4951
Ok(_) => {} // 略過 Ping/Comment
5052
Err(e) => {
51-
eprintln!("SSE 錯誤: {}", e);
53+
eprintln!("SSE 錯誤: {},3 秒後嘗試重連", e);
54+
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
55+
break;
5256
}
5357
}
5458
}
@@ -68,31 +72,30 @@ pub async fn get_pyth_feed_id(symbol: &str, category: &str) -> String {
6872
return raw.to_string();
6973
}
7074

71-
pub fn spawn_price_stream(symbol: &str, category: &str, prices: Arc<Mutex<Vec<(String, f64)>>>) {
75+
pub fn spawn_price_stream(symbol: &str, category: &str, prices: SharedPriceMap) {
7276
let symbol = symbol.to_string();
7377
let category = category.to_string();
7478
tokio::spawn(async move {
7579
let id = get_pyth_feed_id(&symbol, &category).await;
7680
let symbol_clone = symbol.clone();
7781
if let Err(e) = get_price_stream_from_pyth(id.as_str(), move |price| {
78-
update_price(&symbol_clone, price, &prices)
79-
})
80-
.await
82+
let prices = Arc::clone(&prices);
83+
let symbol_clone = symbol_clone.clone();
84+
tokio::spawn(async move {
85+
update_price(&symbol_clone, price, &prices).await;
86+
});
87+
}).await
8188
{
8289
eprintln!("Error occurred for {}: {}", symbol, e);
8390
}
8491
});
8592
}
8693

87-
fn update_price(symbol: &str, price: f64, prices: &Arc<Mutex<Vec<(String, f64)>>>) {
88-
let symbol = symbol.to_string(); // Clone symbol to ensure it is owned
89-
let prices = Arc::clone(prices); // Clone Arc to ensure it is owned
90-
tokio::spawn(async move {
91-
let mut prices = prices.lock().await;
92-
if let Some(entry) = prices.iter_mut().find(|(s, _)| s == &symbol) {
93-
entry.1 = price;
94-
} else {
95-
prices.push((symbol, price));
96-
}
97-
});
98-
}
94+
async fn update_price(symbol: &str, price: f64, prices: &Arc<Mutex<Vec<(String, f64)>>>) {
95+
let mut prices = prices.lock().await;
96+
if let Some(entry) = prices.iter_mut().find(|(s, _)| s == symbol) {
97+
entry.1 = price;
98+
} else {
99+
prices.push((symbol.to_string(), price));
100+
}
101+
}

0 commit comments

Comments
 (0)