object_store_opendal/service/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use object_store::ObjectStore;
22use object_store::path::Path as ObjectStorePath;
23use opendal::Error;
24use opendal::ErrorKind;
25use opendal::raw::oio::BatchDeleter;
26use opendal::raw::oio::MultipartWriter;
27use opendal::raw::*;
28use opendal::*;
29
30mod core;
31mod deleter;
32mod error;
33mod lister;
34mod reader;
35mod writer;
36
37use deleter::ObjectStoreDeleter;
38use error::parse_error;
39use lister::ObjectStoreLister;
40use reader::ObjectStoreReader;
41use writer::ObjectStoreWriter;
42
43use crate::service::core::format_metadata as parse_metadata;
44use crate::service::core::parse_op_stat;
45
46pub const OBJECT_STORE_SCHEME: &str = "object_store";
47
48/// ObjectStore backend builder
49#[derive(Default)]
50pub struct ObjectStoreBuilder {
51    store: Option<Arc<dyn ObjectStore + 'static>>,
52}
53
54impl Debug for ObjectStoreBuilder {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        let mut d = f.debug_struct("ObjectStoreBuilder");
57        d.finish_non_exhaustive()
58    }
59}
60
61impl ObjectStoreBuilder {
62    /// Set the object store instance
63    pub fn new(store: Arc<dyn ObjectStore + 'static>) -> Self {
64        Self { store: Some(store) }
65    }
66}
67
68impl Builder for ObjectStoreBuilder {
69    type Config = ();
70
71    fn build(self) -> Result<impl Access> {
72        let store = self.store.ok_or_else(|| {
73            Error::new(ErrorKind::ConfigInvalid, "object store is required")
74                .with_context("service", OBJECT_STORE_SCHEME)
75        })?;
76
77        Ok(ObjectStoreService { store })
78    }
79}
80
81/// ObjectStore backend
82pub struct ObjectStoreService {
83    store: Arc<dyn ObjectStore + 'static>,
84}
85
86impl Debug for ObjectStoreService {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        let mut d = f.debug_struct("ObjectStoreBackend");
89        d.finish_non_exhaustive()
90    }
91}
92
93impl Access for ObjectStoreService {
94    type Reader = ObjectStoreReader;
95    type Writer = MultipartWriter<ObjectStoreWriter>;
96    type Lister = ObjectStoreLister;
97    type Deleter = BatchDeleter<ObjectStoreDeleter>;
98    type Copier = ();
99
100    fn info(&self) -> Arc<AccessorInfo> {
101        let info = AccessorInfo::default();
102        info.set_scheme(OBJECT_STORE_SCHEME)
103            .set_root("/")
104            .set_name("object_store")
105            .set_native_capability(Capability {
106                stat: true,
107                stat_with_if_match: true,
108                stat_with_if_unmodified_since: true,
109                read: true,
110                write: true,
111                delete: true,
112                list: true,
113                list_with_limit: true,
114                list_with_start_after: true,
115                delete_with_version: false,
116                ..Default::default()
117            });
118        Arc::new(info)
119    }
120
121    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
122        let path = ObjectStorePath::from(path);
123        let opts = parse_op_stat(&args)?;
124        let result = self
125            .store
126            .get_opts(&path, opts)
127            .await
128            .map_err(parse_error)?;
129        let metadata = parse_metadata(&result.meta);
130        Ok(RpStat::new(metadata))
131    }
132
133    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
134        let reader = ObjectStoreReader::new(self.store.clone(), path, args).await?;
135        Ok((reader.rp(), reader))
136    }
137
138    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
139        let writer = ObjectStoreWriter::new(self.store.clone(), path, args);
140        Ok((
141            RpWrite::default(),
142            MultipartWriter::new(self.info(), writer, 10),
143        ))
144    }
145
146    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
147        let deleter = BatchDeleter::new(ObjectStoreDeleter::new(self.store.clone()), Some(1000));
148        Ok((RpDelete::default(), deleter))
149    }
150
151    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
152        let lister = ObjectStoreLister::new(self.store.clone(), path, args).await?;
153        Ok((RpList::default(), lister))
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use object_store::memory::InMemory;
161    use opendal::Buffer;
162    use opendal::raw::oio::{Delete, List, ReadStream, Write};
163
164    #[tokio::test]
165    async fn test_object_store_backend_builder() {
166        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
167        let builder = ObjectStoreBuilder::new(store);
168
169        let backend = builder.build().expect("build should succeed");
170        assert!(backend.info().scheme() == OBJECT_STORE_SCHEME);
171    }
172
173    #[tokio::test]
174    async fn test_object_store_backend_info() {
175        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
176        let backend = ObjectStoreBuilder::new(store)
177            .build()
178            .expect("build should succeed");
179
180        let info = backend.info();
181        assert_eq!(info.scheme(), "object_store");
182        assert_eq!(info.name(), "object_store".into());
183        assert_eq!(info.root(), "/".into());
184
185        let cap = info.native_capability();
186        assert!(cap.stat);
187        assert!(cap.read);
188        assert!(cap.write);
189        assert!(cap.delete);
190        assert!(cap.list);
191    }
192
193    #[tokio::test]
194    async fn test_object_store_backend_basic_operations() {
195        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
196        let backend = ObjectStoreBuilder::new(store.clone())
197            .build()
198            .expect("build should succeed");
199
200        let path = "test_file.txt";
201        let content = b"Hello, world!";
202
203        // Test write
204        let (_, mut writer) = backend
205            .write(path, OpWrite::default())
206            .await
207            .expect("write should succeed");
208
209        writer
210            .write(Buffer::from(&content[..]))
211            .await
212            .expect("write content should succeed");
213        writer.close().await.expect("close should succeed");
214
215        // Test stat
216        let stat_result = backend
217            .stat(path, OpStat::default())
218            .await
219            .expect("stat should succeed");
220
221        assert_eq!(
222            stat_result.into_metadata().content_length(),
223            content.len() as u64
224        );
225
226        // Test read
227        let (_, mut reader) = backend
228            .read(path, OpRead::default())
229            .await
230            .expect("read should succeed");
231
232        let buf = reader.read().await.expect("read should succeed");
233        assert_eq!(buf.to_vec(), content);
234    }
235
236    #[tokio::test]
237    async fn test_object_store_backend_multipart_upload() {
238        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
239        let backend = ObjectStoreBuilder::new(store.clone())
240            .build()
241            .expect("build should succeed");
242
243        let path = "test_file.txt";
244        let content =
245            b"Hello, multipart upload! This is a test content for multipart upload functionality.";
246        let content_len = content.len();
247
248        // Test multipart upload with multiple chunks
249        let (_, mut writer) = backend
250            .write(path, OpWrite::default())
251            .await
252            .expect("write should succeed");
253
254        // Write content in chunks to simulate multipart upload
255        let chunk_size = 20;
256        for chunk in content.chunks(chunk_size) {
257            writer
258                .write(Buffer::from(chunk))
259                .await
260                .expect("write chunk should succeed");
261        }
262
263        writer.close().await.expect("close should succeed");
264
265        // Verify the uploaded file
266        let stat_result = backend
267            .stat(path, OpStat::default())
268            .await
269            .expect("stat should succeed");
270
271        assert_eq!(
272            stat_result.into_metadata().content_length(),
273            content_len as u64
274        );
275
276        // Read back and verify content
277        let (_, mut reader) = backend
278            .read(path, OpRead::default())
279            .await
280            .expect("read should succeed");
281
282        let buf = reader.read().await.expect("read should succeed");
283        assert_eq!(buf.to_vec(), content);
284    }
285
286    #[tokio::test]
287    async fn test_object_store_backend_list() {
288        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
289        let backend = ObjectStoreBuilder::new(store.clone())
290            .build()
291            .expect("build should succeed");
292
293        // Create multiple files
294        let files = vec![
295            ("dir1/file1.txt", b"content1"),
296            ("dir1/file2.txt", b"content2"),
297            ("dir2/file3.txt", b"content3"),
298        ];
299
300        for (path, content) in &files {
301            let (_, mut writer) = backend
302                .write(path, OpWrite::default())
303                .await
304                .expect("write should succeed");
305            writer
306                .write(Buffer::from(&content[..]))
307                .await
308                .expect("write content should succeed");
309            writer.close().await.expect("close should succeed");
310        }
311
312        // List directory
313        let (_, mut lister) = backend
314            .list("dir1/", OpList::default())
315            .await
316            .expect("list should succeed");
317
318        let mut entries = Vec::new();
319        while let Some(entry) = lister.next().await.expect("next should succeed") {
320            entries.push(entry);
321        }
322
323        assert_eq!(entries.len(), 2);
324        assert!(entries.iter().any(|e| e.path() == "dir1/file1.txt"));
325        assert!(entries.iter().any(|e| e.path() == "dir1/file2.txt"));
326    }
327
328    #[tokio::test]
329    async fn test_object_store_backend_delete() {
330        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
331        let backend = ObjectStoreBuilder::new(store)
332            .build()
333            .expect("build should succeed");
334
335        let path = "test_delete.txt";
336        let content = b"To be deleted";
337
338        // Write file
339        let (_, mut writer) = backend
340            .write(path, OpWrite::default())
341            .await
342            .expect("write should succeed");
343        writer
344            .write(Buffer::from(&content[..]))
345            .await
346            .expect("write content should succeed");
347        writer.close().await.expect("close should succeed");
348
349        // Verify file exists
350        backend
351            .stat(path, OpStat::default())
352            .await
353            .expect("file should exist");
354
355        // Delete file
356        let (_, mut deleter) = backend.delete().await.expect("delete should succeed");
357        deleter
358            .delete(path, OpDelete::default())
359            .await
360            .expect("delete should succeed");
361        deleter.close().await.expect("close should succeed");
362
363        // Verify file is deleted
364        let result = backend.stat(path, OpStat::default()).await;
365        assert!(result.is_err());
366    }
367
368    #[tokio::test]
369    async fn test_object_store_backend_error_handling() {
370        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
371        let backend = ObjectStoreBuilder::new(store)
372            .build()
373            .expect("build should succeed");
374
375        // Test stat on non-existent file
376        let result = backend.stat("non_existent.txt", OpStat::default()).await;
377        assert!(result.is_err());
378
379        // Test read on non-existent file
380        let result = backend.read("non_existent.txt", OpRead::default()).await;
381        assert!(result.is_err());
382
383        // Test list on non-existent directory
384        let result = backend.list("non_existent_dir/", OpList::default()).await;
385        // This should succeed but return empty results
386        if let Ok((_, mut lister)) = result {
387            let entry = lister.next().await.expect("next should succeed");
388            assert!(entry.is_none());
389        }
390    }
391}