composefs/
splitstream.rs

1//! Split Stream file format implementation.
2//!
3//! This module implements the Split Stream format for efficiently storing
4//! and transferring data with inline content and external object references,
5//! supporting compression and content deduplication.
6
7/* Implementation of the Split Stream file format
8 *
9 * NB: This format is documented in `doc/splitstream.md`.  Please keep the docs up to date!!
10 */
11
12use std::{
13    collections::{BTreeMap, HashMap},
14    fs::File,
15    hash::Hash,
16    io::{BufRead, BufReader, Read, Seek, SeekFrom, Take, Write},
17    mem::size_of,
18    mem::MaybeUninit,
19    ops::Range,
20    sync::Arc,
21};
22
23use anyhow::{bail, ensure, Context, Error, Result};
24use rustix::{
25    buffer::spare_capacity,
26    io::{pread, read},
27};
28use tokio::task::JoinHandle;
29use zerocopy::{
30    little_endian::{I64, U16, U64},
31    FromBytes, Immutable, IntoBytes, KnownLayout,
32};
33use zstd::stream::{read::Decoder, write::Encoder};
34
35use crate::{fsverity::FsVerityHashValue, repository::Repository, util::read_exactish};
36
37const SPLITSTREAM_MAGIC: [u8; 11] = *b"SplitStream";
38const LG_BLOCKSIZE: u8 = 12; // TODO: hard-coded 4k.  make this generic later...
39
40// Nearly everything in the file is located at an offset indicated by a FileRange.
41#[derive(Debug, Clone, Copy, FromBytes, Immutable, IntoBytes, KnownLayout)]
42struct FileRange {
43    start: U64,
44    end: U64,
45}
46
47// The only exception is the header: it is a fixed sized and comes at the start (offset 0).
48#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)]
49struct SplitstreamHeader {
50    pub magic: [u8; 11],  // Contains SPLITSTREAM_MAGIC
51    pub version: u8,      // must always be 0
52    pub _flags: U16,      // is currently always 0 (but ignored)
53    pub algorithm: u8,    // kernel fs-verity algorithm identifier (1 = sha256, 2 = sha512)
54    pub lg_blocksize: u8, // log2 of the fs-verity block size (12 = 4k, 16 = 64k)
55    pub info: FileRange,  // can be used to expand/move the info section in the future
56}
57
58// The info block can be located anywhere, indicated by the "info" FileRange in the header.
59#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)]
60struct SplitstreamInfo {
61    pub stream_refs: FileRange, // location of the stream references array
62    pub object_refs: FileRange, // location of the object references array
63    pub stream: FileRange,      // location of the zstd-compressed stream within the file
64    pub named_refs: FileRange,  // location of the compressed named references
65    pub content_type: U64,      // user can put whatever magic identifier they want there
66    pub stream_size: U64,       // total uncompressed size of inline chunks and external chunks
67}
68
69impl FileRange {
70    fn len(&self) -> Result<u64> {
71        self.end
72            .get()
73            .checked_sub(self.start.get())
74            .context("Negative-sized range in splitstream")
75    }
76}
77
78impl From<Range<u64>> for FileRange {
79    fn from(value: Range<u64>) -> Self {
80        Self {
81            start: U64::from(value.start),
82            end: U64::from(value.end),
83        }
84    }
85}
86
87fn read_range(file: &mut File, range: FileRange) -> Result<Vec<u8>> {
88    let size: usize = (range.len()?.try_into())
89        .context("Unable to allocate buffer for implausibly large splitstream section")?;
90    let mut buffer = Vec::with_capacity(size);
91    if size > 0 {
92        pread(file, spare_capacity(&mut buffer), range.start.get())
93            .context("Unable to read section from splitstream file")?;
94    }
95    ensure!(
96        buffer.len() == size,
97        "Incomplete read from splitstream file"
98    );
99    Ok(buffer)
100}
101
102/// An array of objects with the following properties:
103///   - each item appears only once
104///   - efficient insertion and lookup of indexes of existing items
105///   - insertion order is maintained, indexes are stable across modification
106///   - can do .as_bytes() for items that are IntoBytes + Immutable
107struct UniqueVec<T: Clone + Hash + Eq> {
108    items: Vec<T>,
109    index: HashMap<T, usize>,
110}
111
112impl<T: Clone + Hash + Eq + std::fmt::Debug> std::fmt::Debug for UniqueVec<T> {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        f.debug_struct("UniqueVec")
115            .field("items", &self.items)
116            .field("index", &self.index)
117            .finish()
118    }
119}
120
121impl<T: Clone + Hash + Eq + IntoBytes + Immutable> UniqueVec<T> {
122    fn as_bytes(&self) -> &[u8] {
123        self.items.as_bytes()
124    }
125}
126
127impl<T: Clone + Hash + Eq> UniqueVec<T> {
128    fn new() -> Self {
129        Self {
130            items: Vec::new(),
131            index: HashMap::new(),
132        }
133    }
134
135    fn get(&self, item: &T) -> Option<usize> {
136        self.index.get(item).copied()
137    }
138
139    fn ensure(&mut self, item: &T) -> usize {
140        self.get(item).unwrap_or_else(|| {
141            let idx = self.items.len();
142            self.index.insert(item.clone(), idx);
143            self.items.push(item.clone());
144            idx
145        })
146    }
147
148    /// Get an item by its index.
149    fn get_by_index(&self, idx: usize) -> Option<&T> {
150        self.items.get(idx)
151    }
152}
153
154/// An entry in the split stream being built.
155///
156/// Used by `SplitStreamBuilder` to collect entries before serialization.
157pub enum SplitStreamEntry<ObjectID: FsVerityHashValue> {
158    /// Inline data (headers, small files, padding)
159    Inline(Vec<u8>),
160    /// External reference - will be resolved to ObjectID when the handle completes
161    External {
162        /// Background task that will return the ObjectID
163        handle: JoinHandle<Result<ObjectID>>,
164        /// Size of the external object in bytes
165        size: u64,
166    },
167}
168
169impl<ObjectID: FsVerityHashValue> std::fmt::Debug for SplitStreamEntry<ObjectID> {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        match self {
172            SplitStreamEntry::Inline(data) => {
173                f.debug_struct("Inline").field("len", &data.len()).finish()
174            }
175            SplitStreamEntry::External { size, .. } => f
176                .debug_struct("External")
177                .field("size", size)
178                .finish_non_exhaustive(),
179        }
180    }
181}
182
183/// Builder for constructing a split stream with parallel object storage.
184///
185/// This builder collects entries (inline data and pending external object handles),
186/// then serializes them all at once when `finish()` is called. This approach:
187/// - Allows all external handles to be awaited in parallel
188/// - Enables proper deduplication of ObjectIDs
189/// - Writes the stream in one clean pass after all IDs are known
190///
191/// # Example
192/// ```ignore
193/// let mut builder = SplitStreamBuilder::new(repo.clone(), content_type);
194/// builder.push_inline(header_bytes);
195/// builder.push_external(storage_handle, file_size);
196/// builder.push_inline(padding);
197/// let object_id = builder.finish().await?;
198/// ```
199pub struct SplitStreamBuilder<ObjectID: FsVerityHashValue> {
200    repo: Arc<Repository<ObjectID>>,
201    entries: Vec<SplitStreamEntry<ObjectID>>,
202    total_external_size: u64,
203    content_type: u64,
204    stream_refs: UniqueVec<ObjectID>,
205    named_refs: BTreeMap<Box<str>, usize>,
206}
207
208impl<ObjectID: FsVerityHashValue> std::fmt::Debug for SplitStreamBuilder<ObjectID> {
209    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210        f.debug_struct("SplitStreamBuilder")
211            .field("repo", &self.repo)
212            .field("entries", &self.entries)
213            .field("total_external_size", &self.total_external_size)
214            .field("content_type", &self.content_type)
215            .finish_non_exhaustive()
216    }
217}
218
219impl<ObjectID: FsVerityHashValue> SplitStreamBuilder<ObjectID> {
220    /// Create a new split stream builder.
221    pub fn new(repo: Arc<Repository<ObjectID>>, content_type: u64) -> Self {
222        Self {
223            repo,
224            entries: Vec::new(),
225            total_external_size: 0,
226            content_type,
227            stream_refs: UniqueVec::new(),
228            named_refs: Default::default(),
229        }
230    }
231
232    /// Append inline data to the stream.
233    ///
234    /// Adjacent inline data will be coalesced to avoid fragmentation.
235    pub fn push_inline(&mut self, data: &[u8]) {
236        if data.is_empty() {
237            return;
238        }
239        // Coalesce with the previous inline entry if possible
240        if let Some(SplitStreamEntry::Inline(ref mut existing)) = self.entries.last_mut() {
241            existing.extend_from_slice(data);
242        } else {
243            self.entries.push(SplitStreamEntry::Inline(data.to_vec()));
244        }
245    }
246
247    /// Append an external object being stored in background.
248    ///
249    /// The handle should resolve to the ObjectID when the storage completes.
250    pub fn push_external(&mut self, handle: JoinHandle<Result<ObjectID>>, size: u64) {
251        self.total_external_size += size;
252        self.entries
253            .push(SplitStreamEntry::External { handle, size });
254    }
255
256    /// Add an externally-referenced stream with the given name.
257    ///
258    /// The name has no meaning beyond the scope of this file: it is meant to be used to link to
259    /// associated data when reading the file back again. For example, for OCI config files, this
260    /// might refer to a layer splitstream via its DiffId.
261    pub fn add_named_stream_ref(&mut self, name: &str, verity: &ObjectID) {
262        let idx = self.stream_refs.ensure(verity);
263        self.named_refs.insert(Box::from(name), idx);
264    }
265
266    /// Finalize: await all handles, build the splitstream, store it.
267    ///
268    /// This method:
269    /// 1. Awaits all external handles to get ObjectIDs
270    /// 2. Builds a UniqueVec<ObjectID> for deduplication
271    /// 3. Creates a SplitStreamWriter and replays all entries
272    /// 4. Stores the final splitstream in the repository
273    ///
274    /// Returns the fs-verity object ID of the stored splitstream.
275    pub async fn finish(self) -> Result<ObjectID> {
276        // First pass: await all handles to collect ObjectIDs
277        // We need to preserve the order of entries, so we process them in sequence
278        let mut resolved_entries: Vec<ResolvedEntry<ObjectID>> =
279            Vec::with_capacity(self.entries.len());
280
281        for entry in self.entries {
282            match entry {
283                SplitStreamEntry::Inline(data) => {
284                    resolved_entries.push(ResolvedEntry::Inline(data));
285                }
286                SplitStreamEntry::External { handle, size } => {
287                    let id = handle.await??;
288                    resolved_entries.push(ResolvedEntry::External { id, size });
289                }
290            }
291        }
292
293        // Second pass: build the splitstream using SplitStreamWriter
294        // This gives us proper deduplication through UniqueVec
295        let mut writer = SplitStreamWriter::new(&self.repo, self.content_type);
296
297        // Copy over stream refs and named refs
298        for (name, idx) in &self.named_refs {
299            let verity = self
300                .stream_refs
301                .get_by_index(*idx)
302                .expect("named ref index out of bounds");
303            writer.add_named_stream_ref(name, verity);
304        }
305
306        // Replay all entries
307        // Note: write_inline tracks total_size internally, but write_reference doesn't,
308        // so we manually add the external size to the writer's total_size.
309        for entry in resolved_entries {
310            match entry {
311                ResolvedEntry::Inline(data) => {
312                    writer.write_inline(&data);
313                }
314                ResolvedEntry::External { id, size } => {
315                    // Add size before writing reference (write_reference doesn't track size)
316                    writer.add_external_size(size);
317                    writer.write_reference(id)?;
318                }
319            }
320        }
321
322        // Finalize and store
323        tokio::task::spawn_blocking(move || writer.done()).await?
324    }
325}
326
327/// Internal type for resolved entries after awaiting handles.
328#[derive(Debug)]
329enum ResolvedEntry<ObjectID: FsVerityHashValue> {
330    Inline(Vec<u8>),
331    External { id: ObjectID, size: u64 },
332}
333
334/// Writer for creating split stream format files with inline content and external object references.
335pub struct SplitStreamWriter<ObjectId: FsVerityHashValue> {
336    repo: Arc<Repository<ObjectId>>,
337    stream_refs: UniqueVec<ObjectId>,
338    object_refs: UniqueVec<ObjectId>,
339    named_refs: BTreeMap<Box<str>, usize>, // index into stream_refs
340    inline_buffer: Vec<u8>,
341    total_size: u64,
342    writer: Encoder<'static, Vec<u8>>,
343    content_type: u64,
344}
345
346impl<ObjectID: FsVerityHashValue> std::fmt::Debug for SplitStreamWriter<ObjectID> {
347    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348        // writer doesn't impl Debug
349        f.debug_struct("SplitStreamWriter")
350            .field("repo", &self.repo)
351            .field("inline_content", &self.inline_buffer)
352            .finish()
353    }
354}
355
356impl<ObjectID: FsVerityHashValue> SplitStreamWriter<ObjectID> {
357    /// Create a new split stream writer.
358    pub fn new(repo: &Arc<Repository<ObjectID>>, content_type: u64) -> Self {
359        // SAFETY: we surely can't get an error writing the header to a Vec<u8>
360        let writer = Encoder::new(vec![], 0).unwrap();
361
362        Self {
363            repo: Arc::clone(repo),
364            content_type,
365            inline_buffer: vec![],
366            stream_refs: UniqueVec::new(),
367            object_refs: UniqueVec::new(),
368            named_refs: Default::default(),
369            total_size: 0,
370            writer,
371        }
372    }
373
374    /// Add an externally-referenced object.
375    ///
376    /// This establishes a link to an object (ie: raw data file) from this stream.  The link is
377    /// given a unique index number, which is returned.  Once assigned, this index won't change.
378    /// The same index can be used to find the linked object when reading the file back.
379    ///
380    /// This is the primary mechanism by which splitstreams reference split external content.
381    ///
382    /// You usually won't need to call this yourself: if you want to add split external content to
383    /// the stream, call `.write_external()` or `._write_external_async()`.
384    pub fn add_object_ref(&mut self, verity: &ObjectID) -> usize {
385        self.object_refs.ensure(verity)
386    }
387
388    /// Find the index of a previously referenced object.
389    ///
390    /// Finds the previously-assigned index for a linked object, or None if the object wasn't
391    /// previously linked.
392    pub fn lookup_object_ref(&self, verity: &ObjectID) -> Option<usize> {
393        self.object_refs.get(verity)
394    }
395
396    /// Add an externally-referenced stream with the given name.
397    ///
398    /// The name has no meaning beyond the scope of this file: it is meant to be used to link to
399    /// associated data when reading the file back again.  For example, for OCI config files, this
400    /// might refer to a layer splitstream via its DiffId.
401    ///
402    /// This establishes a link between the two splitstreams and is considered when performing
403    /// garbage collection: the named stream will be kept alive by this stream.
404    pub fn add_named_stream_ref(&mut self, name: &str, verity: &ObjectID) {
405        let idx = self.stream_refs.ensure(verity);
406        self.named_refs.insert(Box::from(name), idx);
407    }
408
409    // flush any buffered inline data
410    fn flush_inline(&mut self) -> Result<()> {
411        let size = self.inline_buffer.len();
412        if size > 0 {
413            // Inline chunk: stored as negative LE i64 number of bytes (non-zero!)
414            // SAFETY: naive - fails on -i64::MIN but we know size was unsigned
415            let instruction = -i64::try_from(size).expect("implausibly large inline chunk");
416            self.writer.write_all(I64::new(instruction).as_bytes())?;
417            self.writer.write_all(&self.inline_buffer)?;
418            self.inline_buffer.clear();
419        }
420        Ok(())
421    }
422
423    /// Write inline data to the stream.
424    pub fn write_inline(&mut self, data: &[u8]) {
425        // SAFETY: We'd have to write a lot of data to get here...
426        self.total_size += data.len() as u64;
427        self.inline_buffer.extend(data);
428    }
429
430    /// Add to the total external size tracked by this writer.
431    ///
432    /// This is used by `SplitStreamBuilder` when replaying external entries,
433    /// since `write_reference` doesn't track size on its own.
434    pub fn add_external_size(&mut self, size: u64) {
435        self.total_size += size;
436    }
437
438    /// Write a reference to an external object that has already been stored.
439    ///
440    /// This is the common implementation for `.write_external()` and `.write_external_async()`,
441    /// and is also used by `SplitStreamBuilder` when replaying resolved entries.
442    ///
443    /// Note: This does NOT add to total_size - the caller must do that if needed.
444    pub fn write_reference(&mut self, id: ObjectID) -> Result<()> {
445        // Flush any buffered inline data before we store the external reference.
446        self.flush_inline()?;
447
448        // External chunk: non-negative LE i64 index into object_refs array
449        let index = self.add_object_ref(&id);
450        let instruction = i64::try_from(index).expect("implausibly large external index");
451        self.writer.write_all(I64::from(instruction).as_bytes())?;
452        Ok(())
453    }
454
455    /// Write externally-split data to the stream.
456    ///
457    /// The data is stored in the repository and a reference is written to the stream.
458    pub fn write_external(&mut self, data: &[u8]) -> Result<()> {
459        self.total_size += data.len() as u64;
460        let id = self.repo.ensure_object(data)?;
461        self.write_reference(id)
462    }
463
464    /// Asynchronously write externally-split data to the stream.
465    ///
466    /// The data is stored in the repository asynchronously and a reference is written to the stream.
467    /// This method awaits the storage operation before returning.
468    pub async fn write_external_async(&mut self, data: Vec<u8>) -> Result<()> {
469        self.total_size += data.len() as u64;
470        let id = self.repo.ensure_object_async(data).await?;
471        self.write_reference(id)
472    }
473
474    fn write_named_refs(named_refs: BTreeMap<Box<str>, usize>) -> Result<Vec<u8>> {
475        let mut encoder = Encoder::new(vec![], 0)?;
476
477        for (name, idx) in &named_refs {
478            write!(&mut encoder, "{idx}:{name}\0")?;
479        }
480
481        Ok(encoder.finish()?)
482    }
483
484    /// Finalizes the split stream and returns its object ID.
485    ///
486    /// Flushes any remaining inline content, validates the SHA256 hash if provided,
487    /// and stores the compressed stream in the repository.
488    pub fn done(mut self) -> Result<ObjectID> {
489        self.flush_inline()?;
490        let stream = self.writer.finish()?;
491
492        // Pre-compute the file layout
493        let header_start = 0u64;
494        let header_end = header_start + size_of::<SplitstreamHeader>() as u64;
495
496        let info_start = header_end;
497        let info_end = info_start + size_of::<SplitstreamInfo>() as u64;
498        assert_eq!(info_start % 8, 0);
499
500        let stream_refs_size = self.stream_refs.as_bytes().len();
501        let stream_refs_start = info_end;
502        let stream_refs_end = stream_refs_start + stream_refs_size as u64;
503        assert_eq!(stream_refs_start % 8, 0);
504
505        let object_refs_size = self.object_refs.as_bytes().len();
506        let object_refs_start = stream_refs_end;
507        let object_refs_end = object_refs_start + object_refs_size as u64;
508        assert_eq!(object_refs_start % 8, 0);
509
510        let named_refs =
511            Self::write_named_refs(self.named_refs).context("Formatting named references")?;
512        let named_refs_start = object_refs_end;
513        let named_refs_end = named_refs_start + named_refs.len() as u64;
514        assert_eq!(named_refs_start % 8, 0);
515
516        let stream_start = named_refs_end;
517        let stream_end = stream_start + stream.len() as u64;
518
519        // Write the file out into a Vec<u8>, checking the layout on the way
520        let mut buf = vec![];
521
522        assert_eq!(buf.len() as u64, header_start);
523        buf.extend_from_slice(
524            SplitstreamHeader {
525                magic: SPLITSTREAM_MAGIC,
526                version: 0,
527                _flags: U16::ZERO,
528                algorithm: ObjectID::ALGORITHM,
529                lg_blocksize: LG_BLOCKSIZE,
530                info: (info_start..info_end).into(),
531            }
532            .as_bytes(),
533        );
534        assert_eq!(buf.len() as u64, header_end);
535
536        assert_eq!(buf.len() as u64, info_start);
537        buf.extend_from_slice(
538            SplitstreamInfo {
539                stream_refs: (stream_refs_start..stream_refs_end).into(),
540                object_refs: (object_refs_start..object_refs_end).into(),
541                stream: (stream_start..stream_end).into(),
542                named_refs: (named_refs_start..named_refs_end).into(),
543                content_type: self.content_type.into(),
544                stream_size: self.total_size.into(),
545            }
546            .as_bytes(),
547        );
548        assert_eq!(buf.len() as u64, info_end);
549
550        assert_eq!(buf.len() as u64, stream_refs_start);
551        buf.extend_from_slice(self.stream_refs.as_bytes());
552        assert_eq!(buf.len() as u64, stream_refs_end);
553
554        assert_eq!(buf.len() as u64, object_refs_start);
555        buf.extend_from_slice(self.object_refs.as_bytes());
556        assert_eq!(buf.len() as u64, object_refs_end);
557
558        assert_eq!(buf.len() as u64, named_refs_start);
559        buf.extend_from_slice(&named_refs);
560        assert_eq!(buf.len() as u64, named_refs_end);
561
562        assert_eq!(buf.len() as u64, stream_start);
563        buf.extend_from_slice(&stream);
564        assert_eq!(buf.len() as u64, stream_end);
565
566        // Store the Vec<u8> into the repository
567        self.repo.ensure_object(&buf)
568    }
569
570    /// Finalizes the split stream asynchronously.
571    ///
572    /// This is an async-friendly version of `done()` that runs the final
573    /// object storage on a blocking thread pool.
574    ///
575    /// Returns the fs-verity object ID of the stored splitstream.
576    pub async fn done_async(self) -> Result<ObjectID> {
577        tokio::task::spawn_blocking(move || self.done()).await?
578    }
579}
580
581/// Data fragment from a split stream, either inline content or an external object reference.
582#[derive(Debug)]
583pub enum SplitStreamData<ObjectID: FsVerityHashValue> {
584    /// Inline content stored directly in the stream
585    Inline(Box<[u8]>),
586    /// Reference to an external object
587    External(ObjectID),
588}
589
590/// Reader for parsing split stream format files with inline content and external object references.
591pub struct SplitStreamReader<ObjectID: FsVerityHashValue> {
592    decoder: Decoder<'static, BufReader<Take<File>>>,
593    inline_bytes: usize,
594    /// The content_type ID given when the splitstream was constructed
595    pub content_type: u64,
596    /// The total size of the original/merged stream, in bytes
597    pub total_size: u64,
598    object_refs: Vec<ObjectID>,
599    named_refs: HashMap<Box<str>, ObjectID>,
600}
601
602impl<ObjectID: FsVerityHashValue> std::fmt::Debug for SplitStreamReader<ObjectID> {
603    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
604        // decoder doesn't impl Debug
605        f.debug_struct("SplitStreamReader")
606            .field("refs", &self.object_refs)
607            .field("inline_bytes", &self.inline_bytes)
608            .finish()
609    }
610}
611
612/// Using the provided [`vec`] as a buffer, read exactly [`size`]
613/// bytes of content from [`reader`] into it. Any existing content
614/// in [`vec`] will be discarded; however its capacity will be reused,
615/// making this function suitable for use in loops.
616fn read_into_vec(reader: &mut impl Read, vec: &mut Vec<u8>, size: usize) -> Result<()> {
617    vec.resize(size, 0u8);
618    reader.read_exact(vec.as_mut_slice())?;
619    Ok(())
620}
621
622enum ChunkType<ObjectID: FsVerityHashValue> {
623    Eof,
624    Inline,
625    External(ObjectID),
626}
627
628impl<ObjectID: FsVerityHashValue> SplitStreamReader<ObjectID> {
629    /// Creates a new split stream reader from the provided reader.
630    ///
631    /// Reads the digest map header from the stream during initialization.
632    pub fn new(mut file: File, expected_content_type: Option<u64>) -> Result<Self> {
633        let header = SplitstreamHeader::read_from_io(&mut file)
634            .map_err(|e| Error::msg(format!("Error reading splitstream header: {e:?}")))?;
635
636        if header.magic != SPLITSTREAM_MAGIC {
637            bail!("Invalid splitstream header magic value");
638        }
639
640        if header.version != 0 {
641            bail!("Invalid splitstream version {}", header.version);
642        }
643
644        if header.algorithm != ObjectID::ALGORITHM {
645            bail!("Invalid splitstream fs-verity algorithm type");
646        }
647
648        if header.lg_blocksize != LG_BLOCKSIZE {
649            bail!("Invalid splitstream fs-verity block size");
650        }
651
652        let info_bytes = read_range(&mut file, header.info)?;
653        // NB: We imagine that `info` might grow in the future, so for forward-compatibility we
654        // allow that it is larger than we expect it to be.  If we ever expand the info section
655        // then we will also need to come up with a mechanism for a smaller info section for
656        // backwards-compatibility.
657        let (info, _) = SplitstreamInfo::ref_from_prefix(&info_bytes)
658            .map_err(|e| Error::msg(format!("Error reading splitstream metadata: {e:?}")))?;
659
660        let content_type: u64 = info.content_type.into();
661        if let Some(expected) = expected_content_type {
662            ensure!(content_type == expected, "Invalid splitstream content type");
663        }
664
665        let total_size: u64 = info.stream_size.into();
666
667        let stream_refs_bytes = read_range(&mut file, info.stream_refs)?;
668        let stream_refs = <[ObjectID]>::ref_from_bytes(&stream_refs_bytes)
669            .map_err(|e| Error::msg(format!("Error reading splitstream references: {e:?}")))?;
670
671        let object_refs_bytes = read_range(&mut file, info.object_refs)?;
672        let object_refs = <[ObjectID]>::ref_from_bytes(&object_refs_bytes)
673            .map_err(|e| Error::msg(format!("Error reading object references: {e:?}")))?;
674
675        let named_refs_bytes = read_range(&mut file, info.named_refs)?;
676        let named_refs = Self::read_named_references(&named_refs_bytes, stream_refs)
677            .map_err(|e| Error::msg(format!("Error reading splitstream mappings: {e:?}")))?;
678
679        file.seek(SeekFrom::Start(info.stream.start.get()))
680            .context("Unable to seek to start of splitstream content")?;
681        let decoder = Decoder::new(file.take(info.stream.len()?))
682            .context("Unable to decode zstd-compressed content in splitstream")?;
683
684        Ok(Self {
685            decoder,
686            inline_bytes: 0,
687            content_type,
688            total_size,
689            object_refs: object_refs.to_vec(),
690            named_refs,
691        })
692    }
693
694    fn read_named_references<ObjectId: FsVerityHashValue>(
695        section: &[u8],
696        references: &[ObjectId],
697    ) -> Result<HashMap<Box<str>, ObjectId>> {
698        let mut map = HashMap::new();
699        let mut buffer = vec![];
700
701        let mut reader = BufReader::new(
702            Decoder::new(section).context("Creating zstd decoder for named references section")?,
703        );
704
705        loop {
706            reader
707                .read_until(b'\0', &mut buffer)
708                .context("Reading named references section")?;
709
710            let Some(item) = buffer.strip_suffix(b"\0") else {
711                ensure!(
712                    buffer.is_empty(),
713                    "Trailing junk in named references section"
714                );
715                return Ok(map);
716            };
717
718            let (idx_str, name) = std::str::from_utf8(item)
719                .context("Reading named references section")?
720                .split_once(":")
721                .context("Named reference doesn't contain a colon")?;
722
723            let idx: usize = idx_str
724                .parse()
725                .context("Named reference contains a non-integer index")?;
726            let object_id = references
727                .get(idx)
728                .context("Named reference out of bounds")?;
729
730            map.insert(Box::from(name), object_id.clone());
731            buffer.clear();
732        }
733    }
734
735    /// Iterate the list of named references defined on this split stream.
736    pub fn iter_named_refs(&self) -> impl Iterator<Item = (&str, &ObjectID)> {
737        self.named_refs.iter().map(|(name, id)| (name.as_ref(), id))
738    }
739
740    /// Steal the "named refs" table from this splitstream, destructing it in the process.
741    pub fn into_named_refs(self) -> HashMap<Box<str>, ObjectID> {
742        self.named_refs
743    }
744
745    fn ensure_chunk(
746        &mut self,
747        eof_ok: bool,
748        ext_ok: bool,
749        expected_bytes: usize,
750    ) -> Result<ChunkType<ObjectID>> {
751        if self.inline_bytes == 0 {
752            let mut value = I64::ZERO;
753
754            if !read_exactish(&mut self.decoder, value.as_mut_bytes())? {
755                ensure!(eof_ok, "Unexpected EOF in splitstream");
756                return Ok(ChunkType::Eof);
757            }
758
759            // Negative values: (non-empty) inline data
760            // Non-negative values: index into object_refs array
761            match value.get() {
762                n if n < 0i64 => {
763                    self.inline_bytes = (n.unsigned_abs().try_into())
764                        .context("Splitstream inline section is too large")?;
765                }
766                n => {
767                    ensure!(ext_ok, "Unexpected external reference in splitstream");
768                    let idx = usize::try_from(n)
769                        .context("Splitstream external reference is too large")?;
770                    let id: &ObjectID = (self.object_refs.get(idx))
771                        .context("Splitstream external reference is out of range")?;
772                    return Ok(ChunkType::External(id.clone()));
773                }
774            }
775        }
776
777        if self.inline_bytes < expected_bytes {
778            bail!("Unexpectedly small inline content when parsing splitstream");
779        }
780
781        Ok(ChunkType::Inline)
782    }
783
784    /// Reads the exact number of inline bytes
785    /// Assumes that the data cannot be split across chunks
786    pub fn read_inline_exact(&mut self, buffer: &mut [u8]) -> Result<bool> {
787        if let ChunkType::Inline = self.ensure_chunk(true, false, buffer.len())? {
788            // SAFETY: ensure_chunk() already verified the number of bytes for us
789            self.inline_bytes -= buffer.len();
790            self.decoder.read_exact(buffer)?;
791            Ok(true)
792        } else {
793            Ok(false)
794        }
795    }
796
797    fn discard_padding(&mut self, size: usize) -> Result<()> {
798        let mut buf = [0u8; 512];
799        assert!(size <= 512);
800        self.ensure_chunk(false, false, size)?;
801        self.decoder.read_exact(&mut buf[0..size])?;
802        self.inline_bytes -= size;
803        Ok(())
804    }
805
806    /// Reads an exact amount of data, which may be inline or external.
807    ///
808    /// The stored_size is the size as recorded in the stream (including any padding),
809    /// while actual_size is the actual content size without padding.
810    /// Returns either inline content or an external object reference.
811    pub fn read_exact(
812        &mut self,
813        actual_size: usize,
814        stored_size: usize,
815    ) -> Result<SplitStreamData<ObjectID>> {
816        if let ChunkType::External(id) = self.ensure_chunk(false, true, stored_size)? {
817            // ...and the padding
818            if actual_size < stored_size {
819                self.discard_padding(stored_size - actual_size)?;
820            }
821            Ok(SplitStreamData::External(id))
822        } else {
823            let mut content = vec![];
824            read_into_vec(&mut self.decoder, &mut content, stored_size)?;
825            content.truncate(actual_size);
826            self.inline_bytes -= stored_size;
827            Ok(SplitStreamData::Inline(content.into()))
828        }
829    }
830
831    /// Concatenates the entire split stream content to the output writer.
832    ///
833    /// Inline content is written directly, while external references are resolved
834    /// using the provided load_data callback function.
835    pub fn cat(&mut self, repo: &Repository<ObjectID>, output: &mut impl Write) -> Result<()> {
836        let mut buffer = vec![];
837
838        loop {
839            match self.ensure_chunk(true, true, 0)? {
840                ChunkType::Eof => break Ok(()),
841                ChunkType::Inline => {
842                    read_into_vec(&mut self.decoder, &mut buffer, self.inline_bytes)?;
843                    self.inline_bytes = 0;
844                    output.write_all(&buffer)?;
845                }
846                ChunkType::External(ref id) => {
847                    let mut buffer = [MaybeUninit::<u8>::uninit(); 1024 * 1024];
848                    let fd = repo.open_object(id)?;
849
850                    loop {
851                        let (result, _) = read(&fd, &mut buffer)?;
852                        if result.is_empty() {
853                            break;
854                        }
855                        output.write_all(result)?;
856                    }
857                }
858            }
859        }
860    }
861
862    /// Traverses the split stream and calls the callback for each object reference.
863    ///
864    /// This includes both references from the digest map and external references in the stream.
865    pub fn get_object_refs(&mut self, mut callback: impl FnMut(&ObjectID)) -> Result<()> {
866        for entry in &self.object_refs {
867            callback(entry);
868        }
869        Ok(())
870    }
871
872    /// Looks up a named reference
873    ///
874    /// Returns None if no such reference exists
875    pub fn lookup_named_ref(&self, name: &str) -> Option<&ObjectID> {
876        self.named_refs.get(name)
877    }
878}
879
880impl<ObjectID: FsVerityHashValue> Read for SplitStreamReader<ObjectID> {
881    fn read(&mut self, data: &mut [u8]) -> std::io::Result<usize> {
882        match self.ensure_chunk(true, false, 1) {
883            Ok(ChunkType::Eof) => Ok(0),
884            Ok(ChunkType::Inline) => {
885                let n_bytes = std::cmp::min(data.len(), self.inline_bytes);
886                self.decoder.read_exact(&mut data[0..n_bytes])?;
887                self.inline_bytes -= n_bytes;
888                Ok(n_bytes)
889            }
890            Ok(ChunkType::External(..)) => unreachable!(),
891            Err(e) => Err(std::io::Error::other(e)),
892        }
893    }
894}
895
896#[cfg(test)]
897mod tests {
898    use super::*;
899    use crate::fsverity::{compute_verity, Sha256HashValue};
900    use crate::test::tempdir;
901    use rustix::fs::{mkdirat, Mode, CWD};
902    use std::io::Cursor;
903    use std::path::Path;
904
905    #[test]
906    fn test_read_into_vec() -> Result<()> {
907        // Test with an empty reader
908        let mut reader = Cursor::new(vec![]);
909        let mut vec = Vec::new();
910        let result = read_into_vec(&mut reader, &mut vec, 0);
911        assert!(result.is_ok());
912        assert_eq!(vec.len(), 0);
913
914        // Test with a reader that has some data
915        let mut reader = Cursor::new(vec![1, 2, 3, 4, 5]);
916        let mut vec = Vec::new();
917        let result = read_into_vec(&mut reader, &mut vec, 3);
918        assert!(result.is_ok());
919        assert_eq!(vec.len(), 3);
920        assert_eq!(vec, vec![1, 2, 3]);
921
922        // Test reading more than the reader has
923        let mut reader = Cursor::new(vec![1, 2, 3]);
924        let mut vec = Vec::new();
925        let result = read_into_vec(&mut reader, &mut vec, 5);
926        assert!(result.is_err());
927
928        // Test reading exactly what the reader has
929        let mut reader = Cursor::new(vec![1, 2, 3]);
930        let mut vec = Vec::new();
931        let result = read_into_vec(&mut reader, &mut vec, 3);
932        assert!(result.is_ok());
933        assert_eq!(vec.len(), 3);
934        assert_eq!(vec, vec![1, 2, 3]);
935
936        // Test reading into a vector with existing capacity
937        let mut reader = Cursor::new(vec![1, 2, 3, 4, 5]);
938        let mut vec = Vec::with_capacity(10);
939        let result = read_into_vec(&mut reader, &mut vec, 4);
940        assert!(result.is_ok());
941        assert_eq!(vec.len(), 4);
942        assert_eq!(vec, vec![1, 2, 3, 4]);
943        assert_eq!(vec.capacity(), 10);
944
945        // Test reading into a vector with existing data
946        let mut reader = Cursor::new(vec![1, 2, 3]);
947        let mut vec = vec![9, 9, 9];
948        let result = read_into_vec(&mut reader, &mut vec, 2);
949        assert!(result.is_ok());
950        assert_eq!(vec.len(), 2);
951        assert_eq!(vec, vec![1, 2]);
952
953        Ok(())
954    }
955
956    /// Create a test repository in insecure mode (no fs-verity required).
957    fn create_test_repo(path: &Path) -> Result<Arc<Repository<Sha256HashValue>>> {
958        mkdirat(CWD, path, Mode::from_raw_mode(0o755))?;
959        let mut repo = Repository::open_path(CWD, path)?;
960        repo.set_insecure(true);
961        Ok(Arc::new(repo))
962    }
963
964    /// Generate deterministic test data of a given size.
965    fn generate_test_data(size: usize, seed: u8) -> Vec<u8> {
966        (0..size)
967            .map(|i| ((i as u8).wrapping_add(seed)).wrapping_mul(17))
968            .collect()
969    }
970
971    #[test]
972    fn test_splitstream_inline_only() -> Result<()> {
973        let tmp = tempdir();
974        let repo = create_test_repo(&tmp.path().join("repo"))?;
975
976        let inline1 = generate_test_data(32, 0xAB);
977        let inline2 = generate_test_data(48, 0xCD);
978
979        let mut writer = repo.create_stream(0);
980        writer.write_inline(&inline1);
981        writer.write_inline(&inline2);
982        let stream_id = repo.write_stream(writer, "test-inline", None)?;
983
984        // Read it back via cat()
985        let mut reader = repo.open_stream("test-inline", Some(&stream_id), None)?;
986        let mut output = Vec::new();
987        reader.cat(&repo, &mut output)?;
988
989        let mut expected = inline1.clone();
990        expected.extend(&inline2);
991        assert_eq!(output, expected, "inline-only roundtrip must be exact");
992        Ok(())
993    }
994
995    #[test]
996    fn test_splitstream_large_external() -> Result<()> {
997        let tmp = tempdir();
998        let repo = create_test_repo(&tmp.path().join("repo"))?;
999
1000        // 128KB of data
1001        let large_content = generate_test_data(128 * 1024, 0x42);
1002
1003        // Compute expected fs-verity digest for this content
1004        let expected_digest: Sha256HashValue = compute_verity(&large_content);
1005
1006        let mut writer = repo.create_stream(0);
1007        writer.write_external(&large_content)?;
1008        let stream_id = repo.write_stream(writer, "test-external", None)?;
1009
1010        // Verify the object reference matches the expected digest
1011        let mut reader = repo.open_stream("test-external", Some(&stream_id), None)?;
1012        let mut refs = Vec::new();
1013        reader.get_object_refs(|id| refs.push(id.clone()))?;
1014        assert_eq!(refs.len(), 1);
1015        assert_eq!(
1016            refs[0], expected_digest,
1017            "external object must have correct fs-verity digest"
1018        );
1019
1020        // Verify roundtrip
1021        let mut reader = repo.open_stream("test-external", Some(&stream_id), None)?;
1022        let mut output = Vec::new();
1023        reader.cat(&repo, &mut output)?;
1024
1025        assert_eq!(output.len(), large_content.len());
1026        assert_eq!(
1027            output, large_content,
1028            "large external content must roundtrip exactly"
1029        );
1030        Ok(())
1031    }
1032
1033    #[test]
1034    fn test_splitstream_mixed_content() -> Result<()> {
1035        let tmp = tempdir();
1036        let repo = create_test_repo(&tmp.path().join("repo"))?;
1037
1038        // Simulate a tar-like structure: header (inline) + file content (external) + trailer
1039        let header = generate_test_data(512, 0x01);
1040        let file_content = generate_test_data(64 * 1024, 0x02);
1041        let trailer = generate_test_data(1024, 0x03);
1042
1043        // Compute expected digest for the external content
1044        let expected_digest: Sha256HashValue = compute_verity(&file_content);
1045
1046        let mut writer = repo.create_stream(0);
1047        writer.write_inline(&header);
1048        writer.write_external(&file_content)?;
1049        writer.write_inline(&trailer);
1050        let stream_id = repo.write_stream(writer, "test-mixed", None)?;
1051
1052        // Verify the external object has the correct digest
1053        let mut reader = repo.open_stream("test-mixed", Some(&stream_id), None)?;
1054        let mut refs = Vec::new();
1055        reader.get_object_refs(|id| refs.push(id.clone()))?;
1056        assert_eq!(refs.len(), 1);
1057        assert_eq!(
1058            refs[0], expected_digest,
1059            "external object must have correct fs-verity digest"
1060        );
1061
1062        // Verify roundtrip
1063        let mut reader = repo.open_stream("test-mixed", Some(&stream_id), None)?;
1064        let mut output = Vec::new();
1065        reader.cat(&repo, &mut output)?;
1066
1067        let mut expected = header.clone();
1068        expected.extend(&file_content);
1069        expected.extend(&trailer);
1070
1071        assert_eq!(output.len(), expected.len());
1072        assert_eq!(output, expected, "mixed content must roundtrip exactly");
1073        Ok(())
1074    }
1075
1076    #[test]
1077    fn test_splitstream_multiple_externals() -> Result<()> {
1078        let tmp = tempdir();
1079        let repo = create_test_repo(&tmp.path().join("repo"))?;
1080
1081        let file1 = generate_test_data(32 * 1024, 0x10);
1082        let file2 = generate_test_data(256 * 1024, 0x20);
1083        let file3 = generate_test_data(8 * 1024, 0x30);
1084        let separator = generate_test_data(64, 0xFF);
1085
1086        // Compute expected digests
1087        let expected_digest1: Sha256HashValue = compute_verity(&file1);
1088        let expected_digest2: Sha256HashValue = compute_verity(&file2);
1089        let expected_digest3: Sha256HashValue = compute_verity(&file3);
1090
1091        let mut writer = repo.create_stream(0);
1092        writer.write_external(&file1)?;
1093        writer.write_inline(&separator);
1094        writer.write_external(&file2)?;
1095        writer.write_inline(&separator);
1096        writer.write_external(&file3)?;
1097        let stream_id = repo.write_stream(writer, "test-multi", None)?;
1098
1099        // Verify the object references have correct digests
1100        let mut reader = repo.open_stream("test-multi", Some(&stream_id), None)?;
1101        let mut refs = Vec::new();
1102        reader.get_object_refs(|id| refs.push(id.clone()))?;
1103        assert_eq!(refs.len(), 3);
1104        assert_eq!(refs[0], expected_digest1, "file1 digest mismatch");
1105        assert_eq!(refs[1], expected_digest2, "file2 digest mismatch");
1106        assert_eq!(refs[2], expected_digest3, "file3 digest mismatch");
1107
1108        // Verify roundtrip
1109        let mut reader = repo.open_stream("test-multi", Some(&stream_id), None)?;
1110        let mut output = Vec::new();
1111        reader.cat(&repo, &mut output)?;
1112
1113        let mut expected = file1.clone();
1114        expected.extend(&separator);
1115        expected.extend(&file2);
1116        expected.extend(&separator);
1117        expected.extend(&file3);
1118
1119        assert_eq!(output.len(), expected.len());
1120        assert_eq!(
1121            output, expected,
1122            "multiple externals must roundtrip exactly"
1123        );
1124        Ok(())
1125    }
1126
1127    #[test]
1128    fn test_splitstream_deduplication() -> Result<()> {
1129        let tmp = tempdir();
1130        let repo = create_test_repo(&tmp.path().join("repo"))?;
1131
1132        // Same chunk appearing multiple times should be deduplicated
1133        let repeated_chunk = generate_test_data(64 * 1024, 0xDE);
1134        let unique_chunk = generate_test_data(32 * 1024, 0xAD);
1135
1136        // Compute expected digests
1137        let repeated_digest: Sha256HashValue = compute_verity(&repeated_chunk);
1138        let unique_digest: Sha256HashValue = compute_verity(&unique_chunk);
1139
1140        let mut writer = repo.create_stream(0);
1141        writer.write_external(&repeated_chunk)?;
1142        writer.write_external(&unique_chunk)?;
1143        writer.write_external(&repeated_chunk)?; // duplicate
1144        writer.write_external(&repeated_chunk)?; // another duplicate
1145        let stream_id = repo.write_stream(writer, "test-dedup", None)?;
1146
1147        // Verify deduplication: only 2 unique objects should be referenced
1148        let mut reader = repo.open_stream("test-dedup", Some(&stream_id), None)?;
1149        let mut refs = Vec::new();
1150        reader.get_object_refs(|id| refs.push(id.clone()))?;
1151        assert_eq!(refs.len(), 2, "should only have 2 unique object refs");
1152        assert_eq!(
1153            refs[0], repeated_digest,
1154            "first ref should be repeated chunk"
1155        );
1156        assert_eq!(refs[1], unique_digest, "second ref should be unique chunk");
1157
1158        // Verify roundtrip still works
1159        let mut reader = repo.open_stream("test-dedup", Some(&stream_id), None)?;
1160        let mut output = Vec::new();
1161        reader.cat(&repo, &mut output)?;
1162
1163        let mut expected = repeated_chunk.clone();
1164        expected.extend(&unique_chunk);
1165        expected.extend(&repeated_chunk);
1166        expected.extend(&repeated_chunk);
1167
1168        assert_eq!(output.len(), expected.len());
1169        assert_eq!(
1170            output, expected,
1171            "deduplicated content must still roundtrip exactly"
1172        );
1173        Ok(())
1174    }
1175
1176    #[test]
1177    fn test_splitstream_get_object_refs() -> Result<()> {
1178        let tmp = tempdir();
1179        let repo = create_test_repo(&tmp.path().join("repo"))?;
1180
1181        let chunk1 = generate_test_data(16 * 1024, 0x11);
1182        let chunk2 = generate_test_data(24 * 1024, 0x22);
1183        let inline_data = generate_test_data(128, 0x33);
1184
1185        // Compute expected digests
1186        let expected_digest1: Sha256HashValue = compute_verity(&chunk1);
1187        let expected_digest2: Sha256HashValue = compute_verity(&chunk2);
1188
1189        let mut writer = repo.create_stream(0);
1190        writer.write_inline(&inline_data);
1191        writer.write_external(&chunk1)?;
1192        writer.write_external(&chunk2)?;
1193        let stream_id = repo.write_stream(writer, "test-refs", None)?;
1194
1195        let mut reader = repo.open_stream("test-refs", Some(&stream_id), None)?;
1196
1197        let mut refs = Vec::new();
1198        reader.get_object_refs(|id| refs.push(id.clone()))?;
1199
1200        // Should have 2 external references with correct digests
1201        assert_eq!(refs.len(), 2);
1202        assert_eq!(refs[0], expected_digest1, "chunk1 digest mismatch");
1203        assert_eq!(refs[1], expected_digest2, "chunk2 digest mismatch");
1204
1205        // Verify content can be read back via the digests
1206        let obj1 = repo.read_object(&refs[0])?;
1207        let obj2 = repo.read_object(&refs[1])?;
1208
1209        assert_eq!(obj1, chunk1, "first external reference must match");
1210        assert_eq!(obj2, chunk2, "second external reference must match");
1211
1212        Ok(())
1213    }
1214
1215    #[test]
1216    fn test_splitstream_boundary_sizes() -> Result<()> {
1217        // Test with sizes around common boundaries (4KB page, 64KB chunk)
1218        let sizes = [4095, 4096, 4097, 65535, 65536, 65537];
1219
1220        for size in sizes {
1221            let tmp = tempdir();
1222            let repo = create_test_repo(&tmp.path().join("repo"))?;
1223            let data = generate_test_data(size, size as u8);
1224
1225            // Compute expected digest
1226            let expected_digest: Sha256HashValue = compute_verity(&data);
1227
1228            let mut writer = repo.create_stream(0);
1229            writer.write_external(&data)?;
1230            let stream_id = repo.write_stream(writer, "test-boundary", None)?;
1231
1232            // Verify the digest
1233            let mut reader = repo.open_stream("test-boundary", Some(&stream_id), None)?;
1234            let mut refs = Vec::new();
1235            reader.get_object_refs(|id| refs.push(id.clone()))?;
1236            assert_eq!(refs.len(), 1);
1237            assert_eq!(
1238                refs[0], expected_digest,
1239                "size {} must have correct digest",
1240                size
1241            );
1242
1243            // Verify roundtrip
1244            let mut reader = repo.open_stream("test-boundary", Some(&stream_id), None)?;
1245            let mut output = Vec::new();
1246            reader.cat(&repo, &mut output)?;
1247
1248            assert_eq!(
1249                output.len(),
1250                data.len(),
1251                "size {} must roundtrip with correct length",
1252                size
1253            );
1254            assert_eq!(output, data, "size {} must roundtrip exactly", size);
1255        }
1256
1257        Ok(())
1258    }
1259
1260    #[test]
1261    fn test_splitstream_content_type() -> Result<()> {
1262        let tmp = tempdir();
1263        let repo = create_test_repo(&tmp.path().join("repo"))?;
1264        let content_type = 0xDEADBEEF_u64;
1265
1266        let mut writer = repo.create_stream(content_type);
1267        writer.write_inline(b"test data");
1268        let stream_id = repo.write_stream(writer, "test-ctype", None)?;
1269
1270        let reader = repo.open_stream("test-ctype", Some(&stream_id), Some(content_type))?;
1271        assert_eq!(reader.content_type, content_type);
1272
1273        Ok(())
1274    }
1275
1276    #[test]
1277    fn test_splitstream_total_size_tracking() -> Result<()> {
1278        let tmp = tempdir();
1279        let repo = create_test_repo(&tmp.path().join("repo"))?;
1280
1281        let inline_data = generate_test_data(100, 0x01);
1282        let external_data = generate_test_data(1000, 0x02);
1283
1284        let mut writer = repo.create_stream(0);
1285        writer.write_inline(&inline_data);
1286        writer.write_external(&external_data)?;
1287        let stream_id = repo.write_stream(writer, "test-size", None)?;
1288
1289        let reader = repo.open_stream("test-size", Some(&stream_id), None)?;
1290        assert_eq!(reader.total_size, 1100, "total size should be tracked");
1291
1292        Ok(())
1293    }
1294}