Skip to content

Commit 780f074

Browse files
proxy mesh process test (#809)
Summary: Pull Request resolved: #809 this self-bootstrapping program spawns a `ProxyActor` on a remote process, which then spawns a `TestActor` on another remote process, creating a 3-level process hierarchy. Differential Revision: D79930332
1 parent 5c0302d commit 780f074

File tree

2 files changed

+188
-1
lines changed

2 files changed

+188
-1
lines changed

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
279279
}
280280

281281
/// Open a port on this ActorMesh.
282-
pub(crate) fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
282+
pub fn open_port<M: Message>(&self) -> (PortHandle<M>, PortReceiver<M>) {
283283
self.proc_mesh.client().open_port()
284284
}
285285

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use std::env;
10+
use std::fmt;
11+
use std::path::PathBuf;
12+
use std::sync::Arc;
13+
14+
use anyhow::Result;
15+
use async_trait::async_trait;
16+
use clap::Parser;
17+
use hyperactor::Actor;
18+
use hyperactor::Context;
19+
use hyperactor::Handler;
20+
use hyperactor::Named;
21+
use hyperactor::PortRef;
22+
use hyperactor_mesh::Mesh;
23+
use hyperactor_mesh::ProcMesh;
24+
use hyperactor_mesh::RootActorMesh;
25+
use hyperactor_mesh::alloc::AllocSpec;
26+
use hyperactor_mesh::alloc::Allocator;
27+
use hyperactor_mesh::alloc::ProcessAllocator;
28+
use ndslice::extent;
29+
use serde::Deserialize;
30+
use serde::Serialize;
31+
use tokio::process::Command;
32+
33+
#[derive(Parser)]
34+
struct Args {
35+
/// Run bootstrap logic
36+
#[arg(long)]
37+
bootstrap: bool,
38+
}
39+
40+
// -- TestActor
41+
42+
#[derive(Debug)]
43+
#[hyperactor::export(
44+
spawn = true,
45+
handlers = [
46+
Echo,
47+
],
48+
)]
49+
pub struct TestActor;
50+
51+
#[async_trait]
52+
impl Actor for TestActor {
53+
type Params = ();
54+
55+
async fn new(_params: Self::Params) -> Result<Self, anyhow::Error> {
56+
Ok(Self)
57+
}
58+
}
59+
60+
#[derive(Debug, Serialize, Deserialize, Named, Clone)]
61+
pub struct Echo(pub String, pub PortRef<String>);
62+
63+
#[async_trait]
64+
impl Handler<Echo> for TestActor {
65+
async fn handle(&mut self, cx: &Context<Self>, message: Echo) -> Result<(), anyhow::Error> {
66+
let Echo(message, reply_port) = message;
67+
reply_port.send(cx, message)?;
68+
Ok(())
69+
}
70+
}
71+
72+
// -- ProxyActor
73+
74+
#[hyperactor::export(
75+
spawn = true,
76+
handlers = [
77+
Echo,
78+
],
79+
)]
80+
pub struct ProxyActor {
81+
#[allow(dead_code)]
82+
proc_mesh: Arc<ProcMesh>,
83+
actor_mesh: RootActorMesh<'static, TestActor>,
84+
}
85+
86+
impl fmt::Debug for ProxyActor {
87+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88+
f.debug_struct("ProxyActor")
89+
.field("proc_mesh", &"...")
90+
.field("actor_mesh", &"...")
91+
.finish()
92+
}
93+
}
94+
95+
#[async_trait]
96+
impl Actor for ProxyActor {
97+
type Params = String;
98+
99+
async fn new(exe_path: Self::Params) -> anyhow::Result<Self, anyhow::Error> {
100+
let mut cmd = Command::new(PathBuf::from(&exe_path));
101+
cmd.arg("--bootstrap");
102+
let mut allocator = ProcessAllocator::new(cmd);
103+
104+
let alloc = allocator
105+
.allocate(AllocSpec {
106+
extent: extent! { replica = 1 },
107+
constraints: Default::default(),
108+
})
109+
.await
110+
.unwrap();
111+
let proc_mesh = Arc::new(ProcMesh::allocate(alloc).await.unwrap());
112+
let leaked: &'static Arc<ProcMesh> = Box::leak(Box::new(proc_mesh));
113+
let actor_mesh: RootActorMesh<'static, TestActor> =
114+
leaked.spawn("echo", &()).await.unwrap();
115+
Ok(Self {
116+
proc_mesh: Arc::clone(leaked),
117+
actor_mesh,
118+
})
119+
}
120+
}
121+
122+
#[async_trait]
123+
impl Handler<Echo> for ProxyActor {
124+
async fn handle(&mut self, cx: &Context<Self>, message: Echo) -> Result<(), anyhow::Error> {
125+
let actor = self.actor_mesh.get(0).unwrap();
126+
127+
let (tx, mut rx) = cx.open_port();
128+
actor.send(cx, Echo(message.0, tx.bind()))?;
129+
message.1.send(cx, rx.recv().await.unwrap())?;
130+
131+
Ok(())
132+
}
133+
}
134+
135+
async fn run_client(exe_path: PathBuf) -> Result<(), anyhow::Error> {
136+
let mut cmd = Command::new(PathBuf::from(&exe_path));
137+
cmd.arg("--bootstrap");
138+
139+
let mut allocator = ProcessAllocator::new(cmd);
140+
let alloc = allocator
141+
.allocate(AllocSpec {
142+
extent: extent! { replica = 1 },
143+
constraints: Default::default(),
144+
})
145+
.await
146+
.unwrap();
147+
148+
let mut proc_mesh = ProcMesh::allocate(alloc).await?;
149+
let actor_mesh: RootActorMesh<'_, ProxyActor> = proc_mesh
150+
.spawn("proxy", &exe_path.to_str().unwrap().to_string())
151+
.await?;
152+
let proxy_actor = actor_mesh.get(0).unwrap();
153+
let (tx, mut rx) = actor_mesh.open_port::<String>();
154+
proxy_actor.send(proc_mesh.client(), Echo("hello!".to_owned(), tx.bind()))?;
155+
156+
let msg = rx.recv().await?;
157+
println!("{}", msg);
158+
assert_eq!(msg, "hello!");
159+
160+
let mut alloc = proc_mesh.events().unwrap().into_alloc();
161+
alloc.stop_and_wait().await?;
162+
drop(alloc);
163+
164+
Ok(())
165+
}
166+
167+
#[tokio::main]
168+
async fn main() -> Result<(), anyhow::Error> {
169+
// Logs are written to /tmp/$USER/monarch_log*.
170+
let subscriber = tracing_subscriber::fmt()
171+
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
172+
.finish();
173+
tracing::subscriber::set_global_default(subscriber).expect("failed to set subscriber");
174+
175+
let args = Args::parse();
176+
if args.bootstrap {
177+
hyperactor_mesh::bootstrap_or_die().await;
178+
} else {
179+
let exe_path: PathBuf = env::current_exe().unwrap_or_else(|e| {
180+
eprintln!("Failed to get current executable path: {}", e);
181+
std::process::exit(1);
182+
});
183+
run_client(exe_path).await?;
184+
}
185+
186+
Ok(())
187+
}

0 commit comments

Comments
 (0)