object_store_opendal/
utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use futures::Stream;
19use object_store::ObjectMeta;
20use opendal::Metadata;
21use std::future::IntoFuture;
22
23/// Conditionally add the `Send` marker trait for the wrapped type.
24/// Only take effect when the `send_wrapper` feature is enabled.
25#[cfg(not(feature = "send_wrapper"))]
26use noop_wrapper::NoopWrapper as SendWrapper;
27#[cfg(feature = "send_wrapper")]
28use send_wrapper::SendWrapper;
29
30/// Format `opendal::Error` to `object_store::Error`.
31pub fn format_object_store_error(err: opendal::Error, path: &str) -> object_store::Error {
32    use opendal::ErrorKind;
33    match err.kind() {
34        ErrorKind::NotFound => object_store::Error::NotFound {
35            path: path.to_string(),
36            source: Box::new(err),
37        },
38        ErrorKind::Unsupported => object_store::Error::NotSupported {
39            source: Box::new(err),
40        },
41        ErrorKind::AlreadyExists => object_store::Error::AlreadyExists {
42            path: path.to_string(),
43            source: Box::new(err),
44        },
45        ErrorKind::ConditionNotMatch => object_store::Error::Precondition {
46            path: path.to_string(),
47            source: Box::new(err),
48        },
49        kind => object_store::Error::Generic {
50            store: kind.into_static(),
51            source: Box::new(err),
52        },
53    }
54}
55
56/// Format `opendal::Metadata` to `object_store::ObjectMeta`.
57pub fn format_object_meta(path: &str, meta: &Metadata) -> ObjectMeta {
58    ObjectMeta {
59        location: path.into(),
60        last_modified: meta.last_modified().unwrap_or_default(),
61        size: meta.content_length() as usize,
62        e_tag: meta.etag().map(|x| x.to_string()),
63        version: meta.version().map(|x| x.to_string()),
64    }
65}
66
67/// Make given future `Send`.
68pub trait IntoSendFuture {
69    type Output;
70
71    fn into_send(self) -> Self::Output;
72}
73
74impl<T> IntoSendFuture for T
75where
76    T: IntoFuture,
77{
78    type Output = SendWrapper<T::IntoFuture>;
79
80    fn into_send(self) -> Self::Output {
81        SendWrapper::new(self.into_future())
82    }
83}
84
85/// Make given Stream `Send`.
86pub trait IntoSendStream {
87    type Output;
88
89    fn into_send(self) -> Self::Output;
90}
91
92impl<T> IntoSendStream for T
93where
94    T: Stream,
95{
96    type Output = SendWrapper<T>;
97    fn into_send(self) -> Self::Output {
98        SendWrapper::new(self)
99    }
100}
101
102#[cfg(not(feature = "send_wrapper"))]
103mod noop_wrapper {
104    use std::pin::Pin;
105    use std::task::Context;
106    use std::task::Poll;
107
108    use futures::Future;
109    use futures::Stream;
110    use pin_project::pin_project;
111
112    #[pin_project]
113    pub struct NoopWrapper<T> {
114        #[pin]
115        item: T,
116    }
117
118    impl<T> Future for NoopWrapper<T>
119    where
120        T: Future,
121    {
122        type Output = T::Output;
123
124        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
125            let this = self.project();
126            this.item.poll(cx)
127        }
128    }
129
130    impl<T> Stream for NoopWrapper<T>
131    where
132        T: Stream,
133    {
134        type Item = T::Item;
135
136        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
137            let this = self.project();
138            this.item.poll_next(cx)
139        }
140    }
141
142    impl<T> NoopWrapper<T> {
143        pub fn new(item: T) -> Self {
144            Self { item }
145        }
146    }
147}