object_store_opendal/
utils.rs1use futures::Stream;
19use object_store::ObjectMeta;
20use opendal::Metadata;
21use std::future::IntoFuture;
22
23use crate::timestamp_to_datetime;
24#[cfg(not(feature = "send_wrapper"))]
27use noop_wrapper::NoopWrapper as SendWrapper;
28#[cfg(feature = "send_wrapper")]
29use send_wrapper::SendWrapper;
30
31pub fn format_object_store_error(err: opendal::Error, path: &str) -> object_store::Error {
33 use opendal::ErrorKind;
34 match err.kind() {
35 ErrorKind::NotFound => object_store::Error::NotFound {
36 path: path.to_string(),
37 source: Box::new(err),
38 },
39 ErrorKind::Unsupported => object_store::Error::NotSupported {
40 source: Box::new(err),
41 },
42 ErrorKind::AlreadyExists => object_store::Error::AlreadyExists {
43 path: path.to_string(),
44 source: Box::new(err),
45 },
46 ErrorKind::ConditionNotMatch => object_store::Error::Precondition {
47 path: path.to_string(),
48 source: Box::new(err),
49 },
50 kind => object_store::Error::Generic {
51 store: kind.into_static(),
52 source: Box::new(err),
53 },
54 }
55}
56
57pub fn format_object_meta(path: &str, meta: &Metadata) -> ObjectMeta {
59 ObjectMeta {
60 location: path.into(),
61 last_modified: meta
62 .last_modified()
63 .and_then(timestamp_to_datetime)
64 .unwrap_or_default(),
65 size: meta.content_length(),
66 e_tag: meta.etag().map(|x| x.to_string()),
67 version: meta.version().map(|x| x.to_string()),
68 }
69}
70
71pub trait IntoSendFuture {
73 type Output;
74
75 fn into_send(self) -> Self::Output;
76}
77
78impl<T> IntoSendFuture for T
79where
80 T: IntoFuture,
81{
82 type Output = SendWrapper<T::IntoFuture>;
83
84 fn into_send(self) -> Self::Output {
85 SendWrapper::new(self.into_future())
86 }
87}
88
89pub trait IntoSendStream {
91 type Output;
92
93 fn into_send(self) -> Self::Output;
94}
95
96impl<T> IntoSendStream for T
97where
98 T: Stream,
99{
100 type Output = SendWrapper<T>;
101 fn into_send(self) -> Self::Output {
102 SendWrapper::new(self)
103 }
104}
105
106#[cfg(not(feature = "send_wrapper"))]
107mod noop_wrapper {
108 use std::pin::Pin;
109 use std::task::Context;
110 use std::task::Poll;
111
112 use futures::Future;
113 use futures::Stream;
114 use pin_project::pin_project;
115
116 #[pin_project]
117 pub struct NoopWrapper<T> {
118 #[pin]
119 item: T,
120 }
121
122 impl<T> Future for NoopWrapper<T>
123 where
124 T: Future,
125 {
126 type Output = T::Output;
127
128 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
129 let this = self.project();
130 this.item.poll(cx)
131 }
132 }
133
134 impl<T> Stream for NoopWrapper<T>
135 where
136 T: Stream,
137 {
138 type Item = T::Item;
139
140 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
141 let this = self.project();
142 this.item.poll_next(cx)
143 }
144 }
145
146 impl<T> NoopWrapper<T> {
147 pub fn new(item: T) -> Self {
148 Self { item }
149 }
150 }
151}