object_store_opendal/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::ops::Range;
22use std::sync::Arc;
23
24use crate::utils::*;
25use crate::{datetime_to_timestamp, timestamp_to_datetime};
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::FutureExt;
29use futures::StreamExt;
30use futures::TryStreamExt;
31use futures::stream::BoxStream;
32use mea::mutex::Mutex;
33use mea::oneshot;
34use object_store::CopyMode as ObjectStoreCopyMode;
35use object_store::CopyOptions as ObjectStoreCopyOptions;
36use object_store::ListResult;
37use object_store::MultipartUpload;
38use object_store::ObjectMeta;
39use object_store::ObjectStore;
40use object_store::PutMultipartOptions;
41use object_store::PutOptions;
42use object_store::PutPayload;
43use object_store::PutResult;
44use object_store::path::Path;
45use object_store::{GetOptions, UploadPart};
46use object_store::{GetRange, GetResultPayload};
47use object_store::{GetResult, PutMode};
48use opendal::Buffer;
49use opendal::Writer;
50use opendal::options::CopyOptions;
51use opendal::raw::percent_decode_path;
52use opendal::{Operator, OperatorInfo};
53use std::collections::HashMap;
54
55const DEFAULT_CONCURRENT: usize = 8;
56
57/// OpendalStore implements ObjectStore trait by using opendal.
58///
59/// This allows users to use opendal as an object store without extra cost.
60///
61/// Visit [`opendal::services`] for more information about supported services.
62///
63/// ```no_run
64/// use std::sync::Arc;
65///
66/// use bytes::Bytes;
67/// use object_store::path::Path;
68/// use object_store::ObjectStore;
69/// use object_store::ObjectStoreExt;
70/// use object_store_opendal::OpendalStore;
71/// use opendal::services::S3;
72/// use opendal::{Builder, Operator};
73///
74/// #[tokio::main]
75/// async fn main() {
76///    let builder = S3::default()
77///     .access_key_id("my_access_key")
78///     .secret_access_key("my_secret_key")
79///     .endpoint("my_endpoint")
80///     .region("my_region");
81///
82///     // Create a new operator
83///     let operator = Operator::new(builder).unwrap().finish();
84///
85///     // Create a new object store
86///     let object_store = Arc::new(OpendalStore::new(operator));
87///
88///     let path = Path::from("data/nested/test.txt");
89///     let bytes = Bytes::from_static(b"hello, world! I am nested.");
90///
91///     object_store.put(&path, bytes.clone().into()).await.unwrap();
92///
93///     let content = object_store
94///         .get(&path)
95///         .await
96///         .unwrap()
97///         .bytes()
98///         .await
99///         .unwrap();
100///
101///     assert_eq!(content, bytes);
102/// }
103/// ```
104#[derive(Clone)]
105pub struct OpendalStore {
106    info: Arc<OperatorInfo>,
107    inner: Operator,
108}
109
110impl OpendalStore {
111    /// Create OpendalStore by given Operator.
112    pub fn new(op: Operator) -> Self {
113        Self {
114            info: op.info().into(),
115            inner: op,
116        }
117    }
118
119    /// Get the Operator info.
120    pub fn info(&self) -> &OperatorInfo {
121        self.info.as_ref()
122    }
123
124    /// Copy a file from one location to another
125    async fn copy_request(
126        &self,
127        from: &Path,
128        to: &Path,
129        if_not_exists: bool,
130    ) -> object_store::Result<()> {
131        let mut copy_options = CopyOptions::default();
132        if if_not_exists {
133            copy_options.if_not_exists = true;
134        }
135
136        // Perform the copy operation
137        self.inner
138            .copy_options(
139                &percent_decode_path(from.as_ref()),
140                &percent_decode_path(to.as_ref()),
141                copy_options,
142            )
143            .into_send()
144            .await
145            .map_err(|err| {
146                if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
147                    object_store::Error::AlreadyExists {
148                        path: to.to_string(),
149                        source: Box::new(err),
150                    }
151                } else {
152                    format_object_store_error(err, from.as_ref())
153                }
154            })?;
155
156        Ok(())
157    }
158}
159
160impl Debug for OpendalStore {
161    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
162        f.debug_struct("OpendalStore")
163            .field("scheme", &self.info.scheme())
164            .field("name", &self.info.name())
165            .field("root", &self.info.root())
166            .field("capability", &self.info.full_capability())
167            .finish()
168    }
169}
170
171impl Display for OpendalStore {
172    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
173        let info = self.inner.info();
174        write!(
175            f,
176            "Opendal({}, bucket={}, root={})",
177            info.scheme(),
178            info.name(),
179            info.root()
180        )
181    }
182}
183
184impl From<Operator> for OpendalStore {
185    fn from(value: Operator) -> Self {
186        Self::new(value)
187    }
188}
189
190#[async_trait]
191impl ObjectStore for OpendalStore {
192    async fn put_opts(
193        &self,
194        location: &Path,
195        bytes: PutPayload,
196        opts: PutOptions,
197    ) -> object_store::Result<PutResult> {
198        let decoded_location = percent_decode_path(location.as_ref());
199        let mut future_write = self
200            .inner
201            .write_with(&decoded_location, Buffer::from_iter(bytes));
202        let opts_mode = opts.mode.clone();
203        match opts.mode {
204            PutMode::Overwrite => {}
205            PutMode::Create => {
206                future_write = future_write.if_not_exists(true);
207            }
208            PutMode::Update(update_version) => {
209                let Some(etag) = update_version.e_tag else {
210                    Err(object_store::Error::NotSupported {
211                        source: Box::new(opendal::Error::new(
212                            opendal::ErrorKind::Unsupported,
213                            "etag is required for conditional put",
214                        )),
215                    })?
216                };
217                future_write = future_write.if_match(etag.as_str());
218            }
219        }
220        let rp = future_write.into_send().await.map_err(|err| {
221            match format_object_store_error(err, location.as_ref()) {
222                object_store::Error::Precondition { path, source }
223                    if opts_mode == PutMode::Create =>
224                {
225                    object_store::Error::AlreadyExists { path, source }
226                }
227                e => e,
228            }
229        })?;
230
231        let e_tag = rp.etag().map(|s| s.to_string());
232        let version = rp.version().map(|s| s.to_string());
233
234        Ok(PutResult { e_tag, version })
235    }
236
237    async fn put_multipart_opts(
238        &self,
239        location: &Path,
240        opts: PutMultipartOptions,
241    ) -> object_store::Result<Box<dyn MultipartUpload>> {
242        let mut options = opendal::options::WriteOptions {
243            concurrent: DEFAULT_CONCURRENT,
244            ..Default::default()
245        };
246
247        // Collect user metadata separately to handle multiple entries
248        let mut user_metadata = HashMap::new();
249
250        // Handle attributes if provided
251        for (key, value) in opts.attributes.iter() {
252            match key {
253                object_store::Attribute::CacheControl => {
254                    options.cache_control = Some(value.to_string());
255                }
256                object_store::Attribute::ContentDisposition => {
257                    options.content_disposition = Some(value.to_string());
258                }
259                object_store::Attribute::ContentEncoding => {
260                    options.content_encoding = Some(value.to_string());
261                }
262                object_store::Attribute::ContentLanguage => {
263                    // no support
264                    continue;
265                }
266                object_store::Attribute::ContentType => {
267                    options.content_type = Some(value.to_string());
268                }
269                object_store::Attribute::Metadata(k) => {
270                    user_metadata.insert(k.to_string(), value.to_string());
271                }
272                _ => {}
273            }
274        }
275
276        // Apply user metadata if any entries were collected
277        if !user_metadata.is_empty() {
278            options.user_metadata = Some(user_metadata);
279        }
280
281        let decoded_location = percent_decode_path(location.as_ref());
282        let writer = self
283            .inner
284            .writer_options(&decoded_location, options)
285            .into_send()
286            .await
287            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
288        let upload = OpendalMultipartUpload::new(writer, location.clone());
289
290        Ok(Box::new(upload))
291    }
292
293    async fn get_opts(
294        &self,
295        location: &Path,
296        options: GetOptions,
297    ) -> object_store::Result<GetResult> {
298        let raw_location = percent_decode_path(location.as_ref());
299        let meta = {
300            let mut s = self.inner.stat_with(&raw_location);
301            if let Some(version) = &options.version {
302                s = s.version(version.as_str())
303            }
304            if let Some(if_match) = &options.if_match {
305                s = s.if_match(if_match.as_str());
306            }
307            if let Some(if_none_match) = &options.if_none_match {
308                s = s.if_none_match(if_none_match.as_str());
309            }
310            if let Some(if_modified_since) =
311                options.if_modified_since.and_then(datetime_to_timestamp)
312            {
313                s = s.if_modified_since(if_modified_since);
314            }
315            if let Some(if_unmodified_since) =
316                options.if_unmodified_since.and_then(datetime_to_timestamp)
317            {
318                s = s.if_unmodified_since(if_unmodified_since);
319            }
320            s.into_send()
321                .await
322                .map_err(|err| format_object_store_error(err, location.as_ref()))?
323        };
324
325        // Convert user defined metadata from OpenDAL to object_store attributes
326        let mut attributes = object_store::Attributes::new();
327        if let Some(user_meta) = meta.user_metadata() {
328            for (key, value) in user_meta {
329                attributes.insert(
330                    object_store::Attribute::Metadata(key.clone().into()),
331                    value.clone().into(),
332                );
333            }
334        }
335
336        let meta = ObjectMeta {
337            location: location.clone(),
338            last_modified: meta
339                .last_modified()
340                .and_then(timestamp_to_datetime)
341                .unwrap_or_default(),
342            size: meta.content_length(),
343            e_tag: meta.etag().map(|x| x.to_string()),
344            version: meta.version().map(|x| x.to_string()),
345        };
346
347        if options.head {
348            return Ok(GetResult {
349                payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
350                range: 0..0,
351                meta,
352                attributes,
353            });
354        }
355
356        let reader = {
357            let mut r = self.inner.reader_with(raw_location.as_ref());
358            if let Some(version) = options.version {
359                r = r.version(version.as_str());
360            }
361            if let Some(if_match) = options.if_match {
362                r = r.if_match(if_match.as_str());
363            }
364            if let Some(if_none_match) = options.if_none_match {
365                r = r.if_none_match(if_none_match.as_str());
366            }
367            if let Some(if_modified_since) =
368                options.if_modified_since.and_then(datetime_to_timestamp)
369            {
370                r = r.if_modified_since(if_modified_since);
371            }
372            if let Some(if_unmodified_since) =
373                options.if_unmodified_since.and_then(datetime_to_timestamp)
374            {
375                r = r.if_unmodified_since(if_unmodified_since);
376            }
377            r.into_send()
378                .await
379                .map_err(|err| format_object_store_error(err, location.as_ref()))?
380        };
381
382        let read_range = match options.range {
383            Some(GetRange::Bounded(r)) => {
384                if r.start >= r.end || r.start >= meta.size {
385                    0..0
386                } else {
387                    let end = r.end.min(meta.size);
388                    r.start..end
389                }
390            }
391            Some(GetRange::Offset(r)) => {
392                if r < meta.size {
393                    r..meta.size
394                } else {
395                    0..0
396                }
397            }
398            Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
399            _ => 0..meta.size,
400        };
401
402        let stream = reader
403            .into_bytes_stream(read_range.start..read_range.end)
404            .into_send()
405            .await
406            .map_err(|err| format_object_store_error(err, location.as_ref()))?
407            .into_send()
408            .map_err(|err: io::Error| object_store::Error::Generic {
409                store: "IoError",
410                source: Box::new(err),
411            });
412
413        Ok(GetResult {
414            payload: GetResultPayload::Stream(Box::pin(stream)),
415            range: read_range.start..read_range.end,
416            meta,
417            attributes,
418        })
419    }
420
421    async fn get_ranges(
422        &self,
423        location: &Path,
424        ranges: &[Range<u64>],
425    ) -> object_store::Result<Vec<Bytes>> {
426        let raw_location = percent_decode_path(location.as_ref());
427        let reader = self
428            .inner
429            .reader(&raw_location)
430            .into_send()
431            .await
432            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
433
434        let location_ref: Arc<str> = Arc::from(location.as_ref());
435        futures::stream::iter(ranges.iter().cloned())
436            .map(|range| {
437                let reader = reader.clone();
438                let location_ref = location_ref.clone();
439                async move {
440                    reader
441                        .read(range)
442                        .into_send()
443                        .await
444                        .map(|buf| buf.to_bytes())
445                        .map_err(|err| format_object_store_error(err, &location_ref))
446                }
447            })
448            .buffered(DEFAULT_CONCURRENT)
449            .try_collect()
450            .await
451    }
452
453    fn delete_stream(
454        &self,
455        locations: BoxStream<'static, object_store::Result<Path>>,
456    ) -> BoxStream<'static, object_store::Result<Path>> {
457        let this = self.clone();
458        locations
459            .and_then(move |location| {
460                let this = this.clone();
461                async move {
462                    let decoded = percent_decode_path(location.as_ref());
463                    this.inner
464                        .delete(&decoded)
465                        .into_send()
466                        .await
467                        .map_err(|err| format_object_store_error(err, location.as_ref()))?;
468                    Ok(location)
469                }
470                .into_send()
471            })
472            .boxed()
473    }
474
475    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
476        // object_store `Path` always removes trailing slash
477        // need to add it back
478        let path = prefix.map_or("".into(), |x| {
479            format!("{}/", percent_decode_path(x.as_ref()))
480        });
481
482        let this = self.clone();
483        let fut = async move {
484            let stream = this
485                .inner
486                .lister_with(&path)
487                .recursive(true)
488                .await
489                .map_err(|err| format_object_store_error(err, &path))?;
490
491            let stream = stream.then(|res| async {
492                let entry = res.map_err(|err| format_object_store_error(err, ""))?;
493                let meta = entry.metadata();
494
495                Ok(format_object_meta(entry.path(), meta))
496            });
497            Ok::<_, object_store::Error>(stream)
498        };
499
500        fut.into_stream().try_flatten().into_send().boxed()
501    }
502
503    fn list_with_offset(
504        &self,
505        prefix: Option<&Path>,
506        offset: &Path,
507    ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
508        let path = prefix.map_or("".into(), |x| {
509            format!("{}/", percent_decode_path(x.as_ref()))
510        });
511        let offset = offset.clone();
512
513        // clone self for 'static lifetime
514        // clone self is cheap
515        let this = self.clone();
516
517        let fut = async move {
518            let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
519            let mut fut = this.inner.lister_with(&path).recursive(true);
520
521            // Use native start_after support if possible.
522            if list_with_start_after {
523                fut = fut.start_after(offset.as_ref());
524            }
525
526            let lister = fut
527                .await
528                .map_err(|err| format_object_store_error(err, &path))?
529                .then(move |entry| {
530                    let path = path.clone();
531                    let this = this.clone();
532                    async move {
533                        let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
534                        let (path, metadata) = entry.into_parts();
535
536                        // If it's a dir or last_modified is present, we can use it directly.
537                        if metadata.is_dir() || metadata.last_modified().is_some() {
538                            let object_meta = format_object_meta(&path, &metadata);
539                            return Ok(object_meta);
540                        }
541
542                        let metadata = this
543                            .inner
544                            .stat(&path)
545                            .await
546                            .map_err(|err| format_object_store_error(err, &path))?;
547                        let object_meta = format_object_meta(&path, &metadata);
548                        Ok::<_, object_store::Error>(object_meta)
549                    }
550                })
551                .into_send()
552                .boxed();
553
554            let stream = if list_with_start_after {
555                lister
556            } else {
557                lister
558                    .try_filter(move |entry| futures::future::ready(entry.location > offset))
559                    .into_send()
560                    .boxed()
561            };
562
563            Ok::<_, object_store::Error>(stream)
564        };
565
566        fut.into_stream().into_send().try_flatten().boxed()
567    }
568
569    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
570        let path = prefix.map_or("".into(), |x| {
571            format!("{}/", percent_decode_path(x.as_ref()))
572        });
573        let mut stream = self
574            .inner
575            .lister_with(&path)
576            .into_future()
577            .into_send()
578            .await
579            .map_err(|err| format_object_store_error(err, &path))?
580            .into_send();
581
582        let mut common_prefixes = Vec::new();
583        let mut objects = Vec::new();
584
585        while let Some(res) = stream.next().into_send().await {
586            let entry = res.map_err(|err| format_object_store_error(err, ""))?;
587            let meta = entry.metadata();
588
589            if meta.is_dir() {
590                common_prefixes.push(entry.path().into());
591            } else if meta.last_modified().is_some() {
592                objects.push(format_object_meta(entry.path(), meta));
593            } else {
594                let meta = self
595                    .inner
596                    .stat(entry.path())
597                    .into_send()
598                    .await
599                    .map_err(|err| format_object_store_error(err, entry.path()))?;
600                objects.push(format_object_meta(entry.path(), &meta));
601            }
602        }
603
604        Ok(ListResult {
605            common_prefixes,
606            objects,
607        })
608    }
609
610    async fn copy_opts(
611        &self,
612        from: &Path,
613        to: &Path,
614        options: ObjectStoreCopyOptions,
615    ) -> object_store::Result<()> {
616        let if_not_exists = matches!(options.mode, ObjectStoreCopyMode::Create);
617        self.copy_request(from, to, if_not_exists).await
618    }
619}
620
621/// `MultipartUpload`'s impl based on `Writer` in opendal
622///
623/// # Notes
624///
625/// OpenDAL writer can handle concurrent internally we don't generate real `UploadPart` like existing
626/// implementation do. Instead, we just write the part and notify the next task to be written.
627///
628/// The lock here doesn't really involve the write process, it's just for the notify mechanism.
629struct OpendalMultipartUpload {
630    writer: Arc<Mutex<Writer>>,
631    location: Path,
632    next_notify: oneshot::Receiver<()>,
633}
634
635impl OpendalMultipartUpload {
636    fn new(writer: Writer, location: Path) -> Self {
637        // an immediately dropped sender for the first part to write without waiting
638        let (_, rx) = oneshot::channel();
639
640        Self {
641            writer: Arc::new(Mutex::new(writer)),
642            location,
643            next_notify: rx,
644        }
645    }
646}
647
648#[async_trait]
649impl MultipartUpload for OpendalMultipartUpload {
650    fn put_part(&mut self, data: PutPayload) -> UploadPart {
651        let writer = self.writer.clone();
652        let location = self.location.clone();
653
654        // Generate next notify which will be notified after the current part is written.
655        let (tx, rx) = oneshot::channel();
656        // Fetch the notify for current part to wait for it to be written.
657        let last_rx = std::mem::replace(&mut self.next_notify, rx);
658
659        async move {
660            // Wait for the previous part to be written
661            let _ = last_rx.await;
662
663            let mut writer = writer.lock().await;
664            let result = writer
665                .write(Buffer::from_iter(data))
666                .await
667                .map_err(|err| format_object_store_error(err, location.as_ref()));
668
669            // Notify the next part to be written
670            drop(tx);
671
672            result
673        }
674        .into_send()
675        .boxed()
676    }
677
678    async fn complete(&mut self) -> object_store::Result<PutResult> {
679        let mut writer = self.writer.lock().await;
680        let metadata = writer
681            .close()
682            .into_send()
683            .await
684            .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
685
686        let e_tag = metadata.etag().map(|s| s.to_string());
687        let version = metadata.version().map(|s| s.to_string());
688
689        Ok(PutResult { e_tag, version })
690    }
691
692    async fn abort(&mut self) -> object_store::Result<()> {
693        let mut writer = self.writer.lock().await;
694        writer
695            .abort()
696            .into_send()
697            .await
698            .map_err(|err| format_object_store_error(err, self.location.as_ref()))
699    }
700}
701
702impl Debug for OpendalMultipartUpload {
703    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
704        f.debug_struct("OpendalMultipartUpload")
705            .field("location", &self.location)
706            .finish()
707    }
708}
709
710#[cfg(test)]
711mod tests {
712    use bytes::Bytes;
713    use object_store::path::Path;
714    use object_store::{ObjectStore, ObjectStoreExt, WriteMultipart};
715    use opendal::services;
716    use rand::prelude::*;
717    use rand::rng;
718    use std::sync::Arc;
719
720    use super::*;
721
722    async fn create_test_object_store() -> Arc<dyn ObjectStore> {
723        let op = Operator::new(services::Memory::default()).unwrap().finish();
724        let object_store = Arc::new(OpendalStore::new(op));
725
726        let path: Path = "data/test.txt".into();
727        let bytes = Bytes::from_static(b"hello, world!");
728        object_store.put(&path, bytes.into()).await.unwrap();
729
730        let path: Path = "data/nested/test.txt".into();
731        let bytes = Bytes::from_static(b"hello, world! I am nested.");
732        object_store.put(&path, bytes.into()).await.unwrap();
733
734        object_store
735    }
736
737    #[tokio::test]
738    async fn test_basic() {
739        let op = Operator::new(services::Memory::default()).unwrap().finish();
740        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
741
742        // Retrieve a specific file
743        let path: Path = "data/test.txt".into();
744
745        let bytes = Bytes::from_static(b"hello, world!");
746        object_store.put(&path, bytes.clone().into()).await.unwrap();
747
748        let meta = object_store.head(&path).await.unwrap();
749
750        assert_eq!(meta.size, 13);
751
752        assert_eq!(
753            object_store
754                .get(&path)
755                .await
756                .unwrap()
757                .bytes()
758                .await
759                .unwrap(),
760            bytes
761        );
762    }
763
764    #[tokio::test]
765    async fn test_put_multipart() {
766        let op = Operator::new(services::Memory::default()).unwrap().finish();
767        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
768
769        let mut rng = rng();
770
771        // Case complete
772        let path: Path = "data/test_complete.txt".into();
773        let upload = object_store.put_multipart(&path).await.unwrap();
774
775        let mut write = WriteMultipart::new(upload);
776
777        let mut all_bytes = vec![];
778        let round = rng.random_range(1..=1024);
779        for _ in 0..round {
780            let size = rng.random_range(1..=1024);
781            let mut bytes = vec![0; size];
782            rng.fill_bytes(&mut bytes);
783
784            all_bytes.extend_from_slice(&bytes);
785            write.put(bytes.into());
786        }
787
788        let _ = write.finish().await.unwrap();
789
790        let meta = object_store.head(&path).await.unwrap();
791
792        assert_eq!(meta.size, all_bytes.len() as u64);
793
794        assert_eq!(
795            object_store
796                .get(&path)
797                .await
798                .unwrap()
799                .bytes()
800                .await
801                .unwrap(),
802            Bytes::from(all_bytes)
803        );
804
805        // Case abort
806        let path: Path = "data/test_abort.txt".into();
807        let mut upload = object_store.put_multipart(&path).await.unwrap();
808        upload.put_part(vec![1; 1024].into()).await.unwrap();
809        upload.abort().await.unwrap();
810
811        let res = object_store.head(&path).await;
812        let err = res.unwrap_err();
813
814        assert!(matches!(err, object_store::Error::NotFound { .. }))
815    }
816
817    #[tokio::test]
818    async fn test_list() {
819        let object_store = create_test_object_store().await;
820        let path: Path = "data/".into();
821        let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
822        assert_eq!(results.len(), 2);
823        let mut locations = results
824            .iter()
825            .map(|x| x.as_ref().unwrap().location.as_ref())
826            .collect::<Vec<_>>();
827
828        let expected_files = vec![
829            (
830                "data/nested/test.txt",
831                Bytes::from_static(b"hello, world! I am nested."),
832            ),
833            ("data/test.txt", Bytes::from_static(b"hello, world!")),
834        ];
835
836        let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
837
838        locations.sort();
839        assert_eq!(locations, expected_locations);
840
841        for (location, bytes) in expected_files {
842            let path: Path = location.into();
843            assert_eq!(
844                object_store
845                    .get(&path)
846                    .await
847                    .unwrap()
848                    .bytes()
849                    .await
850                    .unwrap(),
851                bytes
852            );
853        }
854    }
855
856    #[tokio::test]
857    async fn test_list_with_delimiter() {
858        let object_store = create_test_object_store().await;
859        let path: Path = "data/".into();
860        let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
861        assert_eq!(result.objects.len(), 1);
862        assert_eq!(result.common_prefixes.len(), 1);
863        assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
864        assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
865    }
866
867    #[tokio::test]
868    async fn test_list_with_offset() {
869        let object_store = create_test_object_store().await;
870        let path: Path = "data/".into();
871        let offset: Path = "data/nested/test.txt".into();
872        let result = object_store
873            .list_with_offset(Some(&path), &offset)
874            .collect::<Vec<_>>()
875            .await;
876        assert_eq!(result.len(), 1);
877        assert_eq!(
878            result[0].as_ref().unwrap().location.as_ref(),
879            "data/test.txt"
880        );
881    }
882
883    /// Custom layer that counts stat operations for testing
884    mod stat_counter {
885        use super::*;
886        use std::sync::atomic::{AtomicUsize, Ordering};
887
888        #[derive(Debug, Clone)]
889        pub struct StatCounterLayer {
890            count: Arc<AtomicUsize>,
891        }
892
893        impl StatCounterLayer {
894            pub fn new(count: Arc<AtomicUsize>) -> Self {
895                Self { count }
896            }
897        }
898
899        impl<A: opendal::raw::Access> opendal::raw::Layer<A> for StatCounterLayer {
900            type LayeredAccess = StatCounterAccessor<A>;
901
902            fn layer(&self, inner: A) -> Self::LayeredAccess {
903                StatCounterAccessor {
904                    inner,
905                    count: self.count.clone(),
906                }
907            }
908        }
909
910        #[derive(Debug, Clone)]
911        pub struct StatCounterAccessor<A> {
912            inner: A,
913            count: Arc<AtomicUsize>,
914        }
915
916        impl<A: opendal::raw::Access> opendal::raw::LayeredAccess for StatCounterAccessor<A> {
917            type Inner = A;
918            type Reader = A::Reader;
919            type Writer = A::Writer;
920            type Lister = A::Lister;
921            type Deleter = A::Deleter;
922
923            fn inner(&self) -> &Self::Inner {
924                &self.inner
925            }
926
927            async fn stat(
928                &self,
929                path: &str,
930                args: opendal::raw::OpStat,
931            ) -> opendal::Result<opendal::raw::RpStat> {
932                self.count.fetch_add(1, Ordering::SeqCst);
933                self.inner.stat(path, args).await
934            }
935
936            async fn read(
937                &self,
938                path: &str,
939                args: opendal::raw::OpRead,
940            ) -> opendal::Result<(opendal::raw::RpRead, Self::Reader)> {
941                self.inner.read(path, args).await
942            }
943
944            async fn write(
945                &self,
946                path: &str,
947                args: opendal::raw::OpWrite,
948            ) -> opendal::Result<(opendal::raw::RpWrite, Self::Writer)> {
949                self.inner.write(path, args).await
950            }
951
952            async fn delete(&self) -> opendal::Result<(opendal::raw::RpDelete, Self::Deleter)> {
953                self.inner.delete().await
954            }
955
956            async fn list(
957                &self,
958                path: &str,
959                args: opendal::raw::OpList,
960            ) -> opendal::Result<(opendal::raw::RpList, Self::Lister)> {
961                self.inner.list(path, args).await
962            }
963
964            async fn copy(
965                &self,
966                from: &str,
967                to: &str,
968                args: opendal::raw::OpCopy,
969            ) -> opendal::Result<opendal::raw::RpCopy> {
970                self.inner.copy(from, to, args).await
971            }
972
973            async fn rename(
974                &self,
975                from: &str,
976                to: &str,
977                args: opendal::raw::OpRename,
978            ) -> opendal::Result<opendal::raw::RpRename> {
979                self.inner.rename(from, to, args).await
980            }
981        }
982    }
983
984    #[tokio::test]
985    async fn test_get_range_no_stat() {
986        use std::sync::atomic::{AtomicUsize, Ordering};
987
988        // Create a stat counter and operator with tracking layer
989        let stat_count = Arc::new(AtomicUsize::new(0));
990        let op = Operator::new(opendal::services::Memory::default())
991            .unwrap()
992            .layer(stat_counter::StatCounterLayer::new(stat_count.clone()))
993            .finish();
994        let store = OpendalStore::new(op);
995
996        // Create a test file
997        let location = "test_get_range.txt".into();
998        let value = Bytes::from_static(b"Hello, world!");
999        store.put(&location, value.clone().into()).await.unwrap();
1000
1001        // Reset counter after put
1002        stat_count.store(0, Ordering::SeqCst);
1003
1004        // Test 1: get_ranges should NOT call stat()
1005        #[allow(clippy::single_range_in_vec_init)]
1006        let ranges = [0u64..5];
1007        let ret = store.get_ranges(&location, &ranges).await.unwrap();
1008        assert_eq!(Bytes::from_static(b"Hello"), ret[0]);
1009        assert_eq!(
1010            stat_count.load(Ordering::SeqCst),
1011            0,
1012            "get_ranges should not call stat()"
1013        );
1014
1015        // Reset counter
1016        stat_count.store(0, Ordering::SeqCst);
1017
1018        // Test 2: get_opts SHOULD call stat() to get metadata
1019        let opts = object_store::GetOptions {
1020            range: Some(object_store::GetRange::Bounded(0..5)),
1021            ..Default::default()
1022        };
1023        let ret = store.get_opts(&location, opts).await.unwrap();
1024        let data = ret.bytes().await.unwrap();
1025        assert_eq!(Bytes::from_static(b"Hello"), data);
1026        assert!(
1027            stat_count.load(Ordering::SeqCst) > 0,
1028            "get_opts should call stat() to get metadata"
1029        );
1030
1031        // Cleanup
1032        store.delete(&location).await.unwrap();
1033    }
1034}