1use anyhow::Result;
4use core::fmt::{Debug, Display};
5use futures_util::{Future, FutureExt};
6use ostree::gio;
7use ostree::prelude::{CancellableExt, CancellableExtManual};
8
9pub async fn run_with_cancellable<F, R>(f: F, cancellable: &gio::Cancellable) -> Result<R>
11where
12 F: Future<Output = Result<R>>,
13{
14 let notify = std::sync::Arc::new(tokio::sync::Notify::new());
16 let notify2 = notify.clone();
17 cancellable.connect_cancelled(move |_| notify2.notify_one());
18 cancellable.set_error_if_cancelled()?;
19 tokio::select! {
22 r = f => r,
23 _ = notify.notified() => {
24 Err(anyhow::anyhow!("Operation was cancelled"))
25 }
26 }
27}
28
29struct CancelOnDrop(gio::Cancellable);
30
31impl Drop for CancelOnDrop {
32 fn drop(&mut self) {
33 self.0.cancel();
34 }
35}
36
37pub fn spawn_blocking_cancellable<F, R>(f: F) -> tokio::task::JoinHandle<R>
45where
46 F: FnOnce(&gio::Cancellable) -> R + Send + 'static,
47 R: Send + 'static,
48{
49 tokio::task::spawn_blocking(move || {
50 let dropper = CancelOnDrop(gio::Cancellable::new());
51 f(&dropper.0)
52 })
53}
54
55pub(crate) fn flatten_anyhow<T, E>(r: std::result::Result<Result<T>, E>) -> Result<T>
58where
59 E: Display + Debug + Send + Sync + 'static,
60{
61 match r {
62 Ok(x) => x,
63 Err(e) => Err(anyhow::anyhow!(e)),
64 }
65}
66
67pub fn spawn_blocking_cancellable_flatten<F, T>(f: F) -> impl Future<Output = Result<T>>
69where
70 F: FnOnce(&gio::Cancellable) -> Result<T> + Send + 'static,
71 T: Send + 'static,
72{
73 spawn_blocking_cancellable(f).map(flatten_anyhow)
74}
75
76pub fn spawn_blocking_flatten<F, T>(f: F) -> impl Future<Output = Result<T>>
78where
79 F: FnOnce() -> Result<T> + Send + 'static,
80 T: Send + 'static,
81{
82 tokio::task::spawn_blocking(f).map(flatten_anyhow)
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88
89 #[tokio::test]
90 async fn test_cancellable() {
91 let cancellable = ostree::gio::Cancellable::new();
92
93 let cancellable_copy = cancellable.clone();
94 let s = async move {
95 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
96 cancellable_copy.cancel();
97 };
98 let r = async move {
99 tokio::time::sleep(std::time::Duration::from_secs(200)).await;
100 Ok(())
101 };
102 let r = run_with_cancellable(r, &cancellable);
103 let (_, r) = tokio::join!(s, r);
104 assert!(r.is_err());
105 }
106}