object_store_opendal/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::sync::Arc;
22
23use crate::utils::*;
24use async_trait::async_trait;
25use futures::stream::BoxStream;
26use futures::FutureExt;
27use futures::StreamExt;
28use futures::TryStreamExt;
29use object_store::path::Path;
30use object_store::ListResult;
31use object_store::MultipartUpload;
32use object_store::ObjectMeta;
33use object_store::ObjectStore;
34use object_store::PutMultipartOpts;
35use object_store::PutOptions;
36use object_store::PutPayload;
37use object_store::PutResult;
38use object_store::{GetOptions, UploadPart};
39use object_store::{GetRange, GetResultPayload};
40use object_store::{GetResult, PutMode};
41use opendal::raw::percent_decode_path;
42use opendal::Buffer;
43use opendal::Writer;
44use opendal::{Operator, OperatorInfo};
45use tokio::sync::{Mutex, Notify};
46
47/// OpendalStore implements ObjectStore trait by using opendal.
48///
49/// This allows users to use opendal as an object store without extra cost.
50///
51/// Visit [`opendal::services`] for more information about supported services.
52///
53/// ```no_run
54/// use std::sync::Arc;
55///
56/// use bytes::Bytes;
57/// use object_store::path::Path;
58/// use object_store::ObjectStore;
59/// use object_store_opendal::OpendalStore;
60/// use opendal::services::S3;
61/// use opendal::{Builder, Operator};
62///
63/// #[tokio::main]
64/// async fn main() {
65///    let builder = S3::default()
66///     .access_key_id("my_access_key")
67///     .secret_access_key("my_secret_key")
68///     .endpoint("my_endpoint")
69///     .region("my_region");
70///
71///     // Create a new operator
72///     let operator = Operator::new(builder).unwrap().finish();
73///
74///     // Create a new object store
75///     let object_store = Arc::new(OpendalStore::new(operator));
76///
77///     let path = Path::from("data/nested/test.txt");
78///     let bytes = Bytes::from_static(b"hello, world! I am nested.");
79///
80///     object_store.put(&path, bytes.clone().into()).await.unwrap();
81///
82///     let content = object_store
83///         .get(&path)
84///         .await
85///         .unwrap()
86///         .bytes()
87///         .await
88///         .unwrap();
89///
90///     assert_eq!(content, bytes);
91/// }
92/// ```
93#[derive(Clone)]
94pub struct OpendalStore {
95    info: Arc<OperatorInfo>,
96    inner: Operator,
97}
98
99impl OpendalStore {
100    /// Create OpendalStore by given Operator.
101    pub fn new(op: Operator) -> Self {
102        Self {
103            info: op.info().into(),
104            inner: op,
105        }
106    }
107
108    /// Get the Operator info.
109    pub fn info(&self) -> &OperatorInfo {
110        self.info.as_ref()
111    }
112}
113
114impl Debug for OpendalStore {
115    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
116        f.debug_struct("OpendalStore")
117            .field("scheme", &self.info.scheme())
118            .field("name", &self.info.name())
119            .field("root", &self.info.root())
120            .field("capability", &self.info.full_capability())
121            .finish()
122    }
123}
124
125impl Display for OpendalStore {
126    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
127        let info = self.inner.info();
128        write!(
129            f,
130            "Opendal({}, bucket={}, root={})",
131            info.scheme(),
132            info.name(),
133            info.root()
134        )
135    }
136}
137
138impl From<Operator> for OpendalStore {
139    fn from(value: Operator) -> Self {
140        Self::new(value)
141    }
142}
143
144#[async_trait]
145impl ObjectStore for OpendalStore {
146    async fn put_opts(
147        &self,
148        location: &Path,
149        bytes: PutPayload,
150        opts: PutOptions,
151    ) -> object_store::Result<PutResult> {
152        let mut future_write = self.inner.write_with(
153            &percent_decode_path(location.as_ref()),
154            Buffer::from_iter(bytes.into_iter()),
155        );
156        let opts_mode = opts.mode.clone();
157        match opts.mode {
158            PutMode::Overwrite => {}
159            PutMode::Create => {
160                future_write = future_write.if_not_exists(true);
161            }
162            PutMode::Update(update_version) => {
163                let Some(etag) = update_version.e_tag else {
164                    Err(object_store::Error::NotSupported {
165                        source: Box::new(opendal::Error::new(
166                            opendal::ErrorKind::Unsupported,
167                            "etag is required for conditional put",
168                        )),
169                    })?
170                };
171                future_write = future_write.if_match(etag.as_str());
172            }
173        }
174        future_write.into_send().await.map_err(|err| {
175            match format_object_store_error(err, location.as_ref()) {
176                object_store::Error::Precondition { path, source }
177                    if opts_mode == PutMode::Create =>
178                {
179                    object_store::Error::AlreadyExists { path, source }
180                }
181                e => e,
182            }
183        })?;
184
185        Ok(PutResult {
186            e_tag: None,
187            version: None,
188        })
189    }
190
191    async fn put_multipart(
192        &self,
193        location: &Path,
194    ) -> object_store::Result<Box<dyn MultipartUpload>> {
195        let writer = self
196            .inner
197            .writer_with(&percent_decode_path(location.as_ref()))
198            .concurrent(8)
199            .into_send()
200            .await
201            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
202        let upload = OpendalMultipartUpload::new(writer, location.clone());
203
204        Ok(Box::new(upload))
205    }
206
207    async fn put_multipart_opts(
208        &self,
209        _location: &Path,
210        _opts: PutMultipartOpts,
211    ) -> object_store::Result<Box<dyn MultipartUpload>> {
212        Err(object_store::Error::NotSupported {
213            source: Box::new(opendal::Error::new(
214                opendal::ErrorKind::Unsupported,
215                "put_multipart_opts is not implemented so far",
216            )),
217        })
218    }
219
220    async fn get_opts(
221        &self,
222        location: &Path,
223        options: GetOptions,
224    ) -> object_store::Result<GetResult> {
225        let raw_location = percent_decode_path(location.as_ref());
226        let meta = {
227            let mut s = self.inner.stat_with(&raw_location);
228            if let Some(version) = &options.version {
229                s = s.version(version.as_str())
230            }
231            if let Some(if_match) = &options.if_match {
232                s = s.if_match(if_match.as_str());
233            }
234            if let Some(if_none_match) = &options.if_none_match {
235                s = s.if_none_match(if_none_match.as_str());
236            }
237            if let Some(if_modified_since) = options.if_modified_since {
238                s = s.if_modified_since(if_modified_since);
239            }
240            if let Some(if_unmodified_since) = options.if_unmodified_since {
241                s = s.if_unmodified_since(if_unmodified_since);
242            }
243            s.into_send()
244                .await
245                .map_err(|err| format_object_store_error(err, location.as_ref()))?
246        };
247
248        let meta = ObjectMeta {
249            location: location.clone(),
250            last_modified: meta.last_modified().unwrap_or_default(),
251            size: meta.content_length(),
252            e_tag: meta.etag().map(|x| x.to_string()),
253            version: meta.version().map(|x| x.to_string()),
254        };
255
256        if options.head {
257            return Ok(GetResult {
258                payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
259                range: 0..0,
260                meta,
261                attributes: Default::default(),
262            });
263        }
264
265        let reader = {
266            let mut r = self.inner.reader_with(raw_location.as_ref());
267            if let Some(version) = options.version {
268                r = r.version(version.as_str());
269            }
270            if let Some(if_match) = options.if_match {
271                r = r.if_match(if_match.as_str());
272            }
273            if let Some(if_none_match) = options.if_none_match {
274                r = r.if_none_match(if_none_match.as_str());
275            }
276            if let Some(if_modified_since) = options.if_modified_since {
277                r = r.if_modified_since(if_modified_since);
278            }
279            if let Some(if_unmodified_since) = options.if_unmodified_since {
280                r = r.if_unmodified_since(if_unmodified_since);
281            }
282            r.into_send()
283                .await
284                .map_err(|err| format_object_store_error(err, location.as_ref()))?
285        };
286
287        let read_range = match options.range {
288            Some(GetRange::Bounded(r)) => {
289                if r.start >= r.end || r.start >= meta.size {
290                    0..0
291                } else {
292                    let end = r.end.min(meta.size);
293                    r.start..end
294                }
295            }
296            Some(GetRange::Offset(r)) => {
297                if r < meta.size {
298                    r..meta.size
299                } else {
300                    0..0
301                }
302            }
303            Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
304            _ => 0..meta.size,
305        };
306
307        let stream = reader
308            .into_bytes_stream(read_range.start..read_range.end)
309            .into_send()
310            .await
311            .map_err(|err| format_object_store_error(err, location.as_ref()))?
312            .into_send()
313            .map_err(|err: io::Error| object_store::Error::Generic {
314                store: "IoError",
315                source: Box::new(err),
316            });
317
318        Ok(GetResult {
319            payload: GetResultPayload::Stream(Box::pin(stream)),
320            range: read_range.start..read_range.end,
321            meta,
322            attributes: Default::default(),
323        })
324    }
325
326    async fn delete(&self, location: &Path) -> object_store::Result<()> {
327        self.inner
328            .delete(&percent_decode_path(location.as_ref()))
329            .into_send()
330            .await
331            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
332
333        Ok(())
334    }
335
336    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
337        // object_store `Path` always removes trailing slash
338        // need to add it back
339        let path = prefix.map_or("".into(), |x| {
340            format!("{}/", percent_decode_path(x.as_ref()))
341        });
342
343        let lister_fut = self.inner.lister_with(&path).recursive(true);
344        let fut = async move {
345            let stream = lister_fut
346                .await
347                .map_err(|err| format_object_store_error(err, &path))?;
348
349            let stream = stream.then(|res| async {
350                let entry = res.map_err(|err| format_object_store_error(err, ""))?;
351                let meta = entry.metadata();
352
353                Ok(format_object_meta(entry.path(), meta))
354            });
355            Ok::<_, object_store::Error>(stream)
356        };
357
358        fut.into_stream().try_flatten().into_send().boxed()
359    }
360
361    fn list_with_offset(
362        &self,
363        prefix: Option<&Path>,
364        offset: &Path,
365    ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
366        let path = prefix.map_or("".into(), |x| {
367            format!("{}/", percent_decode_path(x.as_ref()))
368        });
369        let offset = offset.clone();
370
371        // clone self for 'static lifetime
372        // clone self is cheap
373        let this = self.clone();
374
375        let fut = async move {
376            let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
377            let mut fut = this.inner.lister_with(&path).recursive(true);
378
379            // Use native start_after support if possible.
380            if list_with_start_after {
381                fut = fut.start_after(offset.as_ref());
382            }
383
384            let lister = fut
385                .await
386                .map_err(|err| format_object_store_error(err, &path))?
387                .then(move |entry| {
388                    let path = path.clone();
389                    let this = this.clone();
390                    async move {
391                        let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
392                        let (path, metadata) = entry.into_parts();
393
394                        // If it's a dir or last_modified is present, we can use it directly.
395                        if metadata.is_dir() || metadata.last_modified().is_some() {
396                            let object_meta = format_object_meta(&path, &metadata);
397                            return Ok(object_meta);
398                        }
399
400                        let metadata = this
401                            .inner
402                            .stat(&path)
403                            .await
404                            .map_err(|err| format_object_store_error(err, &path))?;
405                        let object_meta = format_object_meta(&path, &metadata);
406                        Ok::<_, object_store::Error>(object_meta)
407                    }
408                })
409                .into_send()
410                .boxed();
411
412            let stream = if list_with_start_after {
413                lister
414            } else {
415                lister
416                    .try_filter(move |entry| futures::future::ready(entry.location > offset))
417                    .into_send()
418                    .boxed()
419            };
420
421            Ok::<_, object_store::Error>(stream)
422        };
423
424        fut.into_stream().into_send().try_flatten().boxed()
425    }
426
427    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
428        let path = prefix.map_or("".into(), |x| {
429            format!("{}/", percent_decode_path(x.as_ref()))
430        });
431        let mut stream = self
432            .inner
433            .lister_with(&path)
434            .into_future()
435            .into_send()
436            .await
437            .map_err(|err| format_object_store_error(err, &path))?
438            .into_send();
439
440        let mut common_prefixes = Vec::new();
441        let mut objects = Vec::new();
442
443        while let Some(res) = stream.next().into_send().await {
444            let entry = res.map_err(|err| format_object_store_error(err, ""))?;
445            let meta = entry.metadata();
446
447            if meta.is_dir() {
448                common_prefixes.push(entry.path().into());
449            } else if meta.last_modified().is_some() {
450                objects.push(format_object_meta(entry.path(), meta));
451            } else {
452                let meta = self
453                    .inner
454                    .stat(entry.path())
455                    .into_send()
456                    .await
457                    .map_err(|err| format_object_store_error(err, entry.path()))?;
458                objects.push(format_object_meta(entry.path(), &meta));
459            }
460        }
461
462        Ok(ListResult {
463            common_prefixes,
464            objects,
465        })
466    }
467
468    async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
469        Err(object_store::Error::NotSupported {
470            source: Box::new(opendal::Error::new(
471                opendal::ErrorKind::Unsupported,
472                "copy is not implemented so far",
473            )),
474        })
475    }
476
477    async fn rename(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
478        Err(object_store::Error::NotSupported {
479            source: Box::new(opendal::Error::new(
480                opendal::ErrorKind::Unsupported,
481                "rename is not implemented so far",
482            )),
483        })
484    }
485
486    async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
487        Err(object_store::Error::NotSupported {
488            source: Box::new(opendal::Error::new(
489                opendal::ErrorKind::Unsupported,
490                "copy_if_not_exists is not implemented so far",
491            )),
492        })
493    }
494}
495
496/// `MultipartUpload`'s impl based on `Writer` in opendal
497///
498/// # Notes
499///
500/// OpenDAL writer can handle concurrent internally we don't generate real `UploadPart` like existing
501/// implementation do. Instead, we just write the part and notify the next task to be written.
502///
503/// The lock here doesn't really involve the write process, it's just for the notify mechanism.
504struct OpendalMultipartUpload {
505    writer: Arc<Mutex<Writer>>,
506    location: Path,
507    next_notify: Option<Arc<Notify>>,
508}
509
510impl OpendalMultipartUpload {
511    fn new(writer: Writer, location: Path) -> Self {
512        Self {
513            writer: Arc::new(Mutex::new(writer)),
514            location,
515            next_notify: None,
516        }
517    }
518}
519
520#[async_trait]
521impl MultipartUpload for OpendalMultipartUpload {
522    fn put_part(&mut self, data: PutPayload) -> UploadPart {
523        let writer = self.writer.clone();
524        let location = self.location.clone();
525
526        // Generate next notify which will be notified after the current part is written.
527        let next_notify = Arc::new(Notify::new());
528        // Fetch the notify for current part to wait for it to be written.
529        let current_notify = self.next_notify.replace(next_notify.clone());
530
531        async move {
532            // current_notify == None means that it's the first part, we don't need to wait.
533            if let Some(notify) = current_notify {
534                // Wait for the previous part to be written
535                notify.notified().await;
536            }
537
538            let mut writer = writer.lock().await;
539            let result = writer
540                .write(Buffer::from_iter(data.into_iter()))
541                .await
542                .map_err(|err| format_object_store_error(err, location.as_ref()));
543
544            // Notify the next part to be written
545            next_notify.notify_one();
546
547            result
548        }
549        .into_send()
550        .boxed()
551    }
552
553    async fn complete(&mut self) -> object_store::Result<PutResult> {
554        let mut writer = self.writer.lock().await;
555        writer
556            .close()
557            .into_send()
558            .await
559            .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
560
561        Ok(PutResult {
562            e_tag: None,
563            version: None,
564        })
565    }
566
567    async fn abort(&mut self) -> object_store::Result<()> {
568        let mut writer = self.writer.lock().await;
569        writer
570            .abort()
571            .into_send()
572            .await
573            .map_err(|err| format_object_store_error(err, self.location.as_ref()))
574    }
575}
576
577impl Debug for OpendalMultipartUpload {
578    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
579        f.debug_struct("OpendalMultipartUpload")
580            .field("location", &self.location)
581            .finish()
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use bytes::Bytes;
588    use object_store::path::Path;
589    use object_store::{ObjectStore, WriteMultipart};
590    use opendal::services;
591    use rand::prelude::*;
592    use std::sync::Arc;
593
594    use super::*;
595
596    async fn create_test_object_store() -> Arc<dyn ObjectStore> {
597        let op = Operator::new(services::Memory::default()).unwrap().finish();
598        let object_store = Arc::new(OpendalStore::new(op));
599
600        let path: Path = "data/test.txt".into();
601        let bytes = Bytes::from_static(b"hello, world!");
602        object_store.put(&path, bytes.into()).await.unwrap();
603
604        let path: Path = "data/nested/test.txt".into();
605        let bytes = Bytes::from_static(b"hello, world! I am nested.");
606        object_store.put(&path, bytes.into()).await.unwrap();
607
608        object_store
609    }
610
611    #[tokio::test]
612    async fn test_basic() {
613        let op = Operator::new(services::Memory::default()).unwrap().finish();
614        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
615
616        // Retrieve a specific file
617        let path: Path = "data/test.txt".into();
618
619        let bytes = Bytes::from_static(b"hello, world!");
620        object_store.put(&path, bytes.clone().into()).await.unwrap();
621
622        let meta = object_store.head(&path).await.unwrap();
623
624        assert_eq!(meta.size, 13);
625
626        assert_eq!(
627            object_store
628                .get(&path)
629                .await
630                .unwrap()
631                .bytes()
632                .await
633                .unwrap(),
634            bytes
635        );
636    }
637
638    #[tokio::test]
639    async fn test_put_multipart() {
640        let op = Operator::new(services::Memory::default()).unwrap().finish();
641        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
642
643        let mut rng = thread_rng();
644
645        // Case complete
646        let path: Path = "data/test_complete.txt".into();
647        let upload = object_store.put_multipart(&path).await.unwrap();
648
649        let mut write = WriteMultipart::new(upload);
650
651        let mut all_bytes = vec![];
652        let round = rng.gen_range(1..=1024);
653        for _ in 0..round {
654            let size = rng.gen_range(1..=1024);
655            let mut bytes = vec![0; size];
656            rng.fill_bytes(&mut bytes);
657
658            all_bytes.extend_from_slice(&bytes);
659            write.put(bytes.into());
660        }
661
662        let _ = write.finish().await.unwrap();
663
664        let meta = object_store.head(&path).await.unwrap();
665
666        assert_eq!(meta.size, all_bytes.len() as u64);
667
668        assert_eq!(
669            object_store
670                .get(&path)
671                .await
672                .unwrap()
673                .bytes()
674                .await
675                .unwrap(),
676            Bytes::from(all_bytes)
677        );
678
679        // Case abort
680        let path: Path = "data/test_abort.txt".into();
681        let mut upload = object_store.put_multipart(&path).await.unwrap();
682        upload.put_part(vec![1; 1024].into()).await.unwrap();
683        upload.abort().await.unwrap();
684
685        let res = object_store.head(&path).await;
686        let err = res.unwrap_err();
687
688        assert!(matches!(err, object_store::Error::NotFound { .. }))
689    }
690
691    #[tokio::test]
692    async fn test_list() {
693        let object_store = create_test_object_store().await;
694        let path: Path = "data/".into();
695        let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
696        assert_eq!(results.len(), 2);
697        let mut locations = results
698            .iter()
699            .map(|x| x.as_ref().unwrap().location.as_ref())
700            .collect::<Vec<_>>();
701
702        let expected_files = vec![
703            (
704                "data/nested/test.txt",
705                Bytes::from_static(b"hello, world! I am nested."),
706            ),
707            ("data/test.txt", Bytes::from_static(b"hello, world!")),
708        ];
709
710        let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
711
712        locations.sort();
713        assert_eq!(locations, expected_locations);
714
715        for (location, bytes) in expected_files {
716            let path: Path = location.into();
717            assert_eq!(
718                object_store
719                    .get(&path)
720                    .await
721                    .unwrap()
722                    .bytes()
723                    .await
724                    .unwrap(),
725                bytes
726            );
727        }
728    }
729
730    #[tokio::test]
731    async fn test_list_with_delimiter() {
732        let object_store = create_test_object_store().await;
733        let path: Path = "data/".into();
734        let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
735        assert_eq!(result.objects.len(), 1);
736        assert_eq!(result.common_prefixes.len(), 1);
737        assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
738        assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
739    }
740
741    #[tokio::test]
742    async fn test_list_with_offset() {
743        let object_store = create_test_object_store().await;
744        let path: Path = "data/".into();
745        let offset: Path = "data/nested/test.txt".into();
746        let result = object_store
747            .list_with_offset(Some(&path), &offset)
748            .collect::<Vec<_>>()
749            .await;
750        assert_eq!(result.len(), 1);
751        assert_eq!(
752            result[0].as_ref().unwrap().location.as_ref(),
753            "data/test.txt"
754        );
755    }
756}