bootc_lib/
progress_jsonl.rs

1//! Output progress data using the json-lines format. For more information
2//! see <https://jsonlines.org/>.
3
4use anyhow::Result;
5use canon_json::CanonJsonSerialize;
6use schemars::JsonSchema;
7use serde::Serialize;
8use std::borrow::Cow;
9use std::os::fd::{FromRawFd, OwnedFd, RawFd};
10use std::str::FromStr;
11use std::sync::Arc;
12use std::time::Instant;
13use tokio::io::{AsyncWriteExt, BufWriter};
14use tokio::net::unix::pipe::Sender;
15use tokio::sync::Mutex;
16
17// Maximum number of times per second that an event will be written.
18const REFRESH_HZ: u16 = 5;
19
20/// Semantic version of the protocol.
21const API_VERSION: &str = "0.1.0";
22
23/// An incremental update to e.g. a container image layer download.
24/// The first time a given "subtask" name is seen, a new progress bar should be created.
25/// If bytes == bytes_total, then the subtask is considered complete.
26#[derive(
27    Debug, serde::Serialize, serde::Deserialize, Default, Clone, JsonSchema, PartialEq, Eq,
28)]
29#[serde(rename_all = "camelCase")]
30pub struct SubTaskBytes<'t> {
31    /// A machine readable type for the task (used for i18n).
32    /// (e.g., "ostree_chunk", "ostree_derived")
33    #[serde(borrow)]
34    pub subtask: Cow<'t, str>,
35    /// A human readable description of the task if i18n is not available.
36    /// (e.g., "OSTree Chunk:", "Derived Layer:")
37    #[serde(borrow)]
38    pub description: Cow<'t, str>,
39    /// A human and machine readable identifier for the task
40    /// (e.g., ostree chunk/layer hash).
41    #[serde(borrow)]
42    pub id: Cow<'t, str>,
43    /// The number of bytes fetched by a previous run (e.g., zstd_chunked).
44    pub bytes_cached: u64,
45    /// Updated byte level progress
46    pub bytes: u64,
47    /// Total number of bytes
48    pub bytes_total: u64,
49}
50
51/// Marks the beginning and end of a dictrete step
52#[derive(
53    Debug, serde::Serialize, serde::Deserialize, Default, Clone, JsonSchema, PartialEq, Eq,
54)]
55#[serde(rename_all = "camelCase")]
56pub struct SubTaskStep<'t> {
57    /// A machine readable type for the task (used for i18n).
58    /// (e.g., "ostree_chunk", "ostree_derived")
59    #[serde(borrow)]
60    pub subtask: Cow<'t, str>,
61    /// A human readable description of the task if i18n is not available.
62    /// (e.g., "OSTree Chunk:", "Derived Layer:")
63    #[serde(borrow)]
64    pub description: Cow<'t, str>,
65    /// A human and machine readable identifier for the task
66    /// (e.g., ostree chunk/layer hash).
67    #[serde(borrow)]
68    pub id: Cow<'t, str>,
69    /// Starts as false when beginning to execute and turns true when completed.
70    pub completed: bool,
71}
72
73/// An event emitted as JSON.
74#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, JsonSchema, PartialEq, Eq)]
75#[serde(
76    tag = "type",
77    rename_all = "PascalCase",
78    rename_all_fields = "camelCase"
79)]
80pub enum Event<'t> {
81    Start {
82        /// The semantic version of the progress protocol.
83        #[serde(borrow)]
84        version: Cow<'t, str>,
85    },
86    /// An incremental update to a container image layer download
87    ProgressBytes {
88        /// A machine readable type (e.g., pulling) for the task (used for i18n
89        /// and UI customization).
90        #[serde(borrow)]
91        task: Cow<'t, str>,
92        /// A human readable description of the task if i18n is not available.
93        #[serde(borrow)]
94        description: Cow<'t, str>,
95        /// A human and machine readable unique identifier for the task
96        /// (e.g., the image name). For tasks that only happen once,
97        /// it can be set to the same value as task.
98        #[serde(borrow)]
99        id: Cow<'t, str>,
100        /// The number of bytes fetched by a previous run.
101        bytes_cached: u64,
102        /// The number of bytes already fetched.
103        bytes: u64,
104        /// Total number of bytes. If zero, then this should be considered "unspecified".
105        bytes_total: u64,
106        /// The number of steps fetched by a previous run.
107        steps_cached: u64,
108        /// The initial position of progress.
109        steps: u64,
110        /// The total number of steps (e.g. container image layers, RPMs)
111        steps_total: u64,
112        /// The currently running subtasks.
113        subtasks: Vec<SubTaskBytes<'t>>,
114    },
115    /// An incremental update with discrete steps
116    ProgressSteps {
117        /// A machine readable type (e.g., pulling) for the task (used for i18n
118        /// and UI customization).
119        #[serde(borrow)]
120        task: Cow<'t, str>,
121        /// A human readable description of the task if i18n is not available.
122        #[serde(borrow)]
123        description: Cow<'t, str>,
124        /// A human and machine readable unique identifier for the task
125        /// (e.g., the image name). For tasks that only happen once,
126        /// it can be set to the same value as task.
127        #[serde(borrow)]
128        id: Cow<'t, str>,
129        /// The number of steps fetched by a previous run.
130        steps_cached: u64,
131        /// The initial position of progress.
132        steps: u64,
133        /// The total number of steps (e.g. container image layers, RPMs)
134        steps_total: u64,
135        /// The currently running subtasks.
136        subtasks: Vec<SubTaskStep<'t>>,
137    },
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub(crate) struct RawProgressFd(RawFd);
142
143impl FromStr for RawProgressFd {
144    type Err = anyhow::Error;
145
146    fn from_str(s: &str) -> Result<Self> {
147        let fd = s.parse::<u32>()?;
148        // Sanity check
149        if matches!(fd, 0..=2) {
150            anyhow::bail!("Cannot use fd {fd} for progress JSON")
151        }
152        Ok(Self(fd.try_into()?))
153    }
154}
155
156#[derive(Debug)]
157struct ProgressWriterInner {
158    /// true if we sent the initial Start message
159    sent_start: bool,
160    last_write: Option<std::time::Instant>,
161    fd: BufWriter<Sender>,
162}
163
164#[derive(Clone, Debug, Default)]
165pub(crate) struct ProgressWriter {
166    inner: Arc<Mutex<Option<ProgressWriterInner>>>,
167}
168
169impl TryFrom<OwnedFd> for ProgressWriter {
170    type Error = anyhow::Error;
171
172    fn try_from(value: OwnedFd) -> Result<Self> {
173        let value = Sender::from_owned_fd(value)?;
174        Ok(Self::from(value))
175    }
176}
177
178impl From<Sender> for ProgressWriter {
179    fn from(value: Sender) -> Self {
180        let inner = ProgressWriterInner {
181            sent_start: false,
182            last_write: None,
183            fd: BufWriter::new(value),
184        };
185        Self {
186            inner: Arc::new(Some(inner).into()),
187        }
188    }
189}
190
191impl TryFrom<RawProgressFd> for ProgressWriter {
192    type Error = anyhow::Error;
193
194    #[allow(unsafe_code)]
195    fn try_from(fd: RawProgressFd) -> Result<Self> {
196        unsafe { OwnedFd::from_raw_fd(fd.0) }.try_into()
197    }
198}
199
200impl ProgressWriter {
201    /// Serialize the target value as a single line of JSON and write it.
202    async fn send_impl_inner<T: Serialize>(inner: &mut ProgressWriterInner, v: T) -> Result<()> {
203        // canon_json is guaranteed not to output newlines here
204        let buf = v.to_canon_json_vec()?;
205        inner.fd.write_all(&buf).await?;
206        // We always end in a newline
207        inner.fd.write_all(b"\n").await?;
208        // And flush to ensure the remote side sees updates immediately
209        inner.fd.flush().await?;
210        Ok(())
211    }
212
213    /// Serialize the target object to JSON as a single line
214    pub(crate) async fn send_impl<T: Serialize>(&self, v: T, required: bool) -> Result<()> {
215        let mut guard = self.inner.lock().await;
216        // Check if we have an inner value; if not, nothing to do.
217        let Some(inner) = guard.as_mut() else {
218            return Ok(());
219        };
220
221        // If this is our first message, emit the Start message
222        if !inner.sent_start {
223            inner.sent_start = true;
224            let start = Event::Start {
225                version: API_VERSION.into(),
226            };
227            Self::send_impl_inner(inner, &start).await?;
228        }
229
230        // For messages that can be dropped, if we already sent an update within this cycle, discard this one.
231        // TODO: Also consider querying the pipe buffer and also dropping if we can't do this write.
232        let now = Instant::now();
233        if !required {
234            const REFRESH_MS: u32 = 1000 / REFRESH_HZ as u32;
235            if let Some(elapsed) = inner.last_write.map(|w| now.duration_since(w)) {
236                if elapsed.as_millis() < REFRESH_MS.into() {
237                    return Ok(());
238                }
239            }
240        }
241
242        Self::send_impl_inner(inner, &v).await?;
243        // Update the last write time
244        inner.last_write = Some(now);
245        Ok(())
246    }
247
248    /// Send an event.
249    pub(crate) async fn send(&self, event: Event<'_>) {
250        if let Err(e) = self.send_impl(event, true).await {
251            eprintln!("Failed to write to jsonl: {e}");
252            // Stop writing to fd but let process continue
253            // SAFETY: Propagating panics from the mutex here is intentional
254            let _ = self.inner.lock().await.take();
255        }
256    }
257
258    /// Send an event that can be dropped.
259    pub(crate) async fn send_lossy(&self, event: Event<'_>) {
260        if let Err(e) = self.send_impl(event, false).await {
261            eprintln!("Failed to write to jsonl: {e}");
262            // Stop writing to fd but let process continue
263            // SAFETY: Propagating panics from the mutex here is intentional
264            let _ = self.inner.lock().await.take();
265        }
266    }
267
268    /// Flush remaining data and return the underlying file.
269    #[allow(dead_code)]
270    pub(crate) async fn into_inner(self) -> Result<Option<Sender>> {
271        // SAFETY: Propagating panics from the mutex here is intentional
272        let mut mutex = self.inner.lock().await;
273        if let Some(inner) = mutex.take() {
274            Ok(Some(inner.fd.into_inner()))
275        } else {
276            Ok(None)
277        }
278    }
279}
280
281#[cfg(test)]
282mod test {
283    use tokio::io::{AsyncBufReadExt, BufReader};
284
285    use super::*;
286
287    #[tokio::test]
288    async fn test_jsonl() -> Result<()> {
289        let testvalues = [
290            Event::ProgressSteps {
291                task: "sometask".into(),
292                description: "somedesc".into(),
293                id: "someid".into(),
294                steps_cached: 0,
295                steps: 0,
296                steps_total: 3,
297                subtasks: Vec::new(),
298            },
299            Event::ProgressBytes {
300                task: "sometask".into(),
301                description: "somedesc".into(),
302                id: "someid".into(),
303                bytes_cached: 0,
304                bytes: 11,
305                bytes_total: 42,
306                steps_cached: 0,
307                steps: 0,
308                steps_total: 3,
309                subtasks: Vec::new(),
310            },
311        ];
312        let (send, recv) = tokio::net::unix::pipe::pipe()?;
313        let testvalues_sender = testvalues.iter().cloned();
314        let sender = async move {
315            let w = ProgressWriter::try_from(send)?;
316            for value in testvalues_sender {
317                w.send(value).await;
318            }
319            anyhow::Ok(())
320        };
321        let testvalues = &testvalues;
322        let receiver = async move {
323            let tf = BufReader::new(recv);
324            let mut expected = testvalues.iter();
325            let mut lines = tf.lines();
326            let mut got_first = false;
327            while let Some(line) = lines.next_line().await? {
328                let found: Event = serde_json::from_str(&line)?;
329                let expected_value = if !got_first {
330                    got_first = true;
331                    &Event::Start {
332                        version: API_VERSION.into(),
333                    }
334                } else {
335                    expected.next().unwrap()
336                };
337                assert_eq!(&found, expected_value);
338            }
339            anyhow::Ok(())
340        };
341        tokio::try_join!(sender, receiver)?;
342        Ok(())
343    }
344}