Skip to content

Commit acbbcfd

Browse files
authored
Merge pull request #1613 from microsoft/ab-sched-refactor
scheduler: cleanup: simplify logic and cleanup
2 parents 9ba22d0 + c2e4a67 commit acbbcfd

File tree

25 files changed

+463
-590
lines changed

25 files changed

+463
-590
lines changed

src/catnap/linux/transport.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ impl SharedCatnapTransport {
8989
options: TcpSocketOptions::new(config)?,
9090
}));
9191
let mut me2 = me.clone();
92-
runtime.insert_io_polling_coroutine(
92+
runtime.schedule_polling_coroutine(
9393
"bgc::catnap::transport::epoll",
9494
Box::pin(async move { me2.poll().await }.fuse()),
9595
)?;

src/catnap/win/overlapped.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -474,9 +474,7 @@ mod tests {
474474
})
475475
.fuse();
476476

477-
let server_task = runtime
478-
.insert_nonpolling_coroutine("ioc_server", Box::pin(server))
479-
.unwrap();
477+
let server_task = runtime.schedule_coroutine("ioc_server", Box::pin(server)).unwrap();
480478
post_completion(&iocp, overlapped.as_mut().marshal(), COMPLETION_KEY)?;
481479

482480
iocp.process_events()?;
@@ -584,7 +582,7 @@ mod tests {
584582
);
585583

586584
let mut runtime = SharedDemiRuntime::default();
587-
let server_task = runtime.insert_nonpolling_coroutine("ioc_server", server).unwrap();
585+
let server_task = runtime.schedule_coroutine("ioc_server", server).unwrap();
588586

589587
let mut wait_for_state = |state| -> Result<(), Fail> {
590588
while server_state_view.load(Ordering::Relaxed) < state {
@@ -689,9 +687,7 @@ mod tests {
689687
.fuse();
690688

691689
let mut runtime = SharedDemiRuntime::default();
692-
let server_task = runtime
693-
.insert_nonpolling_coroutine("ioc_server", Box::pin(server))
694-
.unwrap();
690+
let server_task = runtime.schedule_coroutine("ioc_server", Box::pin(server)).unwrap();
695691

696692
ensure_eq!(
697693
server_state_view.load(Ordering::Relaxed),
@@ -702,7 +698,6 @@ mod tests {
702698
let iocp_ref = unsafe { &mut *iocp.get() };
703699
iocp_ref.process_events()?;
704700

705-
// Poll the runtime again, which
706701
let result = loop {
707702
// Move time forward, which should time out the operation.
708703
runtime.advance_clock(Instant::now());

src/catnap/win/transport.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ impl SharedCatnapTransport {
7878
runtime: runtime.clone(),
7979
}));
8080

81-
runtime.insert_io_polling_coroutine(
81+
runtime.schedule_polling_coroutine(
8282
"bgc::catnap::transport::epoll",
8383
Box::pin({
8484
let mut me = me.clone();

src/demikernel/libos/network/libos.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ impl<T: NetworkTransport> SharedNetworkLibOS<T> {
162162
let coroutine = Box::pin(self.clone().accept_coroutine(qd).fuse());
163163
self.runtime
164164
.clone()
165-
.insert_nonpolling_coroutine("ioc::network::libos::accept", coroutine)
165+
.schedule_coroutine("ioc::network::libos::accept", coroutine)
166166
};
167167

168168
queue.accept(coroutine_constructor)
@@ -213,7 +213,7 @@ impl<T: NetworkTransport> SharedNetworkLibOS<T> {
213213
let coroutine = Box::pin(self.clone().connect_coroutine(qd, remote).fuse());
214214
self.runtime
215215
.clone()
216-
.insert_nonpolling_coroutine("ioc::network::libos::connect", coroutine)
216+
.schedule_coroutine("ioc::network::libos::connect", coroutine)
217217
};
218218

219219
queue.connect(coroutine_constructor)
@@ -253,7 +253,7 @@ impl<T: NetworkTransport> SharedNetworkLibOS<T> {
253253
let coroutine = Box::pin(self.clone().close_coroutine(qd).fuse());
254254
self.runtime
255255
.clone()
256-
.insert_nonpolling_coroutine("ioc::network::libos::close", coroutine)
256+
.schedule_coroutine("ioc::network::libos::close", coroutine)
257257
};
258258

259259
queue.close(coroutine_constructor)
@@ -322,7 +322,7 @@ impl<T: NetworkTransport> SharedNetworkLibOS<T> {
322322
let coroutine = Box::pin(self.clone().push_coroutine(qd, bufs, None).fuse());
323323
self.runtime
324324
.clone()
325-
.insert_nonpolling_coroutine("ioc::network::libos::push", coroutine)
325+
.schedule_coroutine("ioc::network::libos::push", coroutine)
326326
};
327327

328328
queue.push(coroutine_constructor)
@@ -370,7 +370,7 @@ impl<T: NetworkTransport> SharedNetworkLibOS<T> {
370370
let coroutine = Box::pin(self.clone().push_coroutine(qd, bufs, Some(remote)).fuse());
371371
self.runtime
372372
.clone()
373-
.insert_nonpolling_coroutine("ioc::network::libos::pushto", coroutine)
373+
.schedule_coroutine("ioc::network::libos::pushto", coroutine)
374374
};
375375

376376
queue.push(coroutine_constructor)
@@ -390,7 +390,7 @@ impl<T: NetworkTransport> SharedNetworkLibOS<T> {
390390
let coroutine = Box::pin(self.clone().pop_coroutine(qd, size).fuse());
391391
self.runtime
392392
.clone()
393-
.insert_nonpolling_coroutine("ioc::network::libos::pop", coroutine)
393+
.schedule_coroutine("ioc::network::libos::pop", coroutine)
394394
};
395395

396396
queue.pop(coroutine_constructor)

src/inetstack/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,11 @@ impl SharedInetStack {
8383
runtime: runtime.clone(),
8484
layer4_endpoint,
8585
}));
86-
runtime.insert_io_polling_coroutine("bgc::inetstack::poll", Box::pin(me.clone().poll().fuse()))?;
86+
runtime.schedule_polling_coroutine("bgc::inetstack::poll", Box::pin(me.clone().poll().fuse()))?;
8787
Ok(me)
8888
}
8989

90-
/// Scheduler will poll all futures that are ready to make progress.
90+
/// Scheduler will run all futures that are ready to make progress.
9191
/// Then ask the runtime to receive new data which we will forward to the engine to parse and
9292
/// route to the correct protocol.
9393
pub async fn poll(mut self) {

src/inetstack/protocols/layer3/arp/peer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ impl SharedArpPeer {
8282
recv_queue: AsyncQueue::<DemiBuffer>::default(),
8383
}));
8484
// This is a future returned by the async function.
85-
runtime.insert_nonpolling_coroutine("bgc::inetstack::arp::background", Box::pin(peer.clone().poll().fuse()))?;
85+
runtime.schedule_coroutine("bgc::inetstack::arp::background", Box::pin(peer.clone().poll().fuse()))?;
8686
Ok(peer.clone())
8787
}
8888

src/inetstack/protocols/layer3/arp/tests.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ fn arp_immediate_reply() -> Result<()> {
5252
// Move clock forward and poll the engine.
5353
now += Duration::from_micros(1);
5454
engine.advance_clock(now);
55-
engine.get_runtime().poll_background_tasks();
55+
engine.get_runtime().run_background_tasks();
5656

5757
// Check if the ARP cache outputs a reply message.
5858
let mut buffers = engine.pop_expected_frames(1);
@@ -120,7 +120,7 @@ fn arp_cache_update() -> Result<()> {
120120
// Move clock forward and poll the engine.
121121
now += Duration::from_micros(1);
122122
engine.advance_clock(now);
123-
engine.get_runtime().poll_background_tasks();
123+
engine.get_runtime().run_background_tasks();
124124

125125
// Check if the ARP cache has been updated.
126126
let cache = engine.get_transport().export_arp_cache();
@@ -157,10 +157,10 @@ fn arp_cache_timeout() -> Result<()> {
157157
let _qt = engine
158158
.get_runtime()
159159
.clone()
160-
.insert_nonpolling_coroutine("arp query", coroutine)?;
160+
.schedule_coroutine("arp query", coroutine)?;
161161

162162
for _ in 0..(ARP_RETRY_COUNT + 1) {
163-
engine.get_runtime().poll_foreground_tasks();
163+
engine.get_runtime().run_foreground_tasks();
164164
// Check if the ARP cache outputs a reply message.
165165
let buffers = engine.pop_expected_frames(1);
166166
crate::ensure_eq!(buffers.len(), 1);

src/inetstack/protocols/layer3/icmpv4/peer.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,7 @@ impl SharedIcmpv4Peer {
9999
rng,
100100
inflight: HashMap::<(u16, u16), InflightRequest>::new(),
101101
}));
102-
runtime
103-
.insert_nonpolling_coroutine("bgc::inetstack::icmp::background", Box::pin(peer.clone().poll().fuse()))?;
102+
runtime.schedule_coroutine("bgc::inetstack::icmp::background", Box::pin(peer.clone().poll().fuse()))?;
104103
Ok(peer)
105104
}
106105

src/inetstack/protocols/layer4/tcp/established/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ impl SharedEstablishedSocket {
173173
me.receive(header, data);
174174
}
175175
let me2 = me.clone();
176-
runtime.insert_nonpolling_coroutine(
176+
runtime.schedule_coroutine(
177177
"bgc::inetstack::tcp::established::background",
178178
Box::pin(async move { me2.background().await }.fuse()),
179179
)?;

src/inetstack/protocols/layer4/tcp/passive_open.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ impl SharedPassiveSocket {
191191
.fuse();
192192
match self
193193
.runtime
194-
.insert_nonpolling_coroutine("bgc::inetstack::tcp::passiveopen::background", Box::pin(future))
194+
.schedule_coroutine("bgc::inetstack::tcp::passiveopen::background", Box::pin(future))
195195
{
196196
Ok(qt) => qt,
197197
Err(e) => {

0 commit comments

Comments
 (0)