1use std::{
13 collections::{BTreeMap, HashMap},
14 fs::File,
15 hash::Hash,
16 io::{BufRead, BufReader, Read, Seek, SeekFrom, Take, Write},
17 mem::size_of,
18 mem::MaybeUninit,
19 ops::Range,
20 sync::Arc,
21};
22
23use anyhow::{bail, ensure, Context, Error, Result};
24use rustix::{
25 buffer::spare_capacity,
26 io::{pread, read},
27};
28use tokio::task::JoinHandle;
29use zerocopy::{
30 little_endian::{I64, U16, U64},
31 FromBytes, Immutable, IntoBytes, KnownLayout,
32};
33use zstd::stream::{read::Decoder, write::Encoder};
34
35use crate::{fsverity::FsVerityHashValue, repository::Repository, util::read_exactish};
36
37const SPLITSTREAM_MAGIC: [u8; 11] = *b"SplitStream";
38const LG_BLOCKSIZE: u8 = 12; #[derive(Debug, Clone, Copy, FromBytes, Immutable, IntoBytes, KnownLayout)]
42struct FileRange {
43 start: U64,
44 end: U64,
45}
46
47#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)]
49struct SplitstreamHeader {
50 pub magic: [u8; 11], pub version: u8, pub _flags: U16, pub algorithm: u8, pub lg_blocksize: u8, pub info: FileRange, }
57
58#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)]
60struct SplitstreamInfo {
61 pub stream_refs: FileRange, pub object_refs: FileRange, pub stream: FileRange, pub named_refs: FileRange, pub content_type: U64, pub stream_size: U64, }
68
69impl FileRange {
70 fn len(&self) -> Result<u64> {
71 self.end
72 .get()
73 .checked_sub(self.start.get())
74 .context("Negative-sized range in splitstream")
75 }
76}
77
78impl From<Range<u64>> for FileRange {
79 fn from(value: Range<u64>) -> Self {
80 Self {
81 start: U64::from(value.start),
82 end: U64::from(value.end),
83 }
84 }
85}
86
87fn read_range(file: &mut File, range: FileRange) -> Result<Vec<u8>> {
88 let size: usize = (range.len()?.try_into())
89 .context("Unable to allocate buffer for implausibly large splitstream section")?;
90 let mut buffer = Vec::with_capacity(size);
91 if size > 0 {
92 pread(file, spare_capacity(&mut buffer), range.start.get())
93 .context("Unable to read section from splitstream file")?;
94 }
95 ensure!(
96 buffer.len() == size,
97 "Incomplete read from splitstream file"
98 );
99 Ok(buffer)
100}
101
102struct UniqueVec<T: Clone + Hash + Eq> {
108 items: Vec<T>,
109 index: HashMap<T, usize>,
110}
111
112impl<T: Clone + Hash + Eq + std::fmt::Debug> std::fmt::Debug for UniqueVec<T> {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 f.debug_struct("UniqueVec")
115 .field("items", &self.items)
116 .field("index", &self.index)
117 .finish()
118 }
119}
120
121impl<T: Clone + Hash + Eq + IntoBytes + Immutable> UniqueVec<T> {
122 fn as_bytes(&self) -> &[u8] {
123 self.items.as_bytes()
124 }
125}
126
127impl<T: Clone + Hash + Eq> UniqueVec<T> {
128 fn new() -> Self {
129 Self {
130 items: Vec::new(),
131 index: HashMap::new(),
132 }
133 }
134
135 fn get(&self, item: &T) -> Option<usize> {
136 self.index.get(item).copied()
137 }
138
139 fn ensure(&mut self, item: &T) -> usize {
140 self.get(item).unwrap_or_else(|| {
141 let idx = self.items.len();
142 self.index.insert(item.clone(), idx);
143 self.items.push(item.clone());
144 idx
145 })
146 }
147
148 fn get_by_index(&self, idx: usize) -> Option<&T> {
150 self.items.get(idx)
151 }
152}
153
154pub enum SplitStreamEntry<ObjectID: FsVerityHashValue> {
158 Inline(Vec<u8>),
160 External {
162 handle: JoinHandle<Result<ObjectID>>,
164 size: u64,
166 },
167}
168
169impl<ObjectID: FsVerityHashValue> std::fmt::Debug for SplitStreamEntry<ObjectID> {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 match self {
172 SplitStreamEntry::Inline(data) => {
173 f.debug_struct("Inline").field("len", &data.len()).finish()
174 }
175 SplitStreamEntry::External { size, .. } => f
176 .debug_struct("External")
177 .field("size", size)
178 .finish_non_exhaustive(),
179 }
180 }
181}
182
183pub struct SplitStreamBuilder<ObjectID: FsVerityHashValue> {
200 repo: Arc<Repository<ObjectID>>,
201 entries: Vec<SplitStreamEntry<ObjectID>>,
202 total_external_size: u64,
203 content_type: u64,
204 stream_refs: UniqueVec<ObjectID>,
205 named_refs: BTreeMap<Box<str>, usize>,
206}
207
208impl<ObjectID: FsVerityHashValue> std::fmt::Debug for SplitStreamBuilder<ObjectID> {
209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 f.debug_struct("SplitStreamBuilder")
211 .field("repo", &self.repo)
212 .field("entries", &self.entries)
213 .field("total_external_size", &self.total_external_size)
214 .field("content_type", &self.content_type)
215 .finish_non_exhaustive()
216 }
217}
218
219impl<ObjectID: FsVerityHashValue> SplitStreamBuilder<ObjectID> {
220 pub fn new(repo: Arc<Repository<ObjectID>>, content_type: u64) -> Self {
222 Self {
223 repo,
224 entries: Vec::new(),
225 total_external_size: 0,
226 content_type,
227 stream_refs: UniqueVec::new(),
228 named_refs: Default::default(),
229 }
230 }
231
232 pub fn push_inline(&mut self, data: &[u8]) {
236 if data.is_empty() {
237 return;
238 }
239 if let Some(SplitStreamEntry::Inline(ref mut existing)) = self.entries.last_mut() {
241 existing.extend_from_slice(data);
242 } else {
243 self.entries.push(SplitStreamEntry::Inline(data.to_vec()));
244 }
245 }
246
247 pub fn push_external(&mut self, handle: JoinHandle<Result<ObjectID>>, size: u64) {
251 self.total_external_size += size;
252 self.entries
253 .push(SplitStreamEntry::External { handle, size });
254 }
255
256 pub fn add_named_stream_ref(&mut self, name: &str, verity: &ObjectID) {
262 let idx = self.stream_refs.ensure(verity);
263 self.named_refs.insert(Box::from(name), idx);
264 }
265
266 pub async fn finish(self) -> Result<ObjectID> {
276 let mut resolved_entries: Vec<ResolvedEntry<ObjectID>> =
279 Vec::with_capacity(self.entries.len());
280
281 for entry in self.entries {
282 match entry {
283 SplitStreamEntry::Inline(data) => {
284 resolved_entries.push(ResolvedEntry::Inline(data));
285 }
286 SplitStreamEntry::External { handle, size } => {
287 let id = handle.await??;
288 resolved_entries.push(ResolvedEntry::External { id, size });
289 }
290 }
291 }
292
293 let mut writer = SplitStreamWriter::new(&self.repo, self.content_type);
296
297 for (name, idx) in &self.named_refs {
299 let verity = self
300 .stream_refs
301 .get_by_index(*idx)
302 .expect("named ref index out of bounds");
303 writer.add_named_stream_ref(name, verity);
304 }
305
306 for entry in resolved_entries {
310 match entry {
311 ResolvedEntry::Inline(data) => {
312 writer.write_inline(&data);
313 }
314 ResolvedEntry::External { id, size } => {
315 writer.add_external_size(size);
317 writer.write_reference(id)?;
318 }
319 }
320 }
321
322 tokio::task::spawn_blocking(move || writer.done()).await?
324 }
325}
326
327#[derive(Debug)]
329enum ResolvedEntry<ObjectID: FsVerityHashValue> {
330 Inline(Vec<u8>),
331 External { id: ObjectID, size: u64 },
332}
333
334pub struct SplitStreamWriter<ObjectId: FsVerityHashValue> {
336 repo: Arc<Repository<ObjectId>>,
337 stream_refs: UniqueVec<ObjectId>,
338 object_refs: UniqueVec<ObjectId>,
339 named_refs: BTreeMap<Box<str>, usize>, inline_buffer: Vec<u8>,
341 total_size: u64,
342 writer: Encoder<'static, Vec<u8>>,
343 content_type: u64,
344}
345
346impl<ObjectID: FsVerityHashValue> std::fmt::Debug for SplitStreamWriter<ObjectID> {
347 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348 f.debug_struct("SplitStreamWriter")
350 .field("repo", &self.repo)
351 .field("inline_content", &self.inline_buffer)
352 .finish()
353 }
354}
355
356impl<ObjectID: FsVerityHashValue> SplitStreamWriter<ObjectID> {
357 pub fn new(repo: &Arc<Repository<ObjectID>>, content_type: u64) -> Self {
359 let writer = Encoder::new(vec![], 0).unwrap();
361
362 Self {
363 repo: Arc::clone(repo),
364 content_type,
365 inline_buffer: vec![],
366 stream_refs: UniqueVec::new(),
367 object_refs: UniqueVec::new(),
368 named_refs: Default::default(),
369 total_size: 0,
370 writer,
371 }
372 }
373
374 pub fn add_object_ref(&mut self, verity: &ObjectID) -> usize {
385 self.object_refs.ensure(verity)
386 }
387
388 pub fn lookup_object_ref(&self, verity: &ObjectID) -> Option<usize> {
393 self.object_refs.get(verity)
394 }
395
396 pub fn add_named_stream_ref(&mut self, name: &str, verity: &ObjectID) {
405 let idx = self.stream_refs.ensure(verity);
406 self.named_refs.insert(Box::from(name), idx);
407 }
408
409 fn flush_inline(&mut self) -> Result<()> {
411 let size = self.inline_buffer.len();
412 if size > 0 {
413 let instruction = -i64::try_from(size).expect("implausibly large inline chunk");
416 self.writer.write_all(I64::new(instruction).as_bytes())?;
417 self.writer.write_all(&self.inline_buffer)?;
418 self.inline_buffer.clear();
419 }
420 Ok(())
421 }
422
423 pub fn write_inline(&mut self, data: &[u8]) {
425 self.total_size += data.len() as u64;
427 self.inline_buffer.extend(data);
428 }
429
430 pub fn add_external_size(&mut self, size: u64) {
435 self.total_size += size;
436 }
437
438 pub fn write_reference(&mut self, id: ObjectID) -> Result<()> {
445 self.flush_inline()?;
447
448 let index = self.add_object_ref(&id);
450 let instruction = i64::try_from(index).expect("implausibly large external index");
451 self.writer.write_all(I64::from(instruction).as_bytes())?;
452 Ok(())
453 }
454
455 pub fn write_external(&mut self, data: &[u8]) -> Result<()> {
459 self.total_size += data.len() as u64;
460 let id = self.repo.ensure_object(data)?;
461 self.write_reference(id)
462 }
463
464 pub async fn write_external_async(&mut self, data: Vec<u8>) -> Result<()> {
469 self.total_size += data.len() as u64;
470 let id = self.repo.ensure_object_async(data).await?;
471 self.write_reference(id)
472 }
473
474 fn write_named_refs(named_refs: BTreeMap<Box<str>, usize>) -> Result<Vec<u8>> {
475 let mut encoder = Encoder::new(vec![], 0)?;
476
477 for (name, idx) in &named_refs {
478 write!(&mut encoder, "{idx}:{name}\0")?;
479 }
480
481 Ok(encoder.finish()?)
482 }
483
484 pub fn done(mut self) -> Result<ObjectID> {
489 self.flush_inline()?;
490 let stream = self.writer.finish()?;
491
492 let header_start = 0u64;
494 let header_end = header_start + size_of::<SplitstreamHeader>() as u64;
495
496 let info_start = header_end;
497 let info_end = info_start + size_of::<SplitstreamInfo>() as u64;
498 assert_eq!(info_start % 8, 0);
499
500 let stream_refs_size = self.stream_refs.as_bytes().len();
501 let stream_refs_start = info_end;
502 let stream_refs_end = stream_refs_start + stream_refs_size as u64;
503 assert_eq!(stream_refs_start % 8, 0);
504
505 let object_refs_size = self.object_refs.as_bytes().len();
506 let object_refs_start = stream_refs_end;
507 let object_refs_end = object_refs_start + object_refs_size as u64;
508 assert_eq!(object_refs_start % 8, 0);
509
510 let named_refs =
511 Self::write_named_refs(self.named_refs).context("Formatting named references")?;
512 let named_refs_start = object_refs_end;
513 let named_refs_end = named_refs_start + named_refs.len() as u64;
514 assert_eq!(named_refs_start % 8, 0);
515
516 let stream_start = named_refs_end;
517 let stream_end = stream_start + stream.len() as u64;
518
519 let mut buf = vec![];
521
522 assert_eq!(buf.len() as u64, header_start);
523 buf.extend_from_slice(
524 SplitstreamHeader {
525 magic: SPLITSTREAM_MAGIC,
526 version: 0,
527 _flags: U16::ZERO,
528 algorithm: ObjectID::ALGORITHM,
529 lg_blocksize: LG_BLOCKSIZE,
530 info: (info_start..info_end).into(),
531 }
532 .as_bytes(),
533 );
534 assert_eq!(buf.len() as u64, header_end);
535
536 assert_eq!(buf.len() as u64, info_start);
537 buf.extend_from_slice(
538 SplitstreamInfo {
539 stream_refs: (stream_refs_start..stream_refs_end).into(),
540 object_refs: (object_refs_start..object_refs_end).into(),
541 stream: (stream_start..stream_end).into(),
542 named_refs: (named_refs_start..named_refs_end).into(),
543 content_type: self.content_type.into(),
544 stream_size: self.total_size.into(),
545 }
546 .as_bytes(),
547 );
548 assert_eq!(buf.len() as u64, info_end);
549
550 assert_eq!(buf.len() as u64, stream_refs_start);
551 buf.extend_from_slice(self.stream_refs.as_bytes());
552 assert_eq!(buf.len() as u64, stream_refs_end);
553
554 assert_eq!(buf.len() as u64, object_refs_start);
555 buf.extend_from_slice(self.object_refs.as_bytes());
556 assert_eq!(buf.len() as u64, object_refs_end);
557
558 assert_eq!(buf.len() as u64, named_refs_start);
559 buf.extend_from_slice(&named_refs);
560 assert_eq!(buf.len() as u64, named_refs_end);
561
562 assert_eq!(buf.len() as u64, stream_start);
563 buf.extend_from_slice(&stream);
564 assert_eq!(buf.len() as u64, stream_end);
565
566 self.repo.ensure_object(&buf)
568 }
569
570 pub async fn done_async(self) -> Result<ObjectID> {
577 tokio::task::spawn_blocking(move || self.done()).await?
578 }
579}
580
581#[derive(Debug)]
583pub enum SplitStreamData<ObjectID: FsVerityHashValue> {
584 Inline(Box<[u8]>),
586 External(ObjectID),
588}
589
590pub struct SplitStreamReader<ObjectID: FsVerityHashValue> {
592 decoder: Decoder<'static, BufReader<Take<File>>>,
593 inline_bytes: usize,
594 pub content_type: u64,
596 pub total_size: u64,
598 object_refs: Vec<ObjectID>,
599 named_refs: HashMap<Box<str>, ObjectID>,
600}
601
602impl<ObjectID: FsVerityHashValue> std::fmt::Debug for SplitStreamReader<ObjectID> {
603 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
604 f.debug_struct("SplitStreamReader")
606 .field("refs", &self.object_refs)
607 .field("inline_bytes", &self.inline_bytes)
608 .finish()
609 }
610}
611
612fn read_into_vec(reader: &mut impl Read, vec: &mut Vec<u8>, size: usize) -> Result<()> {
617 vec.resize(size, 0u8);
618 reader.read_exact(vec.as_mut_slice())?;
619 Ok(())
620}
621
622enum ChunkType<ObjectID: FsVerityHashValue> {
623 Eof,
624 Inline,
625 External(ObjectID),
626}
627
628impl<ObjectID: FsVerityHashValue> SplitStreamReader<ObjectID> {
629 pub fn new(mut file: File, expected_content_type: Option<u64>) -> Result<Self> {
633 let header = SplitstreamHeader::read_from_io(&mut file)
634 .map_err(|e| Error::msg(format!("Error reading splitstream header: {e:?}")))?;
635
636 if header.magic != SPLITSTREAM_MAGIC {
637 bail!("Invalid splitstream header magic value");
638 }
639
640 if header.version != 0 {
641 bail!("Invalid splitstream version {}", header.version);
642 }
643
644 if header.algorithm != ObjectID::ALGORITHM {
645 bail!("Invalid splitstream fs-verity algorithm type");
646 }
647
648 if header.lg_blocksize != LG_BLOCKSIZE {
649 bail!("Invalid splitstream fs-verity block size");
650 }
651
652 let info_bytes = read_range(&mut file, header.info)?;
653 let (info, _) = SplitstreamInfo::ref_from_prefix(&info_bytes)
658 .map_err(|e| Error::msg(format!("Error reading splitstream metadata: {e:?}")))?;
659
660 let content_type: u64 = info.content_type.into();
661 if let Some(expected) = expected_content_type {
662 ensure!(content_type == expected, "Invalid splitstream content type");
663 }
664
665 let total_size: u64 = info.stream_size.into();
666
667 let stream_refs_bytes = read_range(&mut file, info.stream_refs)?;
668 let stream_refs = <[ObjectID]>::ref_from_bytes(&stream_refs_bytes)
669 .map_err(|e| Error::msg(format!("Error reading splitstream references: {e:?}")))?;
670
671 let object_refs_bytes = read_range(&mut file, info.object_refs)?;
672 let object_refs = <[ObjectID]>::ref_from_bytes(&object_refs_bytes)
673 .map_err(|e| Error::msg(format!("Error reading object references: {e:?}")))?;
674
675 let named_refs_bytes = read_range(&mut file, info.named_refs)?;
676 let named_refs = Self::read_named_references(&named_refs_bytes, stream_refs)
677 .map_err(|e| Error::msg(format!("Error reading splitstream mappings: {e:?}")))?;
678
679 file.seek(SeekFrom::Start(info.stream.start.get()))
680 .context("Unable to seek to start of splitstream content")?;
681 let decoder = Decoder::new(file.take(info.stream.len()?))
682 .context("Unable to decode zstd-compressed content in splitstream")?;
683
684 Ok(Self {
685 decoder,
686 inline_bytes: 0,
687 content_type,
688 total_size,
689 object_refs: object_refs.to_vec(),
690 named_refs,
691 })
692 }
693
694 fn read_named_references<ObjectId: FsVerityHashValue>(
695 section: &[u8],
696 references: &[ObjectId],
697 ) -> Result<HashMap<Box<str>, ObjectId>> {
698 let mut map = HashMap::new();
699 let mut buffer = vec![];
700
701 let mut reader = BufReader::new(
702 Decoder::new(section).context("Creating zstd decoder for named references section")?,
703 );
704
705 loop {
706 reader
707 .read_until(b'\0', &mut buffer)
708 .context("Reading named references section")?;
709
710 let Some(item) = buffer.strip_suffix(b"\0") else {
711 ensure!(
712 buffer.is_empty(),
713 "Trailing junk in named references section"
714 );
715 return Ok(map);
716 };
717
718 let (idx_str, name) = std::str::from_utf8(item)
719 .context("Reading named references section")?
720 .split_once(":")
721 .context("Named reference doesn't contain a colon")?;
722
723 let idx: usize = idx_str
724 .parse()
725 .context("Named reference contains a non-integer index")?;
726 let object_id = references
727 .get(idx)
728 .context("Named reference out of bounds")?;
729
730 map.insert(Box::from(name), object_id.clone());
731 buffer.clear();
732 }
733 }
734
735 pub fn iter_named_refs(&self) -> impl Iterator<Item = (&str, &ObjectID)> {
737 self.named_refs.iter().map(|(name, id)| (name.as_ref(), id))
738 }
739
740 pub fn into_named_refs(self) -> HashMap<Box<str>, ObjectID> {
742 self.named_refs
743 }
744
745 fn ensure_chunk(
746 &mut self,
747 eof_ok: bool,
748 ext_ok: bool,
749 expected_bytes: usize,
750 ) -> Result<ChunkType<ObjectID>> {
751 if self.inline_bytes == 0 {
752 let mut value = I64::ZERO;
753
754 if !read_exactish(&mut self.decoder, value.as_mut_bytes())? {
755 ensure!(eof_ok, "Unexpected EOF in splitstream");
756 return Ok(ChunkType::Eof);
757 }
758
759 match value.get() {
762 n if n < 0i64 => {
763 self.inline_bytes = (n.unsigned_abs().try_into())
764 .context("Splitstream inline section is too large")?;
765 }
766 n => {
767 ensure!(ext_ok, "Unexpected external reference in splitstream");
768 let idx = usize::try_from(n)
769 .context("Splitstream external reference is too large")?;
770 let id: &ObjectID = (self.object_refs.get(idx))
771 .context("Splitstream external reference is out of range")?;
772 return Ok(ChunkType::External(id.clone()));
773 }
774 }
775 }
776
777 if self.inline_bytes < expected_bytes {
778 bail!("Unexpectedly small inline content when parsing splitstream");
779 }
780
781 Ok(ChunkType::Inline)
782 }
783
784 pub fn read_inline_exact(&mut self, buffer: &mut [u8]) -> Result<bool> {
787 if let ChunkType::Inline = self.ensure_chunk(true, false, buffer.len())? {
788 self.inline_bytes -= buffer.len();
790 self.decoder.read_exact(buffer)?;
791 Ok(true)
792 } else {
793 Ok(false)
794 }
795 }
796
797 fn discard_padding(&mut self, size: usize) -> Result<()> {
798 let mut buf = [0u8; 512];
799 assert!(size <= 512);
800 self.ensure_chunk(false, false, size)?;
801 self.decoder.read_exact(&mut buf[0..size])?;
802 self.inline_bytes -= size;
803 Ok(())
804 }
805
806 pub fn read_exact(
812 &mut self,
813 actual_size: usize,
814 stored_size: usize,
815 ) -> Result<SplitStreamData<ObjectID>> {
816 if let ChunkType::External(id) = self.ensure_chunk(false, true, stored_size)? {
817 if actual_size < stored_size {
819 self.discard_padding(stored_size - actual_size)?;
820 }
821 Ok(SplitStreamData::External(id))
822 } else {
823 let mut content = vec![];
824 read_into_vec(&mut self.decoder, &mut content, stored_size)?;
825 content.truncate(actual_size);
826 self.inline_bytes -= stored_size;
827 Ok(SplitStreamData::Inline(content.into()))
828 }
829 }
830
831 pub fn cat(&mut self, repo: &Repository<ObjectID>, output: &mut impl Write) -> Result<()> {
836 let mut buffer = vec![];
837
838 loop {
839 match self.ensure_chunk(true, true, 0)? {
840 ChunkType::Eof => break Ok(()),
841 ChunkType::Inline => {
842 read_into_vec(&mut self.decoder, &mut buffer, self.inline_bytes)?;
843 self.inline_bytes = 0;
844 output.write_all(&buffer)?;
845 }
846 ChunkType::External(ref id) => {
847 let mut buffer = [MaybeUninit::<u8>::uninit(); 1024 * 1024];
848 let fd = repo.open_object(id)?;
849
850 loop {
851 let (result, _) = read(&fd, &mut buffer)?;
852 if result.is_empty() {
853 break;
854 }
855 output.write_all(result)?;
856 }
857 }
858 }
859 }
860 }
861
862 pub fn get_object_refs(&mut self, mut callback: impl FnMut(&ObjectID)) -> Result<()> {
866 for entry in &self.object_refs {
867 callback(entry);
868 }
869 Ok(())
870 }
871
872 pub fn lookup_named_ref(&self, name: &str) -> Option<&ObjectID> {
876 self.named_refs.get(name)
877 }
878}
879
880impl<ObjectID: FsVerityHashValue> Read for SplitStreamReader<ObjectID> {
881 fn read(&mut self, data: &mut [u8]) -> std::io::Result<usize> {
882 match self.ensure_chunk(true, false, 1) {
883 Ok(ChunkType::Eof) => Ok(0),
884 Ok(ChunkType::Inline) => {
885 let n_bytes = std::cmp::min(data.len(), self.inline_bytes);
886 self.decoder.read_exact(&mut data[0..n_bytes])?;
887 self.inline_bytes -= n_bytes;
888 Ok(n_bytes)
889 }
890 Ok(ChunkType::External(..)) => unreachable!(),
891 Err(e) => Err(std::io::Error::other(e)),
892 }
893 }
894}
895
896#[cfg(test)]
897mod tests {
898 use super::*;
899 use crate::fsverity::{compute_verity, Sha256HashValue};
900 use crate::test::tempdir;
901 use rustix::fs::{mkdirat, Mode, CWD};
902 use std::io::Cursor;
903 use std::path::Path;
904
905 #[test]
906 fn test_read_into_vec() -> Result<()> {
907 let mut reader = Cursor::new(vec![]);
909 let mut vec = Vec::new();
910 let result = read_into_vec(&mut reader, &mut vec, 0);
911 assert!(result.is_ok());
912 assert_eq!(vec.len(), 0);
913
914 let mut reader = Cursor::new(vec![1, 2, 3, 4, 5]);
916 let mut vec = Vec::new();
917 let result = read_into_vec(&mut reader, &mut vec, 3);
918 assert!(result.is_ok());
919 assert_eq!(vec.len(), 3);
920 assert_eq!(vec, vec![1, 2, 3]);
921
922 let mut reader = Cursor::new(vec![1, 2, 3]);
924 let mut vec = Vec::new();
925 let result = read_into_vec(&mut reader, &mut vec, 5);
926 assert!(result.is_err());
927
928 let mut reader = Cursor::new(vec![1, 2, 3]);
930 let mut vec = Vec::new();
931 let result = read_into_vec(&mut reader, &mut vec, 3);
932 assert!(result.is_ok());
933 assert_eq!(vec.len(), 3);
934 assert_eq!(vec, vec![1, 2, 3]);
935
936 let mut reader = Cursor::new(vec![1, 2, 3, 4, 5]);
938 let mut vec = Vec::with_capacity(10);
939 let result = read_into_vec(&mut reader, &mut vec, 4);
940 assert!(result.is_ok());
941 assert_eq!(vec.len(), 4);
942 assert_eq!(vec, vec![1, 2, 3, 4]);
943 assert_eq!(vec.capacity(), 10);
944
945 let mut reader = Cursor::new(vec![1, 2, 3]);
947 let mut vec = vec![9, 9, 9];
948 let result = read_into_vec(&mut reader, &mut vec, 2);
949 assert!(result.is_ok());
950 assert_eq!(vec.len(), 2);
951 assert_eq!(vec, vec![1, 2]);
952
953 Ok(())
954 }
955
956 fn create_test_repo(path: &Path) -> Result<Arc<Repository<Sha256HashValue>>> {
958 mkdirat(CWD, path, Mode::from_raw_mode(0o755))?;
959 let mut repo = Repository::open_path(CWD, path)?;
960 repo.set_insecure(true);
961 Ok(Arc::new(repo))
962 }
963
964 fn generate_test_data(size: usize, seed: u8) -> Vec<u8> {
966 (0..size)
967 .map(|i| ((i as u8).wrapping_add(seed)).wrapping_mul(17))
968 .collect()
969 }
970
971 #[test]
972 fn test_splitstream_inline_only() -> Result<()> {
973 let tmp = tempdir();
974 let repo = create_test_repo(&tmp.path().join("repo"))?;
975
976 let inline1 = generate_test_data(32, 0xAB);
977 let inline2 = generate_test_data(48, 0xCD);
978
979 let mut writer = repo.create_stream(0);
980 writer.write_inline(&inline1);
981 writer.write_inline(&inline2);
982 let stream_id = repo.write_stream(writer, "test-inline", None)?;
983
984 let mut reader = repo.open_stream("test-inline", Some(&stream_id), None)?;
986 let mut output = Vec::new();
987 reader.cat(&repo, &mut output)?;
988
989 let mut expected = inline1.clone();
990 expected.extend(&inline2);
991 assert_eq!(output, expected, "inline-only roundtrip must be exact");
992 Ok(())
993 }
994
995 #[test]
996 fn test_splitstream_large_external() -> Result<()> {
997 let tmp = tempdir();
998 let repo = create_test_repo(&tmp.path().join("repo"))?;
999
1000 let large_content = generate_test_data(128 * 1024, 0x42);
1002
1003 let expected_digest: Sha256HashValue = compute_verity(&large_content);
1005
1006 let mut writer = repo.create_stream(0);
1007 writer.write_external(&large_content)?;
1008 let stream_id = repo.write_stream(writer, "test-external", None)?;
1009
1010 let mut reader = repo.open_stream("test-external", Some(&stream_id), None)?;
1012 let mut refs = Vec::new();
1013 reader.get_object_refs(|id| refs.push(id.clone()))?;
1014 assert_eq!(refs.len(), 1);
1015 assert_eq!(
1016 refs[0], expected_digest,
1017 "external object must have correct fs-verity digest"
1018 );
1019
1020 let mut reader = repo.open_stream("test-external", Some(&stream_id), None)?;
1022 let mut output = Vec::new();
1023 reader.cat(&repo, &mut output)?;
1024
1025 assert_eq!(output.len(), large_content.len());
1026 assert_eq!(
1027 output, large_content,
1028 "large external content must roundtrip exactly"
1029 );
1030 Ok(())
1031 }
1032
1033 #[test]
1034 fn test_splitstream_mixed_content() -> Result<()> {
1035 let tmp = tempdir();
1036 let repo = create_test_repo(&tmp.path().join("repo"))?;
1037
1038 let header = generate_test_data(512, 0x01);
1040 let file_content = generate_test_data(64 * 1024, 0x02);
1041 let trailer = generate_test_data(1024, 0x03);
1042
1043 let expected_digest: Sha256HashValue = compute_verity(&file_content);
1045
1046 let mut writer = repo.create_stream(0);
1047 writer.write_inline(&header);
1048 writer.write_external(&file_content)?;
1049 writer.write_inline(&trailer);
1050 let stream_id = repo.write_stream(writer, "test-mixed", None)?;
1051
1052 let mut reader = repo.open_stream("test-mixed", Some(&stream_id), None)?;
1054 let mut refs = Vec::new();
1055 reader.get_object_refs(|id| refs.push(id.clone()))?;
1056 assert_eq!(refs.len(), 1);
1057 assert_eq!(
1058 refs[0], expected_digest,
1059 "external object must have correct fs-verity digest"
1060 );
1061
1062 let mut reader = repo.open_stream("test-mixed", Some(&stream_id), None)?;
1064 let mut output = Vec::new();
1065 reader.cat(&repo, &mut output)?;
1066
1067 let mut expected = header.clone();
1068 expected.extend(&file_content);
1069 expected.extend(&trailer);
1070
1071 assert_eq!(output.len(), expected.len());
1072 assert_eq!(output, expected, "mixed content must roundtrip exactly");
1073 Ok(())
1074 }
1075
1076 #[test]
1077 fn test_splitstream_multiple_externals() -> Result<()> {
1078 let tmp = tempdir();
1079 let repo = create_test_repo(&tmp.path().join("repo"))?;
1080
1081 let file1 = generate_test_data(32 * 1024, 0x10);
1082 let file2 = generate_test_data(256 * 1024, 0x20);
1083 let file3 = generate_test_data(8 * 1024, 0x30);
1084 let separator = generate_test_data(64, 0xFF);
1085
1086 let expected_digest1: Sha256HashValue = compute_verity(&file1);
1088 let expected_digest2: Sha256HashValue = compute_verity(&file2);
1089 let expected_digest3: Sha256HashValue = compute_verity(&file3);
1090
1091 let mut writer = repo.create_stream(0);
1092 writer.write_external(&file1)?;
1093 writer.write_inline(&separator);
1094 writer.write_external(&file2)?;
1095 writer.write_inline(&separator);
1096 writer.write_external(&file3)?;
1097 let stream_id = repo.write_stream(writer, "test-multi", None)?;
1098
1099 let mut reader = repo.open_stream("test-multi", Some(&stream_id), None)?;
1101 let mut refs = Vec::new();
1102 reader.get_object_refs(|id| refs.push(id.clone()))?;
1103 assert_eq!(refs.len(), 3);
1104 assert_eq!(refs[0], expected_digest1, "file1 digest mismatch");
1105 assert_eq!(refs[1], expected_digest2, "file2 digest mismatch");
1106 assert_eq!(refs[2], expected_digest3, "file3 digest mismatch");
1107
1108 let mut reader = repo.open_stream("test-multi", Some(&stream_id), None)?;
1110 let mut output = Vec::new();
1111 reader.cat(&repo, &mut output)?;
1112
1113 let mut expected = file1.clone();
1114 expected.extend(&separator);
1115 expected.extend(&file2);
1116 expected.extend(&separator);
1117 expected.extend(&file3);
1118
1119 assert_eq!(output.len(), expected.len());
1120 assert_eq!(
1121 output, expected,
1122 "multiple externals must roundtrip exactly"
1123 );
1124 Ok(())
1125 }
1126
1127 #[test]
1128 fn test_splitstream_deduplication() -> Result<()> {
1129 let tmp = tempdir();
1130 let repo = create_test_repo(&tmp.path().join("repo"))?;
1131
1132 let repeated_chunk = generate_test_data(64 * 1024, 0xDE);
1134 let unique_chunk = generate_test_data(32 * 1024, 0xAD);
1135
1136 let repeated_digest: Sha256HashValue = compute_verity(&repeated_chunk);
1138 let unique_digest: Sha256HashValue = compute_verity(&unique_chunk);
1139
1140 let mut writer = repo.create_stream(0);
1141 writer.write_external(&repeated_chunk)?;
1142 writer.write_external(&unique_chunk)?;
1143 writer.write_external(&repeated_chunk)?; writer.write_external(&repeated_chunk)?; let stream_id = repo.write_stream(writer, "test-dedup", None)?;
1146
1147 let mut reader = repo.open_stream("test-dedup", Some(&stream_id), None)?;
1149 let mut refs = Vec::new();
1150 reader.get_object_refs(|id| refs.push(id.clone()))?;
1151 assert_eq!(refs.len(), 2, "should only have 2 unique object refs");
1152 assert_eq!(
1153 refs[0], repeated_digest,
1154 "first ref should be repeated chunk"
1155 );
1156 assert_eq!(refs[1], unique_digest, "second ref should be unique chunk");
1157
1158 let mut reader = repo.open_stream("test-dedup", Some(&stream_id), None)?;
1160 let mut output = Vec::new();
1161 reader.cat(&repo, &mut output)?;
1162
1163 let mut expected = repeated_chunk.clone();
1164 expected.extend(&unique_chunk);
1165 expected.extend(&repeated_chunk);
1166 expected.extend(&repeated_chunk);
1167
1168 assert_eq!(output.len(), expected.len());
1169 assert_eq!(
1170 output, expected,
1171 "deduplicated content must still roundtrip exactly"
1172 );
1173 Ok(())
1174 }
1175
1176 #[test]
1177 fn test_splitstream_get_object_refs() -> Result<()> {
1178 let tmp = tempdir();
1179 let repo = create_test_repo(&tmp.path().join("repo"))?;
1180
1181 let chunk1 = generate_test_data(16 * 1024, 0x11);
1182 let chunk2 = generate_test_data(24 * 1024, 0x22);
1183 let inline_data = generate_test_data(128, 0x33);
1184
1185 let expected_digest1: Sha256HashValue = compute_verity(&chunk1);
1187 let expected_digest2: Sha256HashValue = compute_verity(&chunk2);
1188
1189 let mut writer = repo.create_stream(0);
1190 writer.write_inline(&inline_data);
1191 writer.write_external(&chunk1)?;
1192 writer.write_external(&chunk2)?;
1193 let stream_id = repo.write_stream(writer, "test-refs", None)?;
1194
1195 let mut reader = repo.open_stream("test-refs", Some(&stream_id), None)?;
1196
1197 let mut refs = Vec::new();
1198 reader.get_object_refs(|id| refs.push(id.clone()))?;
1199
1200 assert_eq!(refs.len(), 2);
1202 assert_eq!(refs[0], expected_digest1, "chunk1 digest mismatch");
1203 assert_eq!(refs[1], expected_digest2, "chunk2 digest mismatch");
1204
1205 let obj1 = repo.read_object(&refs[0])?;
1207 let obj2 = repo.read_object(&refs[1])?;
1208
1209 assert_eq!(obj1, chunk1, "first external reference must match");
1210 assert_eq!(obj2, chunk2, "second external reference must match");
1211
1212 Ok(())
1213 }
1214
1215 #[test]
1216 fn test_splitstream_boundary_sizes() -> Result<()> {
1217 let sizes = [4095, 4096, 4097, 65535, 65536, 65537];
1219
1220 for size in sizes {
1221 let tmp = tempdir();
1222 let repo = create_test_repo(&tmp.path().join("repo"))?;
1223 let data = generate_test_data(size, size as u8);
1224
1225 let expected_digest: Sha256HashValue = compute_verity(&data);
1227
1228 let mut writer = repo.create_stream(0);
1229 writer.write_external(&data)?;
1230 let stream_id = repo.write_stream(writer, "test-boundary", None)?;
1231
1232 let mut reader = repo.open_stream("test-boundary", Some(&stream_id), None)?;
1234 let mut refs = Vec::new();
1235 reader.get_object_refs(|id| refs.push(id.clone()))?;
1236 assert_eq!(refs.len(), 1);
1237 assert_eq!(
1238 refs[0], expected_digest,
1239 "size {} must have correct digest",
1240 size
1241 );
1242
1243 let mut reader = repo.open_stream("test-boundary", Some(&stream_id), None)?;
1245 let mut output = Vec::new();
1246 reader.cat(&repo, &mut output)?;
1247
1248 assert_eq!(
1249 output.len(),
1250 data.len(),
1251 "size {} must roundtrip with correct length",
1252 size
1253 );
1254 assert_eq!(output, data, "size {} must roundtrip exactly", size);
1255 }
1256
1257 Ok(())
1258 }
1259
1260 #[test]
1261 fn test_splitstream_content_type() -> Result<()> {
1262 let tmp = tempdir();
1263 let repo = create_test_repo(&tmp.path().join("repo"))?;
1264 let content_type = 0xDEADBEEF_u64;
1265
1266 let mut writer = repo.create_stream(content_type);
1267 writer.write_inline(b"test data");
1268 let stream_id = repo.write_stream(writer, "test-ctype", None)?;
1269
1270 let reader = repo.open_stream("test-ctype", Some(&stream_id), Some(content_type))?;
1271 assert_eq!(reader.content_type, content_type);
1272
1273 Ok(())
1274 }
1275
1276 #[test]
1277 fn test_splitstream_total_size_tracking() -> Result<()> {
1278 let tmp = tempdir();
1279 let repo = create_test_repo(&tmp.path().join("repo"))?;
1280
1281 let inline_data = generate_test_data(100, 0x01);
1282 let external_data = generate_test_data(1000, 0x02);
1283
1284 let mut writer = repo.create_stream(0);
1285 writer.write_inline(&inline_data);
1286 writer.write_external(&external_data)?;
1287 let stream_id = repo.write_stream(writer, "test-size", None)?;
1288
1289 let reader = repo.open_stream("test-size", Some(&stream_id), None)?;
1290 assert_eq!(reader.total_size, 1100, "total size should be tracked");
1291
1292 Ok(())
1293 }
1294}