ostree_ext/
generic_decompress.rs

1//! This module primarily contains the `Decompressor` struct which is
2//! used to decompress a stream based on its OCI media type.
3//!
4//! It also contains the `ReadWithGetInnerMut` trait and related
5//! concrete implementations thereof.  These provide a means for each
6//! specific decompressor to give mutable access to the inner reader.
7//!
8//! For example, the GzipDecompressor would give the underlying
9//! compressed stream.
10//!
11//! We need a common way to access this stream so that we can flush
12//! the data during cleanup.
13//!
14//! See: <https://github.com/bootc-dev/bootc/issues/1407>
15
16use std::io::Read;
17
18use crate::oci_spec::image as oci_image;
19
20/// The legacy MIME type returned by the skopeo/(containers/storage) code
21/// when we have local uncompressed docker-formatted image.
22/// TODO: change the skopeo code to shield us from this correctly
23const DOCKER_TYPE_LAYER_TAR: &str = "application/vnd.docker.image.rootfs.diff.tar";
24
25/// Extends the `Read` trait with another method to get mutable access to the inner reader
26trait ReadWithGetInnerMut: Read + Send + 'static {
27    fn get_inner_mut(&mut self) -> &mut dyn Read;
28}
29
30// TransparentDecompressor
31
32struct TransparentDecompressor<R: Read + Send + 'static>(R);
33
34impl<R: Read + Send + 'static> Read for TransparentDecompressor<R> {
35    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
36        self.0.read(buf)
37    }
38}
39
40impl<R: Read + Send + 'static> ReadWithGetInnerMut for TransparentDecompressor<R> {
41    fn get_inner_mut(&mut self) -> &mut dyn Read {
42        &mut self.0
43    }
44}
45
46// GzipDecompressor
47
48struct GzipDecompressor<R: std::io::BufRead>(flate2::bufread::GzDecoder<R>);
49
50impl<R: std::io::BufRead + Send + 'static> Read for GzipDecompressor<R> {
51    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
52        self.0.read(buf)
53    }
54}
55
56impl<R: std::io::BufRead + Send + 'static> ReadWithGetInnerMut for GzipDecompressor<R> {
57    fn get_inner_mut(&mut self) -> &mut dyn Read {
58        self.0.get_mut()
59    }
60}
61
62// ZstdDecompressor
63
64struct ZstdDecompressor<'a, R: std::io::BufRead>(zstd::stream::read::Decoder<'a, R>);
65
66impl<'a: 'static, R: std::io::BufRead + Send + 'static> Read for ZstdDecompressor<'a, R> {
67    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
68        self.0.read(buf)
69    }
70}
71
72impl<'a: 'static, R: std::io::BufRead + Send + 'static> ReadWithGetInnerMut
73    for ZstdDecompressor<'a, R>
74{
75    fn get_inner_mut(&mut self) -> &mut dyn Read {
76        self.0.get_mut()
77    }
78}
79
80pub(crate) struct Decompressor {
81    inner: Box<dyn ReadWithGetInnerMut>,
82    finished: bool,
83}
84
85impl Read for Decompressor {
86    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
87        self.inner.read(buf)
88    }
89}
90
91impl Drop for Decompressor {
92    fn drop(&mut self) {
93        if self.finished {
94            return;
95        }
96
97        // Ideally we should not get here; users should call
98        // `finish()` to clean up the stream.  But in reality there's
99        // codepaths that can and will short-circuit error out while
100        // processing the stream, and the Decompressor will get
101        // dropped before it's finished in those cases.  We'll give
102        // best-effort to clean things up nonetheless.  If things go
103        // wrong, then panic, because we're in a bad state and it's
104        // likely that we end up with a broken pipe error or a
105        // deadlock.
106        self._finish()
107            .expect("Failed to flush pipe while dropping Decompressor")
108    }
109}
110
111impl Decompressor {
112    /// Create a decompressor for this MIME type, given a stream of input.
113    pub(crate) fn new(
114        media_type: &oci_image::MediaType,
115        src: impl Read + Send + 'static,
116    ) -> anyhow::Result<Self> {
117        let r: Box<dyn ReadWithGetInnerMut> = match media_type {
118            oci_image::MediaType::ImageLayerZstd => {
119                Box::new(ZstdDecompressor(zstd::stream::read::Decoder::new(src)?))
120            }
121            oci_image::MediaType::ImageLayerGzip => Box::new(GzipDecompressor(
122                flate2::bufread::GzDecoder::new(std::io::BufReader::new(src)),
123            )),
124            oci_image::MediaType::ImageLayer => Box::new(TransparentDecompressor(src)),
125            oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => {
126                Box::new(TransparentDecompressor(src))
127            }
128            o => anyhow::bail!("Unhandled layer type: {}", o),
129        };
130        Ok(Self {
131            inner: r,
132            finished: false,
133        })
134    }
135
136    pub(crate) fn finish(mut self) -> anyhow::Result<()> {
137        self._finish()
138    }
139
140    fn _finish(&mut self) -> anyhow::Result<()> {
141        self.finished = true;
142
143        // We need to make sure to flush out the decompressor and/or
144        // tar stream here.  For tar, we might not read through the
145        // entire stream, because the archive has zero-block-markers
146        // at the end; or possibly because the final entry is filtered
147        // in filter_tar so we don't advance to read the data.  For
148        // decompressor, zstd:chunked layers will have
149        // metadata/skippable frames at the end of the stream.  That
150        // data isn't relevant to the tar stream, but if we don't read
151        // it here then on the skopeo proxy we'll block trying to
152        // write the end of the stream.  That in turn will block our
153        // client end trying to call FinishPipe, and we end up
154        // deadlocking ourselves through skopeo.
155        //
156        // https://github.com/bootc-dev/bootc/issues/1204
157
158        let mut sink = std::io::sink();
159        let n = std::io::copy(self.inner.get_inner_mut(), &mut sink)?;
160
161        if n > 0 {
162            tracing::debug!("Read extra {n} bytes at end of decompressor stream");
163        }
164
165        Ok(())
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172
173    struct BrokenPipe;
174
175    impl Read for BrokenPipe {
176        fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
177            std::io::Result::Err(std::io::ErrorKind::BrokenPipe.into())
178        }
179    }
180
181    #[test]
182    #[should_panic(expected = "Failed to flush pipe while dropping Decompressor")]
183    fn test_drop_decompressor_with_finish_error_should_panic() {
184        let broken = BrokenPipe;
185        let d = Decompressor::new(&oci_image::MediaType::ImageLayer, broken).unwrap();
186        drop(d)
187    }
188
189    #[test]
190    fn test_drop_decompressor_with_successful_finish() {
191        let empty = std::io::empty();
192        let d = Decompressor::new(&oci_image::MediaType::ImageLayer, empty).unwrap();
193        drop(d)
194    }
195
196    #[test]
197    fn test_drop_decompressor_with_incomplete_gzip_data() {
198        let empty = std::io::empty();
199        let d = Decompressor::new(&oci_image::MediaType::ImageLayerGzip, empty).unwrap();
200        drop(d)
201    }
202
203    #[test]
204    fn test_drop_decompressor_with_incomplete_zstd_data() {
205        let empty = std::io::empty();
206        let d = Decompressor::new(&oci_image::MediaType::ImageLayerZstd, empty).unwrap();
207        drop(d)
208    }
209
210    #[test]
211    fn test_gzip_decompressor_with_garbage_input() {
212        let garbage = b"This is not valid gzip data";
213        let mut d = Decompressor::new(&oci_image::MediaType::ImageLayerGzip, &garbage[..]).unwrap();
214        let mut buf = [0u8; 32];
215        let e = d.read(&mut buf).unwrap_err();
216        assert!(matches!(e.kind(), std::io::ErrorKind::InvalidInput));
217        assert_eq!(e.to_string(), "invalid gzip header".to_string());
218        drop(d)
219    }
220
221    #[test]
222    fn test_zstd_decompressor_with_garbage_input() {
223        let garbage = b"This is not valid zstd data";
224        let mut d = Decompressor::new(&oci_image::MediaType::ImageLayerZstd, &garbage[..]).unwrap();
225        let mut buf = [0u8; 32];
226        let e = d.read(&mut buf).unwrap_err();
227        assert!(matches!(e.kind(), std::io::ErrorKind::Other));
228        assert_eq!(e.to_string(), "Unknown frame descriptor".to_string());
229        drop(d)
230    }
231}