object_store_opendal/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::sync::Arc;
22
23use crate::utils::*;
24use crate::{datetime_to_timestamp, timestamp_to_datetime};
25use async_trait::async_trait;
26use futures::FutureExt;
27use futures::StreamExt;
28use futures::TryStreamExt;
29use futures::stream::BoxStream;
30use mea::mutex::Mutex;
31use mea::oneshot;
32use object_store::ListResult;
33use object_store::MultipartUpload;
34use object_store::ObjectMeta;
35use object_store::ObjectStore;
36use object_store::PutMultipartOptions;
37use object_store::PutOptions;
38use object_store::PutPayload;
39use object_store::PutResult;
40use object_store::path::Path;
41use object_store::{GetOptions, UploadPart};
42use object_store::{GetRange, GetResultPayload};
43use object_store::{GetResult, PutMode};
44use opendal::Buffer;
45use opendal::Writer;
46use opendal::options::CopyOptions;
47use opendal::raw::percent_decode_path;
48use opendal::{Operator, OperatorInfo};
49use std::collections::HashMap;
50
51/// OpendalStore implements ObjectStore trait by using opendal.
52///
53/// This allows users to use opendal as an object store without extra cost.
54///
55/// Visit [`opendal::services`] for more information about supported services.
56///
57/// ```no_run
58/// use std::sync::Arc;
59///
60/// use bytes::Bytes;
61/// use object_store::path::Path;
62/// use object_store::ObjectStore;
63/// use object_store_opendal::OpendalStore;
64/// use opendal::services::S3;
65/// use opendal::{Builder, Operator};
66///
67/// #[tokio::main]
68/// async fn main() {
69///    let builder = S3::default()
70///     .access_key_id("my_access_key")
71///     .secret_access_key("my_secret_key")
72///     .endpoint("my_endpoint")
73///     .region("my_region");
74///
75///     // Create a new operator
76///     let operator = Operator::new(builder).unwrap().finish();
77///
78///     // Create a new object store
79///     let object_store = Arc::new(OpendalStore::new(operator));
80///
81///     let path = Path::from("data/nested/test.txt");
82///     let bytes = Bytes::from_static(b"hello, world! I am nested.");
83///
84///     object_store.put(&path, bytes.clone().into()).await.unwrap();
85///
86///     let content = object_store
87///         .get(&path)
88///         .await
89///         .unwrap()
90///         .bytes()
91///         .await
92///         .unwrap();
93///
94///     assert_eq!(content, bytes);
95/// }
96/// ```
97#[derive(Clone)]
98pub struct OpendalStore {
99    info: Arc<OperatorInfo>,
100    inner: Operator,
101}
102
103impl OpendalStore {
104    /// Create OpendalStore by given Operator.
105    pub fn new(op: Operator) -> Self {
106        Self {
107            info: op.info().into(),
108            inner: op,
109        }
110    }
111
112    /// Get the Operator info.
113    pub fn info(&self) -> &OperatorInfo {
114        self.info.as_ref()
115    }
116
117    /// Copy a file from one location to another
118    async fn copy_request(
119        &self,
120        from: &Path,
121        to: &Path,
122        if_not_exists: bool,
123    ) -> object_store::Result<()> {
124        let mut copy_options = CopyOptions::default();
125        if if_not_exists {
126            copy_options.if_not_exists = true;
127        }
128
129        // Perform the copy operation
130        self.inner
131            .copy_options(
132                &percent_decode_path(from.as_ref()),
133                &percent_decode_path(to.as_ref()),
134                copy_options,
135            )
136            .into_send()
137            .await
138            .map_err(|err| {
139                if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
140                    object_store::Error::AlreadyExists {
141                        path: to.to_string(),
142                        source: Box::new(err),
143                    }
144                } else {
145                    format_object_store_error(err, from.as_ref())
146                }
147            })?;
148
149        Ok(())
150    }
151}
152
153impl Debug for OpendalStore {
154    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
155        f.debug_struct("OpendalStore")
156            .field("scheme", &self.info.scheme())
157            .field("name", &self.info.name())
158            .field("root", &self.info.root())
159            .field("capability", &self.info.full_capability())
160            .finish()
161    }
162}
163
164impl Display for OpendalStore {
165    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
166        let info = self.inner.info();
167        write!(
168            f,
169            "Opendal({}, bucket={}, root={})",
170            info.scheme(),
171            info.name(),
172            info.root()
173        )
174    }
175}
176
177impl From<Operator> for OpendalStore {
178    fn from(value: Operator) -> Self {
179        Self::new(value)
180    }
181}
182
183#[async_trait]
184impl ObjectStore for OpendalStore {
185    async fn put_opts(
186        &self,
187        location: &Path,
188        bytes: PutPayload,
189        opts: PutOptions,
190    ) -> object_store::Result<PutResult> {
191        let decoded_location = percent_decode_path(location.as_ref());
192        let mut future_write = self
193            .inner
194            .write_with(&decoded_location, Buffer::from_iter(bytes));
195        let opts_mode = opts.mode.clone();
196        match opts.mode {
197            PutMode::Overwrite => {}
198            PutMode::Create => {
199                future_write = future_write.if_not_exists(true);
200            }
201            PutMode::Update(update_version) => {
202                let Some(etag) = update_version.e_tag else {
203                    Err(object_store::Error::NotSupported {
204                        source: Box::new(opendal::Error::new(
205                            opendal::ErrorKind::Unsupported,
206                            "etag is required for conditional put",
207                        )),
208                    })?
209                };
210                future_write = future_write.if_match(etag.as_str());
211            }
212        }
213        let rp = future_write.into_send().await.map_err(|err| {
214            match format_object_store_error(err, location.as_ref()) {
215                object_store::Error::Precondition { path, source }
216                    if opts_mode == PutMode::Create =>
217                {
218                    object_store::Error::AlreadyExists { path, source }
219                }
220                e => e,
221            }
222        })?;
223
224        let e_tag = rp.etag().map(|s| s.to_string());
225        let version = rp.version().map(|s| s.to_string());
226
227        Ok(PutResult { e_tag, version })
228    }
229
230    async fn put_multipart(
231        &self,
232        location: &Path,
233    ) -> object_store::Result<Box<dyn MultipartUpload>> {
234        let decoded_location = percent_decode_path(location.as_ref());
235        let writer = self
236            .inner
237            .writer_with(&decoded_location)
238            .concurrent(8)
239            .into_send()
240            .await
241            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
242        let upload = OpendalMultipartUpload::new(writer, location.clone());
243
244        Ok(Box::new(upload))
245    }
246
247    async fn put_multipart_opts(
248        &self,
249        location: &Path,
250        opts: PutMultipartOptions,
251    ) -> object_store::Result<Box<dyn MultipartUpload>> {
252        const DEFAULT_CONCURRENT: usize = 8;
253
254        let mut options = opendal::options::WriteOptions {
255            concurrent: DEFAULT_CONCURRENT,
256            ..Default::default()
257        };
258
259        // Collect user metadata separately to handle multiple entries
260        let mut user_metadata = HashMap::new();
261
262        // Handle attributes if provided
263        for (key, value) in opts.attributes.iter() {
264            match key {
265                object_store::Attribute::CacheControl => {
266                    options.cache_control = Some(value.to_string());
267                }
268                object_store::Attribute::ContentDisposition => {
269                    options.content_disposition = Some(value.to_string());
270                }
271                object_store::Attribute::ContentEncoding => {
272                    options.content_encoding = Some(value.to_string());
273                }
274                object_store::Attribute::ContentLanguage => {
275                    // no support
276                    continue;
277                }
278                object_store::Attribute::ContentType => {
279                    options.content_type = Some(value.to_string());
280                }
281                object_store::Attribute::Metadata(k) => {
282                    user_metadata.insert(k.to_string(), value.to_string());
283                }
284                _ => {}
285            }
286        }
287
288        // Apply user metadata if any entries were collected
289        if !user_metadata.is_empty() {
290            options.user_metadata = Some(user_metadata);
291        }
292
293        let decoded_location = percent_decode_path(location.as_ref());
294        let writer = self
295            .inner
296            .writer_options(&decoded_location, options)
297            .into_send()
298            .await
299            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
300        let upload = OpendalMultipartUpload::new(writer, location.clone());
301
302        Ok(Box::new(upload))
303    }
304
305    async fn get_opts(
306        &self,
307        location: &Path,
308        options: GetOptions,
309    ) -> object_store::Result<GetResult> {
310        let raw_location = percent_decode_path(location.as_ref());
311        let meta = {
312            let mut s = self.inner.stat_with(&raw_location);
313            if let Some(version) = &options.version {
314                s = s.version(version.as_str())
315            }
316            if let Some(if_match) = &options.if_match {
317                s = s.if_match(if_match.as_str());
318            }
319            if let Some(if_none_match) = &options.if_none_match {
320                s = s.if_none_match(if_none_match.as_str());
321            }
322            if let Some(if_modified_since) =
323                options.if_modified_since.and_then(datetime_to_timestamp)
324            {
325                s = s.if_modified_since(if_modified_since);
326            }
327            if let Some(if_unmodified_since) =
328                options.if_unmodified_since.and_then(datetime_to_timestamp)
329            {
330                s = s.if_unmodified_since(if_unmodified_since);
331            }
332            s.into_send()
333                .await
334                .map_err(|err| format_object_store_error(err, location.as_ref()))?
335        };
336
337        // Convert user defined metadata from OpenDAL to object_store attributes
338        let mut attributes = object_store::Attributes::new();
339        if let Some(user_meta) = meta.user_metadata() {
340            for (key, value) in user_meta {
341                attributes.insert(
342                    object_store::Attribute::Metadata(key.clone().into()),
343                    value.clone().into(),
344                );
345            }
346        }
347
348        let meta = ObjectMeta {
349            location: location.clone(),
350            last_modified: meta
351                .last_modified()
352                .and_then(timestamp_to_datetime)
353                .unwrap_or_default(),
354            size: meta.content_length(),
355            e_tag: meta.etag().map(|x| x.to_string()),
356            version: meta.version().map(|x| x.to_string()),
357        };
358
359        if options.head {
360            return Ok(GetResult {
361                payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
362                range: 0..0,
363                meta,
364                attributes,
365            });
366        }
367
368        let reader = {
369            let mut r = self.inner.reader_with(raw_location.as_ref());
370            if let Some(version) = options.version {
371                r = r.version(version.as_str());
372            }
373            if let Some(if_match) = options.if_match {
374                r = r.if_match(if_match.as_str());
375            }
376            if let Some(if_none_match) = options.if_none_match {
377                r = r.if_none_match(if_none_match.as_str());
378            }
379            if let Some(if_modified_since) =
380                options.if_modified_since.and_then(datetime_to_timestamp)
381            {
382                r = r.if_modified_since(if_modified_since);
383            }
384            if let Some(if_unmodified_since) =
385                options.if_unmodified_since.and_then(datetime_to_timestamp)
386            {
387                r = r.if_unmodified_since(if_unmodified_since);
388            }
389            r.into_send()
390                .await
391                .map_err(|err| format_object_store_error(err, location.as_ref()))?
392        };
393
394        let read_range = match options.range {
395            Some(GetRange::Bounded(r)) => {
396                if r.start >= r.end || r.start >= meta.size {
397                    0..0
398                } else {
399                    let end = r.end.min(meta.size);
400                    r.start..end
401                }
402            }
403            Some(GetRange::Offset(r)) => {
404                if r < meta.size {
405                    r..meta.size
406                } else {
407                    0..0
408                }
409            }
410            Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
411            _ => 0..meta.size,
412        };
413
414        let stream = reader
415            .into_bytes_stream(read_range.start..read_range.end)
416            .into_send()
417            .await
418            .map_err(|err| format_object_store_error(err, location.as_ref()))?
419            .into_send()
420            .map_err(|err: io::Error| object_store::Error::Generic {
421                store: "IoError",
422                source: Box::new(err),
423            });
424
425        Ok(GetResult {
426            payload: GetResultPayload::Stream(Box::pin(stream)),
427            range: read_range.start..read_range.end,
428            meta,
429            attributes,
430        })
431    }
432
433    async fn delete(&self, location: &Path) -> object_store::Result<()> {
434        let decoded_location = percent_decode_path(location.as_ref());
435        self.inner
436            .delete(&decoded_location)
437            .into_send()
438            .await
439            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
440
441        Ok(())
442    }
443
444    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
445        // object_store `Path` always removes trailing slash
446        // need to add it back
447        let path = prefix.map_or("".into(), |x| {
448            format!("{}/", percent_decode_path(x.as_ref()))
449        });
450
451        let this = self.clone();
452        let fut = async move {
453            let stream = this
454                .inner
455                .lister_with(&path)
456                .recursive(true)
457                .await
458                .map_err(|err| format_object_store_error(err, &path))?;
459
460            let stream = stream.then(|res| async {
461                let entry = res.map_err(|err| format_object_store_error(err, ""))?;
462                let meta = entry.metadata();
463
464                Ok(format_object_meta(entry.path(), meta))
465            });
466            Ok::<_, object_store::Error>(stream)
467        };
468
469        fut.into_stream().try_flatten().into_send().boxed()
470    }
471
472    fn list_with_offset(
473        &self,
474        prefix: Option<&Path>,
475        offset: &Path,
476    ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
477        let path = prefix.map_or("".into(), |x| {
478            format!("{}/", percent_decode_path(x.as_ref()))
479        });
480        let offset = offset.clone();
481
482        // clone self for 'static lifetime
483        // clone self is cheap
484        let this = self.clone();
485
486        let fut = async move {
487            let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
488            let mut fut = this.inner.lister_with(&path).recursive(true);
489
490            // Use native start_after support if possible.
491            if list_with_start_after {
492                fut = fut.start_after(offset.as_ref());
493            }
494
495            let lister = fut
496                .await
497                .map_err(|err| format_object_store_error(err, &path))?
498                .then(move |entry| {
499                    let path = path.clone();
500                    let this = this.clone();
501                    async move {
502                        let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
503                        let (path, metadata) = entry.into_parts();
504
505                        // If it's a dir or last_modified is present, we can use it directly.
506                        if metadata.is_dir() || metadata.last_modified().is_some() {
507                            let object_meta = format_object_meta(&path, &metadata);
508                            return Ok(object_meta);
509                        }
510
511                        let metadata = this
512                            .inner
513                            .stat(&path)
514                            .await
515                            .map_err(|err| format_object_store_error(err, &path))?;
516                        let object_meta = format_object_meta(&path, &metadata);
517                        Ok::<_, object_store::Error>(object_meta)
518                    }
519                })
520                .into_send()
521                .boxed();
522
523            let stream = if list_with_start_after {
524                lister
525            } else {
526                lister
527                    .try_filter(move |entry| futures::future::ready(entry.location > offset))
528                    .into_send()
529                    .boxed()
530            };
531
532            Ok::<_, object_store::Error>(stream)
533        };
534
535        fut.into_stream().into_send().try_flatten().boxed()
536    }
537
538    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
539        let path = prefix.map_or("".into(), |x| {
540            format!("{}/", percent_decode_path(x.as_ref()))
541        });
542        let mut stream = self
543            .inner
544            .lister_with(&path)
545            .into_future()
546            .into_send()
547            .await
548            .map_err(|err| format_object_store_error(err, &path))?
549            .into_send();
550
551        let mut common_prefixes = Vec::new();
552        let mut objects = Vec::new();
553
554        while let Some(res) = stream.next().into_send().await {
555            let entry = res.map_err(|err| format_object_store_error(err, ""))?;
556            let meta = entry.metadata();
557
558            if meta.is_dir() {
559                common_prefixes.push(entry.path().into());
560            } else if meta.last_modified().is_some() {
561                objects.push(format_object_meta(entry.path(), meta));
562            } else {
563                let meta = self
564                    .inner
565                    .stat(entry.path())
566                    .into_send()
567                    .await
568                    .map_err(|err| format_object_store_error(err, entry.path()))?;
569                objects.push(format_object_meta(entry.path(), &meta));
570            }
571        }
572
573        Ok(ListResult {
574            common_prefixes,
575            objects,
576        })
577    }
578
579    async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
580        self.copy_request(from, to, false).await
581    }
582
583    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
584        self.copy_request(from, to, true).await
585    }
586
587    async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
588        self.inner
589            .rename(
590                &percent_decode_path(from.as_ref()),
591                &percent_decode_path(to.as_ref()),
592            )
593            .into_send()
594            .await
595            .map_err(|err| format_object_store_error(err, from.as_ref()))?;
596
597        Ok(())
598    }
599}
600
601/// `MultipartUpload`'s impl based on `Writer` in opendal
602///
603/// # Notes
604///
605/// OpenDAL writer can handle concurrent internally we don't generate real `UploadPart` like existing
606/// implementation do. Instead, we just write the part and notify the next task to be written.
607///
608/// The lock here doesn't really involve the write process, it's just for the notify mechanism.
609struct OpendalMultipartUpload {
610    writer: Arc<Mutex<Writer>>,
611    location: Path,
612    next_notify: oneshot::Receiver<()>,
613}
614
615impl OpendalMultipartUpload {
616    fn new(writer: Writer, location: Path) -> Self {
617        // an immediately dropped sender for the first part to write without waiting
618        let (_, rx) = oneshot::channel();
619
620        Self {
621            writer: Arc::new(Mutex::new(writer)),
622            location,
623            next_notify: rx,
624        }
625    }
626}
627
628#[async_trait]
629impl MultipartUpload for OpendalMultipartUpload {
630    fn put_part(&mut self, data: PutPayload) -> UploadPart {
631        let writer = self.writer.clone();
632        let location = self.location.clone();
633
634        // Generate next notify which will be notified after the current part is written.
635        let (tx, rx) = oneshot::channel();
636        // Fetch the notify for current part to wait for it to be written.
637        let last_rx = std::mem::replace(&mut self.next_notify, rx);
638
639        async move {
640            // Wait for the previous part to be written
641            let _ = last_rx.await;
642
643            let mut writer = writer.lock().await;
644            let result = writer
645                .write(Buffer::from_iter(data.into_iter()))
646                .await
647                .map_err(|err| format_object_store_error(err, location.as_ref()));
648
649            // Notify the next part to be written
650            drop(tx);
651
652            result
653        }
654        .into_send()
655        .boxed()
656    }
657
658    async fn complete(&mut self) -> object_store::Result<PutResult> {
659        let mut writer = self.writer.lock().await;
660        let metadata = writer
661            .close()
662            .into_send()
663            .await
664            .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
665
666        let e_tag = metadata.etag().map(|s| s.to_string());
667        let version = metadata.version().map(|s| s.to_string());
668
669        Ok(PutResult { e_tag, version })
670    }
671
672    async fn abort(&mut self) -> object_store::Result<()> {
673        let mut writer = self.writer.lock().await;
674        writer
675            .abort()
676            .into_send()
677            .await
678            .map_err(|err| format_object_store_error(err, self.location.as_ref()))
679    }
680}
681
682impl Debug for OpendalMultipartUpload {
683    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
684        f.debug_struct("OpendalMultipartUpload")
685            .field("location", &self.location)
686            .finish()
687    }
688}
689
690#[cfg(test)]
691mod tests {
692    use bytes::Bytes;
693    use object_store::path::Path;
694    use object_store::{ObjectStore, WriteMultipart};
695    use opendal::services;
696    use rand::prelude::*;
697    use std::sync::Arc;
698
699    use super::*;
700
701    async fn create_test_object_store() -> Arc<dyn ObjectStore> {
702        let op = Operator::new(services::Memory::default()).unwrap().finish();
703        let object_store = Arc::new(OpendalStore::new(op));
704
705        let path: Path = "data/test.txt".into();
706        let bytes = Bytes::from_static(b"hello, world!");
707        object_store.put(&path, bytes.into()).await.unwrap();
708
709        let path: Path = "data/nested/test.txt".into();
710        let bytes = Bytes::from_static(b"hello, world! I am nested.");
711        object_store.put(&path, bytes.into()).await.unwrap();
712
713        object_store
714    }
715
716    #[tokio::test]
717    async fn test_basic() {
718        let op = Operator::new(services::Memory::default()).unwrap().finish();
719        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
720
721        // Retrieve a specific file
722        let path: Path = "data/test.txt".into();
723
724        let bytes = Bytes::from_static(b"hello, world!");
725        object_store.put(&path, bytes.clone().into()).await.unwrap();
726
727        let meta = object_store.head(&path).await.unwrap();
728
729        assert_eq!(meta.size, 13);
730
731        assert_eq!(
732            object_store
733                .get(&path)
734                .await
735                .unwrap()
736                .bytes()
737                .await
738                .unwrap(),
739            bytes
740        );
741    }
742
743    #[tokio::test]
744    async fn test_put_multipart() {
745        let op = Operator::new(services::Memory::default()).unwrap().finish();
746        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
747
748        let mut rng = thread_rng();
749
750        // Case complete
751        let path: Path = "data/test_complete.txt".into();
752        let upload = object_store.put_multipart(&path).await.unwrap();
753
754        let mut write = WriteMultipart::new(upload);
755
756        let mut all_bytes = vec![];
757        let round = rng.gen_range(1..=1024);
758        for _ in 0..round {
759            let size = rng.gen_range(1..=1024);
760            let mut bytes = vec![0; size];
761            rng.fill_bytes(&mut bytes);
762
763            all_bytes.extend_from_slice(&bytes);
764            write.put(bytes.into());
765        }
766
767        let _ = write.finish().await.unwrap();
768
769        let meta = object_store.head(&path).await.unwrap();
770
771        assert_eq!(meta.size, all_bytes.len() as u64);
772
773        assert_eq!(
774            object_store
775                .get(&path)
776                .await
777                .unwrap()
778                .bytes()
779                .await
780                .unwrap(),
781            Bytes::from(all_bytes)
782        );
783
784        // Case abort
785        let path: Path = "data/test_abort.txt".into();
786        let mut upload = object_store.put_multipart(&path).await.unwrap();
787        upload.put_part(vec![1; 1024].into()).await.unwrap();
788        upload.abort().await.unwrap();
789
790        let res = object_store.head(&path).await;
791        let err = res.unwrap_err();
792
793        assert!(matches!(err, object_store::Error::NotFound { .. }))
794    }
795
796    #[tokio::test]
797    async fn test_list() {
798        let object_store = create_test_object_store().await;
799        let path: Path = "data/".into();
800        let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
801        assert_eq!(results.len(), 2);
802        let mut locations = results
803            .iter()
804            .map(|x| x.as_ref().unwrap().location.as_ref())
805            .collect::<Vec<_>>();
806
807        let expected_files = vec![
808            (
809                "data/nested/test.txt",
810                Bytes::from_static(b"hello, world! I am nested."),
811            ),
812            ("data/test.txt", Bytes::from_static(b"hello, world!")),
813        ];
814
815        let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
816
817        locations.sort();
818        assert_eq!(locations, expected_locations);
819
820        for (location, bytes) in expected_files {
821            let path: Path = location.into();
822            assert_eq!(
823                object_store
824                    .get(&path)
825                    .await
826                    .unwrap()
827                    .bytes()
828                    .await
829                    .unwrap(),
830                bytes
831            );
832        }
833    }
834
835    #[tokio::test]
836    async fn test_list_with_delimiter() {
837        let object_store = create_test_object_store().await;
838        let path: Path = "data/".into();
839        let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
840        assert_eq!(result.objects.len(), 1);
841        assert_eq!(result.common_prefixes.len(), 1);
842        assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
843        assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
844    }
845
846    #[tokio::test]
847    async fn test_list_with_offset() {
848        let object_store = create_test_object_store().await;
849        let path: Path = "data/".into();
850        let offset: Path = "data/nested/test.txt".into();
851        let result = object_store
852            .list_with_offset(Some(&path), &offset)
853            .collect::<Vec<_>>()
854            .await;
855        assert_eq!(result.len(), 1);
856        assert_eq!(
857            result[0].as_ref().unwrap().location.as_ref(),
858            "data/test.txt"
859        );
860    }
861}