object_store_opendal/service/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use object_store::ObjectStore;
22use object_store::path::Path as ObjectStorePath;
23use opendal::Error;
24use opendal::ErrorKind;
25use opendal::raw::oio::BatchDeleter;
26use opendal::raw::oio::MultipartWriter;
27use opendal::raw::*;
28use opendal::*;
29
30mod core;
31mod deleter;
32mod error;
33mod lister;
34mod reader;
35mod writer;
36
37use deleter::ObjectStoreDeleter;
38use error::parse_error;
39use lister::ObjectStoreLister;
40use reader::ObjectStoreReader;
41use writer::ObjectStoreWriter;
42
43use crate::service::core::format_metadata as parse_metadata;
44use crate::service::core::parse_op_stat;
45
46pub const OBJECT_STORE_SCHEME: &str = "object_store";
47
48/// ObjectStore backend builder
49#[derive(Default)]
50pub struct ObjectStoreBuilder {
51    store: Option<Arc<dyn ObjectStore + 'static>>,
52}
53
54impl Debug for ObjectStoreBuilder {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        let mut d = f.debug_struct("ObjectStoreBuilder");
57        d.finish_non_exhaustive()
58    }
59}
60
61impl ObjectStoreBuilder {
62    /// Set the object store instance
63    pub fn new(store: Arc<dyn ObjectStore + 'static>) -> Self {
64        Self { store: Some(store) }
65    }
66}
67
68impl Builder for ObjectStoreBuilder {
69    type Config = ();
70
71    fn build(self) -> Result<impl Access> {
72        let store = self.store.ok_or_else(|| {
73            Error::new(ErrorKind::ConfigInvalid, "object store is required")
74                .with_context("service", OBJECT_STORE_SCHEME)
75        })?;
76
77        Ok(ObjectStoreService { store })
78    }
79}
80
81/// ObjectStore backend
82pub struct ObjectStoreService {
83    store: Arc<dyn ObjectStore + 'static>,
84}
85
86impl Debug for ObjectStoreService {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        let mut d = f.debug_struct("ObjectStoreBackend");
89        d.finish_non_exhaustive()
90    }
91}
92
93impl Access for ObjectStoreService {
94    type Reader = ObjectStoreReader;
95    type Writer = MultipartWriter<ObjectStoreWriter>;
96    type Lister = ObjectStoreLister;
97    type Deleter = BatchDeleter<ObjectStoreDeleter>;
98
99    fn info(&self) -> Arc<AccessorInfo> {
100        let info = AccessorInfo::default();
101        info.set_scheme(OBJECT_STORE_SCHEME)
102            .set_root("/")
103            .set_name("object_store")
104            .set_native_capability(Capability {
105                stat: true,
106                stat_with_if_match: true,
107                stat_with_if_unmodified_since: true,
108                read: true,
109                write: true,
110                delete: true,
111                list: true,
112                list_with_limit: true,
113                list_with_start_after: true,
114                delete_with_version: false,
115                ..Default::default()
116            });
117        Arc::new(info)
118    }
119
120    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
121        let path = ObjectStorePath::from(path);
122        let opts = parse_op_stat(&args)?;
123        let result = self
124            .store
125            .get_opts(&path, opts)
126            .await
127            .map_err(parse_error)?;
128        let metadata = parse_metadata(&result.meta);
129        Ok(RpStat::new(metadata))
130    }
131
132    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
133        let reader = ObjectStoreReader::new(self.store.clone(), path, args).await?;
134        Ok((reader.rp(), reader))
135    }
136
137    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
138        let writer = ObjectStoreWriter::new(self.store.clone(), path, args);
139        Ok((
140            RpWrite::default(),
141            MultipartWriter::new(self.info(), writer, 10),
142        ))
143    }
144
145    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
146        let deleter = BatchDeleter::new(ObjectStoreDeleter::new(self.store.clone()), Some(1000));
147        Ok((RpDelete::default(), deleter))
148    }
149
150    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
151        let lister = ObjectStoreLister::new(self.store.clone(), path, args).await?;
152        Ok((RpList::default(), lister))
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use object_store::memory::InMemory;
160    use opendal::Buffer;
161    use opendal::raw::oio::{Delete, List, Read, Write};
162
163    #[tokio::test]
164    async fn test_object_store_backend_builder() {
165        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
166        let builder = ObjectStoreBuilder::new(store);
167
168        let backend = builder.build().expect("build should succeed");
169        assert!(backend.info().scheme() == OBJECT_STORE_SCHEME);
170    }
171
172    #[tokio::test]
173    async fn test_object_store_backend_info() {
174        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
175        let backend = ObjectStoreBuilder::new(store)
176            .build()
177            .expect("build should succeed");
178
179        let info = backend.info();
180        assert_eq!(info.scheme(), "object_store");
181        assert_eq!(info.name(), "object_store".into());
182        assert_eq!(info.root(), "/".into());
183
184        let cap = info.native_capability();
185        assert!(cap.stat);
186        assert!(cap.read);
187        assert!(cap.write);
188        assert!(cap.delete);
189        assert!(cap.list);
190    }
191
192    #[tokio::test]
193    async fn test_object_store_backend_basic_operations() {
194        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
195        let backend = ObjectStoreBuilder::new(store.clone())
196            .build()
197            .expect("build should succeed");
198
199        let path = "test_file.txt";
200        let content = b"Hello, world!";
201
202        // Test write
203        let (_, mut writer) = backend
204            .write(path, OpWrite::default())
205            .await
206            .expect("write should succeed");
207
208        writer
209            .write(Buffer::from(&content[..]))
210            .await
211            .expect("write content should succeed");
212        writer.close().await.expect("close should succeed");
213
214        // Test stat
215        let stat_result = backend
216            .stat(path, OpStat::default())
217            .await
218            .expect("stat should succeed");
219
220        assert_eq!(
221            stat_result.into_metadata().content_length(),
222            content.len() as u64
223        );
224
225        // Test read
226        let (_, mut reader) = backend
227            .read(path, OpRead::default())
228            .await
229            .expect("read should succeed");
230
231        let buf = reader.read().await.expect("read should succeed");
232        assert_eq!(buf.to_vec(), content);
233    }
234
235    #[tokio::test]
236    async fn test_object_store_backend_multipart_upload() {
237        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
238        let backend = ObjectStoreBuilder::new(store.clone())
239            .build()
240            .expect("build should succeed");
241
242        let path = "test_file.txt";
243        let content =
244            b"Hello, multipart upload! This is a test content for multipart upload functionality.";
245        let content_len = content.len();
246
247        // Test multipart upload with multiple chunks
248        let (_, mut writer) = backend
249            .write(path, OpWrite::default())
250            .await
251            .expect("write should succeed");
252
253        // Write content in chunks to simulate multipart upload
254        let chunk_size = 20;
255        for chunk in content.chunks(chunk_size) {
256            writer
257                .write(Buffer::from(chunk))
258                .await
259                .expect("write chunk should succeed");
260        }
261
262        writer.close().await.expect("close should succeed");
263
264        // Verify the uploaded file
265        let stat_result = backend
266            .stat(path, OpStat::default())
267            .await
268            .expect("stat should succeed");
269
270        assert_eq!(
271            stat_result.into_metadata().content_length(),
272            content_len as u64
273        );
274
275        // Read back and verify content
276        let (_, mut reader) = backend
277            .read(path, OpRead::default())
278            .await
279            .expect("read should succeed");
280
281        let buf = reader.read().await.expect("read should succeed");
282        assert_eq!(buf.to_vec(), content);
283    }
284
285    #[tokio::test]
286    async fn test_object_store_backend_list() {
287        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
288        let backend = ObjectStoreBuilder::new(store.clone())
289            .build()
290            .expect("build should succeed");
291
292        // Create multiple files
293        let files = vec![
294            ("dir1/file1.txt", b"content1"),
295            ("dir1/file2.txt", b"content2"),
296            ("dir2/file3.txt", b"content3"),
297        ];
298
299        for (path, content) in &files {
300            let (_, mut writer) = backend
301                .write(path, OpWrite::default())
302                .await
303                .expect("write should succeed");
304            writer
305                .write(Buffer::from(&content[..]))
306                .await
307                .expect("write content should succeed");
308            writer.close().await.expect("close should succeed");
309        }
310
311        // List directory
312        let (_, mut lister) = backend
313            .list("dir1/", OpList::default())
314            .await
315            .expect("list should succeed");
316
317        let mut entries = Vec::new();
318        while let Some(entry) = lister.next().await.expect("next should succeed") {
319            entries.push(entry);
320        }
321
322        assert_eq!(entries.len(), 2);
323        assert!(entries.iter().any(|e| e.path() == "dir1/file1.txt"));
324        assert!(entries.iter().any(|e| e.path() == "dir1/file2.txt"));
325    }
326
327    #[tokio::test]
328    async fn test_object_store_backend_delete() {
329        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
330        let backend = ObjectStoreBuilder::new(store)
331            .build()
332            .expect("build should succeed");
333
334        let path = "test_delete.txt";
335        let content = b"To be deleted";
336
337        // Write file
338        let (_, mut writer) = backend
339            .write(path, OpWrite::default())
340            .await
341            .expect("write should succeed");
342        writer
343            .write(Buffer::from(&content[..]))
344            .await
345            .expect("write content should succeed");
346        writer.close().await.expect("close should succeed");
347
348        // Verify file exists
349        backend
350            .stat(path, OpStat::default())
351            .await
352            .expect("file should exist");
353
354        // Delete file
355        let (_, mut deleter) = backend.delete().await.expect("delete should succeed");
356        deleter
357            .delete(path, OpDelete::default())
358            .await
359            .expect("delete should succeed");
360        deleter.close().await.expect("close should succeed");
361
362        // Verify file is deleted
363        let result = backend.stat(path, OpStat::default()).await;
364        assert!(result.is_err());
365    }
366
367    #[tokio::test]
368    async fn test_object_store_backend_error_handling() {
369        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
370        let backend = ObjectStoreBuilder::new(store)
371            .build()
372            .expect("build should succeed");
373
374        // Test stat on non-existent file
375        let result = backend.stat("non_existent.txt", OpStat::default()).await;
376        assert!(result.is_err());
377
378        // Test read on non-existent file
379        let result = backend.read("non_existent.txt", OpRead::default()).await;
380        assert!(result.is_err());
381
382        // Test list on non-existent directory
383        let result = backend.list("non_existent_dir/", OpList::default()).await;
384        // This should succeed but return empty results
385        if let Ok((_, mut lister)) = result {
386            let entry = lister.next().await.expect("next should succeed");
387            assert!(entry.is_none());
388        }
389    }
390}