diff --git a/src/catnap/linux/active_socket.rs b/src/catnap/linux/active_socket.rs index 792c42cf1..02d543ad4 100644 --- a/src/catnap/linux/active_socket.rs +++ b/src/catnap/linux/active_socket.rs @@ -90,9 +90,8 @@ impl ActiveSocketData { // Put the buffer back and try again later. self.send_queue.push_front(Outgoing { addr, buffer, result }); } else { - let cause = format!("failed to send on socket: {:?}", errno); - error!("poll_send(): {}", cause); - result.set(Some(Err(Fail::new(errno, &cause)))); + error!("poll_send(): failed on socket: {:?}", errno); + result.set(Some(Err(Fail::new(errno, "send failed on socket")))); } }, } @@ -126,9 +125,8 @@ impl ActiveSocketData { Err(e) => { let errno = get_libc_err(e); if !DemiRuntime::should_retry(errno) { - let cause = format!("failed to receive on socket: {:?}", errno); - error!("poll_recv(): {}", cause); - self.recv_queue.push(Err(Fail::new(errno, &cause))); + error!("poll_recv(): failed on socket: {:?}", errno); + self.recv_queue.push(Err(Fail::new(errno, "receive failed on socket"))); } }, } @@ -153,21 +151,22 @@ impl ActiveSocketData { } } - /// Pops data from the socket. Blocks until some data is found but does not wait until the buf has reached [size]. + /// Blocks until some data is found but does not wait until the buf has reached [size]. pub async fn pop( &mut self, size: usize, timeout: Option, ) -> Result<(Option, DemiBuffer), Fail> { let (addr, mut buffer) = self.recv_queue.pop(timeout).await??; - // Figure out how much data we got. let bytes_read = min(buffer.len(), size); + // Trim the buffer and leave for next read if we got more than expected. if let Ok(remainder) = buffer.split_back(bytes_read) { if !remainder.is_empty() { - self.push_front(remainder, addr.clone()); + self.push_front(remainder, addr); } } + Ok((addr, buffer)) } diff --git a/src/catnap/linux/socket.rs b/src/catnap/linux/socket.rs index 6538deb86..bfd641291 100644 --- a/src/catnap/linux/socket.rs +++ b/src/catnap/linux/socket.rs @@ -70,25 +70,23 @@ impl SharedSocketData { self.set_socket_data(SocketData::Active(ActiveSocketData::new(socket))); } - /// Gets a reference to the actual Socket for reading the socket's metadata (mostly the raw file descriptor). - pub fn get_socket<'a>(&'a self) -> &'a Socket { + pub fn get_socket(&self) -> &Socket { let _self = self.as_ref(); match _self { SocketData::Inactive(Some(socket)) => socket, SocketData::Active(data) => data.socket(), SocketData::Passive(data) => data.socket(), - _ => panic!("Should have data"), + _ => panic!("should have data"), } } - /// Gets a mutable reference to the actual Socket for I/O operations. - pub fn get_mut_socket<'a>(&'a mut self) -> &'a mut Socket { + pub fn get_socket_mut(&mut self) -> &mut Socket { let _self = self.as_mut(); match _self { SocketData::Inactive(Some(socket)) => socket, SocketData::Active(data) => data.socket_mut(), SocketData::Passive(data) => data.socket_mut(), - _ => panic!("Should have data"), + _ => panic!("should have data"), } } diff --git a/src/catnap/linux/transport.rs b/src/catnap/linux/transport.rs index efc8cc341..949f716c2 100644 --- a/src/catnap/linux/transport.rs +++ b/src/catnap/linux/transport.rs @@ -203,7 +203,7 @@ impl SharedCatnapTransport { /// Internal function to get the Socket from the metadata structure, given the socket descriptor. fn socket_from_sd(&mut self, sd: &SockDesc) -> &mut Socket { - self.data_from_sd(sd).get_mut_socket() + self.data_from_sd(sd).get_socket_mut() } /// Internal function to get the metadata for the socket, given the socket descriptor. diff --git a/src/demikernel/libos/network/libos.rs b/src/demikernel/libos/network/libos.rs index eeeb3c927..2bea33dca 100644 --- a/src/demikernel/libos/network/libos.rs +++ b/src/demikernel/libos/network/libos.rs @@ -74,9 +74,8 @@ impl SharedNetworkLibOS { } if (typ != Type::STREAM) && (typ != Type::DGRAM) { - let cause: String = format!("socket type not supported (type={:?})", typ); - error!("socket(): {}", cause); - return Err(Fail::new(libc::ENOTSUP, &cause)); + error!("socket(): socket type not supported (type={:?})", typ); + return Err(Fail::new(libc::ENOTSUP, "socket type not supported")); } let queue: SharedNetworkQueue = SharedNetworkQueue::new(domain, typ, &mut self.transport)?; @@ -109,24 +108,22 @@ impl SharedNetworkLibOS { // We only support the wildcard address for UDP sockets. // FIXME: https://github.com/demikernel/demikernel/issues/189 if *socket_addrv4.ip() == Ipv4Addr::UNSPECIFIED && self.get_shared_queue(&qd)?.qtype() != QType::UdpSocket { - let cause: String = format!("cannot bind to wildcard address (qd={:?})", qd); - error!("bind(): {}", cause); - return Err(Fail::new(libc::ENOTSUP, &cause)); + error!("bind(): cannot bind to wildcard address (qd={:?})", qd); + return Err(Fail::new(libc::ENOTSUP, "cannot bind to wildcard address")); } // We only support the wildcard address for UDP sockets. // FIXME: https://github.com/demikernel/demikernel/issues/582 if socket_addr.port() == 0 && self.get_shared_queue(&qd)?.qtype() != QType::UdpSocket { - let cause: String = format!("cannot bind to port 0 (qd={:?})", qd); - error!("bind(): {}", cause); - return Err(Fail::new(libc::ENOTSUP, &cause)); + error!("bind(): cannot bind to port 0 (qd={:?})", qd); + return Err(Fail::new(libc::ENOTSUP, "cannot bind to port 0")); } if self.runtime.is_addr_in_use(socket_addrv4) { - let cause: String = format!("address is already bound to a socket (qd={:?}", qd); - error!("bind(): {}", &cause); - return Err(Fail::new(libc::EADDRINUSE, &cause)); + error!("bind(): address is already bound to a socket (qd={:?}", qd); + return Err(Fail::new(libc::EADDRINUSE, "address is already bound to a socket")); } + self.get_shared_queue(&qd)?.bind(socket_addr)?; // Insert into address to queue descriptor table. self.runtime @@ -142,12 +139,10 @@ impl SharedNetworkLibOS { // We use this API for testing, so we must check again. if !((backlog > 0) && (backlog <= SOMAXCONN as usize)) { - let cause: String = format!("invalid backlog length: {:?}", backlog); - warn!("{}", cause); - return Err(Fail::new(libc::EINVAL, &cause)); + warn!("invalid backlog length: {:?}", backlog); + return Err(Fail::new(libc::EINVAL, "invalid backlog length")); } - // Issue listen operation. self.get_shared_queue(&qd)?.listen(backlog) } @@ -303,23 +298,23 @@ impl SharedNetworkLibOS { /// coroutine that asynchronously runs the push and any synchronous multi-queue functionality before the push /// begins. pub fn push(&mut self, qd: QDesc, sga: &demi_sgarray_t) -> Result { - let bufs = clone_sgarray(sga)?; - if bufs.is_empty() { - let cause = "zero-length list of buffers"; - warn!("push(): {}", cause); - return Err(Fail::new(libc::EINVAL, &cause)); + let buffers = clone_sgarray(sga)?; + + if buffers.is_empty() { + warn!("push(): buffers cannot be empty"); + return Err(Fail::new(libc::EINVAL, "buffers cannot be empty")); } - for buf in bufs.iter() { - if buf.is_empty() { - let cause = "zero-length buffer"; - warn!("push(): {}", cause); - return Err(Fail::new(libc::EINVAL, &cause)); + + for buffer in buffers.iter() { + if buffer.is_empty() { + warn!("push(): empty buffer"); + return Err(Fail::new(libc::EINVAL, "empty buffer")); }; } let mut queue: SharedNetworkQueue = self.get_shared_queue(&qd)?; let coroutine_constructor = || -> Result { - let coroutine = Box::pin(self.clone().push_coroutine(qd, bufs, None).fuse()); + let coroutine = Box::pin(self.clone().push_coroutine(qd, buffers, None).fuse()); self.runtime .clone() .schedule_coroutine("ioc::network::libos::push", coroutine) @@ -360,14 +355,14 @@ impl SharedNetworkLibOS { pub fn pushto(&mut self, qd: QDesc, sga: &demi_sgarray_t, remote: SocketAddr) -> Result { trace!("pushto() qd={:?}", qd); - let bufs: ArrayVec = clone_sgarray(sga)?; - if bufs.is_empty() { - return Err(Fail::new(libc::EINVAL, "zero buffers to send")); + let buffers: ArrayVec = clone_sgarray(sga)?; + if buffers.is_empty() { + return Err(Fail::new(libc::EINVAL, "buffers cannot be empty")); } let mut queue: SharedNetworkQueue = self.get_shared_queue(&qd)?; let coroutine_constructor = || -> Result { - let coroutine = Box::pin(self.clone().push_coroutine(qd, bufs, Some(remote)).fuse()); + let coroutine = Box::pin(self.clone().push_coroutine(qd, buffers, Some(remote)).fuse()); self.runtime .clone() .schedule_coroutine("ioc::network::libos::pushto", coroutine) diff --git a/src/demikernel/libos/network/mod.rs b/src/demikernel/libos/network/mod.rs index e3ac206eb..a48ba8950 100644 --- a/src/demikernel/libos/network/mod.rs +++ b/src/demikernel/libos/network/mod.rs @@ -120,7 +120,7 @@ impl NetworkLibOSWrapper { /// Marks a socket as a passive one. pub fn listen(&mut self, sockqd: QDesc, mut backlog: usize) -> Result<(), Fail> { - // Truncate backlog length. + // Limit backlog length. if backlog > SOMAXCONN as usize { debug!( "listen(): backlog length is too large, truncating (qd={:?}, backlog={:?})", diff --git a/src/demikernel/libos/network/queue.rs b/src/demikernel/libos/network/queue.rs index 43c3abcef..f22c05464 100644 --- a/src/demikernel/libos/network/queue.rs +++ b/src/demikernel/libos/network/queue.rs @@ -74,9 +74,8 @@ impl SharedNetworkQueue { pub fn set_socket_option(&mut self, option: SocketOption) -> Result<(), Fail> { if self.state_machine.ensure_not_closing().is_err() { - let cause = "cannot set socket-level options when socket is closing"; - warn!("set_socket_option(): {}", cause); - return Err(Fail::new(libc::EBUSY, cause)); + warn!("set_socket_option(): cannot set socket options when closing"); + return Err(Fail::new(libc::EBUSY, "cannot set socket options when closing")); } self.transport.clone().set_socket_option(&mut self.socket, option) diff --git a/src/runtime/memory/mod.rs b/src/runtime/memory/mod.rs index 66f5792e2..757a71efe 100644 --- a/src/runtime/memory/mod.rs +++ b/src/runtime/memory/mod.rs @@ -37,66 +37,65 @@ pub trait DemiMemoryAllocator { } } -/// Converts a list of DemiBuffers into a scatter-gather array. -pub fn into_sgarray(bufs: ArrayVec) -> Result { - // Check the sizes before allocating anything. - if bufs.is_empty() { - let cause = "cannot allocate a zero element scatter-gather array"; - error!("into_sgarray(): {}", cause); - return Err(Fail::new(libc::EINVAL, &cause)); +pub fn into_sgarray(buffers: ArrayVec) -> Result { + if buffers.is_empty() { + error!("into_sgarray(): buffers is empty"); + return Err(Fail::new(libc::EINVAL, "buffers is empty")); } - if bufs.len() > DEMI_SGARRAY_MAXLEN { - let cause = format!("cannot allocate a {} element scatter-gather array", bufs.len()); - error!("into_sgarray(): {}", cause); - return Err(Fail::new(libc::EINVAL, &cause)); + + if buffers.len() > DEMI_SGARRAY_MAXLEN { + error!( + "into_sgarray(): too many buffers: {}, max: {}", + buffers.len(), + DEMI_SGARRAY_MAXLEN + ); + return Err(Fail::new(libc::EINVAL, "too many buffers")); } - // Create a scatter-gather segment to expose the DemiBuffers to the user. - let mut sga: demi_sgarray_t = demi_sgarray_t::default(); - sga.num_segments = bufs.len() as u32; + let mut sga: demi_sgarray_t = demi_sgarray_t { + num_segments: buffers.len() as u32, + ..Default::default() + }; - for (i, buf) in bufs.into_iter().enumerate() { - sga.segments[i].data_buf_ptr = buf.as_ptr() as *mut c_void; - sga.segments[i].data_len_bytes = buf.len() as u32; - sga.segments[i].reserved_metadata_ptr = buf.into_raw().as_ptr() as *mut c_void; + for (i, buffer) in buffers.into_iter().enumerate() { + sga.segments[i].data_buf_ptr = buffer.as_ptr() as *mut c_void; + sga.segments[i].data_len_bytes = buffer.len() as u32; + sga.segments[i].reserved_metadata_ptr = buffer.into_raw().as_ptr() as *mut c_void; } - // Create and return a new scatter-gather array (which inherits the DemiBuffer's reference). Ok(sga) } -/// Allocates a scatter-gather array. pub fn sgaalloc(size: usize, mem_alloc: &M) -> Result { - // Check the sizes before allocating anything. - // We can't allocate a zero-sized buffer. if size == 0 { - let cause = "cannot allocate a zero-sized buffer"; - error!("sgaalloc(): {}", cause); - return Err(Fail::new(libc::EINVAL, cause)); + error!("sgaalloc(): cannot allocate zero-sized buffer"); + return Err(Fail::new(libc::EINVAL, "cannot allocate zero-sized buffer")); } // First allocate the underlying DemiBuffer. if size > mem_alloc.max_buffer_size_bytes() * DEMI_SGARRAY_MAXLEN { return Err(Fail::new(libc::EINVAL, "size too large for a single demi_sgaseg_t")); } + // Calculate the number of DemiBuffers to allocate. let max_buffer_size_bytes: usize = mem_alloc.max_buffer_size_bytes(); let remainder: usize = size % max_buffer_size_bytes; let len: usize = (size - remainder) / max_buffer_size_bytes; let mut bufs: ArrayVec = ArrayVec::new(); + for _ in 0..len { bufs.push(mem_alloc.allocate_demi_buffer(max_buffer_size_bytes)?); } + // If there is any remaining length, allocate a partial buffer. if remainder > 0 { bufs.push(mem_alloc.allocate_demi_buffer(remainder)?); } + into_sgarray(bufs) } -/// Releases a scatter-gather array. pub fn sgafree(sga: demi_sgarray_t) -> Result<(), Fail> { - // Check arguments. if sga.num_segments > DEMI_SGARRAY_MAXLEN as u32 { return Err(Fail::new(libc::EINVAL, "demi_sgarray_t has invalid segment count")); } @@ -105,13 +104,13 @@ pub fn sgafree(sga: demi_sgarray_t) -> Result<(), Fail> { let buf: DemiBuffer = convert_sgaseg_to_demi_buffer(&sga.segments[i])?; drop(buf); } + Ok(()) } -/// Clones a scatter-gather array. The [sga_buf] field must point to the first DemiBuffer in the chain and the elements -/// of [segments] must be the rest of the chain. +/// The [sga_buf] field must point to the first DemiBuffer in the chain and the elements of [segments] must be the rest +/// of the chain. pub fn clone_sgarray(sga: &demi_sgarray_t) -> Result, Fail> { - // Check arguments. if sga.num_segments > DEMI_SGARRAY_MAXLEN as u32 || sga.num_segments == 0 { return Err(Fail::new(libc::EINVAL, "demi_sgarray_t has invalid segment count")); } diff --git a/src/runtime/scheduler/page/page_ref.rs b/src/runtime/scheduler/page/page_ref.rs index 8f95a56a6..12000bf9e 100644 --- a/src/runtime/scheduler/page/page_ref.rs +++ b/src/runtime/scheduler/page/page_ref.rs @@ -66,8 +66,7 @@ impl WakerPageRef { unsafe { let base_ptr: *mut u8 = self.0.as_ptr().cast(); - let ptr = NonNull::new_unchecked(base_ptr.add(ix)); - ptr + NonNull::new_unchecked(base_ptr.add(ix)) } } } diff --git a/src/runtime/scheduler/scheduler.rs b/src/runtime/scheduler/scheduler.rs index e8c3c5796..9521c3fcb 100644 --- a/src/runtime/scheduler/scheduler.rs +++ b/src/runtime/scheduler/scheduler.rs @@ -147,8 +147,7 @@ mod tests { impl DummyCoroutine { pub fn new(val: usize) -> Self { - let f = Self { val }; - f + Self { val } } } impl Future for DummyCoroutine {