Skip to content

Commit 188a3e4

Browse files
authored
fix: malformed packet crash (#889)
When a client sends a malformed packet, the server will crash because it used entity.destruct to disconnect players for malformed packets which leads to an abort because, when the server receives PlayerDisconnect from the proxy, the server will try to set PendingRemove on the already-destructed entity. This PR fixes these errors on player disconnects.
1 parent 347728c commit 188a3e4

File tree

8 files changed

+77
-81
lines changed

8 files changed

+77
-81
lines changed

crates/hyperion-proto/src/server_to_proxy.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ pub struct Unicast<'a> {
4747
#[rkyv(derive(Debug))]
4848
pub struct Flush;
4949

50+
/// The server must be prepared to handle other additional packets with this stream from the proxy after the server
51+
/// sends [`Shutdown`] until the server receives [`crate::PlayerDisconnect`] because proxy to server packets may
52+
/// already be in transit.
53+
#[derive(Archive, Deserialize, Serialize, Clone, Copy, PartialEq, Debug)]
54+
pub struct Shutdown {
55+
pub stream: u64,
56+
}
57+
5058
#[derive(Archive, Deserialize, Serialize, Clone, PartialEq)]
5159
pub enum ServerToProxyMessage<'a> {
5260
UpdatePlayerChunkPositions(UpdatePlayerChunkPositions),
@@ -55,4 +63,5 @@ pub enum ServerToProxyMessage<'a> {
5563
Unicast(Unicast<'a>),
5664
SetReceiveBroadcasts(SetReceiveBroadcasts),
5765
Flush(Flush),
66+
Shutdown(Shutdown),
5867
}

crates/hyperion-proxy/src/cache.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,9 @@ impl BufferedEgress {
323323
egress.handle_broadcast_local(instruction);
324324
});
325325
}
326+
ArchivedServerToProxyMessage::Shutdown(pkt) => {
327+
self.egress.handle_shutdown(pkt);
328+
}
326329
}
327330
}
328331

crates/hyperion-proxy/src/data.rs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,11 @@ impl OrderedBytes {
4040
data: Bytes::from_static(b""),
4141
exclusions: None,
4242
};
43-
pub const SHUTDOWN: Self = Self {
44-
order: u32::MAX - 1,
45-
offset: 0,
46-
data: Bytes::from_static(b""),
47-
exclusions: None,
48-
};
4943

5044
pub const fn is_flush(&self) -> bool {
5145
self.order == u32::MAX
5246
}
5347

54-
pub const fn is_shutdown(&self) -> bool {
55-
self.order == u32::MAX - 1
56-
}
57-
5848
pub const fn no_order(data: Bytes) -> Self {
5949
Self {
6050
order: 0,
@@ -120,8 +110,8 @@ impl PlayerHandle {
120110
}
121111

122112
pub fn shutdown(&self) {
123-
let _ = self.writer.try_send(OrderedBytes::SHUTDOWN);
124-
self.writer.close().unwrap();
113+
// Ignore error for if the channel is already closed
114+
let _ = self.writer.close();
125115
}
126116

127117
pub fn enable_receive_broadcasts(&self) {
@@ -143,7 +133,7 @@ impl PlayerHandle {
143133
bail!("failed to send packet to player, channel is full: {is_full}");
144134
}
145135
Err(e) => {
146-
self.writer.close().unwrap();
136+
self.shutdown();
147137
bail!("failed to send packet to player: {e}");
148138
}
149139
}

crates/hyperion-proxy/src/egress.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use bvh::{Aabb, Bvh};
44
use bytes::Bytes;
55
use glam::I16Vec2;
66
use hyperion_proto::{
7-
ArchivedSetReceiveBroadcasts, ArchivedUnicast, ArchivedUpdatePlayerChunkPositions,
8-
ChunkPosition,
7+
ArchivedSetReceiveBroadcasts, ArchivedShutdown, ArchivedUnicast,
8+
ArchivedUpdatePlayerChunkPositions, ChunkPosition,
99
};
1010
use rustc_hash::FxBuildHasher;
1111
use tracing::{Instrument, debug, error, info_span, instrument, warn};
@@ -19,8 +19,6 @@ use crate::{
1919
pub struct Egress {
2020
// todo: can we do some type of EntityId and SlotMap
2121
player_registry: &'static papaya::HashMap<u64, PlayerHandle, FxBuildHasher>,
22-
23-
// todo: remove positions when player leaves
2422
positions: &'static papaya::HashMap<u64, ChunkPosition, FxBuildHasher>,
2523
}
2624

@@ -81,7 +79,7 @@ impl Egress {
8179

8280
// imo it makes sense to read once... it is a fast loop
8381
#[allow(clippy::significant_drop_in_scrutinee)]
84-
for (player_id, player) in &players {
82+
for (_, player) in &players {
8583
if !player.can_receive_broadcasts() {
8684
continue;
8785
}
@@ -91,9 +89,7 @@ impl Egress {
9189

9290
if let Err(e) = player.send(to_send) {
9391
warn!("Failed to send data to player: {:?}", e);
94-
if let Some(result) = players.remove(player_id) {
95-
result.shutdown();
96-
}
92+
player.shutdown();
9793
}
9894
}
9995
}
@@ -107,12 +103,10 @@ impl Egress {
107103

108104
tokio::spawn(
109105
async move {
110-
for (id, player) in &players {
106+
for (_, player) in &players {
111107
if let Err(e) = player.send(OrderedBytes::FLUSH) {
112108
warn!("Failed to send data to player: {:?}", e);
113-
if let Some(result) = players.remove(id) {
114-
result.shutdown();
115-
}
109+
player.shutdown();
116110
}
117111
}
118112
}
@@ -171,9 +165,7 @@ impl Egress {
171165

172166
if let Err(e) = player.send(to_send) {
173167
warn!("Failed to send data to player: {:?}", e);
174-
if let Some(result) = players.remove(id) {
175-
result.shutdown();
176-
}
168+
player.shutdown();
177169
}
178170
}
179171
}
@@ -202,16 +194,14 @@ impl Egress {
202194

203195
let Some(player) = players.get(&id) else {
204196
// expected to still happen infrequently
205-
debug!("Player not found for id {id:?}");
197+
warn!("Player not found for id {id:?}");
206198
return;
207199
};
208200

209201
// todo: handle error; kick player if cannot send (buffer full)
210202
if let Err(e) = player.send(ordered) {
211203
warn!("Failed to send data to player: {:?}", e);
212-
if let Some(result) = players.remove(&id) {
213-
result.shutdown();
214-
}
204+
player.shutdown();
215205
}
216206

217207
drop(players);
@@ -230,4 +220,17 @@ impl Egress {
230220

231221
player.enable_receive_broadcasts();
232222
}
223+
224+
#[instrument(skip_all)]
225+
pub fn handle_shutdown(&self, pkt: &ArchivedShutdown) {
226+
let player_registry = self.player_registry;
227+
let players = player_registry.pin();
228+
let Ok(stream) = rkyv::deserialize::<u64, !>(&pkt.stream);
229+
230+
if let Some(result) = players.get(&stream) {
231+
result.shutdown();
232+
} else {
233+
error!("Player not found for stream {stream:?}");
234+
}
235+
}
233236
}

crates/hyperion-proxy/src/player.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,6 @@ pub fn initiate_player_connection(
115115
let mut packet_writer = PlayerPacketWriter::new(socket_writer, player_id);
116116

117117
while let Ok(outgoing_packet) = incoming_packet_receiver.recv().await {
118-
if outgoing_packet.is_shutdown() {
119-
return;
120-
}
121-
122118
if outgoing_packet.is_flush() {
123119
let time_start = std::time::Instant::now();
124120
if let Err(e) = packet_writer.flush_pending_packets().await {
@@ -175,15 +171,14 @@ pub fn initiate_player_connection(
175171
if let Err(e) = server_sender.send(disconnect).await {
176172
warn!("failed to send player disconnect to server: {e}");
177173
}
178-
179-
let map_ref = player_registry.pin();
180-
map_ref.remove(&player_id);
181-
182-
let map_ref = player_positions.pin();
183-
map_ref.remove(&player_id);
184-
185174
}
186175
}
176+
177+
let map_ref = player_registry.pin();
178+
map_ref.remove(&player_id);
179+
180+
let map_ref = player_positions.pin();
181+
map_ref.remove(&player_id);
187182
})
188183
}
189184

crates/hyperion/src/egress/player_join/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use glam::DVec3;
66
use hyperion_crafting::{Action, CraftingRegistry, RecipeBookState};
77
use hyperion_utils::EntityExt;
88
use rayon::iter::{IntoParallelIterator, ParallelIterator};
9-
use tracing::{info, instrument};
9+
use tracing::{info, instrument, warn};
1010
use valence_protocol::{
1111
ByteAngle, GameMode, Ident, PacketEncoder, RawBytes, VarInt, Velocity,
1212
game_mode::OptGameMode,
@@ -29,7 +29,6 @@ pub use list::*;
2929
use crate::{
3030
config::Config,
3131
egress::metadata::show_all,
32-
ingress::PendingRemove,
3332
net::{Compose, ConnectionId, DataBundle},
3433
simulation::{
3534
Comms, Name, Position, Uuid, Yaw,
@@ -598,7 +597,8 @@ impl Module for PlayerJoinModule {
598597
crafting_registry,
599598
config,
600599
) {
601-
entity.set(PendingRemove::new(e.to_string()));
600+
warn!("player_join_world error: {e:?}");
601+
compose.io_buf().shutdown(stream_id, world);
602602
}
603603
},
604604
);

crates/hyperion/src/ingress/mod.rs

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,10 @@ use crate::{
3838
util::{TracingExt, mojang::MojangClient},
3939
};
4040

41+
/// This marks players who have already been disconnected and about to be destructed. This component should not be
42+
/// added to an entity to disconnect a player. Use [`crate::net::IoBuf::shutdown`] instead.
4143
#[derive(Component, Debug)]
42-
pub struct PendingRemove {
43-
pub reason: String,
44-
}
45-
46-
impl PendingRemove {
47-
#[must_use]
48-
pub fn new(reason: impl Into<String>) -> Self {
49-
Self {
50-
reason: reason.into(),
51-
}
52-
}
53-
}
44+
pub struct PendingRemove;
5445

5546
fn process_handshake(
5647
login_state: &mut PacketState,
@@ -335,13 +326,11 @@ impl Module for IngressModule {
335326
for disconnect in recv.player_disconnect.lock().drain(..) {
336327
// will initiate the removal of entity
337328
info!("queue pending remove");
338-
let Some(id) = lookup.get(&disconnect).copied() else {
329+
let Some(entity_id) = lookup.get(&disconnect).copied() else {
339330
error!("failed to get id for disconnect stream {disconnect:?}");
340331
continue;
341332
};
342-
world
343-
.entity_from_id(*id)
344-
.set(PendingRemove::new("disconnected"));
333+
world.entity_from_id(*entity_id).add(id::<PendingRemove>());
345334
}
346335
});
347336

@@ -375,11 +364,10 @@ impl Module for IngressModule {
375364
world,
376365
&Uuid,
377366
&Compose($),
378-
&ConnectionId,
379-
&PendingRemove,
380367
)
381368
.kind(id::<flecs::pipeline::PostLoad>())
382-
.each_iter(move |it, row, (uuid, compose, io, pending_remove)| {
369+
.with(id::<PendingRemove>())
370+
.each_iter(move |it, row, (uuid, compose)| {
383371
let system = it.system();
384372
let entity = it.entity(row).expect("row must be in bounds");
385373
let uuids = &[uuid.0];
@@ -402,16 +390,6 @@ impl Module for IngressModule {
402390
if let Err(e) = compose.broadcast(&pkt, system).send() {
403391
error!("failed to send player remove packet: {e}");
404392
}
405-
406-
if !pending_remove.reason.is_empty() {
407-
let pkt = play::DisconnectS2c {
408-
reason: pending_remove.reason.clone().into_cow_text(),
409-
};
410-
411-
if let Err(e) = compose.unicast_no_compression(&pkt, *io, system) {
412-
error!("failed to send disconnect packet: {e}");
413-
}
414-
}
415393
});
416394

417395
world
@@ -485,7 +463,7 @@ impl Module for IngressModule {
485463
Ok(frame) => frame,
486464
Err(e) => {
487465
error!("failed to decode packet: {e}");
488-
entity.destruct();
466+
compose.io_buf().shutdown(io_ref, &world);
489467
break;
490468
}
491469
};
@@ -498,9 +476,7 @@ impl Module for IngressModule {
498476
PacketState::Handshake => {
499477
if process_handshake(login_state, &frame).is_err() {
500478
error!("failed to process handshake");
501-
502-
entity.destruct();
503-
479+
compose.io_buf().shutdown(io_ref, &world);
504480
break;
505481
}
506482
}
@@ -509,7 +485,7 @@ impl Module for IngressModule {
509485
process_status(login_state, system, &frame, io_ref, compose)
510486
{
511487
error!("failed to process status packet: {e}");
512-
entity.destruct();
488+
compose.io_buf().shutdown(io_ref, &world);
513489
break;
514490
}
515491
}
@@ -549,7 +525,7 @@ impl Module for IngressModule {
549525
error!("failed to send login disconnect packet: {e}");
550526
}
551527

552-
entity.destruct();
528+
compose.io_buf().shutdown(io_ref, &world);
553529
break;
554530
}
555531
}

crates/hyperion/src/net/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,4 +636,24 @@ impl IoBuf {
636636
let packet_len = u64::try_from(new_len - len - size_of::<u64>()).unwrap();
637637
buffer[len..(len + 8)].copy_from_slice(&packet_len.to_be_bytes());
638638
}
639+
640+
pub fn shutdown(&self, stream: ConnectionId, world: &World) {
641+
let buffer = self.buffer.get(world);
642+
let buffer = &mut *buffer.borrow_mut();
643+
644+
let to_send = hyperion_proto::Shutdown {
645+
stream: stream.stream_id,
646+
};
647+
648+
let to_send = ServerToProxyMessage::Shutdown(to_send);
649+
650+
let len = buffer.len();
651+
buffer.write_u64::<byteorder::BigEndian>(0x00).unwrap();
652+
653+
rkyv::api::high::to_bytes_in::<_, rkyv::rancor::Error>(&to_send, &mut *buffer).unwrap();
654+
655+
let new_len = buffer.len();
656+
let packet_len = u64::try_from(new_len - len - size_of::<u64>()).unwrap();
657+
buffer[len..(len + 8)].copy_from_slice(&packet_len.to_be_bytes());
658+
}
639659
}

0 commit comments

Comments
 (0)