Skip to main content

object_store_opendal/service/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use object_store::ObjectStore;
22use object_store::path::Path as ObjectStorePath;
23use opendal::Error;
24use opendal::ErrorKind;
25use opendal::raw::oio::BatchDeleter;
26use opendal::raw::oio::MultipartWriter;
27use opendal::raw::*;
28use opendal::*;
29
30mod core;
31mod deleter;
32mod error;
33mod lister;
34mod reader;
35mod writer;
36
37use deleter::ObjectStoreDeleter;
38use error::parse_error;
39use lister::ObjectStoreLister;
40use reader::ObjectStoreReader;
41use writer::ObjectStoreWriter;
42
43use crate::service::core::format_metadata as parse_metadata;
44use crate::service::core::parse_op_stat;
45
46pub const OBJECT_STORE_SCHEME: &str = "object_store";
47
48/// ObjectStore backend builder
49#[derive(Default)]
50pub struct ObjectStoreBuilder {
51    store: Option<Arc<dyn ObjectStore + 'static>>,
52}
53
54impl Debug for ObjectStoreBuilder {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        let mut d = f.debug_struct("ObjectStoreBuilder");
57        d.finish_non_exhaustive()
58    }
59}
60
61impl ObjectStoreBuilder {
62    /// Set the object store instance
63    pub fn new(store: Arc<dyn ObjectStore + 'static>) -> Self {
64        Self { store: Some(store) }
65    }
66}
67
68impl Builder for ObjectStoreBuilder {
69    type Config = ();
70
71    fn build(self) -> Result<impl Service> {
72        let store = self.store.ok_or_else(|| {
73            Error::new(ErrorKind::ConfigInvalid, "object store is required")
74                .with_context("service", OBJECT_STORE_SCHEME)
75        })?;
76
77        Ok(ObjectStoreService {
78            store,
79            info: ServiceInfo::new(OBJECT_STORE_SCHEME, "/", "object_store"),
80            capability: Capability {
81                stat: true,
82                stat_with_if_match: true,
83                stat_with_if_unmodified_since: true,
84                read: true,
85                read_with_suffix: true,
86                write: true,
87                delete: true,
88                list: true,
89                list_with_limit: true,
90                list_with_start_after: true,
91                delete_with_version: false,
92                ..Default::default()
93            },
94        })
95    }
96}
97
98/// ObjectStore backend
99pub struct ObjectStoreService {
100    store: Arc<dyn ObjectStore + 'static>,
101    info: ServiceInfo,
102    capability: Capability,
103}
104
105impl Debug for ObjectStoreService {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        let mut d = f.debug_struct("ObjectStoreBackend");
108        d.finish_non_exhaustive()
109    }
110}
111
112impl Service for ObjectStoreService {
113    type Reader = oio::StreamReader<ObjectStoreReader>;
114    type Writer = MultipartWriter<ObjectStoreWriter>;
115    type Lister = ObjectStoreLister;
116    type Deleter = BatchDeleter<ObjectStoreDeleter>;
117    type Copier = ();
118
119    fn info(&self) -> ServiceInfo {
120        self.info.clone()
121    }
122
123    fn capability(&self) -> Capability {
124        self.capability
125    }
126
127    async fn create_dir(
128        &self,
129        _: &OperationContext,
130        _: &str,
131        _: OpCreateDir,
132    ) -> Result<RpCreateDir> {
133        Err(Error::new(
134            ErrorKind::Unsupported,
135            "operation is not supported",
136        ))
137    }
138
139    async fn stat(&self, _ctx: &OperationContext, path: &str, args: OpStat) -> Result<RpStat> {
140        let path = ObjectStorePath::from(path);
141        let opts = parse_op_stat(&args)?;
142        let result = self
143            .store
144            .get_opts(&path, opts)
145            .await
146            .map_err(parse_error)?;
147        let metadata = parse_metadata(&result.meta);
148        Ok(RpStat::new(metadata))
149    }
150
151    fn read(&self, _ctx: &OperationContext, path: &str, args: OpRead) -> Result<Self::Reader> {
152        Ok(oio::StreamReader::new(ObjectStoreReader::new(
153            self.store.clone(),
154            path,
155            args,
156        )))
157    }
158
159    fn write(&self, ctx: &OperationContext, path: &str, args: OpWrite) -> Result<Self::Writer> {
160        let writer = ObjectStoreWriter::new(self.store.clone(), path, args);
161        Ok(MultipartWriter::new(ctx.executor().clone(), writer, 10))
162    }
163
164    fn delete(&self, _ctx: &OperationContext) -> Result<Self::Deleter> {
165        let deleter = BatchDeleter::new(ObjectStoreDeleter::new(self.store.clone()), Some(1000));
166        Ok(deleter)
167    }
168
169    fn list(&self, _ctx: &OperationContext, path: &str, args: OpList) -> Result<Self::Lister> {
170        let lister = ObjectStoreLister::new(self.store.clone(), path, args)?;
171        Ok(lister)
172    }
173
174    fn copy(
175        &self,
176        _: &OperationContext,
177        _: &str,
178        _: &str,
179        _: OpCopy,
180        _: OpCopier,
181    ) -> Result<Self::Copier> {
182        Err(Error::new(
183            ErrorKind::Unsupported,
184            "operation is not supported",
185        ))
186    }
187
188    async fn rename(
189        &self,
190        _: &OperationContext,
191        _: &str,
192        _: &str,
193        _: OpRename,
194    ) -> Result<RpRename> {
195        Err(Error::new(
196            ErrorKind::Unsupported,
197            "operation is not supported",
198        ))
199    }
200
201    async fn presign(&self, _: &OperationContext, _: &str, _: OpPresign) -> Result<RpPresign> {
202        Err(Error::new(
203            ErrorKind::Unsupported,
204            "operation is not supported",
205        ))
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use object_store::memory::InMemory;
213    use opendal::Buffer;
214    use opendal::raw::oio::{Delete, List, Read, ReadStream, Write};
215
216    fn test_ctx() -> OperationContext {
217        OperationContext::new()
218    }
219
220    #[tokio::test]
221    async fn test_object_store_backend_builder() {
222        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
223        let builder = ObjectStoreBuilder::new(store);
224
225        let backend = builder.build().expect("build should succeed");
226        assert_eq!(backend.info().scheme(), OBJECT_STORE_SCHEME);
227    }
228
229    #[tokio::test]
230    async fn test_object_store_backend_info() {
231        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
232        let backend = ObjectStoreBuilder::new(store)
233            .build()
234            .expect("build should succeed");
235
236        let info = backend.info();
237        assert_eq!(info.scheme(), "object_store");
238        assert_eq!(info.name(), "object_store".into());
239        assert_eq!(info.root(), "/".into());
240
241        let cap = backend.capability();
242        assert!(cap.stat);
243        assert!(cap.read);
244        assert!(cap.write);
245        assert!(cap.delete);
246        assert!(cap.list);
247    }
248
249    #[tokio::test]
250    async fn test_object_store_backend_basic_operations() {
251        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
252        let backend = ObjectStoreBuilder::new(store.clone())
253            .build()
254            .expect("build should succeed");
255        let ctx = test_ctx();
256
257        let path = "test_file.txt";
258        let content = b"Hello, world!";
259
260        // Test write
261        let mut writer = backend
262            .write(&ctx, path, OpWrite::default())
263            .expect("write should succeed");
264
265        writer
266            .write(Buffer::from(&content[..]))
267            .await
268            .expect("write content should succeed");
269        writer.close().await.expect("close should succeed");
270
271        // Test stat
272        let stat_result = backend
273            .stat(&ctx, path, OpStat::default())
274            .await
275            .expect("stat should succeed");
276
277        assert_eq!(
278            stat_result.into_metadata().content_length(),
279            content.len() as u64
280        );
281
282        // Test read
283        let reader = backend
284            .read(&ctx, path, OpRead::default())
285            .expect("read should succeed");
286
287        let (_, mut stream) = reader
288            .open(BytesRange::default())
289            .await
290            .expect("open should succeed");
291        let buf = stream.read().await.expect("read should succeed");
292        assert_eq!(buf.to_vec(), content);
293    }
294
295    #[tokio::test]
296    async fn test_object_store_backend_multipart_upload() {
297        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
298        let backend = ObjectStoreBuilder::new(store.clone())
299            .build()
300            .expect("build should succeed");
301        let ctx = test_ctx();
302
303        let path = "test_file.txt";
304        let content =
305            b"Hello, multipart upload! This is a test content for multipart upload functionality.";
306        let content_len = content.len();
307
308        // Test multipart upload with multiple chunks
309        let mut writer = backend
310            .write(&ctx, path, OpWrite::default())
311            .expect("write should succeed");
312
313        // Write content in chunks to simulate multipart upload
314        let chunk_size = 20;
315        for chunk in content.chunks(chunk_size) {
316            writer
317                .write(Buffer::from(chunk))
318                .await
319                .expect("write chunk should succeed");
320        }
321
322        writer.close().await.expect("close should succeed");
323
324        // Verify the uploaded file
325        let stat_result = backend
326            .stat(&ctx, path, OpStat::default())
327            .await
328            .expect("stat should succeed");
329
330        assert_eq!(
331            stat_result.into_metadata().content_length(),
332            content_len as u64
333        );
334
335        // Read back and verify content
336        let reader = backend
337            .read(&ctx, path, OpRead::default())
338            .expect("read should succeed");
339
340        let (_, mut stream) = reader
341            .open(BytesRange::default())
342            .await
343            .expect("open should succeed");
344        let buf = stream.read_all().await.expect("read should succeed");
345        assert_eq!(buf.to_vec(), content);
346    }
347
348    #[tokio::test]
349    async fn test_object_store_backend_list() {
350        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
351        let backend = ObjectStoreBuilder::new(store.clone())
352            .build()
353            .expect("build should succeed");
354        let ctx = test_ctx();
355
356        // Create multiple files
357        let files = vec![
358            ("dir1/file1.txt", b"content1"),
359            ("dir1/file2.txt", b"content2"),
360            ("dir2/file3.txt", b"content3"),
361        ];
362
363        for (path, content) in &files {
364            let mut writer = backend
365                .write(&ctx, path, OpWrite::default())
366                .expect("write should succeed");
367            writer
368                .write(Buffer::from(&content[..]))
369                .await
370                .expect("write content should succeed");
371            writer.close().await.expect("close should succeed");
372        }
373
374        // List directory
375        let mut lister = backend
376            .list(&ctx, "dir1/", OpList::default())
377            .expect("list should succeed");
378
379        let mut entries = Vec::new();
380        while let Some(entry) = lister.next().await.expect("next should succeed") {
381            entries.push(entry);
382        }
383
384        assert_eq!(entries.len(), 2);
385        assert!(entries.iter().any(|e| e.path() == "dir1/file1.txt"));
386        assert!(entries.iter().any(|e| e.path() == "dir1/file2.txt"));
387    }
388
389    #[tokio::test]
390    async fn test_object_store_backend_delete() {
391        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
392        let backend = ObjectStoreBuilder::new(store)
393            .build()
394            .expect("build should succeed");
395        let ctx = test_ctx();
396
397        let path = "test_delete.txt";
398        let content = b"To be deleted";
399
400        // Write file
401        let mut writer = backend
402            .write(&ctx, path, OpWrite::default())
403            .expect("write should succeed");
404        writer
405            .write(Buffer::from(&content[..]))
406            .await
407            .expect("write content should succeed");
408        writer.close().await.expect("close should succeed");
409
410        // Verify file exists
411        backend
412            .stat(&ctx, path, OpStat::default())
413            .await
414            .expect("file should exist");
415
416        // Delete file
417        let mut deleter = backend.delete(&ctx).expect("delete should succeed");
418        deleter
419            .delete(path, OpDelete::default())
420            .await
421            .expect("delete should succeed");
422        deleter.close().await.expect("close should succeed");
423
424        // Verify file is deleted
425        let result = backend.stat(&ctx, path, OpStat::default()).await;
426        assert!(result.is_err());
427    }
428
429    #[tokio::test]
430    async fn test_object_store_backend_error_handling() {
431        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
432        let backend = ObjectStoreBuilder::new(store)
433            .build()
434            .expect("build should succeed");
435        let ctx = test_ctx();
436
437        // Test stat on non-existent file
438        let result = backend
439            .stat(&ctx, "non_existent.txt", OpStat::default())
440            .await;
441        assert!(result.is_err());
442
443        // Test read on non-existent file
444        let reader = backend
445            .read(&ctx, "non_existent.txt", OpRead::default())
446            .expect("read should create reader");
447        let result = reader.read(BytesRange::from(0..1)).await;
448        assert!(result.is_err());
449
450        // Test list on non-existent directory
451        let result = backend.list(&ctx, "non_existent_dir/", OpList::default());
452        // This should succeed but return empty results
453        if let Ok(mut lister) = result {
454            let entry = lister.next().await.expect("next should succeed");
455            assert!(entry.is_none());
456        }
457    }
458}