Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions src/catnap/linux/active_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ impl ActiveSocketData {
// Put the buffer back and try again later.
self.send_queue.push_front(Outgoing { addr, buffer, result });
} else {
let cause = format!("failed to send on socket: {:?}", errno);
error!("poll_send(): {}", cause);
result.set(Some(Err(Fail::new(errno, &cause))));
error!("poll_send(): failed on socket: {:?}", errno);
result.set(Some(Err(Fail::new(errno, "send failed on socket"))));
}
},
}
Expand Down Expand Up @@ -126,9 +125,8 @@ impl ActiveSocketData {
Err(e) => {
let errno = get_libc_err(e);
if !DemiRuntime::should_retry(errno) {
let cause = format!("failed to receive on socket: {:?}", errno);
error!("poll_recv(): {}", cause);
self.recv_queue.push(Err(Fail::new(errno, &cause)));
error!("poll_recv(): failed on socket: {:?}", errno);
self.recv_queue.push(Err(Fail::new(errno, "receive failed on socket")));
}
},
}
Expand All @@ -153,21 +151,22 @@ impl ActiveSocketData {
}
}

/// Pops data from the socket. Blocks until some data is found but does not wait until the buf has reached [size].
/// Blocks until some data is found but does not wait until the buf has reached [size].
pub async fn pop(
&mut self,
size: usize,
timeout: Option<Duration>,
) -> Result<(Option<SocketAddr>, DemiBuffer), Fail> {
let (addr, mut buffer) = self.recv_queue.pop(timeout).await??;
// Figure out how much data we got.
let bytes_read = min(buffer.len(), size);

// Trim the buffer and leave for next read if we got more than expected.
if let Ok(remainder) = buffer.split_back(bytes_read) {
if !remainder.is_empty() {
self.push_front(remainder, addr.clone());
self.push_front(remainder, addr);
}
}

Ok((addr, buffer))
}

Expand Down
10 changes: 4 additions & 6 deletions src/catnap/linux/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,23 @@ impl SharedSocketData {
self.set_socket_data(SocketData::Active(ActiveSocketData::new(socket)));
}

/// Gets a reference to the actual Socket for reading the socket's metadata (mostly the raw file descriptor).
pub fn get_socket<'a>(&'a self) -> &'a Socket {
pub fn get_socket(&self) -> &Socket {
let _self = self.as_ref();
match _self {
SocketData::Inactive(Some(socket)) => socket,
SocketData::Active(data) => data.socket(),
SocketData::Passive(data) => data.socket(),
_ => panic!("Should have data"),
_ => panic!("should have data"),
}
}

/// Gets a mutable reference to the actual Socket for I/O operations.
pub fn get_mut_socket<'a>(&'a mut self) -> &'a mut Socket {
pub fn get_socket_mut(&mut self) -> &mut Socket {
let _self = self.as_mut();
match _self {
SocketData::Inactive(Some(socket)) => socket,
SocketData::Active(data) => data.socket_mut(),
SocketData::Passive(data) => data.socket_mut(),
_ => panic!("Should have data"),
_ => panic!("should have data"),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/catnap/linux/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl SharedCatnapTransport {

/// Internal function to get the Socket from the metadata structure, given the socket descriptor.
fn socket_from_sd(&mut self, sd: &SockDesc) -> &mut Socket {
self.data_from_sd(sd).get_mut_socket()
self.data_from_sd(sd).get_socket_mut()
}

/// Internal function to get the metadata for the socket, given the socket descriptor.
Expand Down
57 changes: 26 additions & 31 deletions src/demikernel/libos/network/libos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@ impl<T: NetworkTransport> SharedNetworkLibOS<T> {
}

if (typ != Type::STREAM) && (typ != Type::DGRAM) {
let cause: String = format!("socket type not supported (type={:?})", typ);
error!("socket(): {}", cause);
return Err(Fail::new(libc::ENOTSUP, &cause));
error!("socket(): socket type not supported (type={:?})", typ);
return Err(Fail::new(libc::ENOTSUP, "socket type not supported"));
}

let queue: SharedNetworkQueue<T> = SharedNetworkQueue::new(domain, typ, &mut self.transport)?;
Expand Down Expand Up @@ -109,24 +108,22 @@ impl<T: NetworkTransport> SharedNetworkLibOS<T> {
// We only support the wildcard address for UDP sockets.
// FIXME: https://github.com/demikernel/demikernel/issues/189
if *socket_addrv4.ip() == Ipv4Addr::UNSPECIFIED && self.get_shared_queue(&qd)?.qtype() != QType::UdpSocket {
let cause: String = format!("cannot bind to wildcard address (qd={:?})", qd);
error!("bind(): {}", cause);
return Err(Fail::new(libc::ENOTSUP, &cause));
error!("bind(): cannot bind to wildcard address (qd={:?})", qd);
return Err(Fail::new(libc::ENOTSUP, "cannot bind to wildcard address"));
}

// We only support the wildcard address for UDP sockets.
// FIXME: https://github.com/demikernel/demikernel/issues/582
if socket_addr.port() == 0 && self.get_shared_queue(&qd)?.qtype() != QType::UdpSocket {
let cause: String = format!("cannot bind to port 0 (qd={:?})", qd);
error!("bind(): {}", cause);
return Err(Fail::new(libc::ENOTSUP, &cause));
error!("bind(): cannot bind to port 0 (qd={:?})", qd);
return Err(Fail::new(libc::ENOTSUP, "cannot bind to port 0"));
}

if self.runtime.is_addr_in_use(socket_addrv4) {
let cause: String = format!("address is already bound to a socket (qd={:?}", qd);
error!("bind(): {}", &cause);
return Err(Fail::new(libc::EADDRINUSE, &cause));
error!("bind(): address is already bound to a socket (qd={:?}", qd);
return Err(Fail::new(libc::EADDRINUSE, "address is already bound to a socket"));
}

self.get_shared_queue(&qd)?.bind(socket_addr)?;
// Insert into address to queue descriptor table.
self.runtime
Expand All @@ -142,12 +139,10 @@ impl<T: NetworkTransport> SharedNetworkLibOS<T> {

// We use this API for testing, so we must check again.
if !((backlog > 0) && (backlog <= SOMAXCONN as usize)) {
let cause: String = format!("invalid backlog length: {:?}", backlog);
warn!("{}", cause);
return Err(Fail::new(libc::EINVAL, &cause));
warn!("invalid backlog length: {:?}", backlog);
return Err(Fail::new(libc::EINVAL, "invalid backlog length"));
}

// Issue listen operation.
self.get_shared_queue(&qd)?.listen(backlog)
}

Expand Down Expand Up @@ -303,23 +298,23 @@ impl<T: NetworkTransport> SharedNetworkLibOS<T> {
/// coroutine that asynchronously runs the push and any synchronous multi-queue functionality before the push
/// begins.
pub fn push(&mut self, qd: QDesc, sga: &demi_sgarray_t) -> Result<QToken, Fail> {
let bufs = clone_sgarray(sga)?;
if bufs.is_empty() {
let cause = "zero-length list of buffers";
warn!("push(): {}", cause);
return Err(Fail::new(libc::EINVAL, &cause));
let buffers = clone_sgarray(sga)?;

if buffers.is_empty() {
warn!("push(): buffers cannot be empty");
return Err(Fail::new(libc::EINVAL, "buffers cannot be empty"));
}
for buf in bufs.iter() {
if buf.is_empty() {
let cause = "zero-length buffer";
warn!("push(): {}", cause);
return Err(Fail::new(libc::EINVAL, &cause));

for buffer in buffers.iter() {
if buffer.is_empty() {
warn!("push(): empty buffer");
return Err(Fail::new(libc::EINVAL, "empty buffer"));
};
}

let mut queue: SharedNetworkQueue<T> = self.get_shared_queue(&qd)?;
let coroutine_constructor = || -> Result<QToken, Fail> {
let coroutine = Box::pin(self.clone().push_coroutine(qd, bufs, None).fuse());
let coroutine = Box::pin(self.clone().push_coroutine(qd, buffers, None).fuse());
self.runtime
.clone()
.schedule_coroutine("ioc::network::libos::push", coroutine)
Expand Down Expand Up @@ -360,14 +355,14 @@ impl<T: NetworkTransport> SharedNetworkLibOS<T> {
pub fn pushto(&mut self, qd: QDesc, sga: &demi_sgarray_t, remote: SocketAddr) -> Result<QToken, Fail> {
trace!("pushto() qd={:?}", qd);

let bufs: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN> = clone_sgarray(sga)?;
if bufs.is_empty() {
return Err(Fail::new(libc::EINVAL, "zero buffers to send"));
let buffers: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN> = clone_sgarray(sga)?;
if buffers.is_empty() {
return Err(Fail::new(libc::EINVAL, "buffers cannot be empty"));
}

let mut queue: SharedNetworkQueue<T> = self.get_shared_queue(&qd)?;
let coroutine_constructor = || -> Result<QToken, Fail> {
let coroutine = Box::pin(self.clone().push_coroutine(qd, bufs, Some(remote)).fuse());
let coroutine = Box::pin(self.clone().push_coroutine(qd, buffers, Some(remote)).fuse());
self.runtime
.clone()
.schedule_coroutine("ioc::network::libos::pushto", coroutine)
Expand Down
2 changes: 1 addition & 1 deletion src/demikernel/libos/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl NetworkLibOSWrapper {

/// Marks a socket as a passive one.
pub fn listen(&mut self, sockqd: QDesc, mut backlog: usize) -> Result<(), Fail> {
// Truncate backlog length.
// Limit backlog length.
if backlog > SOMAXCONN as usize {
debug!(
"listen(): backlog length is too large, truncating (qd={:?}, backlog={:?})",
Expand Down
5 changes: 2 additions & 3 deletions src/demikernel/libos/network/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@ impl<T: NetworkTransport> SharedNetworkQueue<T> {

pub fn set_socket_option(&mut self, option: SocketOption) -> Result<(), Fail> {
if self.state_machine.ensure_not_closing().is_err() {
let cause = "cannot set socket-level options when socket is closing";
warn!("set_socket_option(): {}", cause);
return Err(Fail::new(libc::EBUSY, cause));
warn!("set_socket_option(): cannot set socket options when closing");
return Err(Fail::new(libc::EBUSY, "cannot set socket options when closing"));
}

self.transport.clone().set_socket_option(&mut self.socket, option)
Expand Down
59 changes: 29 additions & 30 deletions src/runtime/memory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,66 +37,65 @@ pub trait DemiMemoryAllocator {
}
}

/// Converts a list of DemiBuffers into a scatter-gather array.
pub fn into_sgarray(bufs: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN>) -> Result<demi_sgarray_t, Fail> {
// Check the sizes before allocating anything.
if bufs.is_empty() {
let cause = "cannot allocate a zero element scatter-gather array";
error!("into_sgarray(): {}", cause);
return Err(Fail::new(libc::EINVAL, &cause));
pub fn into_sgarray(buffers: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN>) -> Result<demi_sgarray_t, Fail> {
if buffers.is_empty() {
error!("into_sgarray(): buffers is empty");
return Err(Fail::new(libc::EINVAL, "buffers is empty"));
}
if bufs.len() > DEMI_SGARRAY_MAXLEN {
let cause = format!("cannot allocate a {} element scatter-gather array", bufs.len());
error!("into_sgarray(): {}", cause);
return Err(Fail::new(libc::EINVAL, &cause));

if buffers.len() > DEMI_SGARRAY_MAXLEN {
error!(
"into_sgarray(): too many buffers: {}, max: {}",
buffers.len(),
DEMI_SGARRAY_MAXLEN
);
return Err(Fail::new(libc::EINVAL, "too many buffers"));
}

// Create a scatter-gather segment to expose the DemiBuffers to the user.
let mut sga: demi_sgarray_t = demi_sgarray_t::default();
sga.num_segments = bufs.len() as u32;
let mut sga: demi_sgarray_t = demi_sgarray_t {
num_segments: buffers.len() as u32,
..Default::default()
};

for (i, buf) in bufs.into_iter().enumerate() {
sga.segments[i].data_buf_ptr = buf.as_ptr() as *mut c_void;
sga.segments[i].data_len_bytes = buf.len() as u32;
sga.segments[i].reserved_metadata_ptr = buf.into_raw().as_ptr() as *mut c_void;
for (i, buffer) in buffers.into_iter().enumerate() {
sga.segments[i].data_buf_ptr = buffer.as_ptr() as *mut c_void;
sga.segments[i].data_len_bytes = buffer.len() as u32;
sga.segments[i].reserved_metadata_ptr = buffer.into_raw().as_ptr() as *mut c_void;
}

// Create and return a new scatter-gather array (which inherits the DemiBuffer's reference).
Ok(sga)
}

/// Allocates a scatter-gather array.
pub fn sgaalloc<M: DemiMemoryAllocator>(size: usize, mem_alloc: &M) -> Result<demi_sgarray_t, Fail> {
// Check the sizes before allocating anything.
// We can't allocate a zero-sized buffer.
if size == 0 {
let cause = "cannot allocate a zero-sized buffer";
error!("sgaalloc(): {}", cause);
return Err(Fail::new(libc::EINVAL, cause));
error!("sgaalloc(): cannot allocate zero-sized buffer");
return Err(Fail::new(libc::EINVAL, "cannot allocate zero-sized buffer"));
}

// First allocate the underlying DemiBuffer.
if size > mem_alloc.max_buffer_size_bytes() * DEMI_SGARRAY_MAXLEN {
return Err(Fail::new(libc::EINVAL, "size too large for a single demi_sgaseg_t"));
}

// Calculate the number of DemiBuffers to allocate.
let max_buffer_size_bytes: usize = mem_alloc.max_buffer_size_bytes();
let remainder: usize = size % max_buffer_size_bytes;
let len: usize = (size - remainder) / max_buffer_size_bytes;
let mut bufs: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN> = ArrayVec::new();

for _ in 0..len {
bufs.push(mem_alloc.allocate_demi_buffer(max_buffer_size_bytes)?);
}

// If there is any remaining length, allocate a partial buffer.
if remainder > 0 {
bufs.push(mem_alloc.allocate_demi_buffer(remainder)?);
}

into_sgarray(bufs)
}

/// Releases a scatter-gather array.
pub fn sgafree(sga: demi_sgarray_t) -> Result<(), Fail> {
// Check arguments.
if sga.num_segments > DEMI_SGARRAY_MAXLEN as u32 {
return Err(Fail::new(libc::EINVAL, "demi_sgarray_t has invalid segment count"));
}
Expand All @@ -105,13 +104,13 @@ pub fn sgafree(sga: demi_sgarray_t) -> Result<(), Fail> {
let buf: DemiBuffer = convert_sgaseg_to_demi_buffer(&sga.segments[i])?;
drop(buf);
}

Ok(())
}

/// Clones a scatter-gather array. The [sga_buf] field must point to the first DemiBuffer in the chain and the elements
/// of [segments] must be the rest of the chain.
/// The [sga_buf] field must point to the first DemiBuffer in the chain and the elements of [segments] must be the rest
/// of the chain.
pub fn clone_sgarray(sga: &demi_sgarray_t) -> Result<ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN>, Fail> {
// Check arguments.
if sga.num_segments > DEMI_SGARRAY_MAXLEN as u32 || sga.num_segments == 0 {
return Err(Fail::new(libc::EINVAL, "demi_sgarray_t has invalid segment count"));
}
Expand Down
3 changes: 1 addition & 2 deletions src/runtime/scheduler/page/page_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ impl WakerPageRef {

unsafe {
let base_ptr: *mut u8 = self.0.as_ptr().cast();
let ptr = NonNull::new_unchecked(base_ptr.add(ix));
ptr
NonNull::new_unchecked(base_ptr.add(ix))
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/runtime/scheduler/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ mod tests {

impl DummyCoroutine {
pub fn new(val: usize) -> Self {
let f = Self { val };
f
Self { val }
}
}
impl Future for DummyCoroutine {
Expand Down
Loading