ostree_ext/container/
unencapsulate.rs1use crate::container::store::LayerProgress;
35
36use super::*;
37use containers_image_proxy::{ImageProxy, OpenedImage};
38use fn_error_context::context;
39use futures_util::{Future, FutureExt};
40use oci_spec::image::{self as oci_image, Digest};
41use std::sync::{Arc, Mutex};
42use tokio::{
43 io::{AsyncBufRead, AsyncRead},
44 sync::watch::{Receiver, Sender},
45};
46use tracing::instrument;
47
48type Progress = tokio::sync::watch::Sender<u64>;
49
50#[pin_project::pin_project]
52#[derive(Debug)]
53pub(crate) struct ProgressReader<T> {
54 #[pin]
55 pub(crate) reader: T,
56 #[pin]
57 pub(crate) progress: Arc<Mutex<Progress>>,
58}
59
60impl<T: AsyncRead> ProgressReader<T> {
61 pub(crate) fn new(reader: T) -> (Self, Receiver<u64>) {
62 let (progress, r) = tokio::sync::watch::channel(1);
63 let progress = Arc::new(Mutex::new(progress));
64 (ProgressReader { reader, progress }, r)
65 }
66}
67
68impl<T: AsyncRead> AsyncRead for ProgressReader<T> {
69 fn poll_read(
70 self: std::pin::Pin<&mut Self>,
71 cx: &mut std::task::Context<'_>,
72 buf: &mut tokio::io::ReadBuf<'_>,
73 ) -> std::task::Poll<std::io::Result<()>> {
74 let this = self.project();
75 let len = buf.filled().len();
76 match this.reader.poll_read(cx, buf) {
77 v @ std::task::Poll::Ready(Ok(_)) => {
78 let progress = this.progress.lock().unwrap();
79 let state = {
80 let mut state = *progress.borrow();
81 let newlen = buf.filled().len();
82 debug_assert!(newlen >= len);
83 let read = (newlen - len) as u64;
84 state += read;
85 state
86 };
87 let _ = progress.send(state);
89 v
90 }
91 o => o,
92 }
93 }
94}
95
96async fn fetch_manifest_impl(
97 proxy: &mut ImageProxy,
98 imgref: &OstreeImageReference,
99) -> Result<(oci_image::ImageManifest, oci_image::Digest)> {
100 let oi = &proxy.open_image(&imgref.imgref.to_string()).await?;
101 let (digest, manifest) = proxy.fetch_manifest(oi).await?;
102 proxy.close_image(oi).await?;
103 Ok((manifest, oci_image::Digest::from_str(digest.as_str())?))
104}
105
106#[context("Fetching manifest")]
108pub async fn fetch_manifest(
109 imgref: &OstreeImageReference,
110) -> Result<(oci_image::ImageManifest, oci_image::Digest)> {
111 let mut proxy = ImageProxy::new().await?;
112 fetch_manifest_impl(&mut proxy, imgref).await
113}
114
115#[context("Fetching manifest and config")]
117pub async fn fetch_manifest_and_config(
118 imgref: &OstreeImageReference,
119) -> Result<(
120 oci_image::ImageManifest,
121 oci_image::Digest,
122 oci_image::ImageConfiguration,
123)> {
124 let proxy = ImageProxy::new().await?;
125 let oi = &proxy.open_image(&imgref.imgref.to_string()).await?;
126 let (digest, manifest) = proxy.fetch_manifest(oi).await?;
127 let digest = oci_image::Digest::from_str(&digest)?;
128 let config = proxy.fetch_config(oi).await?;
129 Ok((manifest, digest, config))
130}
131
132#[derive(Debug)]
134pub struct Import {
135 pub ostree_commit: String,
137 pub image_digest: Digest,
139
140 pub deprecated_warning: Option<String>,
142}
143
144pub(crate) async fn join_fetch<T: std::fmt::Debug>(
160 worker: impl Future<Output = Result<T>>,
161 driver: impl Future<Output = Result<()>>,
162) -> Result<T> {
163 let (worker, driver) = tokio::join!(worker, driver);
164 match (worker, driver) {
165 (Ok(t), Ok(())) => Ok(t),
166 (Err(worker), Err(driver)) => {
167 let text = driver.root_cause().to_string();
168 if text.ends_with("broken pipe") {
169 tracing::trace!("Ignoring broken pipe failure from driver");
170 Err(worker)
171 } else {
172 Err(worker.context(format!("proxy failure: {text} and client error")))
173 }
174 }
175 (Ok(_), Err(driver)) => Err(driver),
176 (Err(worker), Ok(())) => Err(worker),
177 }
178}
179
180#[context("Importing {}", imgref)]
182#[instrument(level = "debug", skip(repo))]
183pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -> Result<Import> {
184 let importer = super::store::ImageImporter::new(repo, imgref, Default::default()).await?;
185 importer.unencapsulate().await
186}
187
188pub(crate) async fn fetch_layer<'a>(
190 proxy: &'a ImageProxy,
191 img: &OpenedImage,
192 manifest: &oci_image::ImageManifest,
193 layer: &'a oci_image::Descriptor,
194 progress: Option<&'a Sender<Option<store::LayerProgress>>>,
195 layer_info: Option<&Vec<containers_image_proxy::ConvertedLayerInfo>>,
196 transport_src: Transport,
197) -> Result<(
198 Box<dyn AsyncBufRead + Send + Unpin>,
199 impl Future<Output = Result<()>> + 'a,
200 oci_image::MediaType,
201)> {
202 use futures_util::future::Either;
203 tracing::debug!("fetching {}", layer.digest());
204 let layer_index = manifest.layers().iter().position(|x| x == layer).unwrap();
205 let (blob, driver, size);
206 let mut media_type: oci_image::MediaType;
207 match transport_src {
208 Transport::ContainerStorage | Transport::DockerDaemon => {
212 let layer_info = layer_info.ok_or_else(|| {
213 anyhow!("skopeo too old to pull from containers-storage or docker-daemon")
214 })?;
215 let n_layers = layer_info.len();
216 let layer_blob = layer_info.get(layer_index).ok_or_else(|| {
217 anyhow!("blobid position {layer_index} exceeds diffid count {n_layers}")
218 })?;
219 size = layer_blob.size;
220 media_type = layer_blob.media_type.clone();
221
222 if transport_src == Transport::DockerDaemon {
225 if let oci_image::MediaType::Other(t) = &media_type {
226 if t.as_str() == "application/vnd.docker.image.rootfs.diff.tar.gzip" {
227 media_type = oci_image::MediaType::Other(
228 "application/vnd.docker.image.rootfs.diff.tar".to_string(),
229 );
230 }
231 }
232 }
233
234 (blob, driver) = proxy.get_blob(img, &layer_blob.digest, size).await?;
235 }
236 _ => {
237 size = layer.size();
238 media_type = layer.media_type().clone();
239 (blob, driver) = proxy.get_blob(img, layer.digest(), size).await?;
240 }
241 };
242
243 let driver = async { driver.await.map_err(Into::into) };
244
245 if let Some(progress) = progress {
246 let (readprogress, mut readwatch) = ProgressReader::new(blob);
247 let readprogress = tokio::io::BufReader::new(readprogress);
248 let readproxy = async move {
249 while let Ok(()) = readwatch.changed().await {
250 let fetched = readwatch.borrow_and_update();
251 let status = LayerProgress {
252 layer_index,
253 fetched: *fetched,
254 total: size,
255 };
256 progress.send_replace(Some(status));
257 }
258 };
259 let reader = Box::new(readprogress);
260 let driver = futures_util::future::join(readproxy, driver).map(|r| r.1);
261 Ok((reader, Either::Left(driver), media_type))
262 } else {
263 Ok((Box::new(blob), Either::Right(driver), media_type))
264 }
265}