object_store_opendal/service/
mod.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use object_store::ObjectStore;
22use object_store::path::Path as ObjectStorePath;
23use opendal::Error;
24use opendal::ErrorKind;
25use opendal::raw::oio::BatchDeleter;
26use opendal::raw::oio::MultipartWriter;
27use opendal::raw::*;
28use opendal::*;
29
30mod core;
31mod deleter;
32mod error;
33mod lister;
34mod reader;
35mod writer;
36
37use deleter::ObjectStoreDeleter;
38use error::parse_error;
39use lister::ObjectStoreLister;
40use reader::ObjectStoreReader;
41use writer::ObjectStoreWriter;
42
43use crate::service::core::format_metadata as parse_metadata;
44use crate::service::core::parse_op_stat;
45
46pub const OBJECT_STORE_SCHEME: &str = "object_store";
47
48#[derive(Default)]
50pub struct ObjectStoreBuilder {
51 store: Option<Arc<dyn ObjectStore + 'static>>,
52}
53
54impl Debug for ObjectStoreBuilder {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 let mut d = f.debug_struct("ObjectStoreBuilder");
57 d.finish_non_exhaustive()
58 }
59}
60
61impl ObjectStoreBuilder {
62 pub fn new(store: Arc<dyn ObjectStore + 'static>) -> Self {
64 Self { store: Some(store) }
65 }
66}
67
68impl Builder for ObjectStoreBuilder {
69 type Config = ();
70
71 fn build(self) -> Result<impl Service> {
72 let store = self.store.ok_or_else(|| {
73 Error::new(ErrorKind::ConfigInvalid, "object store is required")
74 .with_context("service", OBJECT_STORE_SCHEME)
75 })?;
76
77 Ok(ObjectStoreService {
78 store,
79 info: ServiceInfo::new(OBJECT_STORE_SCHEME, "/", "object_store"),
80 capability: Capability {
81 stat: true,
82 stat_with_if_match: true,
83 stat_with_if_unmodified_since: true,
84 read: true,
85 read_with_suffix: true,
86 write: true,
87 delete: true,
88 list: true,
89 list_with_limit: true,
90 list_with_start_after: true,
91 delete_with_version: false,
92 ..Default::default()
93 },
94 })
95 }
96}
97
98pub struct ObjectStoreService {
100 store: Arc<dyn ObjectStore + 'static>,
101 info: ServiceInfo,
102 capability: Capability,
103}
104
105impl Debug for ObjectStoreService {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 let mut d = f.debug_struct("ObjectStoreBackend");
108 d.finish_non_exhaustive()
109 }
110}
111
112impl Service for ObjectStoreService {
113 type Reader = oio::StreamReader<ObjectStoreReader>;
114 type Writer = MultipartWriter<ObjectStoreWriter>;
115 type Lister = ObjectStoreLister;
116 type Deleter = BatchDeleter<ObjectStoreDeleter>;
117 type Copier = ();
118
119 fn info(&self) -> ServiceInfo {
120 self.info.clone()
121 }
122
123 fn capability(&self) -> Capability {
124 self.capability
125 }
126
127 async fn create_dir(
128 &self,
129 _: &OperationContext,
130 _: &str,
131 _: OpCreateDir,
132 ) -> Result<RpCreateDir> {
133 Err(Error::new(
134 ErrorKind::Unsupported,
135 "operation is not supported",
136 ))
137 }
138
139 async fn stat(&self, _ctx: &OperationContext, path: &str, args: OpStat) -> Result<RpStat> {
140 let path = ObjectStorePath::from(path);
141 let opts = parse_op_stat(&args)?;
142 let result = self
143 .store
144 .get_opts(&path, opts)
145 .await
146 .map_err(parse_error)?;
147 let metadata = parse_metadata(&result.meta);
148 Ok(RpStat::new(metadata))
149 }
150
151 fn read(&self, _ctx: &OperationContext, path: &str, args: OpRead) -> Result<Self::Reader> {
152 Ok(oio::StreamReader::new(ObjectStoreReader::new(
153 self.store.clone(),
154 path,
155 args,
156 )))
157 }
158
159 fn write(&self, ctx: &OperationContext, path: &str, args: OpWrite) -> Result<Self::Writer> {
160 let writer = ObjectStoreWriter::new(self.store.clone(), path, args);
161 Ok(MultipartWriter::new(ctx.executor().clone(), writer, 10))
162 }
163
164 fn delete(&self, _ctx: &OperationContext) -> Result<Self::Deleter> {
165 let deleter = BatchDeleter::new(ObjectStoreDeleter::new(self.store.clone()), Some(1000));
166 Ok(deleter)
167 }
168
169 fn list(&self, _ctx: &OperationContext, path: &str, args: OpList) -> Result<Self::Lister> {
170 let lister = ObjectStoreLister::new(self.store.clone(), path, args)?;
171 Ok(lister)
172 }
173
174 fn copy(
175 &self,
176 _: &OperationContext,
177 _: &str,
178 _: &str,
179 _: OpCopy,
180 _: OpCopier,
181 ) -> Result<Self::Copier> {
182 Err(Error::new(
183 ErrorKind::Unsupported,
184 "operation is not supported",
185 ))
186 }
187
188 async fn rename(
189 &self,
190 _: &OperationContext,
191 _: &str,
192 _: &str,
193 _: OpRename,
194 ) -> Result<RpRename> {
195 Err(Error::new(
196 ErrorKind::Unsupported,
197 "operation is not supported",
198 ))
199 }
200
201 async fn presign(&self, _: &OperationContext, _: &str, _: OpPresign) -> Result<RpPresign> {
202 Err(Error::new(
203 ErrorKind::Unsupported,
204 "operation is not supported",
205 ))
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212 use object_store::memory::InMemory;
213 use opendal::Buffer;
214 use opendal::raw::oio::{Delete, List, Read, ReadStream, Write};
215
216 fn test_ctx() -> OperationContext {
217 OperationContext::new()
218 }
219
220 #[tokio::test]
221 async fn test_object_store_backend_builder() {
222 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
223 let builder = ObjectStoreBuilder::new(store);
224
225 let backend = builder.build().expect("build should succeed");
226 assert_eq!(backend.info().scheme(), OBJECT_STORE_SCHEME);
227 }
228
229 #[tokio::test]
230 async fn test_object_store_backend_info() {
231 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
232 let backend = ObjectStoreBuilder::new(store)
233 .build()
234 .expect("build should succeed");
235
236 let info = backend.info();
237 assert_eq!(info.scheme(), "object_store");
238 assert_eq!(info.name(), "object_store".into());
239 assert_eq!(info.root(), "/".into());
240
241 let cap = backend.capability();
242 assert!(cap.stat);
243 assert!(cap.read);
244 assert!(cap.write);
245 assert!(cap.delete);
246 assert!(cap.list);
247 }
248
249 #[tokio::test]
250 async fn test_object_store_backend_basic_operations() {
251 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
252 let backend = ObjectStoreBuilder::new(store.clone())
253 .build()
254 .expect("build should succeed");
255 let ctx = test_ctx();
256
257 let path = "test_file.txt";
258 let content = b"Hello, world!";
259
260 let mut writer = backend
262 .write(&ctx, path, OpWrite::default())
263 .expect("write should succeed");
264
265 writer
266 .write(Buffer::from(&content[..]))
267 .await
268 .expect("write content should succeed");
269 writer.close().await.expect("close should succeed");
270
271 let stat_result = backend
273 .stat(&ctx, path, OpStat::default())
274 .await
275 .expect("stat should succeed");
276
277 assert_eq!(
278 stat_result.into_metadata().content_length(),
279 content.len() as u64
280 );
281
282 let reader = backend
284 .read(&ctx, path, OpRead::default())
285 .expect("read should succeed");
286
287 let (_, mut stream) = reader
288 .open(BytesRange::default())
289 .await
290 .expect("open should succeed");
291 let buf = stream.read().await.expect("read should succeed");
292 assert_eq!(buf.to_vec(), content);
293 }
294
295 #[tokio::test]
296 async fn test_object_store_backend_multipart_upload() {
297 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
298 let backend = ObjectStoreBuilder::new(store.clone())
299 .build()
300 .expect("build should succeed");
301 let ctx = test_ctx();
302
303 let path = "test_file.txt";
304 let content =
305 b"Hello, multipart upload! This is a test content for multipart upload functionality.";
306 let content_len = content.len();
307
308 let mut writer = backend
310 .write(&ctx, path, OpWrite::default())
311 .expect("write should succeed");
312
313 let chunk_size = 20;
315 for chunk in content.chunks(chunk_size) {
316 writer
317 .write(Buffer::from(chunk))
318 .await
319 .expect("write chunk should succeed");
320 }
321
322 writer.close().await.expect("close should succeed");
323
324 let stat_result = backend
326 .stat(&ctx, path, OpStat::default())
327 .await
328 .expect("stat should succeed");
329
330 assert_eq!(
331 stat_result.into_metadata().content_length(),
332 content_len as u64
333 );
334
335 let reader = backend
337 .read(&ctx, path, OpRead::default())
338 .expect("read should succeed");
339
340 let (_, mut stream) = reader
341 .open(BytesRange::default())
342 .await
343 .expect("open should succeed");
344 let buf = stream.read_all().await.expect("read should succeed");
345 assert_eq!(buf.to_vec(), content);
346 }
347
348 #[tokio::test]
349 async fn test_object_store_backend_list() {
350 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
351 let backend = ObjectStoreBuilder::new(store.clone())
352 .build()
353 .expect("build should succeed");
354 let ctx = test_ctx();
355
356 let files = vec![
358 ("dir1/file1.txt", b"content1"),
359 ("dir1/file2.txt", b"content2"),
360 ("dir2/file3.txt", b"content3"),
361 ];
362
363 for (path, content) in &files {
364 let mut writer = backend
365 .write(&ctx, path, OpWrite::default())
366 .expect("write should succeed");
367 writer
368 .write(Buffer::from(&content[..]))
369 .await
370 .expect("write content should succeed");
371 writer.close().await.expect("close should succeed");
372 }
373
374 let mut lister = backend
376 .list(&ctx, "dir1/", OpList::default())
377 .expect("list should succeed");
378
379 let mut entries = Vec::new();
380 while let Some(entry) = lister.next().await.expect("next should succeed") {
381 entries.push(entry);
382 }
383
384 assert_eq!(entries.len(), 2);
385 assert!(entries.iter().any(|e| e.path() == "dir1/file1.txt"));
386 assert!(entries.iter().any(|e| e.path() == "dir1/file2.txt"));
387 }
388
389 #[tokio::test]
390 async fn test_object_store_backend_delete() {
391 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
392 let backend = ObjectStoreBuilder::new(store)
393 .build()
394 .expect("build should succeed");
395 let ctx = test_ctx();
396
397 let path = "test_delete.txt";
398 let content = b"To be deleted";
399
400 let mut writer = backend
402 .write(&ctx, path, OpWrite::default())
403 .expect("write should succeed");
404 writer
405 .write(Buffer::from(&content[..]))
406 .await
407 .expect("write content should succeed");
408 writer.close().await.expect("close should succeed");
409
410 backend
412 .stat(&ctx, path, OpStat::default())
413 .await
414 .expect("file should exist");
415
416 let mut deleter = backend.delete(&ctx).expect("delete should succeed");
418 deleter
419 .delete(path, OpDelete::default())
420 .await
421 .expect("delete should succeed");
422 deleter.close().await.expect("close should succeed");
423
424 let result = backend.stat(&ctx, path, OpStat::default()).await;
426 assert!(result.is_err());
427 }
428
429 #[tokio::test]
430 async fn test_object_store_backend_error_handling() {
431 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
432 let backend = ObjectStoreBuilder::new(store)
433 .build()
434 .expect("build should succeed");
435 let ctx = test_ctx();
436
437 let result = backend
439 .stat(&ctx, "non_existent.txt", OpStat::default())
440 .await;
441 assert!(result.is_err());
442
443 let reader = backend
445 .read(&ctx, "non_existent.txt", OpRead::default())
446 .expect("read should create reader");
447 let result = reader.read(BytesRange::from(0..1)).await;
448 assert!(result.is_err());
449
450 let result = backend.list(&ctx, "non_existent_dir/", OpList::default());
452 if let Ok(mut lister) = result {
454 let entry = lister.next().await.expect("next should succeed");
455 assert!(entry.is_none());
456 }
457 }
458}