ostree_ext/container/
unencapsulate.rs

1//! APIs for "unencapsulating" OSTree commits from container images
2//!
3//! This code only operates on container images that were created via
4//! [`encapsulate`].
5//!
6//! # External dependency on container-image-proxy
7//!
8//! This code requires <https://github.com/cgwalters/container-image-proxy>
9//! installed as a binary in $PATH.
10//!
11//! The rationale for this is that while there exist Rust crates to speak
12//! the Docker distribution API, the Go library <https://github.com/containers/image/>
13//! supports key things we want for production use like:
14//!
15//! - Image mirroring and remapping; effectively `man containers-registries.conf`
16//!   For example, we need to support an administrator mirroring an ostree-container
17//!   into a disconnected registry, without changing all the pull specs.
18//! - Signing
19//!
20//! Additionally, the proxy "upconverts" manifests into OCI, so we don't need to care
21//! about parsing the Docker manifest format (as used by most registries still).
22//!
23//! [`encapsulate`]: [`super::encapsulate()`]
24
25// # Implementation
26//
27// First, we support explicitly fetching just the manifest: https://github.com/opencontainers/image-spec/blob/main/manifest.md
28// This will give us information about the layers it contains, and crucially the digest (sha256) of
29// the manifest is how higher level software can detect changes.
30//
31// Once we have the manifest, we expect it to point to a single `application/vnd.oci.image.layer.v1.tar+gzip` layer,
32// which is exactly what is exported by the [`crate::tar::export`] process.
33
34use crate::container::store::LayerProgress;
35
36use super::*;
37use containers_image_proxy::{ImageProxy, OpenedImage};
38use fn_error_context::context;
39use futures_util::{Future, FutureExt};
40use oci_spec::image::{self as oci_image, Digest};
41use std::sync::{Arc, Mutex};
42use tokio::{
43    io::{AsyncBufRead, AsyncRead},
44    sync::watch::{Receiver, Sender},
45};
46use tracing::instrument;
47
48type Progress = tokio::sync::watch::Sender<u64>;
49
50/// A read wrapper that updates the download progress.
51#[pin_project::pin_project]
52#[derive(Debug)]
53pub(crate) struct ProgressReader<T> {
54    #[pin]
55    pub(crate) reader: T,
56    #[pin]
57    pub(crate) progress: Arc<Mutex<Progress>>,
58}
59
60impl<T: AsyncRead> ProgressReader<T> {
61    pub(crate) fn new(reader: T) -> (Self, Receiver<u64>) {
62        let (progress, r) = tokio::sync::watch::channel(1);
63        let progress = Arc::new(Mutex::new(progress));
64        (ProgressReader { reader, progress }, r)
65    }
66}
67
68impl<T: AsyncRead> AsyncRead for ProgressReader<T> {
69    fn poll_read(
70        self: std::pin::Pin<&mut Self>,
71        cx: &mut std::task::Context<'_>,
72        buf: &mut tokio::io::ReadBuf<'_>,
73    ) -> std::task::Poll<std::io::Result<()>> {
74        let this = self.project();
75        let len = buf.filled().len();
76        match this.reader.poll_read(cx, buf) {
77            v @ std::task::Poll::Ready(Ok(_)) => {
78                let progress = this.progress.lock().unwrap();
79                let state = {
80                    let mut state = *progress.borrow();
81                    let newlen = buf.filled().len();
82                    debug_assert!(newlen >= len);
83                    let read = (newlen - len) as u64;
84                    state += read;
85                    state
86                };
87                // Ignore errors, if the caller disconnected from progress that's OK.
88                let _ = progress.send(state);
89                v
90            }
91            o => o,
92        }
93    }
94}
95
96async fn fetch_manifest_impl(
97    proxy: &mut ImageProxy,
98    imgref: &OstreeImageReference,
99) -> Result<(oci_image::ImageManifest, oci_image::Digest)> {
100    let oi = &proxy.open_image(&imgref.imgref.to_string()).await?;
101    let (digest, manifest) = proxy.fetch_manifest(oi).await?;
102    proxy.close_image(oi).await?;
103    Ok((manifest, oci_image::Digest::from_str(digest.as_str())?))
104}
105
106/// Download the manifest for a target image and its sha256 digest.
107#[context("Fetching manifest")]
108pub async fn fetch_manifest(
109    imgref: &OstreeImageReference,
110) -> Result<(oci_image::ImageManifest, oci_image::Digest)> {
111    let mut proxy = ImageProxy::new().await?;
112    fetch_manifest_impl(&mut proxy, imgref).await
113}
114
115/// Download the manifest for a target image and its sha256 digest, as well as the image configuration.
116#[context("Fetching manifest and config")]
117pub async fn fetch_manifest_and_config(
118    imgref: &OstreeImageReference,
119) -> Result<(
120    oci_image::ImageManifest,
121    oci_image::Digest,
122    oci_image::ImageConfiguration,
123)> {
124    let proxy = ImageProxy::new().await?;
125    let oi = &proxy.open_image(&imgref.imgref.to_string()).await?;
126    let (digest, manifest) = proxy.fetch_manifest(oi).await?;
127    let digest = oci_image::Digest::from_str(&digest)?;
128    let config = proxy.fetch_config(oi).await?;
129    Ok((manifest, digest, config))
130}
131
132/// The result of an import operation
133#[derive(Debug)]
134pub struct Import {
135    /// The ostree commit that was imported
136    pub ostree_commit: String,
137    /// The image digest retrieved
138    pub image_digest: Digest,
139
140    /// Any deprecation warning
141    pub deprecated_warning: Option<String>,
142}
143
144/// Use this to process potential errors from a worker and a driver.
145/// This is really a brutal hack around the fact that an error can occur
146/// on either our side or in the proxy.  But if an error occurs on our
147/// side, then we will close the pipe, which will *also* cause the proxy
148/// to error out.
149///
150/// What we really want is for the proxy to tell us when it got an
151/// error from us closing the pipe.  Or, we could store that state
152/// on our side.  Both are slightly tricky, so we have this (again)
153/// hacky thing where we just search for `broken pipe` in the error text.
154///
155/// Or to restate all of the above - what this function does is check
156/// to see if the worker function had an error *and* if the proxy
157/// had an error, but if the proxy's error ends in `broken pipe`
158/// then it means the real only error is from the worker.
159pub(crate) async fn join_fetch<T: std::fmt::Debug>(
160    worker: impl Future<Output = Result<T>>,
161    driver: impl Future<Output = Result<()>>,
162) -> Result<T> {
163    let (worker, driver) = tokio::join!(worker, driver);
164    match (worker, driver) {
165        (Ok(t), Ok(())) => Ok(t),
166        (Err(worker), Err(driver)) => {
167            let text = driver.root_cause().to_string();
168            if text.ends_with("broken pipe") {
169                tracing::trace!("Ignoring broken pipe failure from driver");
170                Err(worker)
171            } else {
172                Err(worker.context(format!("proxy failure: {text} and client error")))
173            }
174        }
175        (Ok(_), Err(driver)) => Err(driver),
176        (Err(worker), Ok(())) => Err(worker),
177    }
178}
179
180/// Fetch a container image and import its embedded OSTree commit.
181#[context("Importing {}", imgref)]
182#[instrument(level = "debug", skip(repo))]
183pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -> Result<Import> {
184    let importer = super::store::ImageImporter::new(repo, imgref, Default::default()).await?;
185    importer.unencapsulate().await
186}
187
188/// A wrapper for [`get_blob`] which fetches a layer and decompresses it.
189pub(crate) async fn fetch_layer<'a>(
190    proxy: &'a ImageProxy,
191    img: &OpenedImage,
192    manifest: &oci_image::ImageManifest,
193    layer: &'a oci_image::Descriptor,
194    progress: Option<&'a Sender<Option<store::LayerProgress>>>,
195    layer_info: Option<&Vec<containers_image_proxy::ConvertedLayerInfo>>,
196    transport_src: Transport,
197) -> Result<(
198    Box<dyn AsyncBufRead + Send + Unpin>,
199    impl Future<Output = Result<()>> + 'a,
200    oci_image::MediaType,
201)> {
202    use futures_util::future::Either;
203    tracing::debug!("fetching {}", layer.digest());
204    let layer_index = manifest.layers().iter().position(|x| x == layer).unwrap();
205    let (blob, driver, size);
206    let mut media_type: oci_image::MediaType;
207    match transport_src {
208        // Both containers-storage and docker-daemon store layers uncompressed in their
209        // local storage, even though the manifest may indicate they are compressed.
210        // We need to use the actual media type from layer_info to avoid decompression errors.
211        Transport::ContainerStorage | Transport::DockerDaemon => {
212            let layer_info = layer_info.ok_or_else(|| {
213                anyhow!("skopeo too old to pull from containers-storage or docker-daemon")
214            })?;
215            let n_layers = layer_info.len();
216            let layer_blob = layer_info.get(layer_index).ok_or_else(|| {
217                anyhow!("blobid position {layer_index} exceeds diffid count {n_layers}")
218            })?;
219            size = layer_blob.size;
220            media_type = layer_blob.media_type.clone();
221
222            // docker-daemon stores layers uncompressed even when the media type
223            // indicates gzip compression. Translate to the uncompressed variant.
224            if transport_src == Transport::DockerDaemon {
225                if let oci_image::MediaType::Other(t) = &media_type {
226                    if t.as_str() == "application/vnd.docker.image.rootfs.diff.tar.gzip" {
227                        media_type = oci_image::MediaType::Other(
228                            "application/vnd.docker.image.rootfs.diff.tar".to_string(),
229                        );
230                    }
231                }
232            }
233
234            (blob, driver) = proxy.get_blob(img, &layer_blob.digest, size).await?;
235        }
236        _ => {
237            size = layer.size();
238            media_type = layer.media_type().clone();
239            (blob, driver) = proxy.get_blob(img, layer.digest(), size).await?;
240        }
241    };
242
243    let driver = async { driver.await.map_err(Into::into) };
244
245    if let Some(progress) = progress {
246        let (readprogress, mut readwatch) = ProgressReader::new(blob);
247        let readprogress = tokio::io::BufReader::new(readprogress);
248        let readproxy = async move {
249            while let Ok(()) = readwatch.changed().await {
250                let fetched = readwatch.borrow_and_update();
251                let status = LayerProgress {
252                    layer_index,
253                    fetched: *fetched,
254                    total: size,
255                };
256                progress.send_replace(Some(status));
257            }
258        };
259        let reader = Box::new(readprogress);
260        let driver = futures_util::future::join(readproxy, driver).map(|r| r.1);
261        Ok((reader, Either::Left(driver), media_type))
262    } else {
263        Ok((Box::new(blob), Either::Right(driver), media_type))
264    }
265}