composefs_oci/
skopeo.rs

1//! Container image pulling and registry interaction via skopeo/containers-image-proxy.
2//!
3//! This module provides functionality to pull container images from various registries and import them
4//! into composefs repositories. It uses the containers-image-proxy library to interface with skopeo
5//! for image operations, handling authentication, transport protocols, and image manifest processing.
6//!
7//! The main entry point is the `pull()` function which downloads an image, processes its layers
8//! asynchronously with parallelism control, and stores them in the composefs repository with proper
9//! fs-verity integration. It supports various image formats and compression types.
10
11use std::{cmp::Reverse, process::Command, thread::available_parallelism};
12
13use std::{iter::zip, sync::Arc};
14
15use anyhow::{bail, Context, Result};
16use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
17use containers_image_proxy::{ImageProxy, ImageProxyConfig, OpenedImage};
18use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
19use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType};
20use rustix::process::geteuid;
21use tokio::{
22    io::{AsyncReadExt, BufReader},
23    sync::Semaphore,
24};
25
26use composefs::{fsverity::FsVerityHashValue, repository::Repository};
27
28use crate::{config_identifier, layer_identifier, tar::split_async, ContentAndVerity};
29
30// Content type identifiers stored as ASCII in the splitstream file
31pub(crate) const TAR_LAYER_CONTENT_TYPE: u64 = u64::from_le_bytes(*b"ocilayer");
32pub(crate) const OCI_CONFIG_CONTENT_TYPE: u64 = u64::from_le_bytes(*b"ociconfg");
33
34struct ImageOp<ObjectID: FsVerityHashValue> {
35    repo: Arc<Repository<ObjectID>>,
36    proxy: ImageProxy,
37    img: OpenedImage,
38    progress: MultiProgress,
39}
40
41impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
42    async fn new(
43        repo: &Arc<Repository<ObjectID>>,
44        imgref: &str,
45        img_proxy_config: Option<ImageProxyConfig>,
46    ) -> Result<Self> {
47        // See https://github.com/containers/skopeo/issues/2563
48        let skopeo_cmd = if imgref.starts_with("containers-storage:") && !geteuid().is_root() {
49            let mut cmd = Command::new("podman");
50            cmd.args(["unshare", "skopeo"]);
51            Some(cmd)
52        } else {
53            None
54        };
55
56        // See https://github.com/containers/skopeo/issues/2750
57        let imgref = if let Some(hash) = imgref.strip_prefix("containers-storage:sha256:") {
58            &format!("containers-storage:{hash}") // yay temporary lifetime extension!
59        } else {
60            imgref
61        };
62
63        let config = match img_proxy_config {
64            Some(mut conf) => {
65                if conf.skopeo_cmd.is_none() {
66                    conf.skopeo_cmd = skopeo_cmd;
67                }
68
69                conf
70            }
71
72            None => {
73                ImageProxyConfig {
74                    skopeo_cmd,
75                    // auth_anonymous: true, debug: true, insecure_skip_tls_verification: None,
76                    ..ImageProxyConfig::default()
77                }
78            }
79        };
80
81        let proxy = containers_image_proxy::ImageProxy::new_with_config(config).await?;
82        let img = proxy.open_image(imgref).await.context("Opening image")?;
83        let progress = MultiProgress::new();
84        Ok(ImageOp {
85            repo: Arc::clone(repo),
86            proxy,
87            img,
88            progress,
89        })
90    }
91
92    pub async fn ensure_layer(&self, diff_id: &str, descriptor: &Descriptor) -> Result<ObjectID> {
93        // We need to use the per_manifest descriptor to download the compressed layer but it gets
94        // stored in the repository via the per_config descriptor.  Our return value is the
95        // fsverity digest for the corresponding splitstream.
96        let content_id = layer_identifier(diff_id);
97
98        if let Some(layer_id) = self.repo.has_stream(&content_id)? {
99            self.progress
100                .println(format!("Already have layer {diff_id}"))?;
101            Ok(layer_id)
102        } else {
103            // Otherwise, we need to fetch it...
104            let (blob_reader, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
105
106            // See https://github.com/containers/containers-image-proxy-rs/issues/71
107            let blob_reader = blob_reader.take(descriptor.size());
108
109            let bar = self.progress.add(ProgressBar::new(descriptor.size()));
110            bar.set_style(ProgressStyle::with_template("[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}")
111                .unwrap()
112                .progress_chars("##-"));
113            let progress = bar.wrap_async_read(blob_reader);
114            self.progress.println(format!("Fetching layer {diff_id}"))?;
115
116            let object_id = match descriptor.media_type() {
117                MediaType::ImageLayer => {
118                    split_async(
119                        BufReader::new(progress),
120                        self.repo.clone(),
121                        TAR_LAYER_CONTENT_TYPE,
122                    )
123                    .await?
124                }
125                MediaType::ImageLayerGzip => {
126                    split_async(
127                        BufReader::new(GzipDecoder::new(BufReader::new(progress))),
128                        self.repo.clone(),
129                        TAR_LAYER_CONTENT_TYPE,
130                    )
131                    .await?
132                }
133                MediaType::ImageLayerZstd => {
134                    split_async(
135                        BufReader::new(ZstdDecoder::new(BufReader::new(progress))),
136                        self.repo.clone(),
137                        TAR_LAYER_CONTENT_TYPE,
138                    )
139                    .await?
140                }
141                other => bail!("Unsupported layer media type {other:?}"),
142            };
143
144            // skopeo is doing data checksums for us to make sure the content we received is equal
145            // to the claimed diff_id. We trust it, but we need to check it by awaiting the driver.
146            driver.await?;
147
148            // Sync and register the stream with its content identifier
149            self.repo
150                .register_stream(&object_id, &content_id, None)
151                .await?;
152
153            Ok(object_id)
154        }
155    }
156
157    pub async fn ensure_config(
158        self: &Arc<Self>,
159        manifest_layers: &[Descriptor],
160        descriptor: &Descriptor,
161    ) -> Result<ContentAndVerity<ObjectID>> {
162        let config_digest: &str = descriptor.digest().as_ref();
163        let content_id = config_identifier(config_digest);
164
165        if let Some(config_id) = self.repo.has_stream(&content_id)? {
166            // We already got this config?  Nice.
167            self.progress
168                .println(format!("Already have container config {config_digest}"))?;
169            Ok((config_digest.to_string(), config_id))
170        } else {
171            // We need to add the config to the repo.  We need to parse the config and make sure we
172            // have all of the layers first.
173            //
174            self.progress
175                .println(format!("Fetching config {config_digest}"))?;
176
177            let (mut config, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
178            let config = async move {
179                let mut s = Vec::new();
180                config.read_to_end(&mut s).await?;
181                anyhow::Ok(s)
182            };
183            let (config, driver) = tokio::join!(config, driver);
184            let _: () = driver?;
185            let raw_config = config?;
186            let config = ImageConfiguration::from_reader(&raw_config[..])?;
187
188            // We want to sort the layers based on size so we can get started on the big layers
189            // first.  The last thing we want is to start on the biggest layer right at the end.
190            let mut layers: Vec<_> = zip(manifest_layers, config.rootfs().diff_ids()).collect();
191            layers.sort_by_key(|(mld, ..)| Reverse(mld.size()));
192
193            // Bound the number of tasks to the available parallelism.
194            let threads = available_parallelism()?;
195            let sem = Arc::new(Semaphore::new(threads.into()));
196            let mut entries = vec![];
197            for (mld, diff_id) in layers {
198                let diff_id_ = diff_id.clone();
199                let self_ = Arc::clone(self);
200                let permit = Arc::clone(&sem).acquire_owned().await?;
201                let descriptor = mld.clone();
202                let future = tokio::spawn(async move {
203                    let _permit = permit;
204                    self_.ensure_layer(&diff_id_, &descriptor).await
205                });
206                entries.push((diff_id, future));
207            }
208
209            let mut splitstream = self.repo.create_stream(OCI_CONFIG_CONTENT_TYPE);
210
211            // Collect the results.
212            for (diff_id, future) in entries {
213                splitstream.add_named_stream_ref(diff_id, &future.await??);
214            }
215
216            // NB: We trust that skopeo has verified that raw_config has the correct digest
217            splitstream.write_inline(&raw_config);
218
219            let config_id = self.repo.write_stream(splitstream, &content_id, None)?;
220            Ok((config_digest.to_string(), config_id))
221        }
222    }
223
224    pub async fn pull(self: &Arc<Self>) -> Result<ContentAndVerity<ObjectID>> {
225        let (_manifest_digest, raw_manifest) = self
226            .proxy
227            .fetch_manifest_raw_oci(&self.img)
228            .await
229            .context("Fetching manifest")?;
230
231        // We need to add the manifest to the repo.  We need to parse the manifest and make
232        // sure we have the config first (which will also pull in the layers).
233        let manifest = ImageManifest::from_reader(raw_manifest.as_slice())?;
234        let config_descriptor = manifest.config();
235        let layers = manifest.layers();
236        self.ensure_config(layers, config_descriptor)
237            .await
238            .with_context(|| format!("Failed to pull config {config_descriptor:?}"))
239    }
240}
241
242/// Pull the target image, and add the provided tag. If this is a mountable
243/// image (i.e. not an artifact), it is *not* unpacked by default.
244pub async fn pull<ObjectID: FsVerityHashValue>(
245    repo: &Arc<Repository<ObjectID>>,
246    imgref: &str,
247    reference: Option<&str>,
248    img_proxy_config: Option<ImageProxyConfig>,
249) -> Result<(String, ObjectID)> {
250    let op = Arc::new(ImageOp::new(repo, imgref, img_proxy_config).await?);
251    let (sha256, id) = op
252        .pull()
253        .await
254        .with_context(|| format!("Unable to pull container image {imgref}"))?;
255
256    if let Some(name) = reference {
257        repo.name_stream(&sha256, name)?;
258    }
259    Ok((sha256, id))
260}