object_store_opendal/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::sync::Arc;
22
23use crate::utils::*;
24use async_trait::async_trait;
25use futures::stream::BoxStream;
26use futures::FutureExt;
27use futures::StreamExt;
28use futures::TryStreamExt;
29use object_store::path::Path;
30use object_store::ListResult;
31use object_store::MultipartUpload;
32use object_store::ObjectMeta;
33use object_store::ObjectStore;
34use object_store::PutMultipartOptions;
35use object_store::PutOptions;
36use object_store::PutPayload;
37use object_store::PutResult;
38use object_store::{GetOptions, UploadPart};
39use object_store::{GetRange, GetResultPayload};
40use object_store::{GetResult, PutMode};
41use opendal::options::CopyOptions;
42use opendal::raw::percent_decode_path;
43use opendal::Buffer;
44use opendal::Writer;
45use opendal::{Operator, OperatorInfo};
46use std::collections::HashMap;
47use tokio::sync::{Mutex, Notify};
48
49/// OpendalStore implements ObjectStore trait by using opendal.
50///
51/// This allows users to use opendal as an object store without extra cost.
52///
53/// Visit [`opendal::services`] for more information about supported services.
54///
55/// ```no_run
56/// use std::sync::Arc;
57///
58/// use bytes::Bytes;
59/// use object_store::path::Path;
60/// use object_store::ObjectStore;
61/// use object_store_opendal::OpendalStore;
62/// use opendal::services::S3;
63/// use opendal::{Builder, Operator};
64///
65/// #[tokio::main]
66/// async fn main() {
67///    let builder = S3::default()
68///     .access_key_id("my_access_key")
69///     .secret_access_key("my_secret_key")
70///     .endpoint("my_endpoint")
71///     .region("my_region");
72///
73///     // Create a new operator
74///     let operator = Operator::new(builder).unwrap().finish();
75///
76///     // Create a new object store
77///     let object_store = Arc::new(OpendalStore::new(operator));
78///
79///     let path = Path::from("data/nested/test.txt");
80///     let bytes = Bytes::from_static(b"hello, world! I am nested.");
81///
82///     object_store.put(&path, bytes.clone().into()).await.unwrap();
83///
84///     let content = object_store
85///         .get(&path)
86///         .await
87///         .unwrap()
88///         .bytes()
89///         .await
90///         .unwrap();
91///
92///     assert_eq!(content, bytes);
93/// }
94/// ```
95#[derive(Clone)]
96pub struct OpendalStore {
97    info: Arc<OperatorInfo>,
98    inner: Operator,
99}
100
101impl OpendalStore {
102    /// Create OpendalStore by given Operator.
103    pub fn new(op: Operator) -> Self {
104        Self {
105            info: op.info().into(),
106            inner: op,
107        }
108    }
109
110    /// Get the Operator info.
111    pub fn info(&self) -> &OperatorInfo {
112        self.info.as_ref()
113    }
114
115    /// Copy a file from one location to another
116    async fn copy_request(
117        &self,
118        from: &Path,
119        to: &Path,
120        if_not_exists: bool,
121    ) -> object_store::Result<()> {
122        let mut copy_options = CopyOptions::default();
123        if if_not_exists {
124            copy_options.if_not_exists = true;
125        }
126
127        // Perform the copy operation
128        self.inner
129            .copy_options(
130                &percent_decode_path(from.as_ref()),
131                &percent_decode_path(to.as_ref()),
132                copy_options,
133            )
134            .into_send()
135            .await
136            .map_err(|err| {
137                if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
138                    object_store::Error::AlreadyExists {
139                        path: to.to_string(),
140                        source: Box::new(err),
141                    }
142                } else {
143                    format_object_store_error(err, from.as_ref())
144                }
145            })?;
146
147        Ok(())
148    }
149}
150
151impl Debug for OpendalStore {
152    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153        f.debug_struct("OpendalStore")
154            .field("scheme", &self.info.scheme())
155            .field("name", &self.info.name())
156            .field("root", &self.info.root())
157            .field("capability", &self.info.full_capability())
158            .finish()
159    }
160}
161
162impl Display for OpendalStore {
163    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
164        let info = self.inner.info();
165        write!(
166            f,
167            "Opendal({}, bucket={}, root={})",
168            info.scheme(),
169            info.name(),
170            info.root()
171        )
172    }
173}
174
175impl From<Operator> for OpendalStore {
176    fn from(value: Operator) -> Self {
177        Self::new(value)
178    }
179}
180
181#[async_trait]
182impl ObjectStore for OpendalStore {
183    async fn put_opts(
184        &self,
185        location: &Path,
186        bytes: PutPayload,
187        opts: PutOptions,
188    ) -> object_store::Result<PutResult> {
189        let mut future_write = self.inner.write_with(
190            &percent_decode_path(location.as_ref()),
191            Buffer::from_iter(bytes),
192        );
193        let opts_mode = opts.mode.clone();
194        match opts.mode {
195            PutMode::Overwrite => {}
196            PutMode::Create => {
197                future_write = future_write.if_not_exists(true);
198            }
199            PutMode::Update(update_version) => {
200                let Some(etag) = update_version.e_tag else {
201                    Err(object_store::Error::NotSupported {
202                        source: Box::new(opendal::Error::new(
203                            opendal::ErrorKind::Unsupported,
204                            "etag is required for conditional put",
205                        )),
206                    })?
207                };
208                future_write = future_write.if_match(etag.as_str());
209            }
210        }
211        let rp = future_write.into_send().await.map_err(|err| {
212            match format_object_store_error(err, location.as_ref()) {
213                object_store::Error::Precondition { path, source }
214                    if opts_mode == PutMode::Create =>
215                {
216                    object_store::Error::AlreadyExists { path, source }
217                }
218                e => e,
219            }
220        })?;
221
222        let e_tag = rp.etag().map(|s| s.to_string());
223        let version = rp.version().map(|s| s.to_string());
224
225        Ok(PutResult { e_tag, version })
226    }
227
228    async fn put_multipart(
229        &self,
230        location: &Path,
231    ) -> object_store::Result<Box<dyn MultipartUpload>> {
232        let writer = self
233            .inner
234            .writer_with(&percent_decode_path(location.as_ref()))
235            .concurrent(8)
236            .into_send()
237            .await
238            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
239        let upload = OpendalMultipartUpload::new(writer, location.clone());
240
241        Ok(Box::new(upload))
242    }
243
244    async fn put_multipart_opts(
245        &self,
246        location: &Path,
247        opts: PutMultipartOptions,
248    ) -> object_store::Result<Box<dyn MultipartUpload>> {
249        const DEFAULT_CONCURRENT: usize = 8;
250
251        let mut options = opendal::options::WriteOptions {
252            concurrent: DEFAULT_CONCURRENT,
253            ..Default::default()
254        };
255
256        // Collect user metadata separately to handle multiple entries
257        let mut user_metadata = HashMap::new();
258
259        // Handle attributes if provided
260        for (key, value) in opts.attributes.iter() {
261            match key {
262                object_store::Attribute::CacheControl => {
263                    options.cache_control = Some(value.to_string());
264                }
265                object_store::Attribute::ContentDisposition => {
266                    options.content_disposition = Some(value.to_string());
267                }
268                object_store::Attribute::ContentEncoding => {
269                    options.content_encoding = Some(value.to_string());
270                }
271                object_store::Attribute::ContentLanguage => {
272                    // no support
273                    continue;
274                }
275                object_store::Attribute::ContentType => {
276                    options.content_type = Some(value.to_string());
277                }
278                object_store::Attribute::Metadata(k) => {
279                    user_metadata.insert(k.to_string(), value.to_string());
280                }
281                _ => {}
282            }
283        }
284
285        // Apply user metadata if any entries were collected
286        if !user_metadata.is_empty() {
287            options.user_metadata = Some(user_metadata);
288        }
289
290        let writer = self
291            .inner
292            .writer_options(&percent_decode_path(location.as_ref()), options)
293            .into_send()
294            .await
295            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
296        let upload = OpendalMultipartUpload::new(writer, location.clone());
297
298        Ok(Box::new(upload))
299    }
300
301    async fn get_opts(
302        &self,
303        location: &Path,
304        options: GetOptions,
305    ) -> object_store::Result<GetResult> {
306        let raw_location = percent_decode_path(location.as_ref());
307        let meta = {
308            let mut s = self.inner.stat_with(&raw_location);
309            if let Some(version) = &options.version {
310                s = s.version(version.as_str())
311            }
312            if let Some(if_match) = &options.if_match {
313                s = s.if_match(if_match.as_str());
314            }
315            if let Some(if_none_match) = &options.if_none_match {
316                s = s.if_none_match(if_none_match.as_str());
317            }
318            if let Some(if_modified_since) = options.if_modified_since {
319                s = s.if_modified_since(if_modified_since);
320            }
321            if let Some(if_unmodified_since) = options.if_unmodified_since {
322                s = s.if_unmodified_since(if_unmodified_since);
323            }
324            s.into_send()
325                .await
326                .map_err(|err| format_object_store_error(err, location.as_ref()))?
327        };
328
329        // Convert user defined metadata from OpenDAL to object_store attributes
330        let mut attributes = object_store::Attributes::new();
331        if let Some(user_meta) = meta.user_metadata() {
332            for (key, value) in user_meta {
333                attributes.insert(
334                    object_store::Attribute::Metadata(key.clone().into()),
335                    value.clone().into(),
336                );
337            }
338        }
339
340        let meta = ObjectMeta {
341            location: location.clone(),
342            last_modified: meta.last_modified().unwrap_or_default(),
343            size: meta.content_length(),
344            e_tag: meta.etag().map(|x| x.to_string()),
345            version: meta.version().map(|x| x.to_string()),
346        };
347
348        if options.head {
349            return Ok(GetResult {
350                payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
351                range: 0..0,
352                meta,
353                attributes,
354            });
355        }
356
357        let reader = {
358            let mut r = self.inner.reader_with(raw_location.as_ref());
359            if let Some(version) = options.version {
360                r = r.version(version.as_str());
361            }
362            if let Some(if_match) = options.if_match {
363                r = r.if_match(if_match.as_str());
364            }
365            if let Some(if_none_match) = options.if_none_match {
366                r = r.if_none_match(if_none_match.as_str());
367            }
368            if let Some(if_modified_since) = options.if_modified_since {
369                r = r.if_modified_since(if_modified_since);
370            }
371            if let Some(if_unmodified_since) = options.if_unmodified_since {
372                r = r.if_unmodified_since(if_unmodified_since);
373            }
374            r.into_send()
375                .await
376                .map_err(|err| format_object_store_error(err, location.as_ref()))?
377        };
378
379        let read_range = match options.range {
380            Some(GetRange::Bounded(r)) => {
381                if r.start >= r.end || r.start >= meta.size {
382                    0..0
383                } else {
384                    let end = r.end.min(meta.size);
385                    r.start..end
386                }
387            }
388            Some(GetRange::Offset(r)) => {
389                if r < meta.size {
390                    r..meta.size
391                } else {
392                    0..0
393                }
394            }
395            Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
396            _ => 0..meta.size,
397        };
398
399        let stream = reader
400            .into_bytes_stream(read_range.start..read_range.end)
401            .into_send()
402            .await
403            .map_err(|err| format_object_store_error(err, location.as_ref()))?
404            .into_send()
405            .map_err(|err: io::Error| object_store::Error::Generic {
406                store: "IoError",
407                source: Box::new(err),
408            });
409
410        Ok(GetResult {
411            payload: GetResultPayload::Stream(Box::pin(stream)),
412            range: read_range.start..read_range.end,
413            meta,
414            attributes,
415        })
416    }
417
418    async fn delete(&self, location: &Path) -> object_store::Result<()> {
419        self.inner
420            .delete(&percent_decode_path(location.as_ref()))
421            .into_send()
422            .await
423            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
424
425        Ok(())
426    }
427
428    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
429        // object_store `Path` always removes trailing slash
430        // need to add it back
431        let path = prefix.map_or("".into(), |x| {
432            format!("{}/", percent_decode_path(x.as_ref()))
433        });
434
435        let lister_fut = self.inner.lister_with(&path).recursive(true);
436        let fut = async move {
437            let stream = lister_fut
438                .await
439                .map_err(|err| format_object_store_error(err, &path))?;
440
441            let stream = stream.then(|res| async {
442                let entry = res.map_err(|err| format_object_store_error(err, ""))?;
443                let meta = entry.metadata();
444
445                Ok(format_object_meta(entry.path(), meta))
446            });
447            Ok::<_, object_store::Error>(stream)
448        };
449
450        fut.into_stream().try_flatten().into_send().boxed()
451    }
452
453    fn list_with_offset(
454        &self,
455        prefix: Option<&Path>,
456        offset: &Path,
457    ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
458        let path = prefix.map_or("".into(), |x| {
459            format!("{}/", percent_decode_path(x.as_ref()))
460        });
461        let offset = offset.clone();
462
463        // clone self for 'static lifetime
464        // clone self is cheap
465        let this = self.clone();
466
467        let fut = async move {
468            let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
469            let mut fut = this.inner.lister_with(&path).recursive(true);
470
471            // Use native start_after support if possible.
472            if list_with_start_after {
473                fut = fut.start_after(offset.as_ref());
474            }
475
476            let lister = fut
477                .await
478                .map_err(|err| format_object_store_error(err, &path))?
479                .then(move |entry| {
480                    let path = path.clone();
481                    let this = this.clone();
482                    async move {
483                        let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
484                        let (path, metadata) = entry.into_parts();
485
486                        // If it's a dir or last_modified is present, we can use it directly.
487                        if metadata.is_dir() || metadata.last_modified().is_some() {
488                            let object_meta = format_object_meta(&path, &metadata);
489                            return Ok(object_meta);
490                        }
491
492                        let metadata = this
493                            .inner
494                            .stat(&path)
495                            .await
496                            .map_err(|err| format_object_store_error(err, &path))?;
497                        let object_meta = format_object_meta(&path, &metadata);
498                        Ok::<_, object_store::Error>(object_meta)
499                    }
500                })
501                .into_send()
502                .boxed();
503
504            let stream = if list_with_start_after {
505                lister
506            } else {
507                lister
508                    .try_filter(move |entry| futures::future::ready(entry.location > offset))
509                    .into_send()
510                    .boxed()
511            };
512
513            Ok::<_, object_store::Error>(stream)
514        };
515
516        fut.into_stream().into_send().try_flatten().boxed()
517    }
518
519    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
520        let path = prefix.map_or("".into(), |x| {
521            format!("{}/", percent_decode_path(x.as_ref()))
522        });
523        let mut stream = self
524            .inner
525            .lister_with(&path)
526            .into_future()
527            .into_send()
528            .await
529            .map_err(|err| format_object_store_error(err, &path))?
530            .into_send();
531
532        let mut common_prefixes = Vec::new();
533        let mut objects = Vec::new();
534
535        while let Some(res) = stream.next().into_send().await {
536            let entry = res.map_err(|err| format_object_store_error(err, ""))?;
537            let meta = entry.metadata();
538
539            if meta.is_dir() {
540                common_prefixes.push(entry.path().into());
541            } else if meta.last_modified().is_some() {
542                objects.push(format_object_meta(entry.path(), meta));
543            } else {
544                let meta = self
545                    .inner
546                    .stat(entry.path())
547                    .into_send()
548                    .await
549                    .map_err(|err| format_object_store_error(err, entry.path()))?;
550                objects.push(format_object_meta(entry.path(), &meta));
551            }
552        }
553
554        Ok(ListResult {
555            common_prefixes,
556            objects,
557        })
558    }
559
560    async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
561        self.copy_request(from, to, false).await
562    }
563
564    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
565        self.copy_request(from, to, true).await
566    }
567
568    async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
569        self.inner
570            .rename(
571                &percent_decode_path(from.as_ref()),
572                &percent_decode_path(to.as_ref()),
573            )
574            .into_send()
575            .await
576            .map_err(|err| format_object_store_error(err, from.as_ref()))?;
577
578        Ok(())
579    }
580}
581
582/// `MultipartUpload`'s impl based on `Writer` in opendal
583///
584/// # Notes
585///
586/// OpenDAL writer can handle concurrent internally we don't generate real `UploadPart` like existing
587/// implementation do. Instead, we just write the part and notify the next task to be written.
588///
589/// The lock here doesn't really involve the write process, it's just for the notify mechanism.
590struct OpendalMultipartUpload {
591    writer: Arc<Mutex<Writer>>,
592    location: Path,
593    next_notify: Option<Arc<Notify>>,
594}
595
596impl OpendalMultipartUpload {
597    fn new(writer: Writer, location: Path) -> Self {
598        Self {
599            writer: Arc::new(Mutex::new(writer)),
600            location,
601            next_notify: None,
602        }
603    }
604}
605
606#[async_trait]
607impl MultipartUpload for OpendalMultipartUpload {
608    fn put_part(&mut self, data: PutPayload) -> UploadPart {
609        let writer = self.writer.clone();
610        let location = self.location.clone();
611
612        // Generate next notify which will be notified after the current part is written.
613        let next_notify = Arc::new(Notify::new());
614        // Fetch the notify for current part to wait for it to be written.
615        let current_notify = self.next_notify.replace(next_notify.clone());
616
617        async move {
618            // current_notify == None means that it's the first part, we don't need to wait.
619            if let Some(notify) = current_notify {
620                // Wait for the previous part to be written
621                notify.notified().await;
622            }
623
624            let mut writer = writer.lock().await;
625            let result = writer
626                .write(Buffer::from_iter(data.into_iter()))
627                .await
628                .map_err(|err| format_object_store_error(err, location.as_ref()));
629
630            // Notify the next part to be written
631            next_notify.notify_one();
632
633            result
634        }
635        .into_send()
636        .boxed()
637    }
638
639    async fn complete(&mut self) -> object_store::Result<PutResult> {
640        let mut writer = self.writer.lock().await;
641        let metadata = writer
642            .close()
643            .into_send()
644            .await
645            .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
646
647        let e_tag = metadata.etag().map(|s| s.to_string());
648        let version = metadata.version().map(|s| s.to_string());
649
650        Ok(PutResult { e_tag, version })
651    }
652
653    async fn abort(&mut self) -> object_store::Result<()> {
654        let mut writer = self.writer.lock().await;
655        writer
656            .abort()
657            .into_send()
658            .await
659            .map_err(|err| format_object_store_error(err, self.location.as_ref()))
660    }
661}
662
663impl Debug for OpendalMultipartUpload {
664    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
665        f.debug_struct("OpendalMultipartUpload")
666            .field("location", &self.location)
667            .finish()
668    }
669}
670
671#[cfg(test)]
672mod tests {
673    use bytes::Bytes;
674    use object_store::path::Path;
675    use object_store::{ObjectStore, WriteMultipart};
676    use opendal::services;
677    use rand::prelude::*;
678    use std::sync::Arc;
679
680    use super::*;
681
682    async fn create_test_object_store() -> Arc<dyn ObjectStore> {
683        let op = Operator::new(services::Memory::default()).unwrap().finish();
684        let object_store = Arc::new(OpendalStore::new(op));
685
686        let path: Path = "data/test.txt".into();
687        let bytes = Bytes::from_static(b"hello, world!");
688        object_store.put(&path, bytes.into()).await.unwrap();
689
690        let path: Path = "data/nested/test.txt".into();
691        let bytes = Bytes::from_static(b"hello, world! I am nested.");
692        object_store.put(&path, bytes.into()).await.unwrap();
693
694        object_store
695    }
696
697    #[tokio::test]
698    async fn test_basic() {
699        let op = Operator::new(services::Memory::default()).unwrap().finish();
700        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
701
702        // Retrieve a specific file
703        let path: Path = "data/test.txt".into();
704
705        let bytes = Bytes::from_static(b"hello, world!");
706        object_store.put(&path, bytes.clone().into()).await.unwrap();
707
708        let meta = object_store.head(&path).await.unwrap();
709
710        assert_eq!(meta.size, 13);
711
712        assert_eq!(
713            object_store
714                .get(&path)
715                .await
716                .unwrap()
717                .bytes()
718                .await
719                .unwrap(),
720            bytes
721        );
722    }
723
724    #[tokio::test]
725    async fn test_put_multipart() {
726        let op = Operator::new(services::Memory::default()).unwrap().finish();
727        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
728
729        let mut rng = thread_rng();
730
731        // Case complete
732        let path: Path = "data/test_complete.txt".into();
733        let upload = object_store.put_multipart(&path).await.unwrap();
734
735        let mut write = WriteMultipart::new(upload);
736
737        let mut all_bytes = vec![];
738        let round = rng.gen_range(1..=1024);
739        for _ in 0..round {
740            let size = rng.gen_range(1..=1024);
741            let mut bytes = vec![0; size];
742            rng.fill_bytes(&mut bytes);
743
744            all_bytes.extend_from_slice(&bytes);
745            write.put(bytes.into());
746        }
747
748        let _ = write.finish().await.unwrap();
749
750        let meta = object_store.head(&path).await.unwrap();
751
752        assert_eq!(meta.size, all_bytes.len() as u64);
753
754        assert_eq!(
755            object_store
756                .get(&path)
757                .await
758                .unwrap()
759                .bytes()
760                .await
761                .unwrap(),
762            Bytes::from(all_bytes)
763        );
764
765        // Case abort
766        let path: Path = "data/test_abort.txt".into();
767        let mut upload = object_store.put_multipart(&path).await.unwrap();
768        upload.put_part(vec![1; 1024].into()).await.unwrap();
769        upload.abort().await.unwrap();
770
771        let res = object_store.head(&path).await;
772        let err = res.unwrap_err();
773
774        assert!(matches!(err, object_store::Error::NotFound { .. }))
775    }
776
777    #[tokio::test]
778    async fn test_list() {
779        let object_store = create_test_object_store().await;
780        let path: Path = "data/".into();
781        let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
782        assert_eq!(results.len(), 2);
783        let mut locations = results
784            .iter()
785            .map(|x| x.as_ref().unwrap().location.as_ref())
786            .collect::<Vec<_>>();
787
788        let expected_files = vec![
789            (
790                "data/nested/test.txt",
791                Bytes::from_static(b"hello, world! I am nested."),
792            ),
793            ("data/test.txt", Bytes::from_static(b"hello, world!")),
794        ];
795
796        let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
797
798        locations.sort();
799        assert_eq!(locations, expected_locations);
800
801        for (location, bytes) in expected_files {
802            let path: Path = location.into();
803            assert_eq!(
804                object_store
805                    .get(&path)
806                    .await
807                    .unwrap()
808                    .bytes()
809                    .await
810                    .unwrap(),
811                bytes
812            );
813        }
814    }
815
816    #[tokio::test]
817    async fn test_list_with_delimiter() {
818        let object_store = create_test_object_store().await;
819        let path: Path = "data/".into();
820        let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
821        assert_eq!(result.objects.len(), 1);
822        assert_eq!(result.common_prefixes.len(), 1);
823        assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
824        assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
825    }
826
827    #[tokio::test]
828    async fn test_list_with_offset() {
829        let object_store = create_test_object_store().await;
830        let path: Path = "data/".into();
831        let offset: Path = "data/nested/test.txt".into();
832        let result = object_store
833            .list_with_offset(Some(&path), &offset)
834            .collect::<Vec<_>>()
835            .await;
836        assert_eq!(result.len(), 1);
837        assert_eq!(
838            result[0].as_ref().unwrap().location.as_ref(),
839            "data/test.txt"
840        );
841    }
842}