Skip to content

Commit 390c5a0

Browse files
committed
Merge remote-tracking branch 'upstream/main' into codex/find-and-fix-a-bug-in-codebase
2 parents 7c9303d + 188a3e4 commit 390c5a0

File tree

8 files changed

+77
-81
lines changed

8 files changed

+77
-81
lines changed

crates/hyperion-proto/src/server_to_proxy.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ pub struct Unicast<'a> {
4747
#[rkyv(derive(Debug))]
4848
pub struct Flush;
4949

50+
/// The server must be prepared to handle other additional packets with this stream from the proxy after the server
51+
/// sends [`Shutdown`] until the server receives [`crate::PlayerDisconnect`] because proxy to server packets may
52+
/// already be in transit.
53+
#[derive(Archive, Deserialize, Serialize, Clone, Copy, PartialEq, Debug)]
54+
pub struct Shutdown {
55+
pub stream: u64,
56+
}
57+
5058
#[derive(Archive, Deserialize, Serialize, Clone, PartialEq)]
5159
pub enum ServerToProxyMessage<'a> {
5260
UpdatePlayerChunkPositions(UpdatePlayerChunkPositions),
@@ -55,4 +63,5 @@ pub enum ServerToProxyMessage<'a> {
5563
Unicast(Unicast<'a>),
5664
SetReceiveBroadcasts(SetReceiveBroadcasts),
5765
Flush(Flush),
66+
Shutdown(Shutdown),
5867
}

crates/hyperion-proxy/src/cache.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,9 @@ impl BufferedEgress {
323323
egress.handle_broadcast_local(instruction);
324324
});
325325
}
326+
ArchivedServerToProxyMessage::Shutdown(pkt) => {
327+
self.egress.handle_shutdown(pkt);
328+
}
326329
}
327330
}
328331

crates/hyperion-proxy/src/data.rs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,11 @@ impl OrderedBytes {
4040
data: Bytes::from_static(b""),
4141
exclusions: None,
4242
};
43-
pub const SHUTDOWN: Self = Self {
44-
order: u32::MAX - 1,
45-
offset: 0,
46-
data: Bytes::from_static(b""),
47-
exclusions: None,
48-
};
4943

5044
pub const fn is_flush(&self) -> bool {
5145
self.order == u32::MAX
5246
}
5347

54-
pub const fn is_shutdown(&self) -> bool {
55-
self.order == u32::MAX - 1
56-
}
57-
5848
pub const fn no_order(data: Bytes) -> Self {
5949
Self {
6050
order: 0,
@@ -120,8 +110,8 @@ impl PlayerHandle {
120110
}
121111

122112
pub fn shutdown(&self) {
123-
let _ = self.writer.try_send(OrderedBytes::SHUTDOWN);
124-
self.writer.close().unwrap();
113+
// Ignore error for if the channel is already closed
114+
let _ = self.writer.close();
125115
}
126116

127117
pub fn enable_receive_broadcasts(&self) {
@@ -143,7 +133,7 @@ impl PlayerHandle {
143133
bail!("failed to send packet to player, channel is full: {is_full}");
144134
}
145135
Err(e) => {
146-
self.writer.close().unwrap();
136+
self.shutdown();
147137
bail!("failed to send packet to player: {e}");
148138
}
149139
}

crates/hyperion-proxy/src/egress.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use bvh::{Aabb, Bvh};
44
use bytes::Bytes;
55
use glam::I16Vec2;
66
use hyperion_proto::{
7-
ArchivedSetReceiveBroadcasts, ArchivedUnicast, ArchivedUpdatePlayerChunkPositions,
8-
ChunkPosition,
7+
ArchivedSetReceiveBroadcasts, ArchivedShutdown, ArchivedUnicast,
8+
ArchivedUpdatePlayerChunkPositions, ChunkPosition,
99
};
1010
use rustc_hash::FxBuildHasher;
1111
use tracing::{Instrument, debug, error, info_span, instrument, warn};
@@ -19,8 +19,6 @@ use crate::{
1919
pub struct Egress {
2020
// todo: can we do some type of EntityId and SlotMap
2121
player_registry: &'static papaya::HashMap<u64, PlayerHandle, FxBuildHasher>,
22-
23-
// todo: remove positions when player leaves
2422
positions: &'static papaya::HashMap<u64, ChunkPosition, FxBuildHasher>,
2523
}
2624

@@ -81,7 +79,7 @@ impl Egress {
8179

8280
// imo it makes sense to read once... it is a fast loop
8381
#[allow(clippy::significant_drop_in_scrutinee)]
84-
for (player_id, player) in &players {
82+
for (_, player) in &players {
8583
if !player.can_receive_broadcasts() {
8684
continue;
8785
}
@@ -91,9 +89,7 @@ impl Egress {
9189

9290
if let Err(e) = player.send(to_send) {
9391
warn!("Failed to send data to player: {:?}", e);
94-
if let Some(result) = players.remove(player_id) {
95-
result.shutdown();
96-
}
92+
player.shutdown();
9793
}
9894
}
9995
}
@@ -107,12 +103,10 @@ impl Egress {
107103

108104
tokio::spawn(
109105
async move {
110-
for (id, player) in &players {
106+
for (_, player) in &players {
111107
if let Err(e) = player.send(OrderedBytes::FLUSH) {
112108
warn!("Failed to send data to player: {:?}", e);
113-
if let Some(result) = players.remove(id) {
114-
result.shutdown();
115-
}
109+
player.shutdown();
116110
}
117111
}
118112
}
@@ -171,9 +165,7 @@ impl Egress {
171165

172166
if let Err(e) = player.send(to_send) {
173167
warn!("Failed to send data to player: {:?}", e);
174-
if let Some(result) = players.remove(id) {
175-
result.shutdown();
176-
}
168+
player.shutdown();
177169
}
178170
}
179171
}
@@ -202,16 +194,14 @@ impl Egress {
202194

203195
let Some(player) = players.get(&id) else {
204196
// expected to still happen infrequently
205-
debug!("Player not found for id {id:?}");
197+
warn!("Player not found for id {id:?}");
206198
return;
207199
};
208200

209201
// todo: handle error; kick player if cannot send (buffer full)
210202
if let Err(e) = player.send(ordered) {
211203
warn!("Failed to send data to player: {:?}", e);
212-
if let Some(result) = players.remove(&id) {
213-
result.shutdown();
214-
}
204+
player.shutdown();
215205
}
216206

217207
drop(players);
@@ -230,4 +220,17 @@ impl Egress {
230220

231221
player.enable_receive_broadcasts();
232222
}
223+
224+
#[instrument(skip_all)]
225+
pub fn handle_shutdown(&self, pkt: &ArchivedShutdown) {
226+
let player_registry = self.player_registry;
227+
let players = player_registry.pin();
228+
let Ok(stream) = rkyv::deserialize::<u64, !>(&pkt.stream);
229+
230+
if let Some(result) = players.get(&stream) {
231+
result.shutdown();
232+
} else {
233+
error!("Player not found for stream {stream:?}");
234+
}
235+
}
233236
}

crates/hyperion-proxy/src/player.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,6 @@ pub fn initiate_player_connection(
115115
let mut packet_writer = PlayerPacketWriter::new(socket_writer, player_id);
116116

117117
while let Ok(outgoing_packet) = incoming_packet_receiver.recv().await {
118-
if outgoing_packet.is_shutdown() {
119-
return;
120-
}
121-
122118
if outgoing_packet.is_flush() {
123119
let time_start = std::time::Instant::now();
124120
if let Err(e) = packet_writer.flush_pending_packets().await {
@@ -175,15 +171,14 @@ pub fn initiate_player_connection(
175171
if let Err(e) = server_sender.send(disconnect).await {
176172
warn!("failed to send player disconnect to server: {e}");
177173
}
178-
179-
let map_ref = player_registry.pin();
180-
map_ref.remove(&player_id);
181-
182-
let map_ref = player_positions.pin();
183-
map_ref.remove(&player_id);
184-
185174
}
186175
}
176+
177+
let map_ref = player_registry.pin();
178+
map_ref.remove(&player_id);
179+
180+
let map_ref = player_positions.pin();
181+
map_ref.remove(&player_id);
187182
})
188183
}
189184

crates/hyperion/src/egress/player_join/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use glam::DVec3;
66
use hyperion_crafting::{Action, CraftingRegistry, RecipeBookState};
77
use hyperion_utils::EntityExt;
88
use rayon::iter::{IntoParallelIterator, ParallelIterator};
9-
use tracing::{info, instrument};
9+
use tracing::{info, instrument, warn};
1010
use valence_protocol::{
1111
ByteAngle, GameMode, Ident, PacketEncoder, RawBytes, VarInt, Velocity,
1212
game_mode::OptGameMode,
@@ -29,7 +29,6 @@ pub use list::*;
2929
use crate::{
3030
config::Config,
3131
egress::metadata::show_all,
32-
ingress::PendingRemove,
3332
net::{Compose, ConnectionId, DataBundle},
3433
simulation::{
3534
Comms, Name, Position, Uuid, Yaw,
@@ -598,7 +597,8 @@ impl Module for PlayerJoinModule {
598597
crafting_registry,
599598
config,
600599
) {
601-
entity.set(PendingRemove::new(e.to_string()));
600+
warn!("player_join_world error: {e:?}");
601+
compose.io_buf().shutdown(stream_id, world);
602602
}
603603
},
604604
);

crates/hyperion/src/ingress/mod.rs

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,10 @@ use crate::{
3838
util::{TracingExt, mojang::MojangClient},
3939
};
4040

41+
/// This marks players who have already been disconnected and about to be destructed. This component should not be
42+
/// added to an entity to disconnect a player. Use [`crate::net::IoBuf::shutdown`] instead.
4143
#[derive(Component, Debug)]
42-
pub struct PendingRemove {
43-
pub reason: String,
44-
}
45-
46-
impl PendingRemove {
47-
#[must_use]
48-
pub fn new(reason: impl Into<String>) -> Self {
49-
Self {
50-
reason: reason.into(),
51-
}
52-
}
53-
}
44+
pub struct PendingRemove;
5445

5546
fn process_handshake(
5647
login_state: &mut PacketState,
@@ -335,13 +326,11 @@ impl Module for IngressModule {
335326
for disconnect in recv.player_disconnect.lock().drain(..) {
336327
// will initiate the removal of entity
337328
info!("queue pending remove");
338-
let Some(id) = lookup.get(&disconnect).copied() else {
329+
let Some(entity_id) = lookup.get(&disconnect).copied() else {
339330
error!("failed to get id for disconnect stream {disconnect:?}");
340331
continue;
341332
};
342-
world
343-
.entity_from_id(*id)
344-
.set(PendingRemove::new("disconnected"));
333+
world.entity_from_id(*entity_id).add(id::<PendingRemove>());
345334
}
346335
});
347336

@@ -375,11 +364,10 @@ impl Module for IngressModule {
375364
world,
376365
&Uuid,
377366
&Compose($),
378-
&ConnectionId,
379-
&PendingRemove,
380367
)
381368
.kind(id::<flecs::pipeline::PostLoad>())
382-
.each_iter(move |it, row, (uuid, compose, io, pending_remove)| {
369+
.with(id::<PendingRemove>())
370+
.each_iter(move |it, row, (uuid, compose)| {
383371
let system = it.system();
384372
let entity = it.entity(row).expect("row must be in bounds");
385373
let uuids = &[uuid.0];
@@ -402,16 +390,6 @@ impl Module for IngressModule {
402390
if let Err(e) = compose.broadcast(&pkt, system).send() {
403391
error!("failed to send player remove packet: {e}");
404392
}
405-
406-
if !pending_remove.reason.is_empty() {
407-
let pkt = play::DisconnectS2c {
408-
reason: pending_remove.reason.clone().into_cow_text(),
409-
};
410-
411-
if let Err(e) = compose.unicast_no_compression(&pkt, *io, system) {
412-
error!("failed to send disconnect packet: {e}");
413-
}
414-
}
415393
});
416394

417395
world
@@ -485,7 +463,7 @@ impl Module for IngressModule {
485463
Ok(frame) => frame,
486464
Err(e) => {
487465
error!("failed to decode packet: {e}");
488-
entity.destruct();
466+
compose.io_buf().shutdown(io_ref, &world);
489467
break;
490468
}
491469
};
@@ -498,9 +476,7 @@ impl Module for IngressModule {
498476
PacketState::Handshake => {
499477
if process_handshake(login_state, &frame).is_err() {
500478
error!("failed to process handshake");
501-
502-
entity.destruct();
503-
479+
compose.io_buf().shutdown(io_ref, &world);
504480
break;
505481
}
506482
}
@@ -509,7 +485,7 @@ impl Module for IngressModule {
509485
process_status(login_state, system, &frame, io_ref, compose)
510486
{
511487
error!("failed to process status packet: {e}");
512-
entity.destruct();
488+
compose.io_buf().shutdown(io_ref, &world);
513489
break;
514490
}
515491
}
@@ -549,7 +525,7 @@ impl Module for IngressModule {
549525
error!("failed to send login disconnect packet: {e}");
550526
}
551527

552-
entity.destruct();
528+
compose.io_buf().shutdown(io_ref, &world);
553529
break;
554530
}
555531
}

crates/hyperion/src/net/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,4 +636,24 @@ impl IoBuf {
636636
let packet_len = u64::try_from(new_len - len - size_of::<u64>()).unwrap();
637637
buffer[len..(len + 8)].copy_from_slice(&packet_len.to_be_bytes());
638638
}
639+
640+
pub fn shutdown(&self, stream: ConnectionId, world: &World) {
641+
let buffer = self.buffer.get(world);
642+
let buffer = &mut *buffer.borrow_mut();
643+
644+
let to_send = hyperion_proto::Shutdown {
645+
stream: stream.stream_id,
646+
};
647+
648+
let to_send = ServerToProxyMessage::Shutdown(to_send);
649+
650+
let len = buffer.len();
651+
buffer.write_u64::<byteorder::BigEndian>(0x00).unwrap();
652+
653+
rkyv::api::high::to_bytes_in::<_, rkyv::rancor::Error>(&to_send, &mut *buffer).unwrap();
654+
655+
let new_len = buffer.len();
656+
let packet_len = u64::try_from(new_len - len - size_of::<u64>()).unwrap();
657+
buffer[len..(len + 8)].copy_from_slice(&packet_len.to_be_bytes());
658+
}
639659
}

0 commit comments

Comments
 (0)