object_store_opendal/service/
mod.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use object_store::ObjectStore;
22use object_store::path::Path as ObjectStorePath;
23use opendal::Error;
24use opendal::ErrorKind;
25use opendal::raw::oio::BatchDeleter;
26use opendal::raw::oio::MultipartWriter;
27use opendal::raw::*;
28use opendal::*;
29
30mod core;
31mod deleter;
32mod error;
33mod lister;
34mod reader;
35mod writer;
36
37use deleter::ObjectStoreDeleter;
38use error::parse_error;
39use lister::ObjectStoreLister;
40use reader::ObjectStoreReader;
41use writer::ObjectStoreWriter;
42
43use crate::service::core::format_metadata as parse_metadata;
44use crate::service::core::parse_op_stat;
45
46pub const OBJECT_STORE_SCHEME: &str = "object_store";
47
48#[derive(Default)]
50pub struct ObjectStoreBuilder {
51 store: Option<Arc<dyn ObjectStore + 'static>>,
52}
53
54impl Debug for ObjectStoreBuilder {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 let mut d = f.debug_struct("ObjectStoreBuilder");
57 d.finish_non_exhaustive()
58 }
59}
60
61impl ObjectStoreBuilder {
62 pub fn new(store: Arc<dyn ObjectStore + 'static>) -> Self {
64 Self { store: Some(store) }
65 }
66}
67
68impl Builder for ObjectStoreBuilder {
69 type Config = ();
70
71 fn build(self) -> Result<impl Access> {
72 let store = self.store.ok_or_else(|| {
73 Error::new(ErrorKind::ConfigInvalid, "object store is required")
74 .with_context("service", OBJECT_STORE_SCHEME)
75 })?;
76
77 Ok(ObjectStoreService { store })
78 }
79}
80
81pub struct ObjectStoreService {
83 store: Arc<dyn ObjectStore + 'static>,
84}
85
86impl Debug for ObjectStoreService {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 let mut d = f.debug_struct("ObjectStoreBackend");
89 d.finish_non_exhaustive()
90 }
91}
92
93impl Access for ObjectStoreService {
94 type Reader = ObjectStoreReader;
95 type Writer = MultipartWriter<ObjectStoreWriter>;
96 type Lister = ObjectStoreLister;
97 type Deleter = BatchDeleter<ObjectStoreDeleter>;
98
99 fn info(&self) -> Arc<AccessorInfo> {
100 let info = AccessorInfo::default();
101 info.set_scheme(OBJECT_STORE_SCHEME)
102 .set_root("/")
103 .set_name("object_store")
104 .set_native_capability(Capability {
105 stat: true,
106 stat_with_if_match: true,
107 stat_with_if_unmodified_since: true,
108 read: true,
109 write: true,
110 delete: true,
111 list: true,
112 list_with_limit: true,
113 list_with_start_after: true,
114 delete_with_version: false,
115 ..Default::default()
116 });
117 Arc::new(info)
118 }
119
120 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
121 let path = ObjectStorePath::from(path);
122 let opts = parse_op_stat(&args)?;
123 let result = self
124 .store
125 .get_opts(&path, opts)
126 .await
127 .map_err(parse_error)?;
128 let metadata = parse_metadata(&result.meta);
129 Ok(RpStat::new(metadata))
130 }
131
132 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
133 let reader = ObjectStoreReader::new(self.store.clone(), path, args).await?;
134 Ok((reader.rp(), reader))
135 }
136
137 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
138 let writer = ObjectStoreWriter::new(self.store.clone(), path, args);
139 Ok((
140 RpWrite::default(),
141 MultipartWriter::new(self.info(), writer, 10),
142 ))
143 }
144
145 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
146 let deleter = BatchDeleter::new(ObjectStoreDeleter::new(self.store.clone()), Some(1000));
147 Ok((RpDelete::default(), deleter))
148 }
149
150 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
151 let lister = ObjectStoreLister::new(self.store.clone(), path, args).await?;
152 Ok((RpList::default(), lister))
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use object_store::memory::InMemory;
160 use opendal::Buffer;
161 use opendal::raw::oio::{Delete, List, Read, Write};
162
163 #[tokio::test]
164 async fn test_object_store_backend_builder() {
165 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
166 let builder = ObjectStoreBuilder::new(store);
167
168 let backend = builder.build().expect("build should succeed");
169 assert!(backend.info().scheme() == OBJECT_STORE_SCHEME);
170 }
171
172 #[tokio::test]
173 async fn test_object_store_backend_info() {
174 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
175 let backend = ObjectStoreBuilder::new(store)
176 .build()
177 .expect("build should succeed");
178
179 let info = backend.info();
180 assert_eq!(info.scheme(), "object_store");
181 assert_eq!(info.name(), "object_store".into());
182 assert_eq!(info.root(), "/".into());
183
184 let cap = info.native_capability();
185 assert!(cap.stat);
186 assert!(cap.read);
187 assert!(cap.write);
188 assert!(cap.delete);
189 assert!(cap.list);
190 }
191
192 #[tokio::test]
193 async fn test_object_store_backend_basic_operations() {
194 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
195 let backend = ObjectStoreBuilder::new(store.clone())
196 .build()
197 .expect("build should succeed");
198
199 let path = "test_file.txt";
200 let content = b"Hello, world!";
201
202 let (_, mut writer) = backend
204 .write(path, OpWrite::default())
205 .await
206 .expect("write should succeed");
207
208 writer
209 .write(Buffer::from(&content[..]))
210 .await
211 .expect("write content should succeed");
212 writer.close().await.expect("close should succeed");
213
214 let stat_result = backend
216 .stat(path, OpStat::default())
217 .await
218 .expect("stat should succeed");
219
220 assert_eq!(
221 stat_result.into_metadata().content_length(),
222 content.len() as u64
223 );
224
225 let (_, mut reader) = backend
227 .read(path, OpRead::default())
228 .await
229 .expect("read should succeed");
230
231 let buf = reader.read().await.expect("read should succeed");
232 assert_eq!(buf.to_vec(), content);
233 }
234
235 #[tokio::test]
236 async fn test_object_store_backend_multipart_upload() {
237 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
238 let backend = ObjectStoreBuilder::new(store.clone())
239 .build()
240 .expect("build should succeed");
241
242 let path = "test_file.txt";
243 let content =
244 b"Hello, multipart upload! This is a test content for multipart upload functionality.";
245 let content_len = content.len();
246
247 let (_, mut writer) = backend
249 .write(path, OpWrite::default())
250 .await
251 .expect("write should succeed");
252
253 let chunk_size = 20;
255 for chunk in content.chunks(chunk_size) {
256 writer
257 .write(Buffer::from(chunk))
258 .await
259 .expect("write chunk should succeed");
260 }
261
262 writer.close().await.expect("close should succeed");
263
264 let stat_result = backend
266 .stat(path, OpStat::default())
267 .await
268 .expect("stat should succeed");
269
270 assert_eq!(
271 stat_result.into_metadata().content_length(),
272 content_len as u64
273 );
274
275 let (_, mut reader) = backend
277 .read(path, OpRead::default())
278 .await
279 .expect("read should succeed");
280
281 let buf = reader.read().await.expect("read should succeed");
282 assert_eq!(buf.to_vec(), content);
283 }
284
285 #[tokio::test]
286 async fn test_object_store_backend_list() {
287 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
288 let backend = ObjectStoreBuilder::new(store.clone())
289 .build()
290 .expect("build should succeed");
291
292 let files = vec![
294 ("dir1/file1.txt", b"content1"),
295 ("dir1/file2.txt", b"content2"),
296 ("dir2/file3.txt", b"content3"),
297 ];
298
299 for (path, content) in &files {
300 let (_, mut writer) = backend
301 .write(path, OpWrite::default())
302 .await
303 .expect("write should succeed");
304 writer
305 .write(Buffer::from(&content[..]))
306 .await
307 .expect("write content should succeed");
308 writer.close().await.expect("close should succeed");
309 }
310
311 let (_, mut lister) = backend
313 .list("dir1/", OpList::default())
314 .await
315 .expect("list should succeed");
316
317 let mut entries = Vec::new();
318 while let Some(entry) = lister.next().await.expect("next should succeed") {
319 entries.push(entry);
320 }
321
322 assert_eq!(entries.len(), 2);
323 assert!(entries.iter().any(|e| e.path() == "dir1/file1.txt"));
324 assert!(entries.iter().any(|e| e.path() == "dir1/file2.txt"));
325 }
326
327 #[tokio::test]
328 async fn test_object_store_backend_delete() {
329 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
330 let backend = ObjectStoreBuilder::new(store)
331 .build()
332 .expect("build should succeed");
333
334 let path = "test_delete.txt";
335 let content = b"To be deleted";
336
337 let (_, mut writer) = backend
339 .write(path, OpWrite::default())
340 .await
341 .expect("write should succeed");
342 writer
343 .write(Buffer::from(&content[..]))
344 .await
345 .expect("write content should succeed");
346 writer.close().await.expect("close should succeed");
347
348 backend
350 .stat(path, OpStat::default())
351 .await
352 .expect("file should exist");
353
354 let (_, mut deleter) = backend.delete().await.expect("delete should succeed");
356 deleter
357 .delete(path, OpDelete::default())
358 .await
359 .expect("delete should succeed");
360 deleter.close().await.expect("close should succeed");
361
362 let result = backend.stat(path, OpStat::default()).await;
364 assert!(result.is_err());
365 }
366
367 #[tokio::test]
368 async fn test_object_store_backend_error_handling() {
369 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
370 let backend = ObjectStoreBuilder::new(store)
371 .build()
372 .expect("build should succeed");
373
374 let result = backend.stat("non_existent.txt", OpStat::default()).await;
376 assert!(result.is_err());
377
378 let result = backend.read("non_existent.txt", OpRead::default()).await;
380 assert!(result.is_err());
381
382 let result = backend.list("non_existent_dir/", OpList::default()).await;
384 if let Ok((_, mut lister)) = result {
386 let entry = lister.next().await.expect("next should succeed");
387 assert!(entry.is_none());
388 }
389 }
390}