diff --git a/Cargo.lock b/Cargo.lock index f9b03973814..442c9479c61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -592,53 +592,53 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", ] [[package]] name = "futures-core" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-io" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" [[package]] name = "futures-macro" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 1.0.98", + "syn 2.0.37", ] [[package]] name = "futures-sink" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-core", "futures-io", @@ -1378,6 +1378,7 @@ dependencies = [ "httpdate", "hyper", "hyperlocal", + "indexmap 1.9.1", "lazy_static", "leaky-bucket", "libc", diff --git a/rafs/src/fs.rs b/rafs/src/fs.rs index 15a1848c4ce..ccb0e06c82c 100644 --- a/rafs/src/fs.rs +++ b/rafs/src/fs.rs @@ -175,8 +175,10 @@ impl Rafs { } if self.fs_prefetch { // Device should be ready before any prefetch. - self.device.start_prefetch(); - self.prefetch(r, prefetch_files); + // self.device.start_prefetch(); + // self.prefetch(r, prefetch_files); + self.device.init_stream_prefetch(); + self.start_stream_prefetch(r, prefetch_files); } self.initialized = true; @@ -327,6 +329,7 @@ impl Rafs { } impl Rafs { + #[allow(unused)] fn prefetch(&self, reader: RafsIoReader, prefetch_files: Option>) { let sb = self.sb.clone(); let device = self.device.clone(); @@ -338,6 +341,18 @@ impl Rafs { }); } + #[allow(unused)] + fn start_stream_prefetch(&self, reader: RafsIoReader, prefetch_files: Option>) { + let sb = self.sb.clone(); + let device = self.device.clone(); + let prefetch_all = self.prefetch_all; + let root_ino = self.root_ino(); + + let _ = std::thread::spawn(move || { + Self::do_stream_prefetch(root_ino, reader, prefetch_files, prefetch_all, sb, device); + }); + } + /// for blobfs pub fn fetch_range_synchronous(&self, prefetches: &[BlobPrefetchRequest]) -> Result<()> { self.device.fetch_range_synchronous(prefetches) @@ -347,6 +362,7 @@ impl Rafs { self.sb.superblock.root_ino() } + #[allow(unused)] fn do_prefetch( root_ino: u64, mut reader: RafsIoReader, @@ -472,6 +488,45 @@ impl Rafs { } } + fn do_stream_prefetch( + root_ino: u64, + mut reader: RafsIoReader, + prefetch_files: Option>, + _prefetch_all: bool, + sb: Arc, + device: BlobDevice, + ) { + // Bootstrap has non-empty prefetch table indicating a full prefetch + let inlay_prefetch_all = sb + .is_inlay_prefetch_all(&mut reader) + .map_err(|e| error!("Detect prefetch table error {}", e)) + .unwrap_or_default(); + + // Nydusd has a CLI option indicating a full prefetch + let startup_prefetch_all = prefetch_files + .as_ref() + .map(|f| f.len() == 1 && f[0].as_os_str() == "/") + .unwrap_or(false); + + // User specified prefetch files have high priority to be prefetched. + // Moreover, user specified prefetch files list will override those on-disk prefetch table. + if !startup_prefetch_all && !inlay_prefetch_all { + // Then do file based prefetch based on: + // - prefetch listed passed in by user + // - or file prefetch list in metadata + // TODO: change this to iterator + let inodes = prefetch_files.map(|files| Self::convert_file_list(&files, &sb)); + let res = sb.stream_prefetch_files(&device, &mut reader, root_ino, inodes); + match res { + Ok(true) => { + info!("Root inode was found, but it should not prefetch all files!") + } + Ok(false) => {} + Err(e) => error!("No file to be prefetched {:?}", e), + } + } + } + fn convert_file_list(files: &[PathBuf], sb: &Arc) -> Vec { let mut inodes = Vec::::with_capacity(files.len()); diff --git a/rafs/src/metadata/direct_v6.rs b/rafs/src/metadata/direct_v6.rs index 3330aea9451..d3056a77328 100644 --- a/rafs/src/metadata/direct_v6.rs +++ b/rafs/src/metadata/direct_v6.rs @@ -847,6 +847,7 @@ impl RafsInode for OndiskInodeWrapper { curr_chunk_index == tail_chunk_index, ) .ok_or_else(|| einval!("failed to get chunk information"))?; + //TODO:这里应该考虑某个中间的chunk的blob_index不同的情况 if desc.blob.blob_index() != descs.blob_index() { vec.push(descs); descs = BlobIoVec::new(desc.blob.clone()); diff --git a/rafs/src/metadata/md_v6.rs b/rafs/src/metadata/md_v6.rs index a0b11d0dd00..bd7996c3a2e 100644 --- a/rafs/src/metadata/md_v6.rs +++ b/rafs/src/metadata/md_v6.rs @@ -151,6 +151,74 @@ impl RafsSuper { Ok(found_root_inode) } + + pub(crate) fn stream_prefetch_data_v6( + &self, + device: &BlobDevice, + rafs_reader: &mut RafsIoReader, + root_ino: Inode, + ) -> RafsResult { + let hint_entries = self.meta.prefetch_table_entries as usize; + if hint_entries == 0 { + return Ok(false); + } + + // Try to prefetch according to the list of files specified by the + // builder's `--prefetch-policy fs` option. + let mut prefetch_table = RafsV6PrefetchTable::new(); + prefetch_table + .load_prefetch_table_from(rafs_reader, self.meta.prefetch_table_offset, hint_entries) + .map_err(|e| { + error!("Failed in loading hint prefetch table at offset {}", e); + RafsError::Prefetch(format!( + "Failed in loading hint prefetch table at offset {}. {:?}", + self.meta.prefetch_table_offset, e + )) + })?; + // debug!("prefetch table contents {:?}", prefetch_table); + + let mut hardlinks: HashSet = HashSet::new(); + let mut fetched_ranges: HashMap> = HashMap::new(); + let blob_ccis = device.get_all_blob_cci(); + + let mut found_root_inode = false; + for ino in prefetch_table.inodes { + // Inode number 0 is invalid, it was added because prefetch table has to be aligned. + if ino == 0 { + break; + } + if ino as Inode == root_ino { + found_root_inode = true; + } + // debug!("CMDebug: hint prefetch inode {}", ino); + + let ranges = self + .get_inode_ranges( + ino as u64, + &mut hardlinks, + &mut fetched_ranges, + device, + &blob_ccis, + ) + .map_err(|e| { + RafsError::Prefetch(format!("Failed in get inode chunk ranges. {:?}", e)) + })?; + + // debug!("CMDebug: prefetch inode: {}, ranges: {:?}", ino, ranges); + + for r in ranges { + device.add_stream_prefetch_range(r).map_err(|e| { + RafsError::Prefetch(format!("Failed to add inode prefetch range. {:?}", e)) + })?; + } + } + + device.flush_stream_prefetch().map_err(|e| { + RafsError::Prefetch(format!("Failed to flush inode prefetch range. {:?}", e)) + })?; + + Ok(found_root_inode) + } } #[cfg(test)] diff --git a/rafs/src/metadata/mod.rs b/rafs/src/metadata/mod.rs index 0c12fc3dd88..af6a837a9c6 100644 --- a/rafs/src/metadata/mod.rs +++ b/rafs/src/metadata/mod.rs @@ -17,6 +17,8 @@ use std::path::{Component, Path, PathBuf}; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use storage::cache::BlobCCI; +use storage::device::{BlobIoDesc, BlobRange}; use thiserror::Error; use anyhow::{bail, ensure}; @@ -962,6 +964,29 @@ impl RafsSuper { } } + pub fn stream_prefetch_files( + &self, + device: &BlobDevice, + r: &mut RafsIoReader, + root_ino: Inode, + files: Option>, + ) -> RafsResult { + // Try to prefetch files according to the list specified by the `--prefetch-files` option. + if let Some(_files) = files { + unimplemented!(); + } else if self.meta.is_v5() { + Err(RafsError::Prefetch( + "Unsupported filesystem version, prefetch disabled".to_string(), + )) + } else if self.meta.is_v6() { + self.stream_prefetch_data_v6(device, r, root_ino) + } else { + Err(RafsError::Prefetch( + "Unknown filesystem version, prefetch disabled".to_string(), + )) + } + } + #[inline] fn prefetch_inode( device: &BlobDevice, @@ -1020,6 +1045,127 @@ impl RafsSuper { Ok(()) } + + fn get_inode_ranges( + &self, + ino: u64, + hardlinks: &mut HashSet, + fetched_ranges: &mut HashMap>, + device: &BlobDevice, + blob_ccis: &[BlobCCI], + ) -> Result> { + let inode = self + .superblock + .get_inode(ino, self.validate_digest) + .map_err(|_e| enoent!("Can't find inode"))?; + + let mut ranges = Vec::new(); + + if inode.is_dir() { + let mut descendants = Vec::new(); + let _ = inode.collect_descendants_inodes(&mut descendants)?; + for i in descendants.iter() { + Self::get_inode_ranges_inner( + &i, + hardlinks, + fetched_ranges, + &mut ranges, + device, + blob_ccis, + )?; + } + } else if !inode.is_empty_size() && inode.is_reg() { + // An empty regular file will also be packed into nydus image, + // then it has a size of zero. + // Moreover, for rafs v5, symlink has size of zero but non-zero size + // for symlink size. For rafs v6, symlink size is also represented by i_size. + // So we have to restrain the condition here. + // debug!("CMDebug: 2"); + + Self::get_inode_ranges_inner( + &inode, + hardlinks, + fetched_ranges, + &mut ranges, + device, + blob_ccis, + )?; + } + + Ok(ranges) + } + + #[inline] + fn get_inode_ranges_inner( + inode: &Arc, + hardlinks: &mut HashSet, + fetched_ranges: &mut HashMap>, + ranges: &mut Vec, + device: &BlobDevice, + blob_ccis: &[BlobCCI], + ) -> Result<()> { + // Check for duplicated hardlinks. + if inode.is_hardlink() { + if hardlinks.contains(&inode.ino()) { + return Ok(()); + } else { + hardlinks.insert(inode.ino()); + } + } + + let mut bi_vecs = inode.alloc_bio_vecs(device, 0, inode.size() as usize, false)?; + for bi_vec in &mut bi_vecs { + //每个bi_vecs是单个blob,但里面可能不连续,需要对里面的每个desc判断是否能合并 + + let blob_idx = bi_vec.blob_index(); + + 'vec: for BlobIoDesc { chunkinfo: ci, .. } in &mut bi_vec.bi_vec { + let ci_id = ci.id(); + + let (c_offset, c_size) = + blob_ccis[blob_idx as usize].get_compressed_info(ci.as_ref())?; + let c_end = c_offset + c_size as u64; + + // 判断这个chunk / batch chunk是否已经下载过 + let fetched_blob_ranges = fetched_ranges.entry(blob_idx).or_insert(HashSet::new()); + if fetched_blob_ranges.contains(&ci_id) { + continue; + } + + // 尝试merge 进已有的range + // TODO:如果chunk在同一个blob,但是乱序的,是否存在这种情况,如何处理 + for r in &mut *ranges { + //先匹配blob + if r.blob_idx != blob_idx { + continue; + } + + // 再判断是否重复(for batch chunks) + if r.end == c_end { + //说明对应的batch chunk已经添加过了 + continue 'vec; + } + + // 再判断is_continuous + // TODO:进一步对特殊格式进行处理 + if r.end == c_offset { + r.end = c_end; + fetched_blob_ranges.insert(ci_id); + continue 'vec; + } + } + // 放到一个新range里 + ranges.push(BlobRange { + blob_idx, + offset: c_offset, + end: c_end, + }); + fetched_blob_ranges.insert(ci_id); + } + } + + Ok(()) + } } // For nydus-image diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 44197ec108f..1a12bc0b4ea 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -18,6 +18,7 @@ http = { version = "0.2.8", optional = true } httpdate = { version = "1.0", optional = true } hyper = { version = "0.14.11", optional = true } hyperlocal = { version = "0.8.0", optional = true } +indexmap = "1" lazy_static = "1.4.0" leaky-bucket = { version = "0.12.1", optional = true } libc = "0.2" diff --git a/storage/src/backend/mod.rs b/storage/src/backend/mod.rs index aec8db9de06..009def2219b 100644 --- a/storage/src/backend/mod.rs +++ b/storage/src/backend/mod.rs @@ -14,8 +14,8 @@ //! prefetching, which is to load data into page cache. //! - [LocalDisk](localdisk/struct.LocalDisk.html): backend driver to access blobs on local disk. -use std::fmt; use std::io::Read; +use std::{fmt, thread}; use std::{sync::Arc, time::Duration}; use fuse_backend_rs::file_buf::FileVolatileSlice; @@ -23,7 +23,9 @@ use nydus_utils::{ metrics::{BackendMetrics, ERROR_HOLDER}, DelayType, Delayer, }; +use reqwest::blocking::Response; +use crate::backend::registry::StreamCallback; use crate::utils::{alloc_buf, copyv}; use crate::StorageError; @@ -149,6 +151,62 @@ pub trait BlobReader: Send + Sync { } } + fn try_stream_read( + &self, + _offset: u64, + _size: u64, + _processed: &mut u64, + _f: &mut StreamCallback, + ) -> BackendResult<()> { + unimplemented!(); + } + + /// TODO: + fn stream_read(&self, offset: u64, size: u64, f: &mut StreamCallback) -> BackendResult { + let mut retry_count = self.retry_limit(); + + let mut delayer = Delayer::new(DelayType::BackOff, Duration::from_millis(500)); + + let mut processed_all = 0u64; + loop { + let begin_time = self.metrics().begin(); + + let mut processed = 0u64; + match self.try_stream_read( + offset + processed_all as u64, + size - processed_all, + &mut processed, + f, + ) { + Ok(()) => { + self.metrics().end(&begin_time, processed as usize, false); + return Ok(size as usize); + } + Err(err) => { + if processed > 0 { + self.metrics().end(&begin_time, processed as usize, true); + processed_all += processed; + } + if retry_count > 0 { + warn!( + "Read from backend failed: {:?}, retry count {}", + err, retry_count + ); + retry_count -= 1; + delayer.delay(); + } else { + ERROR_HOLDER + .lock() + .unwrap() + .push(&format!("{:?}", err)) + .unwrap_or_else(|_| error!("Failed when try to hold error")); + return Err(err); + } + } + } + } + } + /// Read as much as possible data into buffer. fn read_all(&self, buf: &mut [u8], offset: u64) -> BackendResult { let mut off = 0usize; @@ -274,3 +332,82 @@ impl Read for BlobBufReader { Ok(sz) } } + +struct ResponseBufReader { + buf: Option>, + pos: usize, + receiver: std::sync::mpsc::Receiver>, +} + +impl ResponseBufReader { + pub fn new(mut res: Response, size: u64) -> Self { + let (sender, receiver) = std::sync::mpsc::channel(); + + // 新开一个线程来流式下载和发送数据 + thread::spawn(move || { + let mut downloaded = 0u64; + while downloaded < size { + let count = std::cmp::min(102400, size - downloaded); + let mut buffer = vec![0u8; count as usize]; + res.read_exact(&mut buffer).unwrap(); + sender.send(buffer).unwrap(); // 发送数据块到主线程 + downloaded += count; + } + }); + + Self { + buf: None, + pos: 0, + receiver, + } + } +} + +impl Read for ResponseBufReader { + fn read(&mut self, target: &mut [u8]) -> std::io::Result { + let t_len = target.len(); + let mut t_pos = 0; + // read from buffer + if let Some(buf) = self.buf.as_ref() { + let copy_size = std::cmp::min(t_len, buf.len() - self.pos); + target[..copy_size].copy_from_slice(&buf[self.pos..self.pos + copy_size]); + self.pos += copy_size; + if self.pos == buf.len() { + self.buf = None; + self.pos = 0; + } + if copy_size == t_len { + return Ok(t_len); + } + t_pos += copy_size; + } + + // read from channel + while let Ok(data) = self.receiver.recv() { + // debug!( + // "t_pos: {}, t_len: {}, data.len(): {}", + // t_pos, + // t_len, + // data.len() + // ); + let copy_size = std::cmp::min(t_len - t_pos, data.len()); + target[t_pos..t_pos + copy_size].copy_from_slice(&data[..copy_size]); + t_pos += copy_size; + if copy_size == data.len() { + continue; + } + // store buffer before early exit + // assert!(copy_size < res_data.len()); + self.buf = Some(data); + self.pos = copy_size; + break; + } + if t_pos < t_len { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("read size is too big! {}, {}", t_pos, t_len), + )); + } + Ok(t_len) + } +} diff --git a/storage/src/backend/registry.rs b/storage/src/backend/registry.rs index a9947f49efb..99a28b9313f 100644 --- a/storage/src/backend/registry.rs +++ b/storage/src/backend/registry.rs @@ -27,6 +27,8 @@ use crate::backend::connection::{ }; use crate::backend::{BackendError, BackendResult, BlobBackend, BlobReader}; +use super::ResponseBufReader; + const REGISTRY_CLIENT_ID: &str = "nydus-registry-client"; const HEADER_AUTHORIZATION: &str = "Authorization"; const HEADER_WWW_AUTHENTICATE: &str = "www-authenticate"; @@ -68,6 +70,8 @@ impl From for BackendError { type RegistryResult = std::result::Result; +pub type StreamCallback = dyn FnMut(Box, u64, u64, &mut u64) -> Result<()>; + #[derive(Default)] struct Cache(RwLock); @@ -764,6 +768,147 @@ impl RegistryReader { .map_err(RegistryError::Transport) .map(|size| size as usize) } + + fn _try_stream_read( + &self, + offset: u64, + size: u64, + processed: &mut u64, + f: &mut StreamCallback, + allow_retry: bool, + ) -> RegistryResult<()> { + let url = format!("/blobs/sha256:{}", self.blob_id); + let url = self + .state + .url(url.as_str(), &[]) + .map_err(|e| RegistryError::Url(url, e))?; + let mut headers = HeaderMap::new(); + let end_at = offset + size as u64 - 1; + let range = format!("bytes={}-{}", offset, end_at); + headers.insert("Range", range.parse().unwrap()); + + let mut resp; + let cached_redirect = self.state.cached_redirect.get(&self.blob_id); + + if let Some(cached_redirect) = cached_redirect { + resp = self + .connection + .call::<&[u8]>( + Method::GET, + cached_redirect.as_str(), + None, + None, + &mut headers, + false, + ) + .map_err(RegistryError::Request)?; + + // The request has expired or has been denied, need to re-request + if allow_retry + && [StatusCode::UNAUTHORIZED, StatusCode::FORBIDDEN].contains(&resp.status()) + { + warn!( + "The redirected link has expired: {}, will retry read", + cached_redirect.as_str() + ); + self.state.cached_redirect.remove(&self.blob_id); + // Try read again only once + return self._try_stream_read(offset, size, processed, f, false); + } + } else { + resp = match self.request::<&[u8]>( + Method::GET, + url.as_str(), + None, + headers.clone(), + false, + ) { + Ok(res) => res, + Err(RegistryError::Request(ConnectionError::Common(e))) + if self.state.needs_fallback_http(&e) => + { + self.state.fallback_http(); + let url = format!("/blobs/sha256:{}", self.blob_id); + let url = self + .state + .url(url.as_str(), &[]) + .map_err(|e| RegistryError::Url(url, e))?; + self.request::<&[u8]>(Method::GET, url.as_str(), None, headers.clone(), false)? + } + Err(RegistryError::Request(ConnectionError::Common(e))) => { + if e.to_string().contains("self signed certificate") { + warn!("try to enable \"skip_verify: true\" option"); + } + return Err(RegistryError::Request(ConnectionError::Common(e))); + } + Err(e) => { + return Err(e); + } + }; + let status = resp.status(); + + // Handle redirect request and cache redirect url + if REDIRECTED_STATUS_CODE.contains(&status) { + if let Some(location) = resp.headers().get("location") { + let location = location.to_str().unwrap(); + let mut location = Url::parse(location) + .map_err(|e| RegistryError::Url(location.to_string(), e))?; + // Note: Some P2P proxy server supports only scheme specified origin blob server, + // so we need change scheme to `blob_url_scheme` here + if !self.state.blob_url_scheme.is_empty() { + location + .set_scheme(&self.state.blob_url_scheme) + .map_err(|_| { + RegistryError::Scheme(self.state.blob_url_scheme.clone()) + })?; + } + if !self.state.blob_redirected_host.is_empty() { + location + .set_host(Some(self.state.blob_redirected_host.as_str())) + .map_err(|e| { + error!( + "Failed to set blob redirected host to {}: {:?}", + self.state.blob_redirected_host.as_str(), + e + ); + RegistryError::Url(location.to_string(), e) + })?; + debug!("New redirected location {:?}", location.host_str()); + } + let resp_ret = self + .connection + .call::<&[u8]>( + Method::GET, + location.as_str(), + None, + None, + &mut headers, + true, + ) + .map_err(RegistryError::Request); + match resp_ret { + Ok(_resp) => { + resp = _resp; + self.state + .cached_redirect + .set(self.blob_id.clone(), location.as_str().to_string()) + } + Err(err) => { + return Err(err); + } + } + }; + } else { + resp = respond(resp, true).map_err(RegistryError::Request)?; + } + } + + let buf_reader = ResponseBufReader::new(resp, size); + + f(Box::new(buf_reader), offset, size as u64, processed) + .map_err(|e| RegistryError::Common(e.to_string()))?; + Ok(()) + } } impl BlobReader for RegistryReader { @@ -820,6 +965,19 @@ impl BlobReader for RegistryReader { }) } + fn try_stream_read( + &self, + offset: u64, + size: u64, + processed: &mut u64, + f: &mut StreamCallback, + ) -> BackendResult<()> { + self.first.handle_force(&mut || -> BackendResult<()> { + self._try_stream_read(offset, size, processed, f, true) + .map_err(BackendError::Registry) + }) + } + fn metrics(&self) -> &BackendMetrics { &self.metrics } diff --git a/storage/src/cache/cachedfile.rs b/storage/src/cache/cachedfile.rs index 4122384a250..31fd1250226 100644 --- a/storage/src/cache/cachedfile.rs +++ b/storage/src/cache/cachedfile.rs @@ -9,11 +9,13 @@ //! performance. It may be used by both the userspace `FileCacheMgr` or the `FsCacheMgr` based //! on the in-kernel fscache system. +use std::cmp; use std::collections::HashSet; use std::fs::File; use std::io::{ErrorKind, Read, Result}; use std::mem::ManuallyDrop; use std::os::unix::io::{AsRawFd, RawFd}; + use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -28,8 +30,9 @@ use tokio::runtime::Runtime; use crate::backend::BlobReader; use crate::cache::state::ChunkMap; +use crate::cache::streaming::StreamPrefetchMgr; use crate::cache::worker::{AsyncPrefetchConfig, AsyncPrefetchMessage, AsyncWorkerMgr}; -use crate::cache::{BlobCache, BlobIoMergeState}; +use crate::cache::{BlobCCI, BlobCache, BlobIoMergeState}; use crate::device::{ BlobChunkInfo, BlobInfo, BlobIoDesc, BlobIoRange, BlobIoSegment, BlobIoTag, BlobIoVec, BlobObject, BlobPrefetchRequest, @@ -38,6 +41,8 @@ use crate::meta::{BlobCompressionContextInfo, BlobMetaChunk}; use crate::utils::{alloc_buf, copyv, readv, MemSliceCursor}; use crate::{StorageError, StorageResult, RAFS_BATCH_SIZE_TO_GAP_SHIFT, RAFS_DEFAULT_CHUNK_SIZE}; +use super::ChunkDecompressState; + const DOWNLOAD_META_RETRY_COUNT: u32 = 5; const DOWNLOAD_META_RETRY_DELAY: u64 = 400; const ENCRYPTION_PAGE_SIZE: usize = 4096; @@ -128,57 +133,6 @@ impl FileCacheMeta { } } -/// Helper struct to manage and call BlobCompressionContextInfo. -struct BlobCCI { - meta: Option>, -} - -impl BlobCCI { - fn new() -> Self { - BlobCCI { meta: None } - } - - fn is_none(&self) -> bool { - self.meta.is_none() - } - - fn set_meta(&mut self, meta: Option>) -> Result<&Self> { - if meta.is_none() { - return Err(einval!("failed to get blob meta info")); - } - self.meta = meta; - Ok(self) - } - - fn get_compressed_offset(&self, chunk: &Arc) -> Result { - Ok(chunk.compressed_offset()) - } - - fn get_compressed_size(&self, chunk: &Arc) -> Result { - let size = if chunk.is_batch() { - self.meta - .as_ref() - .unwrap() - .get_compressed_size(chunk.id())? - } else { - chunk.compressed_size() - }; - Ok(size) - } - - fn get_compressed_info(&self, chunk: &Arc) -> Result<(u64, u32)> { - Ok(( - self.get_compressed_offset(chunk)?, - self.get_compressed_size(chunk)?, - )) - } - - fn get_compressed_end(&self, chunk: &Arc) -> Result { - let (offset, size) = self.get_compressed_info(chunk)?; - Ok(offset + size as u64) - } -} - pub(crate) struct FileCacheEntry { pub(crate) blob_id: String, pub(crate) blob_info: Arc, @@ -192,6 +146,7 @@ pub(crate) struct FileCacheEntry { pub(crate) reader: Arc, pub(crate) runtime: Arc, pub(crate) workers: Arc, + pub(crate) stream_workers: Arc, pub(crate) blob_compressed_size: u64, pub(crate) blob_uncompressed_size: u64, @@ -509,6 +464,43 @@ impl FileCacheEntry { Ok((start, end, size as usize)) } } + + // 必须保证infos.is_empty() == false + #[inline] + fn process_chunk( + infos: &[Arc], + c_size: u32, + entry: &Arc, + info_offset: &mut u64, + processed: &mut u64, + reader: &mut Box, + ) -> Result<()> { + let mut chunk_buf = vec![0u8; c_size as usize]; + reader.read_exact(chunk_buf.as_mut_slice())?; + + // 如果没有chunk info,那就跳过解压 + if infos.is_empty() { + return Ok(()); + } + + let c_offset = infos[0].compressed_offset(); + + *info_offset += c_size as u64; + + let mut d_bufs = ChunkDecompressState::new( + c_offset, + entry.as_ref(), + infos.iter().map(|i| i.as_ref()).collect(), + chunk_buf, + ); + for info in infos { + let d_buf = d_bufs.next().unwrap().unwrap(); + entry.persist_chunk_data(info.as_ref(), &d_buf); + } + *processed += c_size as u64; + + Ok(()) + } } impl AsRawFd for FileCacheEntry { @@ -594,6 +586,10 @@ impl BlobCache for FileCacheEntry { Ok(()) } + fn init_stream_prefetch(&self, blobs: Vec>) { + self.stream_workers.init_blobs(blobs); + } + fn stop_prefetch(&self) -> StorageResult<()> { loop { let val = self.prefetch_state.load(Ordering::Acquire); @@ -654,6 +650,14 @@ impl BlobCache for FileCacheEntry { Ok(0) } + fn add_stream_prefetch_range(&self, range: crate::device::BlobRange) -> Result<()> { + self.stream_workers.add_prefetch_range(range) + } + + fn flush_stream_prefetch(&self) -> Result<()> { + self.stream_workers.flush_waiting_queue() + } + fn prefetch_range(&self, range: &BlobIoRange) -> Result { let mut pending = Vec::with_capacity(range.chunks.len()); if !self.chunk_map.is_persist() { @@ -770,6 +774,248 @@ impl BlobCache for FileCacheEntry { Ok(None) } } + + fn fetch_range_compressed_stream( + self: Arc, + offset: u64, + size: u64, + prefetch: bool, + ) -> std::io::Result<()> { + let entry = self.clone(); // 有风险,观察是否真正写入 + let fc_meta = entry.meta.as_ref().ok_or_else(|| einval!())?; + let meta = fc_meta.get_blob_meta().ok_or_else(|| einval!())?; + let mut f = move |mut resp: Box, + f_offset: u64, + f_size: u64, + processed: &mut u64| + -> std::io::Result<()> { + let mut info_offset = 0u64; + let iter = StreamChunkIter::new( + entry.clone(), + meta.clone(), + 0x100_0000, // todo:这里可以定制 16MiB + f_offset, + f_size, + prefetch, + ); + for (infos, c_size) in iter { + Self::process_chunk( + &infos, + c_size, + &entry, + &mut info_offset, + processed, + &mut resp, + )?; + } + Ok(()) + }; + self.reader() + .stream_read(offset, size, &mut f) + .map_err(|e| eio!(e))?; + Ok(()) + } +} + +struct StreamChunkIter { + // 暂存获取到的chunk info + chunks: Vec>, + // 记录当前已经处理到的chunk info的位置下标。当infos前部被清理时,这个需要同步更新 + p_cur: usize, + // 记录当前已经锁定到的chunk info的位置下标。当infos前部被清理时,这个需要同步更新 + l_cur: usize, + // 记录当前已经获取到的chunk info的总位置 + ext_end: u64, + entry: Arc, + meta: Arc, + // 常量,用于判断需要锁定多少个chunk info + min_pending_size: u64, + // request在整个blob的偏移 + f_offset: u64, + // request的大小 + f_size: u64, + prefetch: bool, +} + +impl StreamChunkIter { + fn new( + entry: Arc, + meta: Arc, + min_pending_size: u64, + f_offset: u64, + f_size: u64, + prefetch: bool, + ) -> Self { + let mut iter = Self { + chunks: Vec::new(), + p_cur: 0, + l_cur: 0, + ext_end: f_offset, + entry, + meta, + min_pending_size, + f_offset, + f_size, + prefetch, + }; + let _ = iter.extend(); + iter + } + + // 扩展infos + fn extend(&mut self) -> bool { + // todo:可调整,一次载入16MiB的chunk info + let size_to_extend = cmp::min(self.f_offset + self.f_size - self.ext_end, 0x100_0000); + + if size_to_extend == 0 { + return false; + } + + let mut chunks_new = self + .meta + .get_chunks_compressed(self.ext_end, size_to_extend, 0, self.prefetch) + .unwrap(); + + self.chunks.append(&mut chunks_new); + + self.ext_end = self + .chunks + .last() + .map(|i| { + if i.is_batch() { + i.compressed_offset() + self.meta.get_compressed_size(i.id()).unwrap() as u64 + } else { + i.compressed_end() + } + }) + .unwrap(); + + // strip processed chunks + if self.p_cur > self.chunks.len() / 2 { + self.chunks = self.chunks.split_off(self.p_cur); + self.l_cur -= self.p_cur; + self.p_cur = 0; + } + + true + } + + fn mark_pending(&mut self, processed_end: u64) { + if self.p_cur > self.l_cur { + // 先追上p_cur + for i in &self.chunks[self.l_cur..self.p_cur] { + let _ = self.entry.chunk_map.try_mark_pending(i.as_ref()); + } + self.l_cur = self.p_cur; + } + + loop { + if self.l_cur == self.chunks.len() && !self.extend() { + return; + } + + // 再追上min_pending_size + let i = &self.chunks[self.l_cur]; + self.l_cur += 1; + + self.entry + .chunk_map + .check_ready_and_mark_pending(i.as_ref()) + .ok(); + + let c_end = if i.is_batch() { + i.compressed_offset() + self.meta.get_compressed_size(i.id()).unwrap() as u64 + } else { + i.compressed_end() + }; + + if c_end >= processed_end && c_end - processed_end >= self.min_pending_size { + return; + } + } + } + + fn strip_ready_chunks( + &self, + infos: Vec>, + ) -> Vec> { + // 只有batch chunk会有多个chunk info,其他情况都是单个chunk info + let mut all_ready = true; + for i in &infos { + if !self.entry.chunk_map.is_ready(i.as_ref()).unwrap() { + all_ready = false; + break; + } + } + + if all_ready { + Vec::new() + } else { + infos + } + } +} + +impl Iterator for StreamChunkIter { + type Item = (Vec>, u32); + + fn next(&mut self) -> Option { + let mut items: Vec> = Vec::new(); + // uncompressed and compressed size of the whole batch chunk + let mut batch_d_size = u32::MAX; + let mut batch_c_size = u32::MAX; + // processed compressed end of chunks that calculated in this function + let mut p_end = u64::MAX; + + loop { + if self.p_cur == self.chunks.len() && !self.extend() { + if !items.is_empty() { + error!("no enough chunks to add!"); + } + return None; + } + + let i = self.chunks[self.p_cur].clone(); + + if !i.is_batch() { + let c_size = i.compressed_size(); + p_end = i.compressed_end(); + items.push(i); + self.p_cur += 1; + + self.mark_pending(p_end); + return Some((self.strip_ready_chunks(items), c_size)); + } + + // is batch + if items.is_empty() { + let batch_idx = self.meta.get_batch_index(i.id()).unwrap(); + let ctx = self.meta.get_batch_context(batch_idx).unwrap(); + batch_d_size = ctx.uncompressed_batch_size(); + batch_c_size = ctx.compressed_size(); + } + + let in_batch_end = i.uncompressed_size() + + self + .meta + .get_uncompressed_offset_in_batch_buf(i.id()) + .unwrap(); + + if items.is_empty() { + // update only once for the whole batch chunk + p_end = i.compressed_offset() + batch_c_size as u64; + } + + items.push(i); + self.p_cur += 1; + + if batch_d_size == in_batch_end { + self.mark_pending(p_end); + return Some((self.strip_ready_chunks(items), batch_c_size)); + } + // not ended, continue + } + } } impl BlobObject for FileCacheEntry { diff --git a/storage/src/cache/dummycache.rs b/storage/src/cache/dummycache.rs index 7a0465f36e2..6abc0af9da1 100644 --- a/storage/src/cache/dummycache.rs +++ b/storage/src/cache/dummycache.rs @@ -31,7 +31,7 @@ use crate::backend::{BlobBackend, BlobReader}; use crate::cache::state::{ChunkMap, NoopChunkMap}; use crate::cache::{BlobCache, BlobCacheMgr}; use crate::device::{ - BlobChunkInfo, BlobFeatures, BlobInfo, BlobIoDesc, BlobIoVec, BlobPrefetchRequest, + BlobChunkInfo, BlobFeatures, BlobInfo, BlobIoDesc, BlobIoVec, BlobPrefetchRequest, BlobRange, }; use crate::utils::{alloc_buf, copyv}; use crate::{StorageError, StorageResult}; @@ -104,6 +104,8 @@ impl BlobCache for DummyCache { Ok(()) } + fn init_stream_prefetch(&self, _blobs: Vec>) {} + fn stop_prefetch(&self) -> StorageResult<()> { Ok(()) } @@ -121,6 +123,14 @@ impl BlobCache for DummyCache { Err(StorageError::Unsupported) } + fn add_stream_prefetch_range(&self, _range: BlobRange) -> Result<()> { + unimplemented!() + } + + fn flush_stream_prefetch(&self) -> Result<()> { + unimplemented!() + } + fn read(&self, iovec: &mut BlobIoVec, bufs: &[FileVolatileSlice]) -> Result { let bios = &iovec.bi_vec; @@ -164,6 +174,15 @@ impl BlobCache for DummyCache { .map(|(n, _)| n) .map_err(|e| eother!(e)) } + + fn fetch_range_compressed_stream( + self: Arc, + _offset: u64, + _size: u64, + _prefetch: bool, + ) -> std::io::Result<()> { + unimplemented!(); + } } /// A dummy implementation of [BlobCacheMgr](../trait.BlobCacheMgr.html), simply reporting each diff --git a/storage/src/cache/filecache/mod.rs b/storage/src/cache/filecache/mod.rs index 1e38f3b3072..9a591be65b8 100644 --- a/storage/src/cache/filecache/mod.rs +++ b/storage/src/cache/filecache/mod.rs @@ -20,6 +20,7 @@ use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta}; use crate::cache::state::{ BlobStateMap, ChunkMap, DigestedChunkMap, IndexedChunkMap, NoopChunkMap, }; +use crate::cache::streaming::StreamPrefetchMgr; use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; use crate::cache::{BlobCache, BlobCacheMgr}; use crate::device::{BlobFeatures, BlobInfo}; @@ -37,6 +38,7 @@ pub struct FileCacheMgr { prefetch_config: Arc, runtime: Arc, worker_mgr: Arc, + streaming_prefetch_mgr: Arc, work_dir: String, validate: bool, disable_indexed_map: bool, @@ -62,6 +64,7 @@ impl FileCacheMgr { let metrics = BlobcacheMetrics::new(id, work_dir); let prefetch_config: Arc = Arc::new((&config.prefetch).into()); let worker_mgr = AsyncWorkerMgr::new(metrics.clone(), prefetch_config.clone())?; + let streaming_prefetch_mgr = StreamPrefetchMgr::new(prefetch_config.clone()); Ok(FileCacheMgr { blobs: Arc::new(RwLock::new(HashMap::new())), @@ -70,6 +73,7 @@ impl FileCacheMgr { prefetch_config, runtime, worker_mgr: Arc::new(worker_mgr), + streaming_prefetch_mgr: Arc::new(streaming_prefetch_mgr), work_dir: work_dir.to_owned(), disable_indexed_map: blob_cfg.disable_indexed_map, validate: config.cache_validate, @@ -100,6 +104,7 @@ impl FileCacheMgr { self.prefetch_config.clone(), self.runtime.clone(), self.worker_mgr.clone(), + self.streaming_prefetch_mgr.clone(), )?; let entry = Arc::new(entry); let mut guard = self.blobs.write().unwrap(); @@ -120,7 +125,8 @@ impl FileCacheMgr { impl BlobCacheMgr for FileCacheMgr { fn init(&self) -> Result<()> { - AsyncWorkerMgr::start(self.worker_mgr.clone()) + AsyncWorkerMgr::start(self.worker_mgr.clone())?; + StreamPrefetchMgr::start(self.streaming_prefetch_mgr.clone()) } fn destroy(&self) { @@ -183,6 +189,7 @@ impl FileCacheEntry { prefetch_config: Arc, runtime: Arc, workers: Arc, + stream_workers: Arc, ) -> Result { let is_separate_meta = blob_info.has_feature(BlobFeatures::SEPARATE); let is_tarfs = blob_info.features().is_tarfs(); @@ -328,6 +335,7 @@ impl FileCacheEntry { reader, runtime, workers, + stream_workers, blob_compressed_size, blob_uncompressed_size, diff --git a/storage/src/cache/fscache/mod.rs b/storage/src/cache/fscache/mod.rs index 5b2285c9b0e..5510058c0dc 100644 --- a/storage/src/cache/fscache/mod.rs +++ b/storage/src/cache/fscache/mod.rs @@ -16,6 +16,7 @@ use tokio::runtime::Runtime; use crate::backend::BlobBackend; use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta}; use crate::cache::state::{BlobStateMap, IndexedChunkMap, RangeMap}; +use crate::cache::streaming::StreamPrefetchMgr; use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; use crate::cache::{BlobCache, BlobCacheMgr}; use crate::device::{BlobFeatures, BlobInfo, BlobObject}; @@ -35,6 +36,7 @@ pub struct FsCacheMgr { prefetch_config: Arc, runtime: Arc, worker_mgr: Arc, + streaming_prefetch_mgr: Arc, work_dir: String, need_validation: bool, blobs_check_count: Arc, @@ -60,6 +62,7 @@ impl FsCacheMgr { let metrics = BlobcacheMetrics::new(id, work_dir); let prefetch_config: Arc = Arc::new((&config.prefetch).into()); let worker_mgr = AsyncWorkerMgr::new(metrics.clone(), prefetch_config.clone())?; + let streaming_prefetch_mgr = StreamPrefetchMgr::new(prefetch_config.clone()); BLOB_FACTORY.start_mgr_checker(); @@ -70,6 +73,7 @@ impl FsCacheMgr { prefetch_config, runtime, worker_mgr: Arc::new(worker_mgr), + streaming_prefetch_mgr: Arc::new(streaming_prefetch_mgr), work_dir: work_dir.to_owned(), need_validation: config.cache_validate, blobs_check_count: Arc::new(AtomicU8::new(0)), @@ -96,6 +100,7 @@ impl FsCacheMgr { self.prefetch_config.clone(), self.runtime.clone(), self.worker_mgr.clone(), + self.streaming_prefetch_mgr.clone(), )?; let entry = Arc::new(entry); let mut guard = self.blobs.write().unwrap(); @@ -116,7 +121,8 @@ impl FsCacheMgr { impl BlobCacheMgr for FsCacheMgr { fn init(&self) -> Result<()> { - AsyncWorkerMgr::start(self.worker_mgr.clone()) + AsyncWorkerMgr::start(self.worker_mgr.clone())?; + StreamPrefetchMgr::start(self.streaming_prefetch_mgr.clone()) } fn destroy(&self) { @@ -201,6 +207,7 @@ impl FileCacheEntry { prefetch_config: Arc, runtime: Arc, workers: Arc, + stream_workers: Arc, ) -> Result { if blob_info.has_feature(BlobFeatures::_V5_NO_EXT_BLOB_TABLE) { return Err(einval!("fscache does not support Rafs v5 blobs")); @@ -279,6 +286,7 @@ impl FileCacheEntry { reader, runtime, workers, + stream_workers, blob_compressed_size, blob_uncompressed_size: blob_info.uncompressed_size(), diff --git a/storage/src/cache/mod.rs b/storage/src/cache/mod.rs index 7d91862b78d..2b0bde58762 100644 --- a/storage/src/cache/mod.rs +++ b/storage/src/cache/mod.rs @@ -30,6 +30,7 @@ use crate::backend::{BlobBackend, BlobReader}; use crate::cache::state::ChunkMap; use crate::device::{ BlobChunkInfo, BlobInfo, BlobIoDesc, BlobIoRange, BlobIoVec, BlobObject, BlobPrefetchRequest, + BlobRange, }; use crate::meta::BlobCompressionContextInfo; use crate::utils::{alloc_buf, check_digest}; @@ -42,6 +43,7 @@ mod dummycache; mod filecache; #[cfg(target_os = "linux")] mod fscache; +mod streaming; mod worker; pub mod state; @@ -215,6 +217,8 @@ pub trait BlobCache: Send + Sync { /// It should be paired with stop_prefetch(). fn start_prefetch(&self) -> StorageResult<()>; + fn init_stream_prefetch(&self, blobs: Vec>); + /// Stop prefetching blob data in background. /// /// It should be paired with start_prefetch(). @@ -231,6 +235,10 @@ pub trait BlobCache: Send + Sync { bios: &[BlobIoDesc], ) -> StorageResult; + fn add_stream_prefetch_range(&self, range: BlobRange) -> Result<()>; + + fn flush_stream_prefetch(&self) -> Result<()>; + /// Execute filesystem data prefetch. fn prefetch_range(&self, _range: &BlobIoRange) -> Result { Err(enosys!("doesn't support prefetch_range()")) @@ -281,7 +289,27 @@ pub trait BlobCache: Send + Sync { duration ); - let chunks = chunks.iter().map(|v| v.as_ref()).collect(); + let chunks: Vec<&dyn BlobChunkInfo> = chunks.iter().map(|v| v.as_ref()).collect(); + let mut p = false; + for c in &chunks { + if c.compressed_offset() == 50365358 { + p = true; + } + } + if p { + for c in &chunks { + warn!( + "read_chunks_from_backend: chunk {} {} {} {} {} {} {}", + c.id(), + c.compressed_offset(), + c.compressed_size(), + c.uncompressed_size(), + c.is_compressed(), + c.is_encrypted(), + c.is_batch(), + ); + } + } Ok(ChunkDecompressState::new(blob_offset, self, chunks, c_buf)) } @@ -400,6 +428,78 @@ pub trait BlobCache: Send + Sync { fn get_blob_meta_info(&self) -> Result>> { Ok(None) } + + fn fetch_range_compressed_stream( + self: Arc, + offset: u64, + size: u64, + prefetch: bool, + ) -> std::io::Result<()>; +} + +/// Helper struct to manage and call BlobCompressionContextInfo. +pub struct BlobCCI { + meta: Option>, +} + +impl Default for BlobCCI { + fn default() -> Self { + Self::new() + } +} + +impl BlobCCI { + pub fn new() -> Self { + BlobCCI { meta: None } + } + + pub fn is_none(&self) -> bool { + self.meta.is_none() + } + + pub fn set_meta(&mut self, meta: Option>) -> Result<&Self> { + if meta.is_none() { + return Err(einval!("failed to get blob meta info")); + } + self.meta = meta; + Ok(self) + } + + pub fn get_batch_index(&self, chunk_idx: u32) -> Result { + if let Some(meta) = &self.meta { + meta.get_batch_index(chunk_idx) + } else { + Err(einval!("failed to get blob meta info")) + } + } + + pub fn get_compressed_offset(&self, chunk: &Arc) -> Result { + Ok(chunk.compressed_offset()) + } + + pub fn get_compressed_size(&self, chunk: &Arc) -> Result { + let size = if chunk.is_batch() { + self.meta + .as_ref() + .unwrap() + .get_compressed_size(chunk.id())? + } else { + chunk.compressed_size() + }; + Ok(size) + } + + pub fn get_compressed_info(&self, chunk: &Arc) -> Result<(u64, u32)> { + Ok(( + self.get_compressed_offset(chunk)?, + self.get_compressed_size(chunk)?, + )) + } + + pub fn get_compressed_end(&self, chunk: &Arc) -> Result { + let (offset, size) = self.get_compressed_info(chunk)?; + Ok(offset + size as u64) + } } /// An iterator to enumerate decompressed data for chunks. diff --git a/storage/src/cache/state/blob_state_map.rs b/storage/src/cache/state/blob_state_map.rs index cff376f839c..9b7d130b143 100644 --- a/storage/src/cache/state/blob_state_map.rs +++ b/storage/src/cache/state/blob_state_map.rs @@ -146,6 +146,26 @@ where res } + fn try_mark_pending(&self, chunk: &dyn BlobChunkInfo) -> StorageResult { + let ready = self.c.is_ready(chunk).map_err(StorageError::CacheIndex)?; + + if ready { + return Ok(false); + } + + let index = C::get_index(chunk); + let mut guard = self.inflight_tracer.lock().unwrap(); + + if guard.get(&index).is_some() { + // chunk is inflight or ready + Ok(false) + } else { + // mark chunk as pending + guard.insert(index, Arc::new(Slot::new())); + Ok(true) + } + } + fn clear_pending(&self, chunk: &dyn BlobChunkInfo) { let index = C::get_index(chunk); let mut guard = self.inflight_tracer.lock().unwrap(); diff --git a/storage/src/cache/state/mod.rs b/storage/src/cache/state/mod.rs index e02559de93e..240c7443264 100644 --- a/storage/src/cache/state/mod.rs +++ b/storage/src/cache/state/mod.rs @@ -74,7 +74,7 @@ pub trait ChunkMap: Any + Send + Sync { /// /// The function returns: /// - `Err(Timeout)` waiting for inflight backend IO timeouts. - /// - `Ok(true)` if the the chunk is ready. + /// - `Ok(true)` if the chunk is ready. /// - `Ok(false)` marks the chunk as pending, either set_ready_and_clear_pending() or /// clear_pending() must be called to clear the pending state. fn check_ready_and_mark_pending(&self, _chunk: &dyn BlobChunkInfo) -> StorageResult { @@ -83,7 +83,16 @@ pub trait ChunkMap: Any + Send + Sync { /// Set the chunk to ready for use and clear the pending state. fn set_ready_and_clear_pending(&self, _chunk: &dyn BlobChunkInfo) -> Result<()> { - panic!("no support of check_ready_and_mark_pending()"); + panic!("no support of set_ready_and_clear_pending()"); + } + + /// Try to mark the chunk to pending if the chunk is not ready or pending yet. + /// + /// The function returns: + /// - `Ok(true)` if the chunk is successfully turned into pending. + /// - `Ok(false)` if the chunk is pending or ready already. + fn try_mark_pending(&self, _chunk: &dyn BlobChunkInfo) -> StorageResult { + panic!("no support of try_mark_pending()"); } /// Clear the pending state of the chunk. diff --git a/storage/src/cache/streaming.rs b/storage/src/cache/streaming.rs new file mode 100644 index 00000000000..1792573c551 --- /dev/null +++ b/storage/src/cache/streaming.rs @@ -0,0 +1,317 @@ +use crate::cache::BlobCache; +use crate::device::BlobRange; +use nydus_utils::async_helper::with_runtime; +use nydus_utils::mpmc::Channel; +use std::collections::BTreeMap; +use std::io::Result; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread; +use tokio::runtime::Runtime; + +use super::worker::AsyncPrefetchConfig; + +/// Asynchronous service request message. +pub enum StreamingPrefetchMessage { + /// Asynchronous blob layer prefetch request with (offset, size) of blob on storage backend. + BlobPrefetch(Arc, u64, u64), +} + +impl StreamingPrefetchMessage { + /// Create a new asynchronous blob prefetch request message. + pub fn new_blob_prefetch(blob_cache: Arc, offset: u64, size: u64) -> Self { + StreamingPrefetchMessage::BlobPrefetch(blob_cache, offset, size) + } +} + +// 最大负债4MB +static MAX_DEBT: u64 = 0x400000; +// 小任务判断标准:<1MB +#[allow(unused)] +static MIN_TASK_SIZE: u64 = 0x100000; + +// 最小任务阈值: 512KB +static MIN_SUBMITTALBE_TASK_SIZE: u64 = 0x80000; + +// 最大合并阈值:512KB +// 两个不连续的任务,如果中间间隔小于MAX_MERGE_GAP,那么可以合并 +static MAX_MERGE_GAP: u64 = 0x80000; + +struct PrefetchBuffer { + // 用于计算预取任务的 + // 最后更新的任务(大概率是最新的任务)的end_offset + // 允许指向0或者已经被remove的任务 + last_modified: u64, + // 正在等待用于计算的任务队列 + buf: BTreeMap, + // 目前为止总共计算了多少预取数据 + total_processed: u64, + blobs: Vec>, +} +pub(crate) struct StreamPrefetchMgr { + workers: AtomicU32, + threads_count: u32, + active: AtomicBool, + waiting: Mutex, + // 保存任务的队列 + new_channel: Arc>, + // 保存小任务的队列 + new_channel_small: Arc>, +} + +impl StreamPrefetchMgr { + pub fn new(prefetch_config: Arc) -> Self { + Self { + threads_count: prefetch_config.threads_count as u32, + workers: AtomicU32::new(0), + active: AtomicBool::new(false), + waiting: Mutex::new(PrefetchBuffer { + last_modified: 0, + buf: BTreeMap::new(), + total_processed: 0, + blobs: Vec::new(), + }), + new_channel: Arc::new(Channel::new()), + new_channel_small: Arc::new(Channel::new()), + } + } + + /// Create working threads and start the event loop. + pub fn start(mgr: Arc) -> Result<()> { + Self::start_prefetch_workers(mgr)?; + + Ok(()) + } + + pub fn init_blobs(&self, blobs: Vec>) { + let mut waiting = self.waiting.lock().unwrap(); + waiting.blobs = blobs; + } + + // 尝试将需要提交的range提交 + fn submit_ranges(&self, waiting: &mut PrefetchBuffer) -> Result<()> { + // 从后往前遍历,找到第一个不超过阈值的任务 + let mut not_exceeded_offset = u64::MAX; + for end_offset in waiting.buf.keys() { + if waiting.total_processed - end_offset < MAX_DEBT { + not_exceeded_offset = *end_offset; + break; + } + } + + // 弹出需要被提交的任务 + let to_keep = waiting.buf.split_off(¬_exceeded_offset); + let to_submit = std::mem::take(&mut waiting.buf); + waiting.buf = to_keep; + + // 提交任务 + for (end_offset, r) in to_submit { + // 将太小的任务放回 + if r.end - r.offset < MIN_SUBMITTALBE_TASK_SIZE { + waiting.buf.insert(end_offset, r); + } else { + self.merge_and_send_msg(end_offset, &r, waiting)?; + } + } + Ok(()) + } + + pub fn add_prefetch_range(&self, r_new: BlobRange) -> Result<()> { + let mut waiting = self.waiting.lock().unwrap(); + + //1. 尝试merge到现有任务中 + // 先判断last_modified,允许last_modified指向不存在的key + let mut merged = false; + + let last_modified = waiting.buf.get_key_value(&waiting.last_modified); + for (end_offset, r_old) in last_modified.into_iter().chain(waiting.buf.iter()) { + if let Some((added_size, r_merged)) = r_old.try_merge(&r_new, MAX_MERGE_GAP) { + merged = true; + + // remove old and add merged + let end_offset = *end_offset; + waiting.buf.remove(&end_offset); + // 在原先的end_processed基础上增加 + let new_end_offset = end_offset + added_size; + waiting.buf.insert(new_end_offset, r_merged); + waiting.last_modified = new_end_offset; + waiting.total_processed += added_size; + + break; + } + } + + // 3. 检查旧任务是否需要提交 + self.submit_ranges(&mut waiting)?; + + // 2.append为新任务 + if !merged { + let r_new_size = r_new.end - r_new.offset; + waiting.total_processed += r_new_size; + waiting.last_modified = waiting.total_processed; + + let p = waiting.total_processed; + waiting.buf.insert(p, r_new); + } + + Ok(()) + } + + // 递归式地merge将要send的range + // 直到该range无法merge,则send + fn merge_and_send_msg( + &self, + send_e_of: u64, + send: &BlobRange, + waiting: &mut PrefetchBuffer, + ) -> Result<()> { + for (old_e_of, old) in waiting.buf.iter() { + let (prev_e_of, r_prev, r_next) = if *old_e_of < send_e_of { + (*old_e_of, old, send) + } else { + (send_e_of, send, old) + }; + if let Some((added_size, merged)) = r_prev.try_merge(&r_next, MAX_MERGE_GAP) { + // remove old and add merged + let old_e_of = *old_e_of; + waiting.buf.remove(&old_e_of); + // 在原先的end_processed基础上增加 + let new_e_of = prev_e_of + added_size; + + warn!( + "CMDebug!!!!!!!!!!: merge_and_send_msg, new_e_of: {}", + new_e_of + ); + // 尝试进一步merge + return self.merge_and_send_msg(new_e_of, &merged, waiting); + } + } + // merge失败,直接send原始的range + self.send_msg(send, &waiting.blobs) + } + + fn send_msg(&self, r: &BlobRange, blobs: &[Arc]) -> Result<()> { + let msg = StreamingPrefetchMessage::new_blob_prefetch( + blobs[r.blob_idx as usize].clone(), + r.offset, + r.end - r.offset, + ); + let channel = if r.end - r.offset < MIN_TASK_SIZE { + &self.new_channel_small + } else { + &self.new_channel + }; + debug!( + "CMDebug: send_msg, offset: {}, size: {}", + r.offset, + r.end - r.offset + ); + channel.send(msg).map_err(|_| { + std::io::Error::new(std::io::ErrorKind::Other, "Send prefetch message failed") + }) + } + + // 清空等待队列,提交全部任务 + pub fn flush_waiting_queue(&self) -> Result<()> { + let mut waiting = self.waiting.lock().unwrap(); + // let mut buf = std::mem::take(&mut waiting.buf); + + let end_offsets = waiting.buf.keys().cloned().collect::>(); + for end_offset in end_offsets { + if let Some(r) = waiting.buf.remove(&end_offset) { + self.merge_and_send_msg(end_offset, &r, &mut waiting)?; + } + } + assert!(waiting.buf.is_empty()); + + Ok(()) + } + + fn start_prefetch_workers(mgr: Arc) -> Result<()> { + for num in 0..mgr.threads_count + 1 { + let mgr2 = mgr.clone(); + let res = thread::Builder::new() + .name(format!("nydus_storage_worker_{}", num)) + .spawn(move || { + mgr2.grow_n(1); + debug!("CMDebug: start_prefetch_workers, {}", num); + + with_runtime(|rt| { + if num == 0 { + rt.block_on(Self::handle_prefetch_requests_small(mgr2.clone(), rt)); + } else { + rt.block_on(Self::handle_prefetch_requests(mgr2.clone(), rt)); + } + }); + + mgr2.shrink_n(1); + info!("storage: worker thread {} exits.", num) + }); + + if let Err(e) = res { + error!("storage: failed to create worker thread, {:?}", e); + return Err(e); + } + } + mgr.active.store(true, Ordering::Release); + Ok(()) + } + + async fn handle_prefetch_requests(mgr: Arc, rt: &Runtime) { + loop { + let msg; + tokio::select! { + Ok(m) = mgr.new_channel.recv() => msg = m, + Ok(m) = mgr.new_channel_small.recv() => msg = m, + else => break, + } + match msg { + StreamingPrefetchMessage::BlobPrefetch(blob_cache, offset, size) => { + rt.spawn_blocking(move || { + let _ = Self::handle_blob_prefetch_request(blob_cache, offset, size); + }); + } + } + } + } + + // 专门处理小blob + async fn handle_prefetch_requests_small(mgr: Arc, rt: &Runtime) { + while let Ok(msg) = mgr.new_channel_small.recv().await { + match msg { + StreamingPrefetchMessage::BlobPrefetch(blob_cache, offset, size) => { + rt.spawn_blocking(move || { + let _ = Self::handle_blob_prefetch_request(blob_cache, offset, size); + }); + } + } + } + } + + fn handle_blob_prefetch_request( + cache: Arc, + offset: u64, + size: u64, + ) -> Result<()> { + debug!( + "CMDebug: storage: prefetch blob {} offset {} size {}", + cache.blob_id(), + offset, + size + ); + if size == 0 { + return Ok(()); + } + + cache.fetch_range_compressed_stream(offset, size, true)?; + + Ok(()) + } + + fn shrink_n(&self, n: u32) { + self.workers.fetch_sub(n, Ordering::Relaxed); + } + fn grow_n(&self, n: u32) { + self.workers.fetch_add(n, Ordering::Relaxed); + } +} diff --git a/storage/src/device.rs b/storage/src/device.rs index 3eb45d0e215..3c28021f224 100644 --- a/storage/src/device.rs +++ b/storage/src/device.rs @@ -41,7 +41,7 @@ use nydus_utils::compress; use nydus_utils::crypt::{self, Cipher, CipherContext}; use nydus_utils::digest::{self, RafsDigest}; -use crate::cache::BlobCache; +use crate::cache::{BlobCCI, BlobCache}; use crate::factory::BLOB_FACTORY; pub(crate) const BLOB_FEATURE_INCOMPAT_MASK: u32 = 0x0000_ffff; @@ -663,6 +663,12 @@ impl From> for BlobIoChunk { } } +impl AsRef> for BlobIoChunk { + fn as_ref(&self) -> &Arc { + &self.0 + } +} + impl BlobChunkInfo for BlobIoChunk { fn chunk_id(&self) -> &RafsDigest { self.0.chunk_id() @@ -788,7 +794,7 @@ pub struct BlobIoVec { /// Total size of blob IOs to be performed. bi_size: u64, /// Array of blob IOs, these IOs should executed sequentially. - pub(crate) bi_vec: Vec, + pub bi_vec: Vec, } impl BlobIoVec { @@ -1047,6 +1053,54 @@ pub struct BlobPrefetchRequest { pub len: u64, } +pub struct BlobRange { + pub blob_idx: u32, + /// Compressed offset into the blob to prefetch data. + pub offset: u64, + pub end: u64, +} + +impl BlobRange { + // 尝试合并两个BlobRange,如果合并成功,返回合并后的BlobRange,否则返回None + // 还返回合并r_new所增加的大小(不计算重复区域) + pub fn try_merge(&self, r_new: &BlobRange, max_gap: u64) -> Option<(u64, Self)> { + if self.blob_idx != r_new.blob_idx { + return None; + } + + // 确保 r1 总是在 r2 的左侧 + let (l, r) = if self.offset <= r_new.offset { + (self, r_new) + } else { + (r_new, self) + }; + + if l.end + max_gap < r.offset { + // cannot merge + None + } else { + // can merge + let merged = BlobRange { + blob_idx: r.blob_idx, + offset: l.offset, + end: l.end.max(r.end), + }; + let added_size = merged.end - merged.offset - self.end + self.offset; + Some((added_size, merged)) + } + } +} + +impl Debug for BlobRange { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.debug_struct("BlobRange") + .field("blob_idx", &self.blob_idx) + .field("offset", &self.offset) + .field("end", &self.end) + .finish() + } +} + /// Trait to provide direct access to underlying uncompressed blob file. /// /// The suggested flow to make use of an `BlobObject` is as below: @@ -1199,6 +1253,18 @@ impl BlobDevice { Ok(()) } + pub fn add_stream_prefetch_range(&self, range: BlobRange) -> io::Result<()> { + let state = self.blobs.load(); + let blob = &state[range.blob_idx as usize]; + blob.add_stream_prefetch_range(range) + } + + pub fn flush_stream_prefetch(&self) -> io::Result<()> { + let state = self.blobs.load(); + let blob = &state[0]; + blob.flush_stream_prefetch() + } + /// Start the background blob data prefetch task. pub fn start_prefetch(&self) { for blob in self.blobs.load().iter() { @@ -1206,6 +1272,15 @@ impl BlobDevice { } } + pub fn init_stream_prefetch(&self) { + let blobs: Vec> = self.blobs.load().to_vec(); + if blobs.is_empty() { + error!("init_stream_prefetch failed: empty blobs in blob device"); + return; + } + blobs[0].clone().init_stream_prefetch(blobs); + } + /// Stop the background blob data prefetch task. pub fn stop_prefetch(&self) { for blob in self.blobs.load().iter() { @@ -1310,6 +1385,21 @@ impl BlobDevice { None } + + pub fn get_all_blob_cci(&self) -> Vec { + let state: arc_swap::Guard>>> = self.blobs.load(); + let state: &Vec> = &state; + + let mut blob_ccis = Vec::new(); + blob_ccis.resize_with(self.blob_count, BlobCCI::new); + + for (cci, cache) in blob_ccis.iter_mut().zip(state.iter()) { + let meta = cache.get_blob_meta_info().unwrap_or(None); + let _ = cci.set_meta(meta); + } + + blob_ccis + } } /// Struct to execute Io requests with a single blob. diff --git a/storage/src/meta/chunk_info_v1.rs b/storage/src/meta/chunk_info_v1.rs index 3d20998f32b..da199a16ee6 100644 --- a/storage/src/meta/chunk_info_v1.rs +++ b/storage/src/meta/chunk_info_v1.rs @@ -54,6 +54,10 @@ impl BlobMetaChunkInfo for BlobChunkInfoV1Ondisk { self.comp_info = u64::to_le(size_low | size_high | offset); } + fn compressed_size_batch(&self, _state: &BlobCompressionContext) -> u32 { + self.compressed_size() + } + fn uncompressed_offset(&self) -> u64 { u64::from_le(self.uncomp_info) & BLOB_CC_V1_CHUNK_UNCOMP_OFFSET_MASK } diff --git a/storage/src/meta/chunk_info_v2.rs b/storage/src/meta/chunk_info_v2.rs index 913af3243cf..de35a7b5d36 100644 --- a/storage/src/meta/chunk_info_v2.rs +++ b/storage/src/meta/chunk_info_v2.rs @@ -129,6 +129,15 @@ impl BlobMetaChunkInfo for BlobChunkInfoV2Ondisk { self.comp_info |= u64::to_le(size << CHUNK_V2_COMP_SIZE_SHIFT); } + fn compressed_size_batch(&self, meta: &BlobCompressionContext) -> u32 { + if self.is_batch() { + let batch_idx = self.get_batch_index().unwrap() as usize; + meta.get_batch_context(batch_idx).unwrap().compressed_size() + } else { + self.compressed_size() + } + } + fn uncompressed_offset(&self) -> u64 { (u64::from_le(self.uncomp_info) & CHUNK_V2_UNCOMP_OFFSET_MASK) << CHUNK_V2_UNCOMP_OFFSET_SHIFT diff --git a/storage/src/meta/mod.rs b/storage/src/meta/mod.rs index 77bea8f1053..0c83c536359 100644 --- a/storage/src/meta/mod.rs +++ b/storage/src/meta/mod.rs @@ -1549,8 +1549,8 @@ impl BlobMetaChunkArray { chunk_info_array: &[T], start: u64, end: u64, - batch_end: u64, - batch_size: u64, + amplify_end: u64, + amplify_size: u64, prefetch: bool, ) -> Result>> { let mut vec = Vec::with_capacity(512); @@ -1587,7 +1587,7 @@ impl BlobMetaChunkArray { } if entry.get_zran_index()? != zran_last { let ctx = &state.zran_info_array[entry.get_zran_index()? as usize]; - if ctx.in_offset() + ctx.in_size() as u64 - pos > batch_size + if ctx.in_offset() + ctx.in_size() as u64 - pos > amplify_size && entry.compressed_offset() > end { return Ok(vec); @@ -1615,38 +1615,32 @@ impl BlobMetaChunkArray { } vec.push(BlobMetaChunk::new(index, state)); - let mut last_end = entry.compressed_end(); - if last_end >= batch_end { - Ok(vec) - } else { - while index + 1 < chunk_info_array.len() { - index += 1; - - let entry = Self::get_chunk_entry(state, chunk_info_array, index)?; - // Avoid read amplify if next chunk is too big. - if last_end >= end && entry.compressed_end() > batch_end { - return Ok(vec); - } - - vec.push(BlobMetaChunk::new(index, state)); - last_end = entry.compressed_end(); - if last_end >= batch_end { - return Ok(vec); - } + let mut last_end = entry.compressed_end_batch(state); + while index + 1 < chunk_info_array.len() { + index += 1; + let entry = Self::get_chunk_entry(state, chunk_info_array, index)?; + + // Avoid adding, only if entry_start >= end && entry_end > amplify_end. + // Support batch chunks. + if entry.compressed_offset() >= end && entry.compressed_end_batch(state) > amplify_end { + return Ok(vec); } - if last_end >= end || (prefetch && !vec.is_empty()) { - Ok(vec) - } else { - Err(einval!(format!( - "entry not found index {} chunk_info_array.len {}, last_end 0x{:x}, end 0x{:x}, blob compressed size 0x{:x}", - index, - chunk_info_array.len(), - last_end, - end, - state.compressed_size, - ))) - } + vec.push(BlobMetaChunk::new(index, state)); + last_end = entry.compressed_end_batch(state); + } + + if last_end >= end || (prefetch && !vec.is_empty()) { + Ok(vec) + } else { + Err(einval!(format!( + "entry not found index {} chunk_info_array.len {}, last_end 0x{:x}, end 0x{:x}, blob compressed size 0x{:x}", + index, + chunk_info_array.len(), + last_end, + end, + state.compressed_size, + ))) } } @@ -1896,11 +1890,21 @@ pub trait BlobMetaChunkInfo { /// Set compressed size of the chunk. fn set_compressed_size(&mut self, size: u32); + /// Get compressed size of the whole batch chunk. + /// If the chunk is not a batch chunk, fallback to `compressed_size`. + fn compressed_size_batch(&self, state: &BlobCompressionContext) -> u32; + /// Get end of compressed data of the chunk. fn compressed_end(&self) -> u64 { self.compressed_offset() + self.compressed_size() as u64 } + /// Get end of compressed data of the whole batch chunk. + /// If the chunk is not a batch chunk, fallback to `compressed_end`. + fn compressed_end_batch(&self, state: &BlobCompressionContext) -> u64 { + self.compressed_offset() + self.compressed_size_batch(state) as u64 + } + /// Get uncompressed offset of the chunk. fn uncompressed_offset(&self) -> u64;