composefs/
repository.rs

1//! Content-addressable repository for composefs objects.
2//!
3//! This module provides a repository abstraction for storing and retrieving
4//! content-addressed objects, splitstreams, and images with fs-verity
5//! verification and garbage collection support.
6
7use std::{
8    collections::HashSet,
9    ffi::CStr,
10    fs::{canonicalize, File},
11    io::{Read, Write},
12    os::fd::{AsFd, OwnedFd},
13    path::{Path, PathBuf},
14    sync::Arc,
15    thread::available_parallelism,
16};
17
18use tokio::sync::Semaphore;
19
20use anyhow::{bail, ensure, Context, Result};
21use once_cell::sync::OnceCell;
22use rustix::{
23    fs::{
24        flock, linkat, mkdirat, open, openat, readlinkat, statat, syncfs, AtFlags, Dir, FileType,
25        FlockOperation, Mode, OFlags, CWD,
26    },
27    io::{Errno, Result as ErrnoResult},
28};
29
30use crate::{
31    fsverity::{
32        compute_verity, enable_verity_maybe_copy, ensure_verity_equal, measure_verity,
33        CompareVerityError, EnableVerityError, FsVerityHashValue, FsVerityHasher,
34        MeasureVerityError,
35    },
36    mount::{composefs_fsmount, mount_at},
37    splitstream::{SplitStreamReader, SplitStreamWriter},
38    util::{proc_self_fd, replace_symlinkat, ErrnoFilter},
39};
40
41/// Call openat() on the named subdirectory of "dirfd", possibly creating it first.
42///
43/// We assume that the directory will probably exist (ie: we try the open first), and on ENOENT, we
44/// mkdirat() and retry.
45fn ensure_dir_and_openat(dirfd: impl AsFd, filename: &str, flags: OFlags) -> ErrnoResult<OwnedFd> {
46    match openat(
47        &dirfd,
48        filename,
49        flags | OFlags::CLOEXEC | OFlags::DIRECTORY,
50        0o666.into(),
51    ) {
52        Ok(file) => Ok(file),
53        Err(Errno::NOENT) => match mkdirat(&dirfd, filename, 0o777.into()) {
54            Ok(()) | Err(Errno::EXIST) => openat(
55                dirfd,
56                filename,
57                flags | OFlags::CLOEXEC | OFlags::DIRECTORY,
58                0o666.into(),
59            ),
60            Err(other) => Err(other),
61        },
62        Err(other) => Err(other),
63    }
64}
65
66/// A content-addressable repository for composefs objects.
67///
68/// Stores content-addressed objects, splitstreams, and images with fsverity
69/// verification. Objects are stored by their fsverity digest, streams by SHA256
70/// content hash, and both support named references for persistence across
71/// garbage collection.
72pub struct Repository<ObjectID: FsVerityHashValue> {
73    repository: OwnedFd,
74    objects: OnceCell<OwnedFd>,
75    write_semaphore: OnceCell<Arc<Semaphore>>,
76    insecure: bool,
77    _data: std::marker::PhantomData<ObjectID>,
78}
79
80impl<ObjectID: FsVerityHashValue> std::fmt::Debug for Repository<ObjectID> {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        f.debug_struct("Repository")
83            .field("repository", &self.repository)
84            .field("objects", &self.objects)
85            .field("insecure", &self.insecure)
86            .finish_non_exhaustive()
87    }
88}
89
90impl<ObjectID: FsVerityHashValue> Drop for Repository<ObjectID> {
91    fn drop(&mut self) {
92        flock(&self.repository, FlockOperation::Unlock).expect("repository unlock failed");
93    }
94}
95
96impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
97    /// Return the objects directory.
98    pub fn objects_dir(&self) -> ErrnoResult<&OwnedFd> {
99        self.objects
100            .get_or_try_init(|| ensure_dir_and_openat(&self.repository, "objects", OFlags::PATH))
101    }
102
103    /// Return a shared semaphore for limiting concurrent object writes.
104    ///
105    /// This semaphore is lazily initialized with `available_parallelism()` permits,
106    /// and shared across all operations on this repository. Use this to limit
107    /// concurrent I/O when processing multiple files or layers in parallel.
108    pub fn write_semaphore(&self) -> Arc<Semaphore> {
109        self.write_semaphore
110            .get_or_init(|| {
111                let max_concurrent = available_parallelism().map(|n| n.get()).unwrap_or(4);
112                Arc::new(Semaphore::new(max_concurrent))
113            })
114            .clone()
115    }
116
117    /// Open a repository at the target directory and path.
118    pub fn open_path(dirfd: impl AsFd, path: impl AsRef<Path>) -> Result<Self> {
119        let path = path.as_ref();
120
121        // O_PATH isn't enough because flock()
122        let repository = openat(dirfd, path, OFlags::RDONLY | OFlags::CLOEXEC, Mode::empty())
123            .with_context(|| format!("Cannot open composefs repository at {}", path.display()))?;
124
125        flock(&repository, FlockOperation::LockShared)
126            .context("Cannot lock composefs repository")?;
127
128        Ok(Self {
129            repository,
130            objects: OnceCell::new(),
131            write_semaphore: OnceCell::new(),
132            insecure: false,
133            _data: std::marker::PhantomData,
134        })
135    }
136
137    /// Open the default user-owned composefs repository.
138    pub fn open_user() -> Result<Self> {
139        let home = std::env::var("HOME").with_context(|| "$HOME must be set when in user mode")?;
140
141        Self::open_path(CWD, PathBuf::from(home).join(".var/lib/composefs"))
142    }
143
144    /// Open the default system-global composefs repository.
145    pub fn open_system() -> Result<Self> {
146        Self::open_path(CWD, PathBuf::from("/sysroot/composefs".to_string()))
147    }
148
149    fn ensure_dir(&self, dir: impl AsRef<Path>) -> ErrnoResult<()> {
150        mkdirat(&self.repository, dir.as_ref(), 0o755.into()).or_else(|e| match e {
151            Errno::EXIST => Ok(()),
152            _ => Err(e),
153        })
154    }
155
156    /// Asynchronously ensures an object exists in the repository.
157    ///
158    /// Same as `ensure_object` but runs the operation on a blocking thread pool
159    /// to avoid blocking async tasks. Returns the fsverity digest of the object.
160    ///
161    /// For performance reasons, this function does *not* call fsync() or similar.  After you're
162    /// done with everything, call `Repository::sync_async()`.
163    pub async fn ensure_object_async(self: &Arc<Self>, data: Vec<u8>) -> Result<ObjectID> {
164        let self_ = Arc::clone(self);
165        tokio::task::spawn_blocking(move || self_.ensure_object(&data)).await?
166    }
167
168    /// Create an O_TMPFILE in the objects directory for streaming writes.
169    ///
170    /// Returns the file descriptor for writing. The caller should write data to this fd,
171    /// then call `spawn_finalize_object_tmpfile()` to compute the verity digest,
172    /// enable fs-verity, and link the file into the objects directory.
173    pub fn create_object_tmpfile(&self) -> Result<OwnedFd> {
174        let objects_dir = self.objects_dir()?;
175        let fd = openat(
176            objects_dir,
177            ".",
178            OFlags::RDWR | OFlags::TMPFILE | OFlags::CLOEXEC,
179            Mode::from_raw_mode(0o644),
180        )?;
181        Ok(fd)
182    }
183
184    /// Spawn a background task that finalizes a tmpfile as an object.
185    ///
186    /// The task computes the fs-verity digest by reading the file, enables verity,
187    /// and links the file into the objects directory.
188    ///
189    /// Returns a handle that resolves to the ObjectID (fs-verity digest).
190    ///
191    /// # Arguments
192    /// * `tmpfile_fd` - The O_TMPFILE file descriptor with data already written
193    /// * `size` - The exact size in bytes of the data written to the tmpfile
194    pub fn spawn_finalize_object_tmpfile(
195        self: &Arc<Self>,
196        tmpfile_fd: OwnedFd,
197        size: u64,
198    ) -> tokio::task::JoinHandle<Result<ObjectID>> {
199        let self_ = Arc::clone(self);
200        tokio::task::spawn_blocking(move || self_.finalize_object_tmpfile(tmpfile_fd.into(), size))
201    }
202
203    /// Finalize a tmpfile as an object.
204    ///
205    /// This method should be called from a blocking context (e.g., `spawn_blocking`)
206    /// as it performs synchronous I/O operations.
207    ///
208    /// This method:
209    /// 1. Re-opens the file as read-only
210    /// 2. Enables fs-verity on the file (kernel computes digest)
211    /// 3. Reads the digest from the kernel
212    /// 4. Checks if object already exists (deduplication)
213    /// 5. Links the file into the objects directory
214    ///
215    /// By letting the kernel compute the digest during verity enable, we avoid
216    /// reading the file an extra time in userspace.
217    pub fn finalize_object_tmpfile(&self, file: File, size: u64) -> Result<ObjectID> {
218        // Re-open as read-only via /proc/self/fd (required for verity enable)
219        let fd_path = proc_self_fd(&file);
220        let ro_fd = open(&*fd_path, OFlags::RDONLY | OFlags::CLOEXEC, Mode::empty())?;
221
222        // Must close writable fd before enabling verity
223        drop(file);
224
225        // Get objects_dir early since we may need it for verity copy
226        let objects_dir = self.objects_dir()?;
227
228        // Enable verity - the kernel reads the file and computes the digest.
229        // Use enable_verity_maybe_copy to handle the case where forked processes
230        // have inherited writable fds to this file.
231        let (ro_fd, verity_enabled) =
232            match enable_verity_maybe_copy::<ObjectID>(objects_dir, ro_fd.as_fd()) {
233                Ok(None) => (ro_fd, true),
234                Ok(Some(new_fd)) => (new_fd, true),
235                Err(EnableVerityError::FilesystemNotSupported) if self.insecure => (ro_fd, false),
236                Err(EnableVerityError::AlreadyEnabled) => (ro_fd, true),
237                Err(other) => return Err(other).context("Enabling verity on tmpfile")?,
238            };
239
240        // Get the digest - either from kernel (fast) or compute in userspace (fallback)
241        let id: ObjectID = if verity_enabled {
242            measure_verity(&ro_fd).context("Measuring verity digest")?
243        } else {
244            // Insecure mode: compute digest in userspace from ro_fd
245            let mut reader = std::io::BufReader::new(File::from(ro_fd.try_clone()?));
246            Self::compute_verity_digest(&mut reader)?
247        };
248
249        // Check if object already exists
250        let path = id.to_object_pathname();
251
252        match statat(objects_dir, &path, AtFlags::empty()) {
253            Ok(stat) if stat.st_size as u64 == size => {
254                // Object already exists with correct size, skip storage
255                return Ok(id);
256            }
257            _ => {}
258        }
259
260        // Ensure parent directory exists
261        let parent_dir = id.to_object_dir();
262        let _ = mkdirat(objects_dir, &parent_dir, Mode::from_raw_mode(0o755));
263
264        // Link the file into the objects directory
265        match linkat(
266            CWD,
267            proc_self_fd(&ro_fd),
268            objects_dir,
269            &path,
270            AtFlags::SYMLINK_FOLLOW,
271        ) {
272            Ok(()) => Ok(id),
273            Err(Errno::EXIST) => Ok(id), // Race: another task created it
274            Err(e) => Err(e).context("Linking tmpfile into objects directory")?,
275        }
276    }
277
278    /// Compute fs-verity digest in userspace by reading from a buffered source.
279    /// Used as fallback when kernel verity is not available (insecure mode).
280    fn compute_verity_digest(reader: &mut impl std::io::BufRead) -> Result<ObjectID> {
281        let mut hasher = FsVerityHasher::<ObjectID>::new();
282
283        loop {
284            let buf = reader.fill_buf()?;
285            if buf.is_empty() {
286                break;
287            }
288            // add_block expects at most one block at a time
289            let chunk_size = buf.len().min(FsVerityHasher::<ObjectID>::BLOCK_SIZE);
290            hasher.add_block(&buf[..chunk_size]);
291            reader.consume(chunk_size);
292        }
293
294        Ok(hasher.digest())
295    }
296
297    /// Store an object with a pre-computed fs-verity ID.
298    ///
299    /// This is an internal helper that stores data assuming the caller has already
300    /// computed the correct fs-verity digest. The digest is verified after storage.
301    fn store_object_with_id(&self, data: &[u8], id: &ObjectID) -> Result<()> {
302        let dirfd = self.objects_dir()?;
303        let path = id.to_object_pathname();
304
305        // the usual case is that the file will already exist
306        match openat(
307            dirfd,
308            &path,
309            OFlags::RDONLY | OFlags::CLOEXEC,
310            Mode::empty(),
311        ) {
312            Ok(fd) => {
313                // measure the existing file to ensure that it's correct
314                // TODO: try to replace file if it's broken?
315                match ensure_verity_equal(&fd, id) {
316                    Ok(()) => {}
317                    Err(CompareVerityError::Measure(MeasureVerityError::VerityMissing))
318                        if self.insecure =>
319                    {
320                        match enable_verity_maybe_copy::<ObjectID>(dirfd, fd.as_fd()) {
321                            Ok(Some(fd)) => ensure_verity_equal(&fd, id)?,
322                            Ok(None) => ensure_verity_equal(&fd, id)?,
323                            Err(other) => Err(other)?,
324                        }
325                    }
326                    Err(CompareVerityError::Measure(
327                        MeasureVerityError::FilesystemNotSupported,
328                    )) if self.insecure => {}
329                    Err(other) => Err(other)?,
330                }
331                return Ok(());
332            }
333            Err(Errno::NOENT) => {
334                // in this case we'll create the file
335            }
336            Err(other) => {
337                return Err(other).context("Checking for existing object in repository")?;
338            }
339        }
340
341        let fd = ensure_dir_and_openat(dirfd, &id.to_object_dir(), OFlags::RDWR | OFlags::TMPFILE)?;
342        let mut file = File::from(fd);
343        file.write_all(data)?;
344        // We can't enable verity with an open writable fd, so re-open and close the old one.
345        let ro_fd = open(
346            proc_self_fd(&file),
347            OFlags::RDONLY | OFlags::CLOEXEC,
348            Mode::empty(),
349        )?;
350        // NB: We should do fdatasync() or fsync() here, but doing this for each file forces the
351        // creation of a massive number of journal commits and is a performance disaster.  We need
352        // to coordinate this at a higher level.  See .write_stream().
353        drop(file);
354
355        let ro_fd = match enable_verity_maybe_copy::<ObjectID>(dirfd, ro_fd.as_fd()) {
356            Ok(maybe_fd) => {
357                let ro_fd = maybe_fd.unwrap_or(ro_fd);
358                match ensure_verity_equal(&ro_fd, id) {
359                    Ok(()) => ro_fd,
360                    Err(CompareVerityError::Measure(
361                        MeasureVerityError::VerityMissing
362                        | MeasureVerityError::FilesystemNotSupported,
363                    )) if self.insecure => ro_fd,
364                    Err(other) => Err(other).context("Double-checking verity digest")?,
365                }
366            }
367            Err(EnableVerityError::FilesystemNotSupported) if self.insecure => ro_fd,
368            Err(other) => Err(other).context("Enabling verity digest")?,
369        };
370
371        match linkat(
372            CWD,
373            proc_self_fd(&ro_fd),
374            dirfd,
375            path,
376            AtFlags::SYMLINK_FOLLOW,
377        ) {
378            Ok(()) => {}
379            Err(Errno::EXIST) => {
380                // TODO: strictly, we should measure the newly-appeared file
381            }
382            Err(other) => {
383                return Err(other).context("Linking created object file");
384            }
385        }
386
387        Ok(())
388    }
389
390    /// Given a blob of data, store it in the repository.
391    ///
392    /// For performance reasons, this function does *not* call fsync() or similar.  After you're
393    /// done with everything, call `Repository::sync()`.
394    pub fn ensure_object(&self, data: &[u8]) -> Result<ObjectID> {
395        let id: ObjectID = compute_verity(data);
396        self.store_object_with_id(data, &id)?;
397        Ok(id)
398    }
399
400    fn open_with_verity(&self, filename: &str, expected_verity: &ObjectID) -> Result<OwnedFd> {
401        let fd = self.openat(filename, OFlags::RDONLY)?;
402        match ensure_verity_equal(&fd, expected_verity) {
403            Ok(()) => {}
404            Err(CompareVerityError::Measure(
405                MeasureVerityError::VerityMissing | MeasureVerityError::FilesystemNotSupported,
406            )) if self.insecure => {}
407            Err(other) => Err(other)?,
408        }
409        Ok(fd)
410    }
411
412    /// By default fsverity is required to be enabled on the target
413    /// filesystem. Setting this disables verification of digests
414    /// and an instance of [`Self`] can be used on a filesystem
415    /// without fsverity support.
416    pub fn set_insecure(&mut self, insecure: bool) -> &mut Self {
417        self.insecure = insecure;
418        self
419    }
420
421    /// Creates a SplitStreamWriter for writing a split stream.
422    /// You should write the data to the returned object and then pass it to .store_stream() to
423    /// store the result.
424    pub fn create_stream(self: &Arc<Self>, content_type: u64) -> SplitStreamWriter<ObjectID> {
425        SplitStreamWriter::new(self, content_type)
426    }
427
428    fn format_object_path(id: &ObjectID) -> String {
429        format!("objects/{}", id.to_object_pathname())
430    }
431
432    fn format_stream_path(content_identifier: &str) -> String {
433        format!("streams/{content_identifier}")
434    }
435
436    /// Check if the provided splitstream is present in the repository;
437    /// if so, return its fsverity digest.
438    pub fn has_stream(&self, content_identifier: &str) -> Result<Option<ObjectID>> {
439        let stream_path = Self::format_stream_path(content_identifier);
440
441        match readlinkat(&self.repository, &stream_path, []) {
442            Ok(target) => {
443                let bytes = target.as_bytes();
444                ensure!(
445                    bytes.starts_with(b"../"),
446                    "stream symlink has incorrect prefix"
447                );
448                Ok(Some(ObjectID::from_object_pathname(bytes)?))
449            }
450            Err(Errno::NOENT) => Ok(None),
451            Err(err) => Err(err)?,
452        }
453    }
454
455    /// Write the given splitstream to the repository with the provided content identifier and
456    /// optional reference name.
457    ///
458    /// This call contains an internal barrier that guarantees that, in event of a crash, either:
459    ///  - the named stream (by `content_identifier`) will not be available; or
460    ///  - the stream and all of its linked data will be available
461    ///
462    /// In other words: it will not be possible to boot a system which contained a stream named
463    /// `content_identifier` but is missing linked streams or objects from that stream.
464    pub fn write_stream(
465        &self,
466        writer: SplitStreamWriter<ObjectID>,
467        content_identifier: &str,
468        reference: Option<&str>,
469    ) -> Result<ObjectID> {
470        let object_id = writer.done()?;
471
472        // Right now we have:
473        //   - all of the linked external objects and streams; and
474        //   - the binary data of this splitstream itself
475        //
476        // in the filesystem but but not yet guaranteed to be synced to disk.  This is OK because
477        // nobody knows that the binary data of the splitstream is a splitstream yet: it could just
478        // as well be a random data file contained in an OS image or something.
479        //
480        // We need to make sure that all of that makes it to the disk before the splitstream is
481        // visible as a splitstream.
482        self.sync()?;
483
484        let stream_path = Self::format_stream_path(content_identifier);
485        let object_path = Self::format_object_path(&object_id);
486        self.symlink(&stream_path, &object_path)?;
487
488        if let Some(name) = reference {
489            let reference_path = format!("streams/refs/{name}");
490            self.symlink(&reference_path, &stream_path)?;
491        }
492
493        Ok(object_id)
494    }
495
496    /// Register an already-stored object as a named stream.
497    ///
498    /// This is useful when using `SplitStreamBuilder` which stores the splitstream
499    /// directly via `finish()`. After calling `finish()`, call this method to
500    /// sync all data to disk and create the stream symlink.
501    ///
502    /// This method ensures atomicity: the stream symlink is only created after
503    /// all objects have been synced to disk.
504    pub async fn register_stream(
505        self: &Arc<Self>,
506        object_id: &ObjectID,
507        content_identifier: &str,
508        reference: Option<&str>,
509    ) -> Result<()> {
510        self.sync_async().await?;
511
512        let stream_path = Self::format_stream_path(content_identifier);
513        let object_path = Self::format_object_path(object_id);
514        self.symlink(&stream_path, &object_path)?;
515
516        if let Some(name) = reference {
517            let reference_path = format!("streams/refs/{name}");
518            self.symlink(&reference_path, &stream_path)?;
519        }
520
521        Ok(())
522    }
523
524    /// Async version of `write_stream` for use with parallel object storage.
525    ///
526    /// This method awaits any pending parallel object storage tasks before
527    /// finalizing the stream. Use this when you've called `write_external_parallel()`
528    /// on the writer.
529    pub async fn write_stream_async(
530        self: &Arc<Self>,
531        writer: SplitStreamWriter<ObjectID>,
532        content_identifier: &str,
533        reference: Option<&str>,
534    ) -> Result<ObjectID> {
535        let object_id = writer.done_async().await?;
536
537        self.sync_async().await?;
538
539        let stream_path = Self::format_stream_path(content_identifier);
540        let object_path = Self::format_object_path(&object_id);
541        self.symlink(&stream_path, &object_path)?;
542
543        if let Some(name) = reference {
544            let reference_path = format!("streams/refs/{name}");
545            self.symlink(&reference_path, &stream_path)?;
546        }
547
548        Ok(object_id)
549    }
550
551    /// Check if a splitstream with a given name exists in the "refs" in the repository.
552    pub fn has_named_stream(&self, name: &str) -> Result<bool> {
553        let stream_path = format!("streams/refs/{name}");
554
555        Ok(statat(&self.repository, &stream_path, AtFlags::empty())
556            .filter_errno(Errno::NOENT)
557            .context("Looking for stream {name} in repository")?
558            .map(|s| FileType::from_raw_mode(s.st_mode).is_symlink())
559            .unwrap_or(false))
560    }
561
562    /// Assign the given name to a stream.  The stream must already exist.  After this operation it
563    /// will be possible to refer to the stream by its new name 'refs/{name}'.
564    pub fn name_stream(&self, content_identifier: &str, name: &str) -> Result<()> {
565        let stream_path = Self::format_stream_path(content_identifier);
566        let reference_path = format!("streams/refs/{name}");
567        self.symlink(&reference_path, &stream_path)?;
568        Ok(())
569    }
570
571    /// Ensures that the stream with a given content identifier digest exists in the repository.
572    ///
573    /// This tries to find the stream by the content identifier.  If the stream is already in the
574    /// repository, the object ID (fs-verity digest) is read from the symlink.  If the stream is
575    /// not already in the repository, a `SplitStreamWriter` is created and passed to `callback`.
576    /// On return, the object ID of the stream will be calculated and it will be written to disk
577    /// (if it wasn't already created by someone else in the meantime).
578    ///
579    /// In both cases, if `reference` is provided, it is used to provide a fixed name for the
580    /// object.  Any object that doesn't have a fixed reference to it is subject to garbage
581    /// collection.  It is an error if this reference already exists.
582    ///
583    /// On success, the object ID of the new object is returned.  It is expected that this object
584    /// ID will be used when referring to the stream from other linked streams.
585    pub fn ensure_stream(
586        self: &Arc<Self>,
587        content_identifier: &str,
588        content_type: u64,
589        callback: impl FnOnce(&mut SplitStreamWriter<ObjectID>) -> Result<()>,
590        reference: Option<&str>,
591    ) -> Result<ObjectID> {
592        let stream_path = Self::format_stream_path(content_identifier);
593
594        let object_id = match self.has_stream(content_identifier)? {
595            Some(id) => id,
596            None => {
597                let mut writer = self.create_stream(content_type);
598                callback(&mut writer)?;
599                self.write_stream(writer, content_identifier, reference)?
600            }
601        };
602
603        if let Some(name) = reference {
604            let reference_path = format!("streams/refs/{name}");
605            self.symlink(&reference_path, &stream_path)?;
606        }
607
608        Ok(object_id)
609    }
610
611    /// Open a splitstream with the given name.
612    pub fn open_stream(
613        &self,
614        content_identifier: &str,
615        verity: Option<&ObjectID>,
616        expected_content_type: Option<u64>,
617    ) -> Result<SplitStreamReader<ObjectID>> {
618        let file = File::from(if let Some(verity_hash) = verity {
619            self.open_object(verity_hash)
620                .with_context(|| format!("Opening object '{verity_hash:?}'"))?
621        } else {
622            let filename = Self::format_stream_path(content_identifier);
623            self.openat(&filename, OFlags::RDONLY)
624                .with_context(|| format!("Opening ref '{filename}'"))?
625        });
626
627        SplitStreamReader::new(file, expected_content_type)
628    }
629
630    /// Given an object identifier (a digest), return a read-only file descriptor
631    /// for its contents. The fsverity digest is verified (if the repository is not in `insecure` mode).
632    pub fn open_object(&self, id: &ObjectID) -> Result<OwnedFd> {
633        self.open_with_verity(&Self::format_object_path(id), id)
634    }
635
636    /// Read the contents of an object into a Vec
637    pub fn read_object(&self, id: &ObjectID) -> Result<Vec<u8>> {
638        let mut data = vec![];
639        File::from(self.open_object(id)?).read_to_end(&mut data)?;
640        Ok(data)
641    }
642
643    /// Merges a splitstream into a single continuous stream.
644    ///
645    /// Opens the named splitstream, resolves all object references, and writes
646    /// the complete merged content to the provided writer. Optionally verifies
647    /// the splitstream's fsverity digest matches the expected value.
648    pub fn merge_splitstream(
649        &self,
650        content_identifier: &str,
651        verity: Option<&ObjectID>,
652        expected_content_type: Option<u64>,
653        output: &mut impl Write,
654    ) -> Result<()> {
655        let mut split_stream =
656            self.open_stream(content_identifier, verity, expected_content_type)?;
657        split_stream.cat(self, output)
658    }
659
660    /// Write `data into the repository as an image with the given `name`.
661    ///
662    /// The fsverity digest is returned.
663    ///
664    /// # Integrity
665    ///
666    /// This function is not safe for untrusted users.
667    pub fn write_image(&self, name: Option<&str>, data: &[u8]) -> Result<ObjectID> {
668        let object_id = self.ensure_object(data)?;
669
670        let object_path = Self::format_object_path(&object_id);
671        let image_path = format!("images/{}", object_id.to_hex());
672
673        self.symlink(&image_path, &object_path)?;
674
675        if let Some(reference) = name {
676            let ref_path = format!("images/refs/{reference}");
677            self.symlink(&ref_path, &image_path)?;
678        }
679
680        Ok(object_id)
681    }
682
683    /// Import the data from the provided read into the repository as an image.
684    ///
685    /// The fsverity digest is returned.
686    ///
687    /// # Integrity
688    ///
689    /// This function is not safe for untrusted users.
690    pub fn import_image<R: Read>(&self, name: &str, image: &mut R) -> Result<ObjectID> {
691        let mut data = vec![];
692        image.read_to_end(&mut data)?;
693        self.write_image(Some(name), &data)
694    }
695
696    /// Returns the fd of the image and whether or not verity should be
697    /// enabled when mounting it.
698    fn open_image(&self, name: &str) -> Result<(OwnedFd, bool)> {
699        let image = self
700            .openat(&format!("images/{name}"), OFlags::RDONLY)
701            .with_context(|| format!("Opening ref 'images/{name}'"))?;
702
703        if name.contains("/") {
704            return Ok((image, true));
705        }
706
707        // A name with no slashes in it is taken to be a sha256 fs-verity digest
708        match measure_verity::<ObjectID>(&image) {
709            Ok(found) if found == FsVerityHashValue::from_hex(name)? => Ok((image, true)),
710            Ok(_) => bail!("fs-verity content mismatch"),
711            Err(MeasureVerityError::VerityMissing | MeasureVerityError::FilesystemNotSupported)
712                if self.insecure =>
713            {
714                Ok((image, false))
715            }
716            Err(other) => Err(other)?,
717        }
718    }
719
720    /// Create a detached mount of an image. This file descriptor can then
721    /// be attached via e.g. `move_mount`.
722    pub fn mount(&self, name: &str) -> Result<OwnedFd> {
723        let (image, enable_verity) = self.open_image(name)?;
724        Ok(composefs_fsmount(
725            image,
726            name,
727            self.objects_dir()?,
728            enable_verity,
729        )?)
730    }
731
732    /// Mount the image with the provided digest at the target path.
733    pub fn mount_at(&self, name: &str, mountpoint: impl AsRef<Path>) -> Result<()> {
734        Ok(mount_at(
735            self.mount(name)?,
736            CWD,
737            &canonicalize(mountpoint)?,
738        )?)
739    }
740
741    /// Creates a relative symlink within the repository.
742    ///
743    /// Computes the correct relative path from the symlink location to the target,
744    /// creating any necessary intermediate directories. Atomically replaces any
745    /// existing symlink at the specified name.
746    pub fn symlink(&self, name: impl AsRef<Path>, target: impl AsRef<Path>) -> ErrnoResult<()> {
747        let name = name.as_ref();
748
749        let mut symlink_components = name.parent().unwrap().components().peekable();
750        let mut target_components = target.as_ref().components().peekable();
751
752        let mut symlink_ancestor = PathBuf::new();
753
754        // remove common leading components
755        while symlink_components.peek() == target_components.peek() {
756            symlink_ancestor.push(symlink_components.next().unwrap());
757            target_components.next().unwrap();
758        }
759
760        let mut relative = PathBuf::new();
761        // prepend a "../" for each ancestor of the symlink
762        // and create those ancestors as we do so
763        for symlink_component in symlink_components {
764            symlink_ancestor.push(symlink_component);
765            self.ensure_dir(&symlink_ancestor)?;
766            relative.push("..");
767        }
768
769        // now build the relative path from the remaining components of the target
770        for target_component in target_components {
771            relative.push(target_component);
772        }
773
774        // Atomically replace existing symlink
775        replace_symlinkat(&relative, &self.repository, name)
776    }
777
778    fn read_symlink_hashvalue(dirfd: &OwnedFd, name: &CStr) -> Result<ObjectID> {
779        let link_content = readlinkat(dirfd, name, [])?;
780        Ok(ObjectID::from_object_pathname(link_content.to_bytes())?)
781    }
782
783    fn walk_symlinkdir(fd: OwnedFd, objects: &mut HashSet<ObjectID>) -> Result<()> {
784        for item in Dir::read_from(&fd)? {
785            let entry = item?;
786            // NB: the underlying filesystem must support returning filetype via direntry
787            // that's a reasonable assumption, since it must also support fsverity...
788            match entry.file_type() {
789                FileType::Directory => {
790                    let filename = entry.file_name();
791                    if filename != c"." && filename != c".." {
792                        let dirfd = openat(&fd, filename, OFlags::RDONLY, Mode::empty())?;
793                        Self::walk_symlinkdir(dirfd, objects)?;
794                    }
795                }
796                FileType::Symlink => {
797                    objects.insert(Self::read_symlink_hashvalue(&fd, entry.file_name())?);
798                }
799                _ => {
800                    bail!("Unexpected file type encountered");
801                }
802            }
803        }
804
805        Ok(())
806    }
807
808    /// Open the provided path in the repository.
809    fn openat(&self, name: &str, flags: OFlags) -> ErrnoResult<OwnedFd> {
810        // Unconditionally add CLOEXEC as we always want it.
811        openat(
812            &self.repository,
813            name,
814            flags | OFlags::CLOEXEC,
815            Mode::empty(),
816        )
817    }
818
819    fn gc_category(&self, category: &str) -> Result<HashSet<ObjectID>> {
820        let mut objects = HashSet::new();
821
822        let Some(category_fd) = self
823            .openat(category, OFlags::RDONLY | OFlags::DIRECTORY)
824            .filter_errno(Errno::NOENT)
825            .context("Opening {category} dir in repository")?
826        else {
827            return Ok(objects);
828        };
829
830        if let Some(refs) = openat(
831            &category_fd,
832            "refs",
833            OFlags::RDONLY | OFlags::DIRECTORY,
834            Mode::empty(),
835        )
836        .filter_errno(Errno::NOENT)
837        .context("Opening {category}/refs dir in repository")?
838        {
839            Self::walk_symlinkdir(refs, &mut objects)?;
840        }
841
842        for item in Dir::read_from(&category_fd)? {
843            let entry = item?;
844            let filename = entry.file_name();
845            if filename != c"refs" && filename != c"." && filename != c".." {
846                if entry.file_type() != FileType::Symlink {
847                    bail!("category directory contains non-symlink");
848                }
849
850                // TODO: we need to sort this out.  the symlink itself might be a sha256 content ID
851                // (as for splitstreams), not an object/ to be preserved.
852                continue;
853
854                /*
855                let mut value = Sha256HashValue::EMPTY;
856                hex::decode_to_slice(filename.to_bytes(), &mut value)?;
857
858                if !objects.contains(&value) {
859                    println!("rm {}/{:?}", category, filename);
860                }
861                */
862            }
863        }
864
865        Ok(objects)
866    }
867
868    /// Given an image, return the set of all objects referenced by it.
869    pub fn objects_for_image(&self, name: &str) -> Result<HashSet<ObjectID>> {
870        let (image, _) = self.open_image(name)?;
871        let mut data = vec![];
872        std::fs::File::from(image).read_to_end(&mut data)?;
873        Ok(crate::erofs::reader::collect_objects(&data)?)
874    }
875
876    /// Makes sure all content is written to the repository.
877    ///
878    /// This is currently just syncfs() on the repository's root directory because we don't have
879    /// any better options at present.  This blocks until the data is written out.
880    pub fn sync(&self) -> Result<()> {
881        syncfs(&self.repository)?;
882        Ok(())
883    }
884
885    /// Makes sure all content is written to the repository.
886    ///
887    /// This is currently just syncfs() on the repository's root directory because we don't have
888    /// any better options at present.  This won't return until the data is written out.
889    pub async fn sync_async(self: &Arc<Self>) -> Result<()> {
890        let self_ = Arc::clone(self);
891        tokio::task::spawn_blocking(move || self_.sync()).await?
892    }
893
894    /// Perform a garbage collection operation.
895    ///
896    /// # Locking
897    ///
898    /// An exclusive lock is held for the duration of this operation.
899    pub fn gc(&self) -> Result<()> {
900        flock(&self.repository, FlockOperation::LockExclusive)?;
901
902        let mut objects = HashSet::new();
903
904        for ref object in self.gc_category("images")? {
905            println!("{object:?} lives as an image");
906            objects.insert(object.clone());
907            objects.extend(self.objects_for_image(&object.to_hex())?);
908        }
909
910        /* TODO
911        for object in self.gc_category("streams")? {
912            println!("{object:?} lives as a stream");
913            objects.insert(object.clone());
914
915            let mut split_stream = self.open_stream(&object.to_hex(), None, None)?;
916            split_stream.get_object_refs(|id| {
917                println!("   with {id:?}");
918                objects.insert(id.clone());
919            })?;
920        }
921        */
922
923        for first_byte in 0x0..=0xff {
924            let dirfd = match self.openat(
925                &format!("objects/{first_byte:02x}"),
926                OFlags::RDONLY | OFlags::DIRECTORY,
927            ) {
928                Ok(fd) => fd,
929                Err(Errno::NOENT) => continue,
930                Err(e) => Err(e)?,
931            };
932            for item in Dir::new(dirfd)? {
933                let entry = item?;
934                let filename = entry.file_name();
935                if filename != c"." && filename != c".." {
936                    let id =
937                        ObjectID::from_object_dir_and_basename(first_byte, filename.to_bytes())?;
938                    if !objects.contains(&id) {
939                        println!("rm objects/{first_byte:02x}/{filename:?}");
940                    } else {
941                        println!("# objects/{first_byte:02x}/{filename:?} lives");
942                    }
943                }
944            }
945        }
946
947        Ok(flock(&self.repository, FlockOperation::LockShared)?) // XXX: finally { } ?
948    }
949
950    // fn fsck(&self) -> Result<()> {
951    //     unimplemented!()
952    // }
953}