object_store_opendal/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::sync::Arc;
22
23use crate::utils::*;
24use async_trait::async_trait;
25use futures::stream::BoxStream;
26use futures::FutureExt;
27use futures::StreamExt;
28use futures::TryStreamExt;
29use object_store::path::Path;
30use object_store::ListResult;
31use object_store::MultipartUpload;
32use object_store::ObjectMeta;
33use object_store::ObjectStore;
34use object_store::PutMultipartOpts;
35use object_store::PutOptions;
36use object_store::PutPayload;
37use object_store::PutResult;
38use object_store::{GetOptions, UploadPart};
39use object_store::{GetRange, GetResultPayload};
40use object_store::{GetResult, PutMode};
41use opendal::Buffer;
42use opendal::Writer;
43use opendal::{Operator, OperatorInfo};
44use tokio::sync::{Mutex, Notify};
45
46/// OpendalStore implements ObjectStore trait by using opendal.
47///
48/// This allows users to use opendal as an object store without extra cost.
49///
50/// Visit [`opendal::services`] for more information about supported services.
51///
52/// ```no_run
53/// use std::sync::Arc;
54///
55/// use bytes::Bytes;
56/// use object_store::path::Path;
57/// use object_store::ObjectStore;
58/// use object_store_opendal::OpendalStore;
59/// use opendal::services::S3;
60/// use opendal::{Builder, Operator};
61///
62/// #[tokio::main]
63/// async fn main() {
64///    let builder = S3::default()
65///     .access_key_id("my_access_key")
66///     .secret_access_key("my_secret_key")
67///     .endpoint("my_endpoint")
68///     .region("my_region");
69///
70///     // Create a new operator
71///     let operator = Operator::new(builder).unwrap().finish();
72///
73///     // Create a new object store
74///     let object_store = Arc::new(OpendalStore::new(operator));
75///
76///     let path = Path::from("data/nested/test.txt");
77///     let bytes = Bytes::from_static(b"hello, world! I am nested.");
78///
79///     object_store.put(&path, bytes.clone().into()).await.unwrap();
80///
81///     let content = object_store
82///         .get(&path)
83///         .await
84///         .unwrap()
85///         .bytes()
86///         .await
87///         .unwrap();
88///
89///     assert_eq!(content, bytes);
90/// }
91/// ```
92#[derive(Clone)]
93pub struct OpendalStore {
94    info: Arc<OperatorInfo>,
95    inner: Operator,
96}
97
98impl OpendalStore {
99    /// Create OpendalStore by given Operator.
100    pub fn new(op: Operator) -> Self {
101        Self {
102            info: op.info().into(),
103            inner: op,
104        }
105    }
106
107    /// Get the Operator info.
108    pub fn info(&self) -> &OperatorInfo {
109        self.info.as_ref()
110    }
111}
112
113impl Debug for OpendalStore {
114    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
115        f.debug_struct("OpendalStore")
116            .field("scheme", &self.info.scheme())
117            .field("name", &self.info.name())
118            .field("root", &self.info.root())
119            .field("capability", &self.info.full_capability())
120            .finish()
121    }
122}
123
124impl Display for OpendalStore {
125    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
126        let info = self.inner.info();
127        write!(
128            f,
129            "Opendal({}, bucket={}, root={})",
130            info.scheme(),
131            info.name(),
132            info.root()
133        )
134    }
135}
136
137impl From<Operator> for OpendalStore {
138    fn from(value: Operator) -> Self {
139        Self::new(value)
140    }
141}
142
143#[async_trait]
144impl ObjectStore for OpendalStore {
145    async fn put_opts(
146        &self,
147        location: &Path,
148        bytes: PutPayload,
149        opts: PutOptions,
150    ) -> object_store::Result<PutResult> {
151        let mut future_write = self
152            .inner
153            .write_with(location.as_ref(), Buffer::from_iter(bytes.into_iter()));
154        let opts_mode = opts.mode.clone();
155        match opts.mode {
156            PutMode::Overwrite => {}
157            PutMode::Create => {
158                future_write = future_write.if_not_exists(true);
159            }
160            PutMode::Update(update_version) => {
161                let Some(etag) = update_version.e_tag else {
162                    Err(object_store::Error::NotSupported {
163                        source: Box::new(opendal::Error::new(
164                            opendal::ErrorKind::Unsupported,
165                            "etag is required for conditional put",
166                        )),
167                    })?
168                };
169                future_write = future_write.if_match(etag.as_str());
170            }
171        }
172        future_write.into_send().await.map_err(|err| {
173            match format_object_store_error(err, location.as_ref()) {
174                object_store::Error::Precondition { path, source }
175                    if opts_mode == PutMode::Create =>
176                {
177                    object_store::Error::AlreadyExists { path, source }
178                }
179                e => e,
180            }
181        })?;
182
183        Ok(PutResult {
184            e_tag: None,
185            version: None,
186        })
187    }
188
189    async fn put_multipart(
190        &self,
191        location: &Path,
192    ) -> object_store::Result<Box<dyn MultipartUpload>> {
193        let writer = self
194            .inner
195            .writer_with(location.as_ref())
196            .concurrent(8)
197            .into_send()
198            .await
199            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
200        let upload = OpendalMultipartUpload::new(writer, location.clone());
201
202        Ok(Box::new(upload))
203    }
204
205    async fn put_multipart_opts(
206        &self,
207        _location: &Path,
208        _opts: PutMultipartOpts,
209    ) -> object_store::Result<Box<dyn MultipartUpload>> {
210        Err(object_store::Error::NotSupported {
211            source: Box::new(opendal::Error::new(
212                opendal::ErrorKind::Unsupported,
213                "put_multipart_opts is not implemented so far",
214            )),
215        })
216    }
217
218    async fn get_opts(
219        &self,
220        location: &Path,
221        options: GetOptions,
222    ) -> object_store::Result<GetResult> {
223        let meta = {
224            let mut s = self.inner.stat_with(location.as_ref());
225            if let Some(version) = &options.version {
226                s = s.version(version.as_str())
227            }
228            if let Some(if_match) = &options.if_match {
229                s = s.if_match(if_match.as_str());
230            }
231            if let Some(if_none_match) = &options.if_none_match {
232                s = s.if_none_match(if_none_match.as_str());
233            }
234            if let Some(if_modified_since) = options.if_modified_since {
235                s = s.if_modified_since(if_modified_since);
236            }
237            if let Some(if_unmodified_since) = options.if_unmodified_since {
238                s = s.if_unmodified_since(if_unmodified_since);
239            }
240            s.into_send()
241                .await
242                .map_err(|err| format_object_store_error(err, location.as_ref()))?
243        };
244
245        let meta = ObjectMeta {
246            location: location.clone(),
247            last_modified: meta.last_modified().unwrap_or_default(),
248            size: meta.content_length() as usize,
249            e_tag: meta.etag().map(|x| x.to_string()),
250            version: meta.version().map(|x| x.to_string()),
251        };
252
253        if options.head {
254            return Ok(GetResult {
255                payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
256                range: 0..0,
257                meta,
258                attributes: Default::default(),
259            });
260        }
261
262        let reader = {
263            let mut r = self.inner.reader_with(location.as_ref());
264            if let Some(version) = options.version {
265                r = r.version(version.as_str());
266            }
267            if let Some(if_match) = options.if_match {
268                r = r.if_match(if_match.as_str());
269            }
270            if let Some(if_none_match) = options.if_none_match {
271                r = r.if_none_match(if_none_match.as_str());
272            }
273            if let Some(if_modified_since) = options.if_modified_since {
274                r = r.if_modified_since(if_modified_since);
275            }
276            if let Some(if_unmodified_since) = options.if_unmodified_since {
277                r = r.if_unmodified_since(if_unmodified_since);
278            }
279            r.into_send()
280                .await
281                .map_err(|err| format_object_store_error(err, location.as_ref()))?
282        };
283
284        let read_range = match options.range {
285            Some(GetRange::Bounded(r)) => {
286                if r.start >= r.end || r.start >= meta.size {
287                    0..0
288                } else {
289                    let end = r.end.min(meta.size);
290                    r.start..end
291                }
292            }
293            Some(GetRange::Offset(r)) => {
294                if r < meta.size {
295                    r..meta.size
296                } else {
297                    0..0
298                }
299            }
300            Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
301            _ => 0..meta.size,
302        };
303
304        let stream = reader
305            .into_bytes_stream(read_range.start as u64..read_range.end as u64)
306            .into_send()
307            .await
308            .map_err(|err| format_object_store_error(err, location.as_ref()))?
309            .into_send()
310            .map_err(|err: io::Error| object_store::Error::Generic {
311                store: "IoError",
312                source: Box::new(err),
313            });
314
315        Ok(GetResult {
316            payload: GetResultPayload::Stream(Box::pin(stream)),
317            range: read_range.start..read_range.end,
318            meta,
319            attributes: Default::default(),
320        })
321    }
322
323    async fn delete(&self, location: &Path) -> object_store::Result<()> {
324        self.inner
325            .delete(location.as_ref())
326            .into_send()
327            .await
328            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
329
330        Ok(())
331    }
332
333    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
334        // object_store `Path` always removes trailing slash
335        // need to add it back
336        let path = prefix.map_or("".into(), |x| format!("{}/", x));
337
338        let fut = async move {
339            let stream = self
340                .inner
341                .lister_with(&path)
342                .recursive(true)
343                .await
344                .map_err(|err| format_object_store_error(err, &path))?;
345
346            let stream = stream.then(|res| async {
347                let entry = res.map_err(|err| format_object_store_error(err, ""))?;
348                let meta = entry.metadata();
349
350                Ok(format_object_meta(entry.path(), meta))
351            });
352            Ok::<_, object_store::Error>(stream)
353        };
354
355        fut.into_stream().try_flatten().into_send().boxed()
356    }
357
358    fn list_with_offset(
359        &self,
360        prefix: Option<&Path>,
361        offset: &Path,
362    ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
363        let path = prefix.map_or("".into(), |x| format!("{}/", x));
364        let offset = offset.clone();
365
366        let fut = async move {
367            let list_with_start_after = self.inner.info().full_capability().list_with_start_after;
368            let mut fut = self.inner.lister_with(&path).recursive(true);
369
370            // Use native start_after support if possible.
371            if list_with_start_after {
372                fut = fut.start_after(offset.as_ref());
373            }
374
375            let lister = fut
376                .await
377                .map_err(|err| format_object_store_error(err, &path))?
378                .then(move |entry| {
379                    let path = path.clone();
380
381                    async move {
382                        let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
383                        let (path, metadata) = entry.into_parts();
384
385                        // If it's a dir or last_modified is present, we can use it directly.
386                        if metadata.is_dir() || metadata.last_modified().is_some() {
387                            let object_meta = format_object_meta(&path, &metadata);
388                            return Ok(object_meta);
389                        }
390
391                        let metadata = self
392                            .inner
393                            .stat(&path)
394                            .await
395                            .map_err(|err| format_object_store_error(err, &path))?;
396                        let object_meta = format_object_meta(&path, &metadata);
397                        Ok::<_, object_store::Error>(object_meta)
398                    }
399                })
400                .into_send()
401                .boxed();
402
403            let stream = if list_with_start_after {
404                lister
405            } else {
406                lister
407                    .try_filter(move |entry| futures::future::ready(entry.location > offset))
408                    .into_send()
409                    .boxed()
410            };
411
412            Ok::<_, object_store::Error>(stream)
413        };
414
415        fut.into_stream().into_send().try_flatten().boxed()
416    }
417
418    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
419        let path = prefix.map_or("".into(), |x| format!("{}/", x));
420        let mut stream = self
421            .inner
422            .lister_with(&path)
423            .into_future()
424            .into_send()
425            .await
426            .map_err(|err| format_object_store_error(err, &path))?
427            .into_send();
428
429        let mut common_prefixes = Vec::new();
430        let mut objects = Vec::new();
431
432        while let Some(res) = stream.next().into_send().await {
433            let entry = res.map_err(|err| format_object_store_error(err, ""))?;
434            let meta = entry.metadata();
435
436            if meta.is_dir() {
437                common_prefixes.push(entry.path().into());
438            } else if meta.last_modified().is_some() {
439                objects.push(format_object_meta(entry.path(), meta));
440            } else {
441                let meta = self
442                    .inner
443                    .stat(entry.path())
444                    .into_send()
445                    .await
446                    .map_err(|err| format_object_store_error(err, entry.path()))?;
447                objects.push(format_object_meta(entry.path(), &meta));
448            }
449        }
450
451        Ok(ListResult {
452            common_prefixes,
453            objects,
454        })
455    }
456
457    async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
458        Err(object_store::Error::NotSupported {
459            source: Box::new(opendal::Error::new(
460                opendal::ErrorKind::Unsupported,
461                "copy is not implemented so far",
462            )),
463        })
464    }
465
466    async fn rename(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
467        Err(object_store::Error::NotSupported {
468            source: Box::new(opendal::Error::new(
469                opendal::ErrorKind::Unsupported,
470                "rename is not implemented so far",
471            )),
472        })
473    }
474
475    async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
476        Err(object_store::Error::NotSupported {
477            source: Box::new(opendal::Error::new(
478                opendal::ErrorKind::Unsupported,
479                "copy_if_not_exists is not implemented so far",
480            )),
481        })
482    }
483}
484
485/// `MultipartUpload`'s impl based on `Writer` in opendal
486///
487/// # Notes
488///
489/// OpenDAL writer can handle concurrent internally we don't generate real `UploadPart` like existing
490/// implementation do. Instead, we just write the part and notify the next task to be written.
491///
492/// The lock here doesn't really involve the write process, it's just for the notify mechanism.
493struct OpendalMultipartUpload {
494    writer: Arc<Mutex<Writer>>,
495    location: Path,
496    next_notify: Option<Arc<Notify>>,
497}
498
499impl OpendalMultipartUpload {
500    fn new(writer: Writer, location: Path) -> Self {
501        Self {
502            writer: Arc::new(Mutex::new(writer)),
503            location,
504            next_notify: None,
505        }
506    }
507}
508
509#[async_trait]
510impl MultipartUpload for OpendalMultipartUpload {
511    fn put_part(&mut self, data: PutPayload) -> UploadPart {
512        let writer = self.writer.clone();
513        let location = self.location.clone();
514
515        // Generate next notify which will be notified after the current part is written.
516        let next_notify = Arc::new(Notify::new());
517        // Fetch the notify for current part to wait for it to be written.
518        let current_notify = self.next_notify.replace(next_notify.clone());
519
520        async move {
521            // current_notify == None means that it's the first part, we don't need to wait.
522            if let Some(notify) = current_notify {
523                // Wait for the previous part to be written
524                notify.notified().await;
525            }
526
527            let mut writer = writer.lock().await;
528            let result = writer
529                .write(Buffer::from_iter(data.into_iter()))
530                .await
531                .map_err(|err| format_object_store_error(err, location.as_ref()));
532
533            // Notify the next part to be written
534            next_notify.notify_one();
535
536            result
537        }
538        .into_send()
539        .boxed()
540    }
541
542    async fn complete(&mut self) -> object_store::Result<PutResult> {
543        let mut writer = self.writer.lock().await;
544        writer
545            .close()
546            .into_send()
547            .await
548            .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
549
550        Ok(PutResult {
551            e_tag: None,
552            version: None,
553        })
554    }
555
556    async fn abort(&mut self) -> object_store::Result<()> {
557        let mut writer = self.writer.lock().await;
558        writer
559            .abort()
560            .into_send()
561            .await
562            .map_err(|err| format_object_store_error(err, self.location.as_ref()))
563    }
564}
565
566impl Debug for OpendalMultipartUpload {
567    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
568        f.debug_struct("OpendalMultipartUpload")
569            .field("location", &self.location)
570            .finish()
571    }
572}
573
574#[cfg(test)]
575mod tests {
576    use bytes::Bytes;
577    use object_store::path::Path;
578    use object_store::{ObjectStore, WriteMultipart};
579    use opendal::services;
580    use rand::prelude::*;
581    use std::sync::Arc;
582
583    use super::*;
584
585    async fn create_test_object_store() -> Arc<dyn ObjectStore> {
586        let op = Operator::new(services::Memory::default()).unwrap().finish();
587        let object_store = Arc::new(OpendalStore::new(op));
588
589        let path: Path = "data/test.txt".into();
590        let bytes = Bytes::from_static(b"hello, world!");
591        object_store.put(&path, bytes.into()).await.unwrap();
592
593        let path: Path = "data/nested/test.txt".into();
594        let bytes = Bytes::from_static(b"hello, world! I am nested.");
595        object_store.put(&path, bytes.into()).await.unwrap();
596
597        object_store
598    }
599
600    #[tokio::test]
601    async fn test_basic() {
602        let op = Operator::new(services::Memory::default()).unwrap().finish();
603        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
604
605        // Retrieve a specific file
606        let path: Path = "data/test.txt".into();
607
608        let bytes = Bytes::from_static(b"hello, world!");
609        object_store.put(&path, bytes.clone().into()).await.unwrap();
610
611        let meta = object_store.head(&path).await.unwrap();
612
613        assert_eq!(meta.size, 13);
614
615        assert_eq!(
616            object_store
617                .get(&path)
618                .await
619                .unwrap()
620                .bytes()
621                .await
622                .unwrap(),
623            bytes
624        );
625    }
626
627    #[tokio::test]
628    async fn test_put_multipart() {
629        let op = Operator::new(services::Memory::default()).unwrap().finish();
630        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
631
632        let mut rng = thread_rng();
633
634        // Case complete
635        let path: Path = "data/test_complete.txt".into();
636        let upload = object_store.put_multipart(&path).await.unwrap();
637
638        let mut write = WriteMultipart::new(upload);
639
640        let mut all_bytes = vec![];
641        let round = rng.gen_range(1..=1024);
642        for _ in 0..round {
643            let size = rng.gen_range(1..=1024);
644            let mut bytes = vec![0; size];
645            rng.fill_bytes(&mut bytes);
646
647            all_bytes.extend_from_slice(&bytes);
648            write.put(bytes.into());
649        }
650
651        let _ = write.finish().await.unwrap();
652
653        let meta = object_store.head(&path).await.unwrap();
654
655        assert_eq!(meta.size, all_bytes.len());
656
657        assert_eq!(
658            object_store
659                .get(&path)
660                .await
661                .unwrap()
662                .bytes()
663                .await
664                .unwrap(),
665            Bytes::from(all_bytes)
666        );
667
668        // Case abort
669        let path: Path = "data/test_abort.txt".into();
670        let mut upload = object_store.put_multipart(&path).await.unwrap();
671        upload.put_part(vec![1; 1024].into()).await.unwrap();
672        upload.abort().await.unwrap();
673
674        let res = object_store.head(&path).await;
675        let err = res.unwrap_err();
676
677        assert!(matches!(err, object_store::Error::NotFound { .. }))
678    }
679
680    #[tokio::test]
681    async fn test_list() {
682        let object_store = create_test_object_store().await;
683        let path: Path = "data/".into();
684        let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
685        assert_eq!(results.len(), 2);
686        let mut locations = results
687            .iter()
688            .map(|x| x.as_ref().unwrap().location.as_ref())
689            .collect::<Vec<_>>();
690
691        let expected_files = vec![
692            (
693                "data/nested/test.txt",
694                Bytes::from_static(b"hello, world! I am nested."),
695            ),
696            ("data/test.txt", Bytes::from_static(b"hello, world!")),
697        ];
698
699        let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
700
701        locations.sort();
702        assert_eq!(locations, expected_locations);
703
704        for (location, bytes) in expected_files {
705            let path: Path = location.into();
706            assert_eq!(
707                object_store
708                    .get(&path)
709                    .await
710                    .unwrap()
711                    .bytes()
712                    .await
713                    .unwrap(),
714                bytes
715            );
716        }
717    }
718
719    #[tokio::test]
720    async fn test_list_with_delimiter() {
721        let object_store = create_test_object_store().await;
722        let path: Path = "data/".into();
723        let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
724        assert_eq!(result.objects.len(), 1);
725        assert_eq!(result.common_prefixes.len(), 1);
726        assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
727        assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
728    }
729
730    #[tokio::test]
731    async fn test_list_with_offset() {
732        let object_store = create_test_object_store().await;
733        let path: Path = "data/".into();
734        let offset: Path = "data/nested/test.txt".into();
735        let result = object_store
736            .list_with_offset(Some(&path), &offset)
737            .collect::<Vec<_>>()
738            .await;
739        assert_eq!(result.len(), 1);
740        assert_eq!(
741            result[0].as_ref().unwrap().location.as_ref(),
742            "data/test.txt"
743        );
744    }
745}