Skip to content

Commit 752f03f

Browse files
: proxy mesh process test (#809)
Summary: this self-bootstrapping program spawns a `ProxyActor` on a remote process, which then spawns a `TestActor` on another remote process, creating a 3-level process hierarchy. Differential Revision: D79930332
1 parent 6ceefca commit 752f03f

File tree

2 files changed

+183
-1
lines changed

2 files changed

+183
-1
lines changed

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
279279
}
280280

281281
/// Open a port on this ActorMesh.
282-
pub(crate) fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
282+
pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
283283
self.proc_mesh.client().open_port()
284284
}
285285

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use std::env;
10+
use std::fmt;
11+
use std::path::PathBuf;
12+
use std::sync::Arc;
13+
use std::sync::OnceLock;
14+
15+
use anyhow::Result;
16+
use async_trait::async_trait;
17+
use clap::Parser;
18+
use hyperactor::Actor;
19+
use hyperactor::Context;
20+
use hyperactor::Handler;
21+
use hyperactor::Named;
22+
use hyperactor::PortRef;
23+
use hyperactor_mesh::Mesh;
24+
use hyperactor_mesh::ProcMesh;
25+
use hyperactor_mesh::RootActorMesh;
26+
use hyperactor_mesh::alloc::AllocSpec;
27+
use hyperactor_mesh::alloc::Allocator;
28+
use hyperactor_mesh::alloc::ProcessAllocator;
29+
use ndslice::extent;
30+
use serde::Deserialize;
31+
use serde::Serialize;
32+
use tokio::process::Command;
33+
34+
#[derive(Parser)]
35+
struct Args {
36+
/// Run bootstrap logic
37+
#[arg(long)]
38+
bootstrap: bool,
39+
}
40+
41+
// -- TestActor
42+
43+
#[derive(Debug)]
44+
#[hyperactor::export(
45+
spawn = true,
46+
handlers = [
47+
Echo,
48+
],
49+
)]
50+
pub struct TestActor;
51+
52+
#[async_trait]
53+
impl Actor for TestActor {
54+
type Params = ();
55+
56+
async fn new(_params: Self::Params) -> Result<Self, anyhow::Error> {
57+
Ok(Self)
58+
}
59+
}
60+
61+
#[derive(Debug, Serialize, Deserialize, Named, Clone)]
62+
pub struct Echo(pub String, pub PortRef<String>);
63+
64+
#[async_trait]
65+
impl Handler<Echo> for TestActor {
66+
async fn handle(&mut self, cx: &Context<Self>, message: Echo) -> Result<(), anyhow::Error> {
67+
let Echo(message, reply_port) = message;
68+
reply_port.send(cx, message)?;
69+
Ok(())
70+
}
71+
}
72+
73+
// -- ProxyActor
74+
75+
#[hyperactor::export(
76+
spawn = true,
77+
handlers = [
78+
Echo,
79+
],
80+
)]
81+
pub struct ProxyActor {
82+
#[allow(dead_code)]
83+
proc_mesh: Arc<ProcMesh>,
84+
actor_mesh: RootActorMesh<'static, TestActor>,
85+
}
86+
87+
impl fmt::Debug for ProxyActor {
88+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89+
f.debug_struct("ProxyActor")
90+
.field("proc_mesh", &"...")
91+
.field("actor_mesh", &"...")
92+
.finish()
93+
}
94+
}
95+
96+
#[async_trait]
97+
impl Actor for ProxyActor {
98+
type Params = String;
99+
100+
async fn new(exe_path: Self::Params) -> anyhow::Result<Self, anyhow::Error> {
101+
let mut cmd = Command::new(PathBuf::from(&exe_path));
102+
cmd.arg("--bootstrap");
103+
let mut allocator = ProcessAllocator::new(cmd);
104+
105+
let alloc = allocator
106+
.allocate(AllocSpec {
107+
extent: extent! { replica = 1 },
108+
constraints: Default::default(),
109+
})
110+
.await
111+
.unwrap();
112+
let proc_mesh = Arc::new(ProcMesh::allocate(alloc).await.unwrap());
113+
let leaked: &'static Arc<ProcMesh> = Box::leak(Box::new(proc_mesh));
114+
let actor_mesh: RootActorMesh<'static, TestActor> =
115+
leaked.spawn("echo", &()).await.unwrap();
116+
Ok(Self {
117+
proc_mesh: Arc::clone(leaked),
118+
actor_mesh,
119+
})
120+
}
121+
}
122+
123+
#[async_trait]
124+
impl Handler<Echo> for ProxyActor {
125+
async fn handle(&mut self, cx: &Context<Self>, message: Echo) -> Result<(), anyhow::Error> {
126+
let actor = self.actor_mesh.get(0).unwrap();
127+
128+
let (tx, mut rx) = cx.open_port();
129+
actor.send(cx, Echo(message.0, tx.bind()))?;
130+
message.1.send(cx, rx.recv().await.unwrap())?;
131+
132+
Ok(())
133+
}
134+
}
135+
136+
async fn run_client(exe_path: PathBuf) -> Result<(), anyhow::Error> {
137+
let mut cmd = Command::new(PathBuf::from(&exe_path));
138+
cmd.arg("--bootstrap");
139+
140+
let mut allocator = ProcessAllocator::new(cmd);
141+
let alloc = allocator
142+
.allocate(AllocSpec {
143+
extent: extent! { replica = 1 },
144+
constraints: Default::default(),
145+
})
146+
.await
147+
.unwrap();
148+
149+
let mut proc_mesh = ProcMesh::allocate(alloc).await?;
150+
let actor_mesh: RootActorMesh<'_, ProxyActor> = proc_mesh
151+
.spawn("proxy", &exe_path.to_str().unwrap().to_string())
152+
.await?;
153+
let proxy_actor = actor_mesh.get(0).unwrap();
154+
let (tx, mut rx) = actor_mesh.open_port::<String>();
155+
proxy_actor.send(proc_mesh.client(), Echo("hello!".to_owned(), tx.bind()))?;
156+
157+
let msg = rx.recv().await?;
158+
println!("{}", msg);
159+
assert_eq!(msg, "hello!");
160+
161+
let mut alloc = proc_mesh.events().unwrap().into_alloc();
162+
alloc.stop_and_wait().await?;
163+
drop(alloc);
164+
165+
Ok(())
166+
}
167+
168+
#[tokio::main]
169+
async fn main() -> Result<(), anyhow::Error> {
170+
let args = Args::parse();
171+
if args.bootstrap {
172+
hyperactor_mesh::bootstrap_or_die().await;
173+
} else {
174+
let exe_path: PathBuf = env::current_exe().unwrap_or_else(|e| {
175+
eprintln!("Failed to get current executable path: {}", e);
176+
std::process::exit(1);
177+
});
178+
run_client(exe_path).await?;
179+
}
180+
181+
Ok(())
182+
}

0 commit comments

Comments
 (0)