object_store_opendal/
utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use futures::Stream;
19use object_store::ObjectMeta;
20use opendal::Metadata;
21use std::future::IntoFuture;
22
23use crate::timestamp_to_datetime;
24/// Conditionally add the `Send` marker trait for the wrapped type.
25/// Only take effect when the `send_wrapper` feature is enabled.
26#[cfg(not(feature = "send_wrapper"))]
27use noop_wrapper::NoopWrapper as SendWrapper;
28#[cfg(feature = "send_wrapper")]
29use send_wrapper::SendWrapper;
30
31/// Format `opendal::Error` to `object_store::Error`.
32pub fn format_object_store_error(err: opendal::Error, path: &str) -> object_store::Error {
33    use opendal::ErrorKind;
34    match err.kind() {
35        ErrorKind::NotFound => object_store::Error::NotFound {
36            path: path.to_string(),
37            source: Box::new(err),
38        },
39        ErrorKind::Unsupported => object_store::Error::NotSupported {
40            source: Box::new(err),
41        },
42        ErrorKind::AlreadyExists => object_store::Error::AlreadyExists {
43            path: path.to_string(),
44            source: Box::new(err),
45        },
46        ErrorKind::ConditionNotMatch => object_store::Error::Precondition {
47            path: path.to_string(),
48            source: Box::new(err),
49        },
50        kind => object_store::Error::Generic {
51            store: kind.into_static(),
52            source: Box::new(err),
53        },
54    }
55}
56
57/// Format `opendal::Metadata` to `object_store::ObjectMeta`.
58pub fn format_object_meta(path: &str, meta: &Metadata) -> ObjectMeta {
59    ObjectMeta {
60        location: path.into(),
61        last_modified: meta
62            .last_modified()
63            .and_then(timestamp_to_datetime)
64            .unwrap_or_default(),
65        size: meta.content_length(),
66        e_tag: meta.etag().map(|x| x.to_string()),
67        version: meta.version().map(|x| x.to_string()),
68    }
69}
70
71/// Make given future `Send`.
72pub trait IntoSendFuture {
73    type Output;
74
75    fn into_send(self) -> Self::Output;
76}
77
78impl<T> IntoSendFuture for T
79where
80    T: IntoFuture,
81{
82    type Output = SendWrapper<T::IntoFuture>;
83
84    fn into_send(self) -> Self::Output {
85        SendWrapper::new(self.into_future())
86    }
87}
88
89/// Make given Stream `Send`.
90pub trait IntoSendStream {
91    type Output;
92
93    fn into_send(self) -> Self::Output;
94}
95
96impl<T> IntoSendStream for T
97where
98    T: Stream,
99{
100    type Output = SendWrapper<T>;
101    fn into_send(self) -> Self::Output {
102        SendWrapper::new(self)
103    }
104}
105
106#[cfg(not(feature = "send_wrapper"))]
107mod noop_wrapper {
108    use std::pin::Pin;
109    use std::task::Context;
110    use std::task::Poll;
111
112    use futures::Future;
113    use futures::Stream;
114    use pin_project::pin_project;
115
116    #[pin_project]
117    pub struct NoopWrapper<T> {
118        #[pin]
119        item: T,
120    }
121
122    impl<T> Future for NoopWrapper<T>
123    where
124        T: Future,
125    {
126        type Output = T::Output;
127
128        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
129            let this = self.project();
130            this.item.poll(cx)
131        }
132    }
133
134    impl<T> Stream for NoopWrapper<T>
135    where
136        T: Stream,
137    {
138        type Item = T::Item;
139
140        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
141            let this = self.project();
142            this.item.poll_next(cx)
143        }
144    }
145
146    impl<T> NoopWrapper<T> {
147        pub fn new(item: T) -> Self {
148            Self { item }
149        }
150    }
151}