Skip to content

Commit 2fea772

Browse files
Fix sockets causing actix hangs (#3089)
* Fix sockets causing actix hangs * Fix fmt issues * Retry failed S3 uploads * Ignore launcher socket from sentry
1 parent 24765db commit 2fea772

File tree

4 files changed

+37
-32
lines changed

4 files changed

+37
-32
lines changed

apps/labrinth/src/file_hosting/s3_host.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,10 @@ impl FileHost for S3Host {
7474
content_type,
7575
)
7676
.await
77-
.map_err(|_| {
78-
FileHostingError::S3Error(
79-
"Error while uploading file to S3".to_string(),
80-
)
77+
.map_err(|err| {
78+
FileHostingError::S3Error(format!(
79+
"Error while uploading file {file_name} to S3: {err}"
80+
))
8181
})?;
8282

8383
Ok(UploadFileData {
@@ -100,10 +100,10 @@ impl FileHost for S3Host {
100100
self.bucket
101101
.delete_object(format!("/{file_name}"))
102102
.await
103-
.map_err(|_| {
104-
FileHostingError::S3Error(
105-
"Error while deleting file from S3".to_string(),
106-
)
103+
.map_err(|err| {
104+
FileHostingError::S3Error(format!(
105+
"Error while deleting file {file_name} to S3: {err}"
106+
))
107107
})?;
108108

109109
Ok(DeleteFileData {

apps/labrinth/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ async fn main() -> std::io::Result<()> {
9292

9393
let prometheus = PrometheusMetricsBuilder::new("labrinth")
9494
.endpoint("/metrics")
95+
.exclude("/_internal/launcher_socket")
9596
.build()
9697
.expect("Failed to create prometheus metrics middleware");
9798

apps/labrinth/src/routes/internal/statuses.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ use crate::queue::socket::ActiveSockets;
1010
use crate::routes::ApiError;
1111
use actix_web::web::{Data, Payload};
1212
use actix_web::{get, web, HttpRequest, HttpResponse};
13-
use actix_ws::AggregatedMessage;
13+
use actix_ws::Message;
1414
use chrono::Utc;
15-
use futures_util::StreamExt;
15+
use futures_util::{StreamExt, TryStreamExt};
1616
use serde::{Deserialize, Serialize};
1717
use sqlx::PgPool;
1818

@@ -128,13 +128,13 @@ pub async fn ws_init(
128128
)
129129
.await?;
130130

131-
let mut stream = msg_stream.aggregate_continuations();
131+
let mut stream = msg_stream.into_stream();
132132

133133
actix_web::rt::spawn(async move {
134134
// receive messages from websocket
135135
while let Some(msg) = stream.next().await {
136136
match msg {
137-
Ok(AggregatedMessage::Text(text)) => {
137+
Ok(Message::Text(text)) => {
138138
if let Ok(message) =
139139
serde_json::from_str::<ClientToServerMessage>(&text)
140140
{
@@ -159,10 +159,14 @@ pub async fn ws_init(
159159
status.profile_name = profile_name;
160160
status.last_update = Utc::now();
161161

162+
let user_status = status.clone();
163+
// We drop the pair to avoid holding the lock for too long
164+
drop(pair);
165+
162166
let _ = broadcast_friends(
163167
user.id,
164168
ServerToClientMessage::StatusUpdate {
165-
status: status.clone(),
169+
status: user_status,
166170
},
167171
&pool,
168172
&db,
@@ -175,15 +179,14 @@ pub async fn ws_init(
175179
}
176180
}
177181

178-
Ok(AggregatedMessage::Close(_)) => {
182+
Ok(Message::Close(_)) => {
179183
let _ = close_socket(user.id, &pool, &db).await;
180184
}
181185

182-
Ok(AggregatedMessage::Ping(msg)) => {
183-
if let Some(mut socket) = db.auth_sockets.get_mut(&user.id)
184-
{
185-
let (_, socket) = socket.value_mut();
186-
let _ = socket.pong(&msg).await;
186+
Ok(Message::Ping(msg)) => {
187+
if let Some(socket) = db.auth_sockets.get(&user.id) {
188+
let (_, socket) = socket.value();
189+
let _ = socket.clone().pong(&msg).await;
187190
}
188191
}
189192

@@ -218,12 +221,11 @@ pub async fn broadcast_friends(
218221
};
219222

220223
if friend.accepted {
221-
if let Some(mut socket) =
222-
sockets.auth_sockets.get_mut(&friend_id.into())
223-
{
224-
let (_, socket) = socket.value_mut();
224+
if let Some(socket) = sockets.auth_sockets.get(&friend_id.into()) {
225+
let (_, socket) = socket.value();
225226

226-
let _ = socket.text(serde_json::to_string(&message)?).await;
227+
let _ =
228+
socket.clone().text(serde_json::to_string(&message)?).await;
227229
}
228230
}
229231
}

apps/labrinth/src/routes/v3/friends.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,13 @@ pub async fn add_friend(
7878
) -> Result<(), ApiError> {
7979
if let Some(pair) = sockets.auth_sockets.get(&user_id.into()) {
8080
let (friend_status, _) = pair.value();
81-
if let Some(mut socket) =
82-
sockets.auth_sockets.get_mut(&friend_id.into())
81+
if let Some(socket) =
82+
sockets.auth_sockets.get(&friend_id.into())
8383
{
84-
let (_, socket) = socket.value_mut();
84+
let (_, socket) = socket.value();
8585

8686
let _ = socket
87+
.clone()
8788
.text(serde_json::to_string(
8889
&ServerToClientMessage::StatusUpdate {
8990
status: friend_status.clone(),
@@ -120,11 +121,11 @@ pub async fn add_friend(
120121
.insert(&mut transaction)
121122
.await?;
122123

123-
if let Some(mut socket) = db.auth_sockets.get_mut(&friend.id.into())
124-
{
125-
let (_, socket) = socket.value_mut();
124+
if let Some(socket) = db.auth_sockets.get(&friend.id.into()) {
125+
let (_, socket) = socket.value();
126126

127127
if socket
128+
.clone()
128129
.text(serde_json::to_string(
129130
&ServerToClientMessage::FriendRequest { from: user.id },
130131
)?)
@@ -177,10 +178,11 @@ pub async fn remove_friend(
177178
)
178179
.await?;
179180

180-
if let Some(mut socket) = db.auth_sockets.get_mut(&friend.id.into()) {
181-
let (_, socket) = socket.value_mut();
181+
if let Some(socket) = db.auth_sockets.get(&friend.id.into()) {
182+
let (_, socket) = socket.value();
182183

183184
let _ = socket
185+
.clone()
184186
.text(serde_json::to_string(
185187
&ServerToClientMessage::FriendRequestRejected {
186188
from: user.id,

0 commit comments

Comments
 (0)