ostree_ext/
tokio_util.rs

1//! Helpers for bridging GLib async/mainloop with Tokio.
2
3use anyhow::Result;
4use core::fmt::{Debug, Display};
5use futures_util::{Future, FutureExt};
6use ostree::gio;
7use ostree::prelude::{CancellableExt, CancellableExtManual};
8
9/// Call a faillible future, while monitoring `cancellable` and return an error if cancelled.
10pub async fn run_with_cancellable<F, R>(f: F, cancellable: &gio::Cancellable) -> Result<R>
11where
12    F: Future<Output = Result<R>>,
13{
14    // Bridge GCancellable to a tokio notification
15    let notify = std::sync::Arc::new(tokio::sync::Notify::new());
16    let notify2 = notify.clone();
17    cancellable.connect_cancelled(move |_| notify2.notify_one());
18    cancellable.set_error_if_cancelled()?;
19    // See https://blog.yoshuawuyts.com/futures-concurrency-3/ on why
20    // `select!` is a trap in general, but I believe this case is safe.
21    tokio::select! {
22       r = f => r,
23       _ = notify.notified() => {
24           Err(anyhow::anyhow!("Operation was cancelled"))
25       }
26    }
27}
28
29struct CancelOnDrop(gio::Cancellable);
30
31impl Drop for CancelOnDrop {
32    fn drop(&mut self) {
33        self.0.cancel();
34    }
35}
36
37/// Wrapper for [`tokio::task::spawn_blocking`] which provides a [`gio::Cancellable`] that will be triggered on drop.
38///
39/// This function should be used in a Rust/tokio native `async fn`, but that want to invoke
40/// GLib style blocking APIs that use `GCancellable`.  The cancellable will be triggered when this
41/// future is dropped, which helps bound thread usage.
42///
43/// This is in a sense the inverse of [`run_with_cancellable`].
44pub fn spawn_blocking_cancellable<F, R>(f: F) -> tokio::task::JoinHandle<R>
45where
46    F: FnOnce(&gio::Cancellable) -> R + Send + 'static,
47    R: Send + 'static,
48{
49    tokio::task::spawn_blocking(move || {
50        let dropper = CancelOnDrop(gio::Cancellable::new());
51        f(&dropper.0)
52    })
53}
54
55/// Flatten a nested Result<Result<T>>, defaulting to converting the error type to an `anyhow::Error`.
56/// See https://doc.rust-lang.org/std/result/enum.Result.html#method.flatten
57pub(crate) fn flatten_anyhow<T, E>(r: std::result::Result<Result<T>, E>) -> Result<T>
58where
59    E: Display + Debug + Send + Sync + 'static,
60{
61    match r {
62        Ok(x) => x,
63        Err(e) => Err(anyhow::anyhow!(e)),
64    }
65}
66
67/// A wrapper around [`spawn_blocking_cancellable`] that flattens nested results.
68pub fn spawn_blocking_cancellable_flatten<F, T>(f: F) -> impl Future<Output = Result<T>>
69where
70    F: FnOnce(&gio::Cancellable) -> Result<T> + Send + 'static,
71    T: Send + 'static,
72{
73    spawn_blocking_cancellable(f).map(flatten_anyhow)
74}
75
76/// A wrapper around [`tokio::task::spawn_blocking`] that flattens nested results.
77pub fn spawn_blocking_flatten<F, T>(f: F) -> impl Future<Output = Result<T>>
78where
79    F: FnOnce() -> Result<T> + Send + 'static,
80    T: Send + 'static,
81{
82    tokio::task::spawn_blocking(f).map(flatten_anyhow)
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88
89    #[tokio::test]
90    async fn test_cancellable() {
91        let cancellable = ostree::gio::Cancellable::new();
92
93        let cancellable_copy = cancellable.clone();
94        let s = async move {
95            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
96            cancellable_copy.cancel();
97        };
98        let r = async move {
99            tokio::time::sleep(std::time::Duration::from_secs(200)).await;
100            Ok(())
101        };
102        let r = run_with_cancellable(r, &cancellable);
103        let (_, r) = tokio::join!(s, r);
104        assert!(r.is_err());
105    }
106}