composefs_oci/
tar.rs

1//! TAR archive processing and split stream conversion.
2//!
3//! This module handles the conversion of tar archives (container image layers) into composefs split streams.
4//! It provides both synchronous and asynchronous tar processing, intelligently deciding whether to store
5//! file content inline in the split stream or externally in the object store based on file size.
6//!
7//! Key components include the `split()` and `split_async()` functions for converting tar streams,
8//! `get_entry()` for reading back tar entries from split streams, and comprehensive support for
9//! tar format features including GNU long names, PAX extensions, and various file types.
10//! The `TarEntry` and `TarItem` types represent processed tar entries in composefs format.
11
12use std::{
13    cell::RefCell,
14    collections::BTreeMap,
15    ffi::{OsStr, OsString},
16    fmt,
17    fs::File,
18    io::Read,
19    os::unix::prelude::{OsStrExt, OsStringExt},
20    path::PathBuf,
21    sync::Arc,
22};
23
24use anyhow::{bail, ensure, Result};
25use bytes::Bytes;
26use rustix::fs::makedev;
27use tar::{EntryType, Header, PaxExtensions};
28use tokio::{
29    io::{AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt},
30    sync::mpsc,
31};
32
33use composefs::{
34    dumpfile,
35    fsverity::FsVerityHashValue,
36    repository::Repository,
37    splitstream::{SplitStreamBuilder, SplitStreamData, SplitStreamReader, SplitStreamWriter},
38    tree::{LeafContent, RegularFile, Stat},
39    util::{read_exactish, read_exactish_async},
40    INLINE_CONTENT_MAX,
41};
42
43fn read_header<R: Read>(reader: &mut R) -> Result<Option<Header>> {
44    let mut header = Header::new_gnu();
45    if read_exactish(reader, header.as_mut_bytes())? {
46        Ok(Some(header))
47    } else {
48        Ok(None)
49    }
50}
51
52async fn read_header_async(reader: &mut (impl AsyncRead + Unpin)) -> Result<Option<Header>> {
53    let mut header = Header::new_gnu();
54    if read_exactish_async(reader, header.as_mut_bytes()).await? {
55        Ok(Some(header))
56    } else {
57        Ok(None)
58    }
59}
60
61/// Splits the tar file from tar_stream into a Split Stream.  The store_data function is
62/// responsible for ensuring that "external data" is in the composefs repository and returns the
63/// fsverity hash value of that data.
64pub fn split(
65    tar_stream: &mut impl Read,
66    writer: &mut SplitStreamWriter<impl FsVerityHashValue>,
67) -> Result<()> {
68    while let Some(header) = read_header(tar_stream)? {
69        // the header always gets stored as inline data
70        writer.write_inline(header.as_bytes());
71
72        if header.as_bytes() == &[0u8; 512] {
73            continue;
74        }
75
76        // read the corresponding data, if there is any
77        let actual_size = header.entry_size()? as usize;
78        let storage_size = actual_size.next_multiple_of(512);
79        let mut buffer = vec![0u8; storage_size];
80        tar_stream.read_exact(&mut buffer)?;
81
82        if header.entry_type() == EntryType::Regular && actual_size > INLINE_CONTENT_MAX {
83            // non-empty regular file: store the data external and the trailing padding inline
84            writer.write_external(&buffer[..actual_size])?;
85            writer.write_inline(&buffer[actual_size..]);
86        } else {
87            // else: store the data inline in the split stream
88            writer.write_inline(&buffer);
89        }
90    }
91    Ok(())
92}
93
94/// Receive data from channel, write to tmpfile, compute verity, and store object.
95///
96/// This runs in a blocking task to avoid blocking the async runtime.
97fn receive_and_finalize_object<ObjectID: FsVerityHashValue>(
98    rx: mpsc::Receiver<Bytes>,
99    size: u64,
100    repo: &Repository<ObjectID>,
101) -> Result<ObjectID> {
102    use std::io::Write;
103
104    // Create tmpfile in the blocking context
105    let tmpfile_fd = repo.create_object_tmpfile()?;
106    let mut tmpfile = std::io::BufWriter::new(File::from(tmpfile_fd));
107
108    // Receive chunks and write to tmpfile
109    let mut rx = rx;
110    while let Some(chunk) = rx.blocking_recv() {
111        tmpfile.write_all(&chunk)?;
112    }
113
114    // Flush and get the File back
115    let tmpfile = tmpfile.into_inner()?;
116
117    // Finalize: enable verity, get digest, link into objects/
118    repo.finalize_object_tmpfile(tmpfile, size)
119}
120
121/// Asynchronously splits a tar archive into a composefs split stream.
122///
123/// Similar to `split()` but processes the tar stream asynchronously with parallel
124/// object storage. Large files are streamed to O_TMPFILE via a channel, and their
125/// fs-verity digests are computed in background blocking tasks. This avoids blocking
126/// the async runtime while allowing multiple files to be processed concurrently.
127///
128/// Concurrency is limited to `available_parallelism()` to avoid overwhelming the
129/// system with too many concurrent I/O operations.
130///
131/// Files larger than `INLINE_CONTENT_MAX` are stored externally in the object store,
132/// while smaller files and metadata are stored inline in the split stream.
133///
134/// # Arguments
135/// * `tar_stream` - The async buffered tar stream to read from
136/// * `repo` - The repository for creating tmpfiles and storing objects
137/// * `content_type` - The content type identifier for the splitstream
138///
139/// Returns the fs-verity object ID of the stored splitstream.
140pub async fn split_async<ObjectID: FsVerityHashValue>(
141    mut tar_stream: impl AsyncBufRead + Unpin,
142    repo: Arc<Repository<ObjectID>>,
143    content_type: u64,
144) -> Result<ObjectID> {
145    // Use the repository's shared semaphore to limit concurrent object storage
146    let semaphore = repo.write_semaphore();
147
148    let mut builder = SplitStreamBuilder::new(repo.clone(), content_type);
149
150    while let Some(header) = read_header_async(&mut tar_stream).await? {
151        // The header always gets stored as inline data
152        builder.push_inline(header.as_bytes());
153
154        if header.as_bytes() == &[0u8; 512] {
155            continue;
156        }
157
158        // Read the corresponding data, if there is any
159        let actual_size = header.entry_size()? as usize;
160        let storage_size = actual_size.next_multiple_of(512);
161
162        if header.entry_type() == EntryType::Regular && actual_size > INLINE_CONTENT_MAX {
163            // Large file: stream to O_TMPFILE via channel to avoid blocking async runtime
164
165            // Acquire permit before starting
166            let permit = semaphore.clone().acquire_owned().await?;
167
168            // Create a channel for streaming data to the blocking task.
169            // Buffer a few chunks to allow async/blocking to run concurrently.
170            let (tx, rx) = mpsc::channel::<Bytes>(4);
171
172            // Spawn blocking task that receives data, writes to tmpfile, computes verity
173            let repo_clone = repo.clone();
174            let handle = tokio::task::spawn_blocking(move || {
175                let result = receive_and_finalize_object(rx, actual_size as u64, &repo_clone);
176                drop(permit); // Release permit when done
177                result
178            });
179
180            // Send data chunks to the blocking task using fill_buf to avoid extra copies
181            let mut remaining = actual_size;
182            while remaining > 0 {
183                let chunk = tar_stream.fill_buf().await?;
184                if chunk.is_empty() {
185                    bail!("unexpected EOF reading tar entry");
186                }
187                let chunk_size = std::cmp::min(remaining, chunk.len());
188                // If send fails, the receiver dropped (task panicked/errored)
189                if tx
190                    .send(Bytes::copy_from_slice(&chunk[..chunk_size]))
191                    .await
192                    .is_err()
193                {
194                    break;
195                }
196                tar_stream.consume(chunk_size);
197                remaining -= chunk_size;
198            }
199            drop(tx); // Close channel to signal EOF
200
201            // Push external entry to builder (will be resolved at finish())
202            builder.push_external(handle, actual_size as u64);
203
204            // Read and push padding inline (must come after external ref)
205            let padding_size = storage_size - actual_size;
206            if padding_size > 0 {
207                let mut padding = vec![0u8; padding_size];
208                tar_stream.read_exact(&mut padding).await?;
209                builder.push_inline(&padding);
210            }
211        } else {
212            // Small file or non-regular entry: buffer and write inline
213            let mut buffer = vec![0u8; storage_size];
214            tar_stream.read_exact(&mut buffer).await?;
215            builder.push_inline(&buffer);
216        }
217    }
218
219    // Finalize: await all handles, build stream, store it
220    builder.finish().await
221}
222
223/// Represents the content type of a tar entry.
224///
225/// Tar entries can be directories, regular files/symlinks/devices (leaf nodes), or hardlinks
226/// to existing files. This enum captures the different types of content that can appear in a tar archive.
227#[derive(Debug)]
228pub enum TarItem<ObjectID: FsVerityHashValue> {
229    /// A directory entry.
230    Directory,
231    /// A leaf node (regular file, symlink, device, or fifo).
232    Leaf(LeafContent<ObjectID>),
233    /// A hardlink pointing to another path.
234    Hardlink(OsString),
235}
236
237/// Represents a complete tar entry extracted from a split stream.
238///
239/// Contains the full metadata and content for a single file or directory from a tar archive,
240/// including its path, stat information (permissions, ownership, timestamps), and the actual content.
241#[derive(Debug)]
242pub struct TarEntry<ObjectID: FsVerityHashValue> {
243    /// The absolute path of the entry in the filesystem.
244    pub path: PathBuf,
245    /// File metadata (mode, uid, gid, mtime, xattrs).
246    pub stat: Stat,
247    /// The content or type of this entry.
248    pub item: TarItem<ObjectID>,
249}
250
251impl<ObjectID: FsVerityHashValue> fmt::Display for TarEntry<ObjectID> {
252    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
253        match self.item {
254            TarItem::Hardlink(ref target) => dumpfile::write_hardlink(fmt, &self.path, target),
255            TarItem::Directory => dumpfile::write_directory(fmt, &self.path, &self.stat, 1),
256            TarItem::Leaf(ref content) => {
257                dumpfile::write_leaf(fmt, &self.path, &self.stat, content, 1)
258            }
259        }
260    }
261}
262
263fn path_from_tar(pax: Option<Box<[u8]>>, gnu: Vec<u8>, short: &[u8]) -> PathBuf {
264    // Prepend leading /
265    let mut path = vec![b'/'];
266    if let Some(name) = pax {
267        path.extend(name);
268    } else if !gnu.is_empty() {
269        path.extend(gnu);
270    } else {
271        path.extend(short);
272    }
273
274    // Drop trailing '/' characters in case of directories.
275    path.pop_if(|x| x == &b'/');
276
277    PathBuf::from(OsString::from_vec(path))
278}
279
280fn symlink_target_from_tar(pax: Option<Box<[u8]>>, gnu: Vec<u8>, short: &[u8]) -> Box<OsStr> {
281    if let Some(name) = pax {
282        OsStr::from_bytes(name.as_ref()).into()
283    } else if !gnu.is_empty() {
284        OsStr::from_bytes(&gnu).into()
285    } else {
286        OsStr::from_bytes(short).into()
287    }
288}
289
290/// Reads and parses the next tar entry from a split stream.
291///
292/// Decodes tar headers and data from a composefs split stream, handling both inline and
293/// external content storage. Supports GNU long name/link extensions, PAX headers, and
294/// extended attributes. Returns `None` when the end of the archive is reached.
295///
296/// Returns the parsed tar entry, or `None` if the end of the stream is reached.
297pub fn get_entry<ObjectID: FsVerityHashValue>(
298    reader: &mut SplitStreamReader<ObjectID>,
299) -> Result<Option<TarEntry<ObjectID>>> {
300    // We don't have a way to drive the standard tar crate that lets us feed it random bits of
301    // header data while continuing to handle the external references as references.  That means we
302    // have to do the header interpretation ourselves, including handling of PAX/GNU extensions for
303    // xattrs and long filenames.
304    //
305    // We try to use as much of the tar crate as possible to help us with this.
306    let mut gnu_longlink: Vec<u8> = vec![];
307    let mut gnu_longname: Vec<u8> = vec![];
308    let mut pax_longlink: Option<Box<[u8]>> = None;
309    let mut pax_longname: Option<Box<[u8]>> = None;
310    let mut xattrs = BTreeMap::new();
311
312    let mut buf = [0u8; 512];
313    loop {
314        if !reader.read_inline_exact(&mut buf)? || buf == [0u8; 512] {
315            return Ok(None);
316        }
317
318        let header = tar::Header::from_byte_slice(&buf);
319
320        let size = header.entry_size()?;
321        let stored_size = size.next_multiple_of(512);
322
323        let item = match reader.read_exact(size as usize, stored_size as usize)? {
324            SplitStreamData::External(id) => match header.entry_type() {
325                EntryType::Regular | EntryType::Continuous => {
326                    ensure!(
327                        size as usize > INLINE_CONTENT_MAX,
328                        "Splitstream incorrectly stored a small ({size} byte) file external"
329                    );
330                    TarItem::Leaf(LeafContent::Regular(RegularFile::External(id, size)))
331                }
332                _ => bail!("Unsupported external-chunked entry {header:?} {id:?}"),
333            },
334            SplitStreamData::Inline(content) => match header.entry_type() {
335                EntryType::GNULongLink => {
336                    gnu_longlink.extend(content);
337                    gnu_longlink.pop_if(|x| *x == b'\0');
338
339                    continue;
340                }
341                EntryType::GNULongName => {
342                    gnu_longname.extend(content);
343                    gnu_longname.pop_if(|x| *x == b'\0');
344                    continue;
345                }
346                EntryType::XGlobalHeader => {
347                    todo!();
348                }
349                EntryType::XHeader => {
350                    for item in PaxExtensions::new(&content) {
351                        let extension = item?;
352                        let key = extension.key()?;
353                        let value = Box::from(extension.value_bytes());
354
355                        if key == "path" {
356                            pax_longname = Some(value);
357                        } else if key == "linkpath" {
358                            pax_longlink = Some(value);
359                        } else if let Some(xattr) = key.strip_prefix("SCHILY.xattr.") {
360                            xattrs.insert(Box::from(OsStr::new(xattr)), value);
361                        }
362                    }
363                    continue;
364                }
365                EntryType::Directory => TarItem::Directory,
366                EntryType::Regular | EntryType::Continuous => {
367                    ensure!(
368                        content.len() <= INLINE_CONTENT_MAX,
369                        "Splitstream incorrectly stored a large ({} byte) file inline",
370                        content.len()
371                    );
372                    TarItem::Leaf(LeafContent::Regular(RegularFile::Inline(content)))
373                }
374                EntryType::Link => TarItem::Hardlink({
375                    let Some(link_name) = header.link_name_bytes() else {
376                        bail!("link without a name?")
377                    };
378                    OsString::from(path_from_tar(pax_longlink, gnu_longlink, &link_name))
379                }),
380                EntryType::Symlink => TarItem::Leaf(LeafContent::Symlink({
381                    let Some(link_name) = header.link_name_bytes() else {
382                        bail!("symlink without a name?")
383                    };
384                    symlink_target_from_tar(pax_longlink, gnu_longlink, &link_name)
385                })),
386                EntryType::Block => TarItem::Leaf(LeafContent::BlockDevice(
387                    match (header.device_major()?, header.device_minor()?) {
388                        (Some(major), Some(minor)) => makedev(major, minor),
389                        _ => bail!("Device entry without device numbers?"),
390                    },
391                )),
392                EntryType::Char => TarItem::Leaf(LeafContent::CharacterDevice(
393                    match (header.device_major()?, header.device_minor()?) {
394                        (Some(major), Some(minor)) => makedev(major, minor),
395                        _ => bail!("Device entry without device numbers?"),
396                    },
397                )),
398                EntryType::Fifo => TarItem::Leaf(LeafContent::Fifo),
399                _ => {
400                    todo!("Unsupported entry {:?}", header);
401                }
402            },
403        };
404
405        return Ok(Some(TarEntry {
406            path: path_from_tar(pax_longname, gnu_longname, &header.path_bytes()),
407            stat: Stat {
408                st_uid: header.uid()? as u32,
409                st_gid: header.gid()? as u32,
410                st_mode: header.mode()?,
411                st_mtim_sec: header.mtime()? as i64,
412                xattrs: RefCell::new(xattrs),
413            },
414            item,
415        }));
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use crate::TAR_LAYER_CONTENT_TYPE;
422
423    use super::*;
424    use composefs::{
425        fsverity::Sha256HashValue, generic_tree::LeafContent, repository::Repository,
426        splitstream::SplitStreamReader,
427    };
428    use std::{io::Cursor, path::Path, sync::Arc};
429    use tar::Builder;
430
431    use once_cell::sync::Lazy;
432    use std::sync::Mutex;
433
434    static TEST_TEMPDIRS: Lazy<Mutex<Vec<tempfile::TempDir>>> =
435        Lazy::new(|| Mutex::new(Vec::new()));
436
437    pub(crate) fn create_test_repository() -> Result<Arc<Repository<Sha256HashValue>>> {
438        // Create a temporary repository for testing and store it in static
439        let tempdir = tempfile::TempDir::new().unwrap();
440        let fd = rustix::fs::open(
441            tempdir.path(),
442            rustix::fs::OFlags::CLOEXEC | rustix::fs::OFlags::PATH,
443            0.into(),
444        )?;
445
446        // Store tempdir in static to keep it alive
447        {
448            let mut guard = TEST_TEMPDIRS.lock().unwrap();
449            guard.push(tempdir);
450        }
451
452        let mut repo = Repository::open_path(&fd, ".").unwrap();
453        repo.set_insecure(true);
454
455        Ok(Arc::new(repo))
456    }
457
458    /// Helper method to append a file to a tar builder with sensible defaults
459    fn append_file(
460        builder: &mut Builder<&mut Vec<u8>>,
461        path: &str,
462        content: &[u8],
463    ) -> Result<tar::Header> {
464        let mut header = tar::Header::new_gnu();
465        header.set_mode(0o644);
466        header.set_uid(1000);
467        header.set_gid(1000);
468        header.set_mtime(1234567890);
469        header.set_size(content.len() as u64);
470        header.set_entry_type(tar::EntryType::Regular);
471        builder.append_data(&mut header, path, content)?;
472        Ok(header)
473    }
474
475    /// Helper method to process tar data through split/get_entry pipeline
476    fn read_all_via_splitstream(tar_data: Vec<u8>) -> Result<Vec<TarEntry<Sha256HashValue>>> {
477        let mut tar_cursor = Cursor::new(tar_data);
478        let repo = create_test_repository()?;
479        let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE);
480
481        split(&mut tar_cursor, &mut writer)?;
482        let object_id = writer.done()?;
483
484        let mut reader: SplitStreamReader<Sha256HashValue> = SplitStreamReader::new(
485            repo.open_object(&object_id)?.into(),
486            Some(TAR_LAYER_CONTENT_TYPE),
487        )?;
488
489        let mut entries = Vec::new();
490        while let Some(entry) = get_entry(&mut reader)? {
491            entries.push(entry);
492        }
493        Ok(entries)
494    }
495
496    #[test]
497    fn test_empty_tar() {
498        let mut tar_data = Vec::new();
499        {
500            let mut builder = Builder::new(&mut tar_data);
501            builder.finish().unwrap();
502        }
503
504        let mut tar_cursor = Cursor::new(tar_data);
505        let repo = create_test_repository().unwrap();
506        let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE);
507
508        split(&mut tar_cursor, &mut writer).unwrap();
509        let object_id = writer.done().unwrap();
510
511        let mut reader: SplitStreamReader<Sha256HashValue> = SplitStreamReader::new(
512            repo.open_object(&object_id).unwrap().into(),
513            Some(TAR_LAYER_CONTENT_TYPE),
514        )
515        .unwrap();
516        assert!(get_entry(&mut reader).unwrap().is_none());
517    }
518
519    #[test]
520    fn test_single_small_file() {
521        let mut tar_data = Vec::new();
522        let original_header = {
523            let mut builder = Builder::new(&mut tar_data);
524
525            // Add one small regular file
526            let content = b"Hello, World!";
527            let header = append_file(&mut builder, "hello.txt", content).unwrap();
528
529            builder.finish().unwrap();
530            header
531        };
532
533        let mut tar_cursor = Cursor::new(tar_data);
534        let repo = create_test_repository().unwrap();
535        let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE);
536
537        split(&mut tar_cursor, &mut writer).unwrap();
538        let object_id = writer.done().unwrap();
539
540        let mut reader: SplitStreamReader<Sha256HashValue> = SplitStreamReader::new(
541            repo.open_object(&object_id).unwrap().into(),
542            Some(TAR_LAYER_CONTENT_TYPE),
543        )
544        .unwrap();
545
546        // Should have exactly one entry
547        let entry = get_entry(&mut reader)
548            .unwrap()
549            .expect("Should have one entry");
550        assert_eq!(entry.path, PathBuf::from("/hello.txt"));
551        assert!(matches!(
552            entry.item,
553            TarItem::Leaf(LeafContent::Regular(RegularFile::Inline(_)))
554        ));
555
556        // Use the helper to compare header and stat
557        assert_header_stat_equal(&original_header, &entry.stat, "hello.txt");
558
559        if let TarItem::Leaf(LeafContent::Regular(RegularFile::Inline(ref content))) = entry.item {
560            assert_eq!(content.as_ref(), b"Hello, World!");
561        }
562
563        // Should be no more entries
564        assert!(get_entry(&mut reader).unwrap().is_none());
565    }
566
567    #[test]
568    fn test_inline_threshold() {
569        let mut tar_data = Vec::new();
570        let (threshold_header, over_threshold_header) = {
571            let mut builder = Builder::new(&mut tar_data);
572
573            // File exactly at the threshold should be inline
574            let threshold_content = vec![b'X'; INLINE_CONTENT_MAX];
575            let header1 =
576                append_file(&mut builder, "threshold_file.txt", &threshold_content).unwrap();
577
578            // File just over threshold should be external
579            let over_threshold_content = vec![b'Y'; INLINE_CONTENT_MAX + 1];
580            let header2 = append_file(
581                &mut builder,
582                "over_threshold_file.txt",
583                &over_threshold_content,
584            )
585            .unwrap();
586
587            builder.finish().unwrap();
588            (header1, header2)
589        };
590
591        let mut tar_cursor = Cursor::new(tar_data);
592        let repo = create_test_repository().unwrap();
593        let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE);
594
595        split(&mut tar_cursor, &mut writer).unwrap();
596        let object_id = writer.done().unwrap();
597
598        let mut reader: SplitStreamReader<Sha256HashValue> = SplitStreamReader::new(
599            repo.open_object(&object_id).unwrap().into(),
600            Some(TAR_LAYER_CONTENT_TYPE),
601        )
602        .unwrap();
603        let mut entries = Vec::new();
604
605        while let Some(entry) = get_entry(&mut reader).unwrap() {
606            entries.push(entry);
607        }
608
609        assert_eq!(entries.len(), 2);
610
611        // First file should be inline
612        assert_eq!(entries[0].path, PathBuf::from("/threshold_file.txt"));
613        assert_header_stat_equal(&threshold_header, &entries[0].stat, "threshold_file.txt");
614        if let TarItem::Leaf(LeafContent::Regular(RegularFile::Inline(ref content))) =
615            entries[0].item
616        {
617            assert_eq!(content.len(), INLINE_CONTENT_MAX);
618            assert_eq!(content[0], b'X');
619        } else {
620            panic!("Expected inline regular file for threshold file");
621        }
622
623        // Second file should be external
624        assert_eq!(entries[1].path, PathBuf::from("/over_threshold_file.txt"));
625        assert_header_stat_equal(
626            &over_threshold_header,
627            &entries[1].stat,
628            "over_threshold_file.txt",
629        );
630        if let TarItem::Leaf(LeafContent::Regular(RegularFile::External(_, size))) = entries[1].item
631        {
632            assert_eq!(size, (INLINE_CONTENT_MAX + 1) as u64);
633        } else {
634            panic!("Expected external regular file for over-threshold file");
635        }
636    }
637
638    #[test]
639    fn test_round_trip_simple() {
640        // Create a simple tar with various file types
641        let mut original_tar = Vec::new();
642        let (small_header, large_header) = {
643            let mut builder = Builder::new(&mut original_tar);
644
645            // Add a small file
646            let small_content = b"Small file content";
647            let header1 = append_file(&mut builder, "small.txt", small_content).unwrap();
648
649            // Add a large file
650            let large_content = vec![b'L'; INLINE_CONTENT_MAX + 100];
651            let header2 = append_file(&mut builder, "large.txt", &large_content).unwrap();
652
653            builder.finish().unwrap();
654            (header1, header2)
655        };
656
657        // Split the tar
658        let mut tar_cursor = Cursor::new(original_tar.clone());
659        let repo = create_test_repository().unwrap();
660        let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE);
661        split(&mut tar_cursor, &mut writer).unwrap();
662        let object_id = writer.done().unwrap();
663
664        // Read back entries and compare with original headers
665        let mut reader: SplitStreamReader<Sha256HashValue> = SplitStreamReader::new(
666            repo.open_object(&object_id).unwrap().into(),
667            Some(TAR_LAYER_CONTENT_TYPE),
668        )
669        .unwrap();
670        let mut entries = Vec::new();
671
672        while let Some(entry) = get_entry(&mut reader).unwrap() {
673            entries.push(entry);
674        }
675
676        assert_eq!(entries.len(), 2, "Should have exactly 2 entries");
677
678        // Compare small file
679        assert_eq!(entries[0].path, PathBuf::from("/small.txt"));
680        assert_header_stat_equal(&small_header, &entries[0].stat, "small.txt");
681
682        if let TarItem::Leaf(LeafContent::Regular(RegularFile::Inline(ref content))) =
683            entries[0].item
684        {
685            assert_eq!(content.as_ref(), b"Small file content");
686        } else {
687            panic!("Expected inline regular file for small.txt");
688        }
689
690        // Compare large file
691        assert_eq!(entries[1].path, PathBuf::from("/large.txt"));
692        assert_header_stat_equal(&large_header, &entries[1].stat, "large.txt");
693
694        if let TarItem::Leaf(LeafContent::Regular(RegularFile::External(ref id, size))) =
695            entries[1].item
696        {
697            assert_eq!(size, (INLINE_CONTENT_MAX + 100) as u64);
698            // Verify the external content matches
699            use std::io::Read;
700            let mut external_data = Vec::new();
701            std::fs::File::from(repo.open_object(id).unwrap())
702                .read_to_end(&mut external_data)
703                .unwrap();
704            let expected_content = vec![b'L'; INLINE_CONTENT_MAX + 100];
705            assert_eq!(
706                external_data, expected_content,
707                "External file content should match"
708            );
709        } else {
710            panic!("Expected external regular file for large.txt");
711        }
712    }
713
714    #[test]
715    fn test_special_filename_cases() {
716        let mut tar_data = Vec::new();
717        {
718            let mut builder = Builder::new(&mut tar_data);
719
720            // Test file with special characters
721            let content1 = b"Special chars content";
722            append_file(&mut builder, "file-with_special.chars@123", content1).unwrap();
723
724            // Test file with long filename
725            let long_name = "a".repeat(100);
726            let content2 = b"Long filename content";
727            append_file(&mut builder, &long_name, content2).unwrap();
728
729            builder.finish().unwrap();
730        };
731
732        let entries = read_all_via_splitstream(tar_data).unwrap();
733        assert_eq!(entries.len(), 2);
734
735        // Verify special characters filename
736        assert_eq!(
737            entries[0].path,
738            PathBuf::from("/file-with_special.chars@123")
739        );
740        assert_eq!(
741            entries[0].path.file_name().unwrap(),
742            "file-with_special.chars@123"
743        );
744
745        // Verify long filename
746        let expected_long_path = format!("/{}", "a".repeat(100));
747        assert_eq!(entries[1].path, PathBuf::from(expected_long_path));
748        assert_eq!(entries[1].path.file_name().unwrap(), &*"a".repeat(100));
749    }
750
751    #[test]
752    fn test_gnu_long_filename_reproduction() {
753        // Create a very long path that will definitely trigger GNU long name extensions
754        let very_long_path = format!(
755            "very/long/path/that/exceeds/the/normal/tar/header/limit/{}",
756            "x".repeat(120)
757        );
758        let content = b"Content for very long path";
759
760        // Use append_data to create a tar with a very long filename that triggers GNU extensions
761        let mut tar_data = Vec::new();
762        {
763            let mut builder = Builder::new(&mut tar_data);
764            append_file(&mut builder, &very_long_path, content).unwrap();
765            builder.finish().unwrap();
766        };
767
768        let entries = read_all_via_splitstream(tar_data).unwrap();
769        assert_eq!(entries.len(), 1);
770        let abspath = format!("/{very_long_path}");
771        assert_eq!(entries[0].path, Path::new(&abspath));
772    }
773
774    #[test]
775    fn test_gnu_longlink() {
776        let very_long_path = format!(
777            "very/long/path/that/exceeds/the/normal/tar/header/limit/{}",
778            "x".repeat(120)
779        );
780
781        // Use append_data to create a tar with a very long filename that triggers GNU extensions
782        let mut tar_data = Vec::new();
783        {
784            let mut builder = Builder::new(&mut tar_data);
785            let mut header = tar::Header::new_gnu();
786            header.set_mode(0o777);
787            header.set_entry_type(EntryType::Symlink);
788            header.set_size(0);
789            header.set_uid(0);
790            header.set_gid(0);
791            builder
792                .append_link(&mut header, "long-symlink", &very_long_path)
793                .unwrap();
794            builder.finish().unwrap();
795        };
796
797        let entries = read_all_via_splitstream(tar_data).unwrap();
798        assert_eq!(entries.len(), 1);
799        match &entries[0].item {
800            TarItem::Leaf(LeafContent::Symlink(ref target)) => {
801                assert_eq!(&**target, OsStr::new(&very_long_path));
802            }
803            _ => unreachable!(),
804        };
805    }
806
807    /// Compare a tar::Header with a composefs Stat structure for equality
808    fn assert_header_stat_equal(header: &tar::Header, stat: &Stat, msg_prefix: &str) {
809        assert_eq!(
810            header.mode().unwrap(),
811            stat.st_mode,
812            "{msg_prefix}: mode mismatch"
813        );
814        assert_eq!(
815            header.uid().unwrap() as u32,
816            stat.st_uid,
817            "{msg_prefix}: uid mismatch"
818        );
819        assert_eq!(
820            header.gid().unwrap() as u32,
821            stat.st_gid,
822            "{msg_prefix}: gid mismatch"
823        );
824        assert_eq!(
825            header.mtime().unwrap() as i64,
826            stat.st_mtim_sec,
827            "{msg_prefix}: mtime mismatch"
828        );
829    }
830
831    /// Benchmark for tar split processing via Repository API.
832    ///
833    /// Run with: cargo test --release --lib -p composefs-oci bench_tar_split -- --ignored --nocapture
834    #[test]
835    #[ignore]
836    fn bench_tar_split() {
837        use std::time::Instant;
838
839        // Configuration: 10000 files of 200KB each = 2GB total
840        const NUM_FILES: usize = 10000;
841        const FILE_SIZE: usize = 200 * 1024; // 200KB
842        const ITERATIONS: usize = 3;
843
844        println!("\n=== Tar Split Benchmark ===");
845        println!(
846            "Configuration: {} files of {}KB each, {} iterations",
847            NUM_FILES,
848            FILE_SIZE / 1024,
849            ITERATIONS
850        );
851
852        // Generate deterministic test data
853        fn generate_test_data(size: usize, seed: u8) -> Vec<u8> {
854            (0..size)
855                .map(|i| ((i as u8).wrapping_add(seed)).wrapping_mul(17))
856                .collect()
857        }
858
859        // Build a tar archive in memory with many large files
860        let mut tar_data = Vec::new();
861        {
862            let mut builder = Builder::new(&mut tar_data);
863            for i in 0..NUM_FILES {
864                let content = generate_test_data(FILE_SIZE, i as u8);
865                let filename = format!("file_{:04}.bin", i);
866                append_file(&mut builder, &filename, &content).unwrap();
867            }
868            builder.finish().unwrap();
869        }
870
871        let tar_size = tar_data.len();
872        println!(
873            "Tar archive size: {} bytes ({:.2} MB)",
874            tar_size,
875            tar_size as f64 / (1024.0 * 1024.0)
876        );
877
878        let rt = tokio::runtime::Builder::new_multi_thread()
879            .enable_all()
880            .build()
881            .unwrap();
882
883        let mut times = Vec::with_capacity(ITERATIONS);
884        for i in 0..ITERATIONS {
885            let repo = create_test_repository().unwrap();
886            let tar_data_clone = tar_data.clone();
887
888            let start = Instant::now();
889            rt.block_on(async {
890                split_async(&tar_data_clone[..], repo, TAR_LAYER_CONTENT_TYPE).await
891            })
892            .unwrap();
893            let elapsed = start.elapsed();
894            times.push(elapsed);
895            println!("Iteration {}: {:?}", i + 1, elapsed);
896        }
897
898        let total: std::time::Duration = times.iter().sum();
899        let avg = total / ITERATIONS as u32;
900        println!("\n=== Summary ===");
901        println!(
902            "Average: {:?}  ({:.2} MB/s)",
903            avg,
904            (tar_size as f64 / (1024.0 * 1024.0)) / avg.as_secs_f64()
905        );
906    }
907
908    /// Test that split_async produces correct output for mixed content.
909    #[tokio::test]
910    async fn test_split_streaming_roundtrip() {
911        // Create a tar with a mix of small (inline) and large (external) files
912        let mut tar_data = Vec::new();
913        {
914            let mut builder = Builder::new(&mut tar_data);
915
916            // Small file (should be inline)
917            let small_content = b"Small file content";
918            append_file(&mut builder, "small.txt", small_content).unwrap();
919
920            // Large file (should be external/streamed)
921            let large_content = vec![b'L'; INLINE_CONTENT_MAX + 100];
922            append_file(&mut builder, "large.txt", &large_content).unwrap();
923
924            // Another small file
925            let small2_content = b"Another small file";
926            append_file(&mut builder, "small2.txt", small2_content).unwrap();
927
928            builder.finish().unwrap();
929        }
930
931        let repo = create_test_repository().unwrap();
932
933        // Use split_async which returns the object_id directly
934        let object_id = split_async(&tar_data[..], repo.clone(), TAR_LAYER_CONTENT_TYPE)
935            .await
936            .unwrap();
937
938        // Read back and verify
939        let mut reader: SplitStreamReader<Sha256HashValue> = SplitStreamReader::new(
940            repo.open_object(&object_id).unwrap().into(),
941            Some(TAR_LAYER_CONTENT_TYPE),
942        )
943        .unwrap();
944
945        let mut entries = Vec::new();
946        while let Some(entry) = get_entry(&mut reader).unwrap() {
947            entries.push(entry);
948        }
949
950        assert_eq!(entries.len(), 3, "Should have 3 entries");
951
952        // Verify small file (inline)
953        assert_eq!(entries[0].path, PathBuf::from("/small.txt"));
954        if let TarItem::Leaf(LeafContent::Regular(RegularFile::Inline(ref content))) =
955            entries[0].item
956        {
957            assert_eq!(content.as_ref(), b"Small file content");
958        } else {
959            panic!("Expected inline regular file for small.txt");
960        }
961
962        // Verify large file (external)
963        assert_eq!(entries[1].path, PathBuf::from("/large.txt"));
964        if let TarItem::Leaf(LeafContent::Regular(RegularFile::External(ref id, size))) =
965            entries[1].item
966        {
967            assert_eq!(size, (INLINE_CONTENT_MAX + 100) as u64);
968            // Verify the external content matches
969            let mut external_data = Vec::new();
970            std::fs::File::from(repo.open_object(id).unwrap())
971                .read_to_end(&mut external_data)
972                .unwrap();
973            let expected_content = vec![b'L'; INLINE_CONTENT_MAX + 100];
974            assert_eq!(
975                external_data, expected_content,
976                "External file content should match"
977            );
978        } else {
979            panic!("Expected external regular file for large.txt");
980        }
981
982        // Verify second small file (inline)
983        assert_eq!(entries[2].path, PathBuf::from("/small2.txt"));
984        if let TarItem::Leaf(LeafContent::Regular(RegularFile::Inline(ref content))) =
985            entries[2].item
986        {
987            assert_eq!(content.as_ref(), b"Another small file");
988        } else {
989            panic!("Expected inline regular file for small2.txt");
990        }
991    }
992
993    /// Test split_async with multiple large files.
994    #[tokio::test]
995    async fn test_split_streaming_multiple_large_files() {
996        let mut tar_data = Vec::new();
997        {
998            let mut builder = Builder::new(&mut tar_data);
999
1000            // Three large files to test parallel streaming
1001            for i in 0..3 {
1002                let content = vec![(i + 0x41) as u8; INLINE_CONTENT_MAX + 1000]; // 'A', 'B', 'C'
1003                let filename = format!("file{}.bin", i);
1004                append_file(&mut builder, &filename, &content).unwrap();
1005            }
1006
1007            builder.finish().unwrap();
1008        }
1009
1010        let repo = create_test_repository().unwrap();
1011
1012        let object_id = split_async(&tar_data[..], repo.clone(), TAR_LAYER_CONTENT_TYPE)
1013            .await
1014            .unwrap();
1015
1016        // Read back and verify
1017        let mut reader: SplitStreamReader<Sha256HashValue> = SplitStreamReader::new(
1018            repo.open_object(&object_id).unwrap().into(),
1019            Some(TAR_LAYER_CONTENT_TYPE),
1020        )
1021        .unwrap();
1022
1023        let mut entries = Vec::new();
1024        while let Some(entry) = get_entry(&mut reader).unwrap() {
1025            entries.push(entry);
1026        }
1027
1028        assert_eq!(entries.len(), 3, "Should have 3 entries");
1029
1030        for (i, entry) in entries.iter().enumerate() {
1031            let expected_path = format!("/file{}.bin", i);
1032            assert_eq!(entry.path, PathBuf::from(&expected_path));
1033
1034            if let TarItem::Leaf(LeafContent::Regular(RegularFile::External(ref id, size))) =
1035                entry.item
1036            {
1037                assert_eq!(size, (INLINE_CONTENT_MAX + 1000) as u64);
1038                let mut external_data = Vec::new();
1039                std::fs::File::from(repo.open_object(id).unwrap())
1040                    .read_to_end(&mut external_data)
1041                    .unwrap();
1042                let expected_content = vec![(i + 0x41) as u8; INLINE_CONTENT_MAX + 1000];
1043                assert_eq!(
1044                    external_data, expected_content,
1045                    "External file {} content should match",
1046                    i
1047                );
1048            } else {
1049                panic!("Expected external regular file for file{}.bin", i);
1050            }
1051        }
1052    }
1053}