1use std::{cmp::Reverse, process::Command, thread::available_parallelism};
12
13use std::{iter::zip, sync::Arc};
14
15use anyhow::{bail, Context, Result};
16use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
17use containers_image_proxy::{ImageProxy, ImageProxyConfig, OpenedImage};
18use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
19use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType};
20use rustix::process::geteuid;
21use tokio::{
22 io::{AsyncReadExt, BufReader},
23 sync::Semaphore,
24};
25
26use composefs::{fsverity::FsVerityHashValue, repository::Repository};
27
28use crate::{config_identifier, layer_identifier, tar::split_async, ContentAndVerity};
29
30pub(crate) const TAR_LAYER_CONTENT_TYPE: u64 = u64::from_le_bytes(*b"ocilayer");
32pub(crate) const OCI_CONFIG_CONTENT_TYPE: u64 = u64::from_le_bytes(*b"ociconfg");
33
34struct ImageOp<ObjectID: FsVerityHashValue> {
35 repo: Arc<Repository<ObjectID>>,
36 proxy: ImageProxy,
37 img: OpenedImage,
38 progress: MultiProgress,
39}
40
41impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
42 async fn new(
43 repo: &Arc<Repository<ObjectID>>,
44 imgref: &str,
45 img_proxy_config: Option<ImageProxyConfig>,
46 ) -> Result<Self> {
47 let skopeo_cmd = if imgref.starts_with("containers-storage:") && !geteuid().is_root() {
49 let mut cmd = Command::new("podman");
50 cmd.args(["unshare", "skopeo"]);
51 Some(cmd)
52 } else {
53 None
54 };
55
56 let imgref = if let Some(hash) = imgref.strip_prefix("containers-storage:sha256:") {
58 &format!("containers-storage:{hash}") } else {
60 imgref
61 };
62
63 let config = match img_proxy_config {
64 Some(mut conf) => {
65 if conf.skopeo_cmd.is_none() {
66 conf.skopeo_cmd = skopeo_cmd;
67 }
68
69 conf
70 }
71
72 None => {
73 ImageProxyConfig {
74 skopeo_cmd,
75 ..ImageProxyConfig::default()
77 }
78 }
79 };
80
81 let proxy = containers_image_proxy::ImageProxy::new_with_config(config).await?;
82 let img = proxy.open_image(imgref).await.context("Opening image")?;
83 let progress = MultiProgress::new();
84 Ok(ImageOp {
85 repo: Arc::clone(repo),
86 proxy,
87 img,
88 progress,
89 })
90 }
91
92 pub async fn ensure_layer(&self, diff_id: &str, descriptor: &Descriptor) -> Result<ObjectID> {
93 let content_id = layer_identifier(diff_id);
97
98 if let Some(layer_id) = self.repo.has_stream(&content_id)? {
99 self.progress
100 .println(format!("Already have layer {diff_id}"))?;
101 Ok(layer_id)
102 } else {
103 let (blob_reader, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
105
106 let blob_reader = blob_reader.take(descriptor.size());
108
109 let bar = self.progress.add(ProgressBar::new(descriptor.size()));
110 bar.set_style(ProgressStyle::with_template("[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}")
111 .unwrap()
112 .progress_chars("##-"));
113 let progress = bar.wrap_async_read(blob_reader);
114 self.progress.println(format!("Fetching layer {diff_id}"))?;
115
116 let object_id = match descriptor.media_type() {
117 MediaType::ImageLayer => {
118 split_async(
119 BufReader::new(progress),
120 self.repo.clone(),
121 TAR_LAYER_CONTENT_TYPE,
122 )
123 .await?
124 }
125 MediaType::ImageLayerGzip => {
126 split_async(
127 BufReader::new(GzipDecoder::new(BufReader::new(progress))),
128 self.repo.clone(),
129 TAR_LAYER_CONTENT_TYPE,
130 )
131 .await?
132 }
133 MediaType::ImageLayerZstd => {
134 split_async(
135 BufReader::new(ZstdDecoder::new(BufReader::new(progress))),
136 self.repo.clone(),
137 TAR_LAYER_CONTENT_TYPE,
138 )
139 .await?
140 }
141 other => bail!("Unsupported layer media type {other:?}"),
142 };
143
144 driver.await?;
147
148 self.repo
150 .register_stream(&object_id, &content_id, None)
151 .await?;
152
153 Ok(object_id)
154 }
155 }
156
157 pub async fn ensure_config(
158 self: &Arc<Self>,
159 manifest_layers: &[Descriptor],
160 descriptor: &Descriptor,
161 ) -> Result<ContentAndVerity<ObjectID>> {
162 let config_digest: &str = descriptor.digest().as_ref();
163 let content_id = config_identifier(config_digest);
164
165 if let Some(config_id) = self.repo.has_stream(&content_id)? {
166 self.progress
168 .println(format!("Already have container config {config_digest}"))?;
169 Ok((config_digest.to_string(), config_id))
170 } else {
171 self.progress
175 .println(format!("Fetching config {config_digest}"))?;
176
177 let (mut config, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
178 let config = async move {
179 let mut s = Vec::new();
180 config.read_to_end(&mut s).await?;
181 anyhow::Ok(s)
182 };
183 let (config, driver) = tokio::join!(config, driver);
184 let _: () = driver?;
185 let raw_config = config?;
186 let config = ImageConfiguration::from_reader(&raw_config[..])?;
187
188 let mut layers: Vec<_> = zip(manifest_layers, config.rootfs().diff_ids()).collect();
191 layers.sort_by_key(|(mld, ..)| Reverse(mld.size()));
192
193 let threads = available_parallelism()?;
195 let sem = Arc::new(Semaphore::new(threads.into()));
196 let mut entries = vec![];
197 for (mld, diff_id) in layers {
198 let diff_id_ = diff_id.clone();
199 let self_ = Arc::clone(self);
200 let permit = Arc::clone(&sem).acquire_owned().await?;
201 let descriptor = mld.clone();
202 let future = tokio::spawn(async move {
203 let _permit = permit;
204 self_.ensure_layer(&diff_id_, &descriptor).await
205 });
206 entries.push((diff_id, future));
207 }
208
209 let mut splitstream = self.repo.create_stream(OCI_CONFIG_CONTENT_TYPE);
210
211 for (diff_id, future) in entries {
213 splitstream.add_named_stream_ref(diff_id, &future.await??);
214 }
215
216 splitstream.write_inline(&raw_config);
218
219 let config_id = self.repo.write_stream(splitstream, &content_id, None)?;
220 Ok((config_digest.to_string(), config_id))
221 }
222 }
223
224 pub async fn pull(self: &Arc<Self>) -> Result<ContentAndVerity<ObjectID>> {
225 let (_manifest_digest, raw_manifest) = self
226 .proxy
227 .fetch_manifest_raw_oci(&self.img)
228 .await
229 .context("Fetching manifest")?;
230
231 let manifest = ImageManifest::from_reader(raw_manifest.as_slice())?;
234 let config_descriptor = manifest.config();
235 let layers = manifest.layers();
236 self.ensure_config(layers, config_descriptor)
237 .await
238 .with_context(|| format!("Failed to pull config {config_descriptor:?}"))
239 }
240}
241
242pub async fn pull<ObjectID: FsVerityHashValue>(
245 repo: &Arc<Repository<ObjectID>>,
246 imgref: &str,
247 reference: Option<&str>,
248 img_proxy_config: Option<ImageProxyConfig>,
249) -> Result<(String, ObjectID)> {
250 let op = Arc::new(ImageOp::new(repo, imgref, img_proxy_config).await?);
251 let (sha256, id) = op
252 .pull()
253 .await
254 .with_context(|| format!("Unable to pull container image {imgref}"))?;
255
256 if let Some(name) = reference {
257 repo.name_stream(&sha256, name)?;
258 }
259 Ok((sha256, id))
260}