Skip to content

fix: malformed packet crash #889

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/hyperion-proto/src/server_to_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ pub struct Unicast<'a> {
#[rkyv(derive(Debug))]
pub struct Flush;

/// The server must be prepared to handle other additional packets with this stream from the proxy after the server
/// sends [`Shutdown`] until the server receives [`PlayerDisconnect`] because proxy to server packets may already be
/// in transit.
#[derive(Archive, Deserialize, Serialize, Clone, Copy, PartialEq, Debug)]
pub struct Shutdown {
pub stream: u64,
}

#[derive(Archive, Deserialize, Serialize, Clone, PartialEq)]
pub enum ServerToProxyMessage<'a> {
UpdatePlayerChunkPositions(UpdatePlayerChunkPositions),
Expand All @@ -55,4 +63,5 @@ pub enum ServerToProxyMessage<'a> {
Unicast(Unicast<'a>),
SetReceiveBroadcasts(SetReceiveBroadcasts),
Flush(Flush),
Shutdown(Shutdown),
}
3 changes: 3 additions & 0 deletions crates/hyperion-proxy/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ impl BufferedEgress {
egress.handle_broadcast_local(instruction);
});
}
ArchivedServerToProxyMessage::Shutdown(pkt) => {
self.egress.handle_shutdown(pkt);
}
}
}

Expand Down
16 changes: 3 additions & 13 deletions crates/hyperion-proxy/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,11 @@ impl OrderedBytes {
data: Bytes::from_static(b""),
exclusions: None,
};
pub const SHUTDOWN: Self = Self {
order: u32::MAX - 1,
offset: 0,
data: Bytes::from_static(b""),
exclusions: None,
};

pub const fn is_flush(&self) -> bool {
self.order == u32::MAX
}

pub const fn is_shutdown(&self) -> bool {
self.order == u32::MAX - 1
}

pub const fn no_order(data: Bytes) -> Self {
Self {
order: 0,
Expand Down Expand Up @@ -120,8 +110,8 @@ impl PlayerHandle {
}

pub fn shutdown(&self) {
let _ = self.writer.try_send(OrderedBytes::SHUTDOWN);
self.writer.close().unwrap();
// Ignore error for if the channel is already closed
let _ = self.writer.close();
}

pub fn enable_receive_broadcasts(&self) {
Expand All @@ -143,7 +133,7 @@ impl PlayerHandle {
bail!("failed to send packet to player, channel is full: {is_full}");
}
Err(e) => {
self.writer.close().unwrap();
self.shutdown();
bail!("failed to send packet to player: {e}");
}
}
Expand Down
41 changes: 22 additions & 19 deletions crates/hyperion-proxy/src/egress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use bvh::{Aabb, Bvh};
use bytes::Bytes;
use glam::I16Vec2;
use hyperion_proto::{
ArchivedSetReceiveBroadcasts, ArchivedUnicast, ArchivedUpdatePlayerChunkPositions,
ChunkPosition,
ArchivedSetReceiveBroadcasts, ArchivedShutdown, ArchivedUnicast,
ArchivedUpdatePlayerChunkPositions, ChunkPosition,
};
use rustc_hash::FxBuildHasher;
use tracing::{Instrument, debug, error, info_span, instrument, warn};
Expand All @@ -19,8 +19,6 @@ use crate::{
pub struct Egress {
// todo: can we do some type of EntityId and SlotMap
player_registry: &'static papaya::HashMap<u64, PlayerHandle, FxBuildHasher>,

// todo: remove positions when player leaves
positions: &'static papaya::HashMap<u64, ChunkPosition, FxBuildHasher>,
}

Expand Down Expand Up @@ -81,7 +79,7 @@ impl Egress {

// imo it makes sense to read once... it is a fast loop
#[allow(clippy::significant_drop_in_scrutinee)]
for (player_id, player) in &players {
for (_, player) in &players {
if !player.can_receive_broadcasts() {
continue;
}
Expand All @@ -91,9 +89,7 @@ impl Egress {

if let Err(e) = player.send(to_send) {
warn!("Failed to send data to player: {:?}", e);
if let Some(result) = players.remove(player_id) {
result.shutdown();
}
player.shutdown();
}
}
}
Expand All @@ -107,12 +103,10 @@ impl Egress {

tokio::spawn(
async move {
for (id, player) in &players {
for (_, player) in &players {
if let Err(e) = player.send(OrderedBytes::FLUSH) {
warn!("Failed to send data to player: {:?}", e);
if let Some(result) = players.remove(id) {
result.shutdown();
}
player.shutdown();
}
}
}
Expand Down Expand Up @@ -171,9 +165,7 @@ impl Egress {

if let Err(e) = player.send(to_send) {
warn!("Failed to send data to player: {:?}", e);
if let Some(result) = players.remove(id) {
result.shutdown();
}
player.shutdown();
}
}
}
Expand Down Expand Up @@ -202,16 +194,14 @@ impl Egress {

let Some(player) = players.get(&id) else {
// expected to still happen infrequently
debug!("Player not found for id {id:?}");
warn!("Player not found for id {id:?}");
return;
};

// todo: handle error; kick player if cannot send (buffer full)
if let Err(e) = player.send(ordered) {
warn!("Failed to send data to player: {:?}", e);
if let Some(result) = players.remove(&id) {
result.shutdown();
}
player.shutdown();
}

drop(players);
Expand All @@ -230,4 +220,17 @@ impl Egress {

player.enable_receive_broadcasts();
}

#[instrument(skip_all)]
pub fn handle_shutdown(&self, pkt: &ArchivedShutdown) {
let player_registry = self.player_registry;
let players = player_registry.pin();
let Ok(stream) = rkyv::deserialize::<u64, !>(&pkt.stream);

if let Some(result) = players.get(&stream) {
result.shutdown();
} else {
error!("Player not found for stream {stream:?}");
}
}
}
17 changes: 6 additions & 11 deletions crates/hyperion-proxy/src/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,6 @@ pub fn initiate_player_connection(
let mut packet_writer = PlayerPacketWriter::new(socket_writer, player_id);

while let Ok(outgoing_packet) = incoming_packet_receiver.recv().await {
if outgoing_packet.is_shutdown() {
return;
}

if outgoing_packet.is_flush() {
let time_start = std::time::Instant::now();
if let Err(e) = packet_writer.flush_pending_packets().await {
Expand Down Expand Up @@ -175,15 +171,14 @@ pub fn initiate_player_connection(
if let Err(e) = server_sender.send(disconnect).await {
warn!("failed to send player disconnect to server: {e}");
}

let map_ref = player_registry.pin();
map_ref.remove(&player_id);

let map_ref = player_positions.pin();
map_ref.remove(&player_id);

}
}

let map_ref = player_registry.pin();
map_ref.remove(&player_id);

let map_ref = player_positions.pin();
map_ref.remove(&player_id);
})
}

Expand Down
6 changes: 3 additions & 3 deletions crates/hyperion/src/egress/player_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use glam::DVec3;
use hyperion_crafting::{Action, CraftingRegistry, RecipeBookState};
use hyperion_utils::EntityExt;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use tracing::{info, instrument};
use tracing::{info, instrument, warn};
use valence_protocol::{
ByteAngle, GameMode, Ident, PacketEncoder, RawBytes, VarInt, Velocity,
game_mode::OptGameMode,
Expand All @@ -29,7 +29,6 @@ pub use list::*;
use crate::{
config::Config,
egress::metadata::show_all,
ingress::PendingRemove,
net::{Compose, ConnectionId, DataBundle},
simulation::{
Comms, Name, Position, Uuid, Yaw,
Expand Down Expand Up @@ -598,7 +597,8 @@ impl Module for PlayerJoinModule {
crafting_registry,
config,
) {
entity.set(PendingRemove::new(e.to_string()));
warn!("player_join_world error: {e:?}");
compose.io_buf().shutdown(stream_id, world);
}
},
);
Expand Down
37 changes: 11 additions & 26 deletions crates/hyperion/src/ingress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ use crate::{
util::{TracingExt, mojang::MojangClient},
};

/// This marks players who have already been disconnected and about to be destructed. This should not be sent to
/// disconnect a player. Use [`crate::net::IoBuf::shutdown`] instead.
#[derive(Component, Debug)]
pub struct PendingRemove {
pub reason: String,
_private: u8,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the point of this _private field? why can't this just be a ZST

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flecs doesn't allow zero sized components. The _private field also prevents other modules from constructing PendingRemove since it should only be sent by the PlayerDisconnect proxy message.

Copy link
Member

@andrewgazelka andrewgazelka Apr 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flecs doesn't allow zero sized components. The _private field also prevents other modules from constructing PendingRemove since it should only be sent by the PlayerDisconnect proxy message.

when you use .with I believe flecs does. I have definitely used ZSTs before.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I change it to

#[derive(Component, Debug)]
pub struct PendingRemove;

I get the following error:

error[E0277]: the size of type `PendingRemove` should not be zero, should not be a tag.
   --> crates/hyperion/src/ingress/mod.rs:336:47
    |
336 |                 world.entity_from_id(*id).set(PendingRemove);
    |                                           --- ^^^^^^^^^^^^^ Supports only non-empty components
    |                                           |
    |                                           required by a bound introduced by this call
    |
    = help: the trait `flecs_ecs::core::DataComponent` is not implemented for `PendingRemove`
    = help: the following other types implement trait `flecs_ecs::core::DataComponent`:
              &T
              &mut T
              (T, U)
              ActiveAnimation
              AdditionalHearts
              AirSupply
              AppId
              ArrowsInEntity
            and 137 others
note: required by a bound in `flecs_ecs::core::entity_view::entity_view_mut::<impl flecs_ecs::core::EntityView<'a>>::set`
   --> /home/remote-dev/.cargo/git/checkouts/flecs-rust-182003fcd2303c9b/f2762ad/flecs_ecs/src/core/entity_view/entity_view_mut.rs:860:33
    |
860 |     pub fn set<T: ComponentId + DataComponent>(self, component: T) -> Self {
    |                                 ^^^^^^^^^^^^^ required by this bound in `flecs_ecs::core::entity_view::entity_view_mut::<impl EntityView<'a>>::set`

}

impl PendingRemove {
#[must_use]
pub fn new(reason: impl Into<String>) -> Self {
Self {
reason: reason.into(),
}
const fn new() -> Self {
Self { _private: 0 }
}
}

Expand Down Expand Up @@ -335,9 +335,7 @@ impl Module for IngressModule {
error!("failed to get id for disconnect stream {disconnect:?}");
continue;
};
world
.entity_from_id(*id)
.set(PendingRemove::new("disconnected"));
world.entity_from_id(*id).set(PendingRemove::new());
}
});

Expand Down Expand Up @@ -371,11 +369,10 @@ impl Module for IngressModule {
world,
&Uuid,
&Compose($),
&ConnectionId,
&PendingRemove,
)
.kind::<flecs::pipeline::PostLoad>()
.each_iter(move |it, row, (uuid, compose, io, pending_remove)| {
.each_iter(move |it, row, (uuid, compose, _pending_remove)| {
let system = it.system();
let entity = it.entity(row);
let uuids = &[uuid.0];
Expand All @@ -398,16 +395,6 @@ impl Module for IngressModule {
if let Err(e) = compose.broadcast(&pkt, system).send() {
error!("failed to send player remove packet: {e}");
}

if !pending_remove.reason.is_empty() {
let pkt = play::DisconnectS2c {
reason: pending_remove.reason.clone().into_cow_text(),
};

if let Err(e) = compose.unicast_no_compression(&pkt, *io, system) {
error!("failed to send disconnect packet: {e}");
}
}
});

world
Expand Down Expand Up @@ -481,7 +468,7 @@ impl Module for IngressModule {
Ok(frame) => frame,
Err(e) => {
error!("failed to decode packet: {e}");
entity.destruct();
compose.io_buf().shutdown(io_ref, &world);
break;
}
};
Expand All @@ -494,9 +481,7 @@ impl Module for IngressModule {
PacketState::Handshake => {
if process_handshake(login_state, &frame).is_err() {
error!("failed to process handshake");

entity.destruct();

compose.io_buf().shutdown(io_ref, &world);
break;
}
}
Expand All @@ -505,7 +490,7 @@ impl Module for IngressModule {
process_status(login_state, system, &frame, io_ref, compose)
{
error!("failed to process status packet: {e}");
entity.destruct();
compose.io_buf().shutdown(io_ref, &world);
break;
}
}
Expand Down Expand Up @@ -545,7 +530,7 @@ impl Module for IngressModule {
error!("failed to send login disconnect packet: {e}");
}

entity.destruct();
compose.io_buf().shutdown(io_ref, &world);
break;
}
}
Expand Down
20 changes: 20 additions & 0 deletions crates/hyperion/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,4 +636,24 @@ impl IoBuf {
let packet_len = u64::try_from(new_len - len - size_of::<u64>()).unwrap();
buffer[len..(len + 8)].copy_from_slice(&packet_len.to_be_bytes());
}

pub fn shutdown(&self, stream: ConnectionId, world: &World) {
let buffer = self.buffer.get(world);
let buffer = &mut *buffer.borrow_mut();

let to_send = hyperion_proto::Shutdown {
stream: stream.stream_id,
};

let to_send = ServerToProxyMessage::Shutdown(to_send);

let len = buffer.len();
buffer.write_u64::<byteorder::BigEndian>(0x00).unwrap();

rkyv::api::high::to_bytes_in::<_, rkyv::rancor::Error>(&to_send, &mut *buffer).unwrap();

let new_len = buffer.len();
let packet_len = u64::try_from(new_len - len - size_of::<u64>()).unwrap();
buffer[len..(len + 8)].copy_from_slice(&packet_len.to_be_bytes());
}
}
Loading