object_store_opendal/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::sync::Arc;
22
23use crate::utils::*;
24use async_trait::async_trait;
25use futures::stream::BoxStream;
26use futures::FutureExt;
27use futures::StreamExt;
28use futures::TryStreamExt;
29use object_store::path::Path;
30use object_store::ListResult;
31use object_store::MultipartUpload;
32use object_store::ObjectMeta;
33use object_store::ObjectStore;
34use object_store::PutMultipartOpts;
35use object_store::PutOptions;
36use object_store::PutPayload;
37use object_store::PutResult;
38use object_store::{GetOptions, UploadPart};
39use object_store::{GetRange, GetResultPayload};
40use object_store::{GetResult, PutMode};
41use opendal::Buffer;
42use opendal::Writer;
43use opendal::{Operator, OperatorInfo};
44use tokio::sync::{Mutex, Notify};
45
46/// OpendalStore implements ObjectStore trait by using opendal.
47///
48/// This allows users to use opendal as an object store without extra cost.
49///
50/// Visit [`opendal::services`] for more information about supported services.
51///
52/// ```no_run
53/// use std::sync::Arc;
54///
55/// use bytes::Bytes;
56/// use object_store::path::Path;
57/// use object_store::ObjectStore;
58/// use object_store_opendal::OpendalStore;
59/// use opendal::services::S3;
60/// use opendal::{Builder, Operator};
61///
62/// #[tokio::main]
63/// async fn main() {
64///    let builder = S3::default()
65///     .access_key_id("my_access_key")
66///     .secret_access_key("my_secret_key")
67///     .endpoint("my_endpoint")
68///     .region("my_region");
69///
70///     // Create a new operator
71///     let operator = Operator::new(builder).unwrap().finish();
72///
73///     // Create a new object store
74///     let object_store = Arc::new(OpendalStore::new(operator));
75///
76///     let path = Path::from("data/nested/test.txt");
77///     let bytes = Bytes::from_static(b"hello, world! I am nested.");
78///
79///     object_store.put(&path, bytes.clone().into()).await.unwrap();
80///
81///     let content = object_store
82///         .get(&path)
83///         .await
84///         .unwrap()
85///         .bytes()
86///         .await
87///         .unwrap();
88///
89///     assert_eq!(content, bytes);
90/// }
91/// ```
92#[derive(Clone)]
93pub struct OpendalStore {
94    info: Arc<OperatorInfo>,
95    inner: Operator,
96}
97
98impl OpendalStore {
99    /// Create OpendalStore by given Operator.
100    pub fn new(op: Operator) -> Self {
101        Self {
102            info: op.info().into(),
103            inner: op,
104        }
105    }
106
107    /// Get the Operator info.
108    pub fn info(&self) -> &OperatorInfo {
109        self.info.as_ref()
110    }
111}
112
113impl Debug for OpendalStore {
114    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
115        f.debug_struct("OpendalStore")
116            .field("scheme", &self.info.scheme())
117            .field("name", &self.info.name())
118            .field("root", &self.info.root())
119            .field("capability", &self.info.full_capability())
120            .finish()
121    }
122}
123
124impl Display for OpendalStore {
125    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
126        let info = self.inner.info();
127        write!(
128            f,
129            "Opendal({}, bucket={}, root={})",
130            info.scheme(),
131            info.name(),
132            info.root()
133        )
134    }
135}
136
137impl From<Operator> for OpendalStore {
138    fn from(value: Operator) -> Self {
139        Self::new(value)
140    }
141}
142
143#[async_trait]
144impl ObjectStore for OpendalStore {
145    async fn put_opts(
146        &self,
147        location: &Path,
148        bytes: PutPayload,
149        opts: PutOptions,
150    ) -> object_store::Result<PutResult> {
151        let mut future_write = self
152            .inner
153            .write_with(location.as_ref(), Buffer::from_iter(bytes.into_iter()));
154        let opts_mode = opts.mode.clone();
155        match opts.mode {
156            PutMode::Overwrite => {}
157            PutMode::Create => {
158                future_write = future_write.if_not_exists(true);
159            }
160            PutMode::Update(update_version) => {
161                let Some(etag) = update_version.e_tag else {
162                    Err(object_store::Error::NotSupported {
163                        source: Box::new(opendal::Error::new(
164                            opendal::ErrorKind::Unsupported,
165                            "etag is required for conditional put",
166                        )),
167                    })?
168                };
169                future_write = future_write.if_match(etag.as_str());
170            }
171        }
172        future_write.into_send().await.map_err(|err| {
173            match format_object_store_error(err, location.as_ref()) {
174                object_store::Error::Precondition { path, source }
175                    if opts_mode == PutMode::Create =>
176                {
177                    object_store::Error::AlreadyExists { path, source }
178                }
179                e => e,
180            }
181        })?;
182
183        Ok(PutResult {
184            e_tag: None,
185            version: None,
186        })
187    }
188
189    async fn put_multipart(
190        &self,
191        location: &Path,
192    ) -> object_store::Result<Box<dyn MultipartUpload>> {
193        let writer = self
194            .inner
195            .writer_with(location.as_ref())
196            .concurrent(8)
197            .into_send()
198            .await
199            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
200        let upload = OpendalMultipartUpload::new(writer, location.clone());
201
202        Ok(Box::new(upload))
203    }
204
205    async fn put_multipart_opts(
206        &self,
207        _location: &Path,
208        _opts: PutMultipartOpts,
209    ) -> object_store::Result<Box<dyn MultipartUpload>> {
210        Err(object_store::Error::NotSupported {
211            source: Box::new(opendal::Error::new(
212                opendal::ErrorKind::Unsupported,
213                "put_multipart_opts is not implemented so far",
214            )),
215        })
216    }
217
218    async fn get_opts(
219        &self,
220        location: &Path,
221        options: GetOptions,
222    ) -> object_store::Result<GetResult> {
223        let meta = {
224            let mut s = self.inner.stat_with(location.as_ref());
225            if let Some(version) = &options.version {
226                s = s.version(version.as_str())
227            }
228            if let Some(if_match) = &options.if_match {
229                s = s.if_match(if_match.as_str());
230            }
231            if let Some(if_none_match) = &options.if_none_match {
232                s = s.if_none_match(if_none_match.as_str());
233            }
234            if let Some(if_modified_since) = options.if_modified_since {
235                s = s.if_modified_since(if_modified_since);
236            }
237            if let Some(if_unmodified_since) = options.if_unmodified_since {
238                s = s.if_unmodified_since(if_unmodified_since);
239            }
240            s.into_send()
241                .await
242                .map_err(|err| format_object_store_error(err, location.as_ref()))?
243        };
244
245        let meta = ObjectMeta {
246            location: location.clone(),
247            last_modified: meta.last_modified().unwrap_or_default(),
248            size: meta.content_length(),
249            e_tag: meta.etag().map(|x| x.to_string()),
250            version: meta.version().map(|x| x.to_string()),
251        };
252
253        if options.head {
254            return Ok(GetResult {
255                payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
256                range: 0..0,
257                meta,
258                attributes: Default::default(),
259            });
260        }
261
262        let reader = {
263            let mut r = self.inner.reader_with(location.as_ref());
264            if let Some(version) = options.version {
265                r = r.version(version.as_str());
266            }
267            if let Some(if_match) = options.if_match {
268                r = r.if_match(if_match.as_str());
269            }
270            if let Some(if_none_match) = options.if_none_match {
271                r = r.if_none_match(if_none_match.as_str());
272            }
273            if let Some(if_modified_since) = options.if_modified_since {
274                r = r.if_modified_since(if_modified_since);
275            }
276            if let Some(if_unmodified_since) = options.if_unmodified_since {
277                r = r.if_unmodified_since(if_unmodified_since);
278            }
279            r.into_send()
280                .await
281                .map_err(|err| format_object_store_error(err, location.as_ref()))?
282        };
283
284        let read_range = match options.range {
285            Some(GetRange::Bounded(r)) => {
286                if r.start >= r.end || r.start >= meta.size {
287                    0..0
288                } else {
289                    let end = r.end.min(meta.size);
290                    r.start..end
291                }
292            }
293            Some(GetRange::Offset(r)) => {
294                if r < meta.size {
295                    r..meta.size
296                } else {
297                    0..0
298                }
299            }
300            Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
301            _ => 0..meta.size,
302        };
303
304        let stream = reader
305            .into_bytes_stream(read_range.start..read_range.end)
306            .into_send()
307            .await
308            .map_err(|err| format_object_store_error(err, location.as_ref()))?
309            .into_send()
310            .map_err(|err: io::Error| object_store::Error::Generic {
311                store: "IoError",
312                source: Box::new(err),
313            });
314
315        Ok(GetResult {
316            payload: GetResultPayload::Stream(Box::pin(stream)),
317            range: read_range.start..read_range.end,
318            meta,
319            attributes: Default::default(),
320        })
321    }
322
323    async fn delete(&self, location: &Path) -> object_store::Result<()> {
324        self.inner
325            .delete(location.as_ref())
326            .into_send()
327            .await
328            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
329
330        Ok(())
331    }
332
333    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
334        // object_store `Path` always removes trailing slash
335        // need to add it back
336        let path = prefix.map_or("".into(), |x| format!("{}/", x));
337
338        let lister_fut = self.inner.lister_with(&path).recursive(true);
339        let fut = async move {
340            let stream = lister_fut
341                .await
342                .map_err(|err| format_object_store_error(err, &path))?;
343
344            let stream = stream.then(|res| async {
345                let entry = res.map_err(|err| format_object_store_error(err, ""))?;
346                let meta = entry.metadata();
347
348                Ok(format_object_meta(entry.path(), meta))
349            });
350            Ok::<_, object_store::Error>(stream)
351        };
352
353        fut.into_stream().try_flatten().into_send().boxed()
354    }
355
356    fn list_with_offset(
357        &self,
358        prefix: Option<&Path>,
359        offset: &Path,
360    ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
361        let path = prefix.map_or("".into(), |x| format!("{}/", x));
362        let offset = offset.clone();
363
364        // clone self for 'static lifetime
365        // clone self is cheap
366        let this = self.clone();
367
368        let fut = async move {
369            let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
370            let mut fut = this.inner.lister_with(&path).recursive(true);
371
372            // Use native start_after support if possible.
373            if list_with_start_after {
374                fut = fut.start_after(offset.as_ref());
375            }
376
377            let lister = fut
378                .await
379                .map_err(|err| format_object_store_error(err, &path))?
380                .then(move |entry| {
381                    let path = path.clone();
382                    let this = this.clone();
383                    async move {
384                        let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
385                        let (path, metadata) = entry.into_parts();
386
387                        // If it's a dir or last_modified is present, we can use it directly.
388                        if metadata.is_dir() || metadata.last_modified().is_some() {
389                            let object_meta = format_object_meta(&path, &metadata);
390                            return Ok(object_meta);
391                        }
392
393                        let metadata = this
394                            .inner
395                            .stat(&path)
396                            .await
397                            .map_err(|err| format_object_store_error(err, &path))?;
398                        let object_meta = format_object_meta(&path, &metadata);
399                        Ok::<_, object_store::Error>(object_meta)
400                    }
401                })
402                .into_send()
403                .boxed();
404
405            let stream = if list_with_start_after {
406                lister
407            } else {
408                lister
409                    .try_filter(move |entry| futures::future::ready(entry.location > offset))
410                    .into_send()
411                    .boxed()
412            };
413
414            Ok::<_, object_store::Error>(stream)
415        };
416
417        fut.into_stream().into_send().try_flatten().boxed()
418    }
419
420    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
421        let path = prefix.map_or("".into(), |x| format!("{}/", x));
422        let mut stream = self
423            .inner
424            .lister_with(&path)
425            .into_future()
426            .into_send()
427            .await
428            .map_err(|err| format_object_store_error(err, &path))?
429            .into_send();
430
431        let mut common_prefixes = Vec::new();
432        let mut objects = Vec::new();
433
434        while let Some(res) = stream.next().into_send().await {
435            let entry = res.map_err(|err| format_object_store_error(err, ""))?;
436            let meta = entry.metadata();
437
438            if meta.is_dir() {
439                common_prefixes.push(entry.path().into());
440            } else if meta.last_modified().is_some() {
441                objects.push(format_object_meta(entry.path(), meta));
442            } else {
443                let meta = self
444                    .inner
445                    .stat(entry.path())
446                    .into_send()
447                    .await
448                    .map_err(|err| format_object_store_error(err, entry.path()))?;
449                objects.push(format_object_meta(entry.path(), &meta));
450            }
451        }
452
453        Ok(ListResult {
454            common_prefixes,
455            objects,
456        })
457    }
458
459    async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
460        Err(object_store::Error::NotSupported {
461            source: Box::new(opendal::Error::new(
462                opendal::ErrorKind::Unsupported,
463                "copy is not implemented so far",
464            )),
465        })
466    }
467
468    async fn rename(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
469        Err(object_store::Error::NotSupported {
470            source: Box::new(opendal::Error::new(
471                opendal::ErrorKind::Unsupported,
472                "rename is not implemented so far",
473            )),
474        })
475    }
476
477    async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
478        Err(object_store::Error::NotSupported {
479            source: Box::new(opendal::Error::new(
480                opendal::ErrorKind::Unsupported,
481                "copy_if_not_exists is not implemented so far",
482            )),
483        })
484    }
485}
486
487/// `MultipartUpload`'s impl based on `Writer` in opendal
488///
489/// # Notes
490///
491/// OpenDAL writer can handle concurrent internally we don't generate real `UploadPart` like existing
492/// implementation do. Instead, we just write the part and notify the next task to be written.
493///
494/// The lock here doesn't really involve the write process, it's just for the notify mechanism.
495struct OpendalMultipartUpload {
496    writer: Arc<Mutex<Writer>>,
497    location: Path,
498    next_notify: Option<Arc<Notify>>,
499}
500
501impl OpendalMultipartUpload {
502    fn new(writer: Writer, location: Path) -> Self {
503        Self {
504            writer: Arc::new(Mutex::new(writer)),
505            location,
506            next_notify: None,
507        }
508    }
509}
510
511#[async_trait]
512impl MultipartUpload for OpendalMultipartUpload {
513    fn put_part(&mut self, data: PutPayload) -> UploadPart {
514        let writer = self.writer.clone();
515        let location = self.location.clone();
516
517        // Generate next notify which will be notified after the current part is written.
518        let next_notify = Arc::new(Notify::new());
519        // Fetch the notify for current part to wait for it to be written.
520        let current_notify = self.next_notify.replace(next_notify.clone());
521
522        async move {
523            // current_notify == None means that it's the first part, we don't need to wait.
524            if let Some(notify) = current_notify {
525                // Wait for the previous part to be written
526                notify.notified().await;
527            }
528
529            let mut writer = writer.lock().await;
530            let result = writer
531                .write(Buffer::from_iter(data.into_iter()))
532                .await
533                .map_err(|err| format_object_store_error(err, location.as_ref()));
534
535            // Notify the next part to be written
536            next_notify.notify_one();
537
538            result
539        }
540        .into_send()
541        .boxed()
542    }
543
544    async fn complete(&mut self) -> object_store::Result<PutResult> {
545        let mut writer = self.writer.lock().await;
546        writer
547            .close()
548            .into_send()
549            .await
550            .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
551
552        Ok(PutResult {
553            e_tag: None,
554            version: None,
555        })
556    }
557
558    async fn abort(&mut self) -> object_store::Result<()> {
559        let mut writer = self.writer.lock().await;
560        writer
561            .abort()
562            .into_send()
563            .await
564            .map_err(|err| format_object_store_error(err, self.location.as_ref()))
565    }
566}
567
568impl Debug for OpendalMultipartUpload {
569    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
570        f.debug_struct("OpendalMultipartUpload")
571            .field("location", &self.location)
572            .finish()
573    }
574}
575
576#[cfg(test)]
577mod tests {
578    use bytes::Bytes;
579    use object_store::path::Path;
580    use object_store::{ObjectStore, WriteMultipart};
581    use opendal::services;
582    use rand::prelude::*;
583    use std::sync::Arc;
584
585    use super::*;
586
587    async fn create_test_object_store() -> Arc<dyn ObjectStore> {
588        let op = Operator::new(services::Memory::default()).unwrap().finish();
589        let object_store = Arc::new(OpendalStore::new(op));
590
591        let path: Path = "data/test.txt".into();
592        let bytes = Bytes::from_static(b"hello, world!");
593        object_store.put(&path, bytes.into()).await.unwrap();
594
595        let path: Path = "data/nested/test.txt".into();
596        let bytes = Bytes::from_static(b"hello, world! I am nested.");
597        object_store.put(&path, bytes.into()).await.unwrap();
598
599        object_store
600    }
601
602    #[tokio::test]
603    async fn test_basic() {
604        let op = Operator::new(services::Memory::default()).unwrap().finish();
605        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
606
607        // Retrieve a specific file
608        let path: Path = "data/test.txt".into();
609
610        let bytes = Bytes::from_static(b"hello, world!");
611        object_store.put(&path, bytes.clone().into()).await.unwrap();
612
613        let meta = object_store.head(&path).await.unwrap();
614
615        assert_eq!(meta.size, 13);
616
617        assert_eq!(
618            object_store
619                .get(&path)
620                .await
621                .unwrap()
622                .bytes()
623                .await
624                .unwrap(),
625            bytes
626        );
627    }
628
629    #[tokio::test]
630    async fn test_put_multipart() {
631        let op = Operator::new(services::Memory::default()).unwrap().finish();
632        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
633
634        let mut rng = thread_rng();
635
636        // Case complete
637        let path: Path = "data/test_complete.txt".into();
638        let upload = object_store.put_multipart(&path).await.unwrap();
639
640        let mut write = WriteMultipart::new(upload);
641
642        let mut all_bytes = vec![];
643        let round = rng.gen_range(1..=1024);
644        for _ in 0..round {
645            let size = rng.gen_range(1..=1024);
646            let mut bytes = vec![0; size];
647            rng.fill_bytes(&mut bytes);
648
649            all_bytes.extend_from_slice(&bytes);
650            write.put(bytes.into());
651        }
652
653        let _ = write.finish().await.unwrap();
654
655        let meta = object_store.head(&path).await.unwrap();
656
657        assert_eq!(meta.size, all_bytes.len() as u64);
658
659        assert_eq!(
660            object_store
661                .get(&path)
662                .await
663                .unwrap()
664                .bytes()
665                .await
666                .unwrap(),
667            Bytes::from(all_bytes)
668        );
669
670        // Case abort
671        let path: Path = "data/test_abort.txt".into();
672        let mut upload = object_store.put_multipart(&path).await.unwrap();
673        upload.put_part(vec![1; 1024].into()).await.unwrap();
674        upload.abort().await.unwrap();
675
676        let res = object_store.head(&path).await;
677        let err = res.unwrap_err();
678
679        assert!(matches!(err, object_store::Error::NotFound { .. }))
680    }
681
682    #[tokio::test]
683    async fn test_list() {
684        let object_store = create_test_object_store().await;
685        let path: Path = "data/".into();
686        let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
687        assert_eq!(results.len(), 2);
688        let mut locations = results
689            .iter()
690            .map(|x| x.as_ref().unwrap().location.as_ref())
691            .collect::<Vec<_>>();
692
693        let expected_files = vec![
694            (
695                "data/nested/test.txt",
696                Bytes::from_static(b"hello, world! I am nested."),
697            ),
698            ("data/test.txt", Bytes::from_static(b"hello, world!")),
699        ];
700
701        let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
702
703        locations.sort();
704        assert_eq!(locations, expected_locations);
705
706        for (location, bytes) in expected_files {
707            let path: Path = location.into();
708            assert_eq!(
709                object_store
710                    .get(&path)
711                    .await
712                    .unwrap()
713                    .bytes()
714                    .await
715                    .unwrap(),
716                bytes
717            );
718        }
719    }
720
721    #[tokio::test]
722    async fn test_list_with_delimiter() {
723        let object_store = create_test_object_store().await;
724        let path: Path = "data/".into();
725        let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
726        assert_eq!(result.objects.len(), 1);
727        assert_eq!(result.common_prefixes.len(), 1);
728        assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
729        assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
730    }
731
732    #[tokio::test]
733    async fn test_list_with_offset() {
734        let object_store = create_test_object_store().await;
735        let path: Path = "data/".into();
736        let offset: Path = "data/nested/test.txt".into();
737        let result = object_store
738            .list_with_offset(Some(&path), &offset)
739            .collect::<Vec<_>>()
740            .await;
741        assert_eq!(result.len(), 1);
742        assert_eq!(
743            result[0].as_ref().unwrap().location.as_ref(),
744            "data/test.txt"
745        );
746    }
747}