object_store_opendal/
utils.rs1use futures::Stream;
19use object_store::ObjectMeta;
20use opendal::Metadata;
21use std::future::IntoFuture;
22
23#[cfg(not(feature = "send_wrapper"))]
26use noop_wrapper::NoopWrapper as SendWrapper;
27#[cfg(feature = "send_wrapper")]
28use send_wrapper::SendWrapper;
29
30pub fn format_object_store_error(err: opendal::Error, path: &str) -> object_store::Error {
32 use opendal::ErrorKind;
33 match err.kind() {
34 ErrorKind::NotFound => object_store::Error::NotFound {
35 path: path.to_string(),
36 source: Box::new(err),
37 },
38 ErrorKind::Unsupported => object_store::Error::NotSupported {
39 source: Box::new(err),
40 },
41 ErrorKind::AlreadyExists => object_store::Error::AlreadyExists {
42 path: path.to_string(),
43 source: Box::new(err),
44 },
45 ErrorKind::ConditionNotMatch => object_store::Error::Precondition {
46 path: path.to_string(),
47 source: Box::new(err),
48 },
49 kind => object_store::Error::Generic {
50 store: kind.into_static(),
51 source: Box::new(err),
52 },
53 }
54}
55
56pub fn format_object_meta(path: &str, meta: &Metadata) -> ObjectMeta {
58 ObjectMeta {
59 location: path.into(),
60 last_modified: meta.last_modified().unwrap_or_default(),
61 size: meta.content_length() as usize,
62 e_tag: meta.etag().map(|x| x.to_string()),
63 version: meta.version().map(|x| x.to_string()),
64 }
65}
66
67pub trait IntoSendFuture {
69 type Output;
70
71 fn into_send(self) -> Self::Output;
72}
73
74impl<T> IntoSendFuture for T
75where
76 T: IntoFuture,
77{
78 type Output = SendWrapper<T::IntoFuture>;
79
80 fn into_send(self) -> Self::Output {
81 SendWrapper::new(self.into_future())
82 }
83}
84
85pub trait IntoSendStream {
87 type Output;
88
89 fn into_send(self) -> Self::Output;
90}
91
92impl<T> IntoSendStream for T
93where
94 T: Stream,
95{
96 type Output = SendWrapper<T>;
97 fn into_send(self) -> Self::Output {
98 SendWrapper::new(self)
99 }
100}
101
102#[cfg(not(feature = "send_wrapper"))]
103mod noop_wrapper {
104 use std::pin::Pin;
105 use std::task::Context;
106 use std::task::Poll;
107
108 use futures::Future;
109 use futures::Stream;
110 use pin_project::pin_project;
111
112 #[pin_project]
113 pub struct NoopWrapper<T> {
114 #[pin]
115 item: T,
116 }
117
118 impl<T> Future for NoopWrapper<T>
119 where
120 T: Future,
121 {
122 type Output = T::Output;
123
124 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
125 let this = self.project();
126 this.item.poll(cx)
127 }
128 }
129
130 impl<T> Stream for NoopWrapper<T>
131 where
132 T: Stream,
133 {
134 type Item = T::Item;
135
136 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
137 let this = self.project();
138 this.item.poll_next(cx)
139 }
140 }
141
142 impl<T> NoopWrapper<T> {
143 pub fn new(item: T) -> Self {
144 Self { item }
145 }
146 }
147}