object_store_opendal/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::ops::Range;
22use std::sync::Arc;
23
24use crate::utils::*;
25use crate::{datetime_to_timestamp, timestamp_to_datetime};
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::FutureExt;
29use futures::StreamExt;
30use futures::TryStreamExt;
31use futures::stream::BoxStream;
32use mea::mutex::Mutex;
33use mea::oneshot;
34use object_store::ListResult;
35use object_store::MultipartUpload;
36use object_store::ObjectMeta;
37use object_store::ObjectStore;
38use object_store::PutMultipartOptions;
39use object_store::PutOptions;
40use object_store::PutPayload;
41use object_store::PutResult;
42use object_store::path::Path;
43use object_store::{GetOptions, UploadPart};
44use object_store::{GetRange, GetResultPayload};
45use object_store::{GetResult, PutMode};
46use opendal::Buffer;
47use opendal::Writer;
48use opendal::options::CopyOptions;
49use opendal::raw::percent_decode_path;
50use opendal::{Operator, OperatorInfo};
51use std::collections::HashMap;
52
53/// OpendalStore implements ObjectStore trait by using opendal.
54///
55/// This allows users to use opendal as an object store without extra cost.
56///
57/// Visit [`opendal::services`] for more information about supported services.
58///
59/// ```no_run
60/// use std::sync::Arc;
61///
62/// use bytes::Bytes;
63/// use object_store::path::Path;
64/// use object_store::ObjectStore;
65/// use object_store_opendal::OpendalStore;
66/// use opendal::services::S3;
67/// use opendal::{Builder, Operator};
68///
69/// #[tokio::main]
70/// async fn main() {
71///    let builder = S3::default()
72///     .access_key_id("my_access_key")
73///     .secret_access_key("my_secret_key")
74///     .endpoint("my_endpoint")
75///     .region("my_region");
76///
77///     // Create a new operator
78///     let operator = Operator::new(builder).unwrap().finish();
79///
80///     // Create a new object store
81///     let object_store = Arc::new(OpendalStore::new(operator));
82///
83///     let path = Path::from("data/nested/test.txt");
84///     let bytes = Bytes::from_static(b"hello, world! I am nested.");
85///
86///     object_store.put(&path, bytes.clone().into()).await.unwrap();
87///
88///     let content = object_store
89///         .get(&path)
90///         .await
91///         .unwrap()
92///         .bytes()
93///         .await
94///         .unwrap();
95///
96///     assert_eq!(content, bytes);
97/// }
98/// ```
99#[derive(Clone)]
100pub struct OpendalStore {
101    info: Arc<OperatorInfo>,
102    inner: Operator,
103}
104
105impl OpendalStore {
106    /// Create OpendalStore by given Operator.
107    pub fn new(op: Operator) -> Self {
108        Self {
109            info: op.info().into(),
110            inner: op,
111        }
112    }
113
114    /// Get the Operator info.
115    pub fn info(&self) -> &OperatorInfo {
116        self.info.as_ref()
117    }
118
119    /// Copy a file from one location to another
120    async fn copy_request(
121        &self,
122        from: &Path,
123        to: &Path,
124        if_not_exists: bool,
125    ) -> object_store::Result<()> {
126        let mut copy_options = CopyOptions::default();
127        if if_not_exists {
128            copy_options.if_not_exists = true;
129        }
130
131        // Perform the copy operation
132        self.inner
133            .copy_options(
134                &percent_decode_path(from.as_ref()),
135                &percent_decode_path(to.as_ref()),
136                copy_options,
137            )
138            .into_send()
139            .await
140            .map_err(|err| {
141                if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
142                    object_store::Error::AlreadyExists {
143                        path: to.to_string(),
144                        source: Box::new(err),
145                    }
146                } else {
147                    format_object_store_error(err, from.as_ref())
148                }
149            })?;
150
151        Ok(())
152    }
153}
154
155impl Debug for OpendalStore {
156    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
157        f.debug_struct("OpendalStore")
158            .field("scheme", &self.info.scheme())
159            .field("name", &self.info.name())
160            .field("root", &self.info.root())
161            .field("capability", &self.info.full_capability())
162            .finish()
163    }
164}
165
166impl Display for OpendalStore {
167    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
168        let info = self.inner.info();
169        write!(
170            f,
171            "Opendal({}, bucket={}, root={})",
172            info.scheme(),
173            info.name(),
174            info.root()
175        )
176    }
177}
178
179impl From<Operator> for OpendalStore {
180    fn from(value: Operator) -> Self {
181        Self::new(value)
182    }
183}
184
185#[async_trait]
186impl ObjectStore for OpendalStore {
187    async fn put_opts(
188        &self,
189        location: &Path,
190        bytes: PutPayload,
191        opts: PutOptions,
192    ) -> object_store::Result<PutResult> {
193        let decoded_location = percent_decode_path(location.as_ref());
194        let mut future_write = self
195            .inner
196            .write_with(&decoded_location, Buffer::from_iter(bytes));
197        let opts_mode = opts.mode.clone();
198        match opts.mode {
199            PutMode::Overwrite => {}
200            PutMode::Create => {
201                future_write = future_write.if_not_exists(true);
202            }
203            PutMode::Update(update_version) => {
204                let Some(etag) = update_version.e_tag else {
205                    Err(object_store::Error::NotSupported {
206                        source: Box::new(opendal::Error::new(
207                            opendal::ErrorKind::Unsupported,
208                            "etag is required for conditional put",
209                        )),
210                    })?
211                };
212                future_write = future_write.if_match(etag.as_str());
213            }
214        }
215        let rp = future_write.into_send().await.map_err(|err| {
216            match format_object_store_error(err, location.as_ref()) {
217                object_store::Error::Precondition { path, source }
218                    if opts_mode == PutMode::Create =>
219                {
220                    object_store::Error::AlreadyExists { path, source }
221                }
222                e => e,
223            }
224        })?;
225
226        let e_tag = rp.etag().map(|s| s.to_string());
227        let version = rp.version().map(|s| s.to_string());
228
229        Ok(PutResult { e_tag, version })
230    }
231
232    async fn put_multipart(
233        &self,
234        location: &Path,
235    ) -> object_store::Result<Box<dyn MultipartUpload>> {
236        let decoded_location = percent_decode_path(location.as_ref());
237        let writer = self
238            .inner
239            .writer_with(&decoded_location)
240            .concurrent(8)
241            .into_send()
242            .await
243            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
244        let upload = OpendalMultipartUpload::new(writer, location.clone());
245
246        Ok(Box::new(upload))
247    }
248
249    async fn put_multipart_opts(
250        &self,
251        location: &Path,
252        opts: PutMultipartOptions,
253    ) -> object_store::Result<Box<dyn MultipartUpload>> {
254        const DEFAULT_CONCURRENT: usize = 8;
255
256        let mut options = opendal::options::WriteOptions {
257            concurrent: DEFAULT_CONCURRENT,
258            ..Default::default()
259        };
260
261        // Collect user metadata separately to handle multiple entries
262        let mut user_metadata = HashMap::new();
263
264        // Handle attributes if provided
265        for (key, value) in opts.attributes.iter() {
266            match key {
267                object_store::Attribute::CacheControl => {
268                    options.cache_control = Some(value.to_string());
269                }
270                object_store::Attribute::ContentDisposition => {
271                    options.content_disposition = Some(value.to_string());
272                }
273                object_store::Attribute::ContentEncoding => {
274                    options.content_encoding = Some(value.to_string());
275                }
276                object_store::Attribute::ContentLanguage => {
277                    // no support
278                    continue;
279                }
280                object_store::Attribute::ContentType => {
281                    options.content_type = Some(value.to_string());
282                }
283                object_store::Attribute::Metadata(k) => {
284                    user_metadata.insert(k.to_string(), value.to_string());
285                }
286                _ => {}
287            }
288        }
289
290        // Apply user metadata if any entries were collected
291        if !user_metadata.is_empty() {
292            options.user_metadata = Some(user_metadata);
293        }
294
295        let decoded_location = percent_decode_path(location.as_ref());
296        let writer = self
297            .inner
298            .writer_options(&decoded_location, options)
299            .into_send()
300            .await
301            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
302        let upload = OpendalMultipartUpload::new(writer, location.clone());
303
304        Ok(Box::new(upload))
305    }
306
307    async fn get_opts(
308        &self,
309        location: &Path,
310        options: GetOptions,
311    ) -> object_store::Result<GetResult> {
312        let raw_location = percent_decode_path(location.as_ref());
313        let meta = {
314            let mut s = self.inner.stat_with(&raw_location);
315            if let Some(version) = &options.version {
316                s = s.version(version.as_str())
317            }
318            if let Some(if_match) = &options.if_match {
319                s = s.if_match(if_match.as_str());
320            }
321            if let Some(if_none_match) = &options.if_none_match {
322                s = s.if_none_match(if_none_match.as_str());
323            }
324            if let Some(if_modified_since) =
325                options.if_modified_since.and_then(datetime_to_timestamp)
326            {
327                s = s.if_modified_since(if_modified_since);
328            }
329            if let Some(if_unmodified_since) =
330                options.if_unmodified_since.and_then(datetime_to_timestamp)
331            {
332                s = s.if_unmodified_since(if_unmodified_since);
333            }
334            s.into_send()
335                .await
336                .map_err(|err| format_object_store_error(err, location.as_ref()))?
337        };
338
339        // Convert user defined metadata from OpenDAL to object_store attributes
340        let mut attributes = object_store::Attributes::new();
341        if let Some(user_meta) = meta.user_metadata() {
342            for (key, value) in user_meta {
343                attributes.insert(
344                    object_store::Attribute::Metadata(key.clone().into()),
345                    value.clone().into(),
346                );
347            }
348        }
349
350        let meta = ObjectMeta {
351            location: location.clone(),
352            last_modified: meta
353                .last_modified()
354                .and_then(timestamp_to_datetime)
355                .unwrap_or_default(),
356            size: meta.content_length(),
357            e_tag: meta.etag().map(|x| x.to_string()),
358            version: meta.version().map(|x| x.to_string()),
359        };
360
361        if options.head {
362            return Ok(GetResult {
363                payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
364                range: 0..0,
365                meta,
366                attributes,
367            });
368        }
369
370        let reader = {
371            let mut r = self.inner.reader_with(raw_location.as_ref());
372            if let Some(version) = options.version {
373                r = r.version(version.as_str());
374            }
375            if let Some(if_match) = options.if_match {
376                r = r.if_match(if_match.as_str());
377            }
378            if let Some(if_none_match) = options.if_none_match {
379                r = r.if_none_match(if_none_match.as_str());
380            }
381            if let Some(if_modified_since) =
382                options.if_modified_since.and_then(datetime_to_timestamp)
383            {
384                r = r.if_modified_since(if_modified_since);
385            }
386            if let Some(if_unmodified_since) =
387                options.if_unmodified_since.and_then(datetime_to_timestamp)
388            {
389                r = r.if_unmodified_since(if_unmodified_since);
390            }
391            r.into_send()
392                .await
393                .map_err(|err| format_object_store_error(err, location.as_ref()))?
394        };
395
396        let read_range = match options.range {
397            Some(GetRange::Bounded(r)) => {
398                if r.start >= r.end || r.start >= meta.size {
399                    0..0
400                } else {
401                    let end = r.end.min(meta.size);
402                    r.start..end
403                }
404            }
405            Some(GetRange::Offset(r)) => {
406                if r < meta.size {
407                    r..meta.size
408                } else {
409                    0..0
410                }
411            }
412            Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
413            _ => 0..meta.size,
414        };
415
416        let stream = reader
417            .into_bytes_stream(read_range.start..read_range.end)
418            .into_send()
419            .await
420            .map_err(|err| format_object_store_error(err, location.as_ref()))?
421            .into_send()
422            .map_err(|err: io::Error| object_store::Error::Generic {
423                store: "IoError",
424                source: Box::new(err),
425            });
426
427        Ok(GetResult {
428            payload: GetResultPayload::Stream(Box::pin(stream)),
429            range: read_range.start..read_range.end,
430            meta,
431            attributes,
432        })
433    }
434
435    /// Return the bytes that are stored at the specified location
436    /// in the given byte range.
437    ///
438    /// See [`GetRange::Bounded`] for more details on how `range` gets interpreted
439    async fn get_range(&self, location: &Path, range: Range<u64>) -> object_store::Result<Bytes> {
440        // For bounded ranges, we can read directly without calling stat()
441        // This avoids the unnecessary metadata fetch in get_opts
442        let raw_location = percent_decode_path(location.as_ref());
443        let reader = self
444            .inner
445            .reader_with(&raw_location)
446            .into_send()
447            .await
448            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
449
450        reader
451            .read(range.start..range.end)
452            .into_send()
453            .await
454            .map(|buf| buf.to_bytes())
455            .map_err(|err| format_object_store_error(err, location.as_ref()))
456    }
457
458    async fn delete(&self, location: &Path) -> object_store::Result<()> {
459        let decoded_location = percent_decode_path(location.as_ref());
460        self.inner
461            .delete(&decoded_location)
462            .into_send()
463            .await
464            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
465
466        Ok(())
467    }
468
469    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
470        // object_store `Path` always removes trailing slash
471        // need to add it back
472        let path = prefix.map_or("".into(), |x| {
473            format!("{}/", percent_decode_path(x.as_ref()))
474        });
475
476        let this = self.clone();
477        let fut = async move {
478            let stream = this
479                .inner
480                .lister_with(&path)
481                .recursive(true)
482                .await
483                .map_err(|err| format_object_store_error(err, &path))?;
484
485            let stream = stream.then(|res| async {
486                let entry = res.map_err(|err| format_object_store_error(err, ""))?;
487                let meta = entry.metadata();
488
489                Ok(format_object_meta(entry.path(), meta))
490            });
491            Ok::<_, object_store::Error>(stream)
492        };
493
494        fut.into_stream().try_flatten().into_send().boxed()
495    }
496
497    fn list_with_offset(
498        &self,
499        prefix: Option<&Path>,
500        offset: &Path,
501    ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
502        let path = prefix.map_or("".into(), |x| {
503            format!("{}/", percent_decode_path(x.as_ref()))
504        });
505        let offset = offset.clone();
506
507        // clone self for 'static lifetime
508        // clone self is cheap
509        let this = self.clone();
510
511        let fut = async move {
512            let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
513            let mut fut = this.inner.lister_with(&path).recursive(true);
514
515            // Use native start_after support if possible.
516            if list_with_start_after {
517                fut = fut.start_after(offset.as_ref());
518            }
519
520            let lister = fut
521                .await
522                .map_err(|err| format_object_store_error(err, &path))?
523                .then(move |entry| {
524                    let path = path.clone();
525                    let this = this.clone();
526                    async move {
527                        let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
528                        let (path, metadata) = entry.into_parts();
529
530                        // If it's a dir or last_modified is present, we can use it directly.
531                        if metadata.is_dir() || metadata.last_modified().is_some() {
532                            let object_meta = format_object_meta(&path, &metadata);
533                            return Ok(object_meta);
534                        }
535
536                        let metadata = this
537                            .inner
538                            .stat(&path)
539                            .await
540                            .map_err(|err| format_object_store_error(err, &path))?;
541                        let object_meta = format_object_meta(&path, &metadata);
542                        Ok::<_, object_store::Error>(object_meta)
543                    }
544                })
545                .into_send()
546                .boxed();
547
548            let stream = if list_with_start_after {
549                lister
550            } else {
551                lister
552                    .try_filter(move |entry| futures::future::ready(entry.location > offset))
553                    .into_send()
554                    .boxed()
555            };
556
557            Ok::<_, object_store::Error>(stream)
558        };
559
560        fut.into_stream().into_send().try_flatten().boxed()
561    }
562
563    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
564        let path = prefix.map_or("".into(), |x| {
565            format!("{}/", percent_decode_path(x.as_ref()))
566        });
567        let mut stream = self
568            .inner
569            .lister_with(&path)
570            .into_future()
571            .into_send()
572            .await
573            .map_err(|err| format_object_store_error(err, &path))?
574            .into_send();
575
576        let mut common_prefixes = Vec::new();
577        let mut objects = Vec::new();
578
579        while let Some(res) = stream.next().into_send().await {
580            let entry = res.map_err(|err| format_object_store_error(err, ""))?;
581            let meta = entry.metadata();
582
583            if meta.is_dir() {
584                common_prefixes.push(entry.path().into());
585            } else if meta.last_modified().is_some() {
586                objects.push(format_object_meta(entry.path(), meta));
587            } else {
588                let meta = self
589                    .inner
590                    .stat(entry.path())
591                    .into_send()
592                    .await
593                    .map_err(|err| format_object_store_error(err, entry.path()))?;
594                objects.push(format_object_meta(entry.path(), &meta));
595            }
596        }
597
598        Ok(ListResult {
599            common_prefixes,
600            objects,
601        })
602    }
603
604    async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
605        self.copy_request(from, to, false).await
606    }
607
608    async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
609        self.inner
610            .rename(
611                &percent_decode_path(from.as_ref()),
612                &percent_decode_path(to.as_ref()),
613            )
614            .into_send()
615            .await
616            .map_err(|err| format_object_store_error(err, from.as_ref()))?;
617
618        Ok(())
619    }
620
621    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
622        self.copy_request(from, to, true).await
623    }
624}
625
626/// `MultipartUpload`'s impl based on `Writer` in opendal
627///
628/// # Notes
629///
630/// OpenDAL writer can handle concurrent internally we don't generate real `UploadPart` like existing
631/// implementation do. Instead, we just write the part and notify the next task to be written.
632///
633/// The lock here doesn't really involve the write process, it's just for the notify mechanism.
634struct OpendalMultipartUpload {
635    writer: Arc<Mutex<Writer>>,
636    location: Path,
637    next_notify: oneshot::Receiver<()>,
638}
639
640impl OpendalMultipartUpload {
641    fn new(writer: Writer, location: Path) -> Self {
642        // an immediately dropped sender for the first part to write without waiting
643        let (_, rx) = oneshot::channel();
644
645        Self {
646            writer: Arc::new(Mutex::new(writer)),
647            location,
648            next_notify: rx,
649        }
650    }
651}
652
653#[async_trait]
654impl MultipartUpload for OpendalMultipartUpload {
655    fn put_part(&mut self, data: PutPayload) -> UploadPart {
656        let writer = self.writer.clone();
657        let location = self.location.clone();
658
659        // Generate next notify which will be notified after the current part is written.
660        let (tx, rx) = oneshot::channel();
661        // Fetch the notify for current part to wait for it to be written.
662        let last_rx = std::mem::replace(&mut self.next_notify, rx);
663
664        async move {
665            // Wait for the previous part to be written
666            let _ = last_rx.await;
667
668            let mut writer = writer.lock().await;
669            let result = writer
670                .write(Buffer::from_iter(data.into_iter()))
671                .await
672                .map_err(|err| format_object_store_error(err, location.as_ref()));
673
674            // Notify the next part to be written
675            drop(tx);
676
677            result
678        }
679        .into_send()
680        .boxed()
681    }
682
683    async fn complete(&mut self) -> object_store::Result<PutResult> {
684        let mut writer = self.writer.lock().await;
685        let metadata = writer
686            .close()
687            .into_send()
688            .await
689            .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
690
691        let e_tag = metadata.etag().map(|s| s.to_string());
692        let version = metadata.version().map(|s| s.to_string());
693
694        Ok(PutResult { e_tag, version })
695    }
696
697    async fn abort(&mut self) -> object_store::Result<()> {
698        let mut writer = self.writer.lock().await;
699        writer
700            .abort()
701            .into_send()
702            .await
703            .map_err(|err| format_object_store_error(err, self.location.as_ref()))
704    }
705}
706
707impl Debug for OpendalMultipartUpload {
708    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
709        f.debug_struct("OpendalMultipartUpload")
710            .field("location", &self.location)
711            .finish()
712    }
713}
714
715#[cfg(test)]
716mod tests {
717    use bytes::Bytes;
718    use object_store::path::Path;
719    use object_store::{ObjectStore, WriteMultipart};
720    use opendal::services;
721    use rand::prelude::*;
722    use std::sync::Arc;
723
724    use super::*;
725
726    async fn create_test_object_store() -> Arc<dyn ObjectStore> {
727        let op = Operator::new(services::Memory::default()).unwrap().finish();
728        let object_store = Arc::new(OpendalStore::new(op));
729
730        let path: Path = "data/test.txt".into();
731        let bytes = Bytes::from_static(b"hello, world!");
732        object_store.put(&path, bytes.into()).await.unwrap();
733
734        let path: Path = "data/nested/test.txt".into();
735        let bytes = Bytes::from_static(b"hello, world! I am nested.");
736        object_store.put(&path, bytes.into()).await.unwrap();
737
738        object_store
739    }
740
741    #[tokio::test]
742    async fn test_basic() {
743        let op = Operator::new(services::Memory::default()).unwrap().finish();
744        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
745
746        // Retrieve a specific file
747        let path: Path = "data/test.txt".into();
748
749        let bytes = Bytes::from_static(b"hello, world!");
750        object_store.put(&path, bytes.clone().into()).await.unwrap();
751
752        let meta = object_store.head(&path).await.unwrap();
753
754        assert_eq!(meta.size, 13);
755
756        assert_eq!(
757            object_store
758                .get(&path)
759                .await
760                .unwrap()
761                .bytes()
762                .await
763                .unwrap(),
764            bytes
765        );
766    }
767
768    #[tokio::test]
769    async fn test_put_multipart() {
770        let op = Operator::new(services::Memory::default()).unwrap().finish();
771        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
772
773        let mut rng = thread_rng();
774
775        // Case complete
776        let path: Path = "data/test_complete.txt".into();
777        let upload = object_store.put_multipart(&path).await.unwrap();
778
779        let mut write = WriteMultipart::new(upload);
780
781        let mut all_bytes = vec![];
782        let round = rng.gen_range(1..=1024);
783        for _ in 0..round {
784            let size = rng.gen_range(1..=1024);
785            let mut bytes = vec![0; size];
786            rng.fill_bytes(&mut bytes);
787
788            all_bytes.extend_from_slice(&bytes);
789            write.put(bytes.into());
790        }
791
792        let _ = write.finish().await.unwrap();
793
794        let meta = object_store.head(&path).await.unwrap();
795
796        assert_eq!(meta.size, all_bytes.len() as u64);
797
798        assert_eq!(
799            object_store
800                .get(&path)
801                .await
802                .unwrap()
803                .bytes()
804                .await
805                .unwrap(),
806            Bytes::from(all_bytes)
807        );
808
809        // Case abort
810        let path: Path = "data/test_abort.txt".into();
811        let mut upload = object_store.put_multipart(&path).await.unwrap();
812        upload.put_part(vec![1; 1024].into()).await.unwrap();
813        upload.abort().await.unwrap();
814
815        let res = object_store.head(&path).await;
816        let err = res.unwrap_err();
817
818        assert!(matches!(err, object_store::Error::NotFound { .. }))
819    }
820
821    #[tokio::test]
822    async fn test_list() {
823        let object_store = create_test_object_store().await;
824        let path: Path = "data/".into();
825        let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
826        assert_eq!(results.len(), 2);
827        let mut locations = results
828            .iter()
829            .map(|x| x.as_ref().unwrap().location.as_ref())
830            .collect::<Vec<_>>();
831
832        let expected_files = vec![
833            (
834                "data/nested/test.txt",
835                Bytes::from_static(b"hello, world! I am nested."),
836            ),
837            ("data/test.txt", Bytes::from_static(b"hello, world!")),
838        ];
839
840        let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
841
842        locations.sort();
843        assert_eq!(locations, expected_locations);
844
845        for (location, bytes) in expected_files {
846            let path: Path = location.into();
847            assert_eq!(
848                object_store
849                    .get(&path)
850                    .await
851                    .unwrap()
852                    .bytes()
853                    .await
854                    .unwrap(),
855                bytes
856            );
857        }
858    }
859
860    #[tokio::test]
861    async fn test_list_with_delimiter() {
862        let object_store = create_test_object_store().await;
863        let path: Path = "data/".into();
864        let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
865        assert_eq!(result.objects.len(), 1);
866        assert_eq!(result.common_prefixes.len(), 1);
867        assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
868        assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
869    }
870
871    #[tokio::test]
872    async fn test_list_with_offset() {
873        let object_store = create_test_object_store().await;
874        let path: Path = "data/".into();
875        let offset: Path = "data/nested/test.txt".into();
876        let result = object_store
877            .list_with_offset(Some(&path), &offset)
878            .collect::<Vec<_>>()
879            .await;
880        assert_eq!(result.len(), 1);
881        assert_eq!(
882            result[0].as_ref().unwrap().location.as_ref(),
883            "data/test.txt"
884        );
885    }
886
887    /// Custom layer that counts stat operations for testing
888    mod stat_counter {
889        use super::*;
890        use std::sync::atomic::{AtomicUsize, Ordering};
891
892        #[derive(Debug, Clone)]
893        pub struct StatCounterLayer {
894            count: Arc<AtomicUsize>,
895        }
896
897        impl StatCounterLayer {
898            pub fn new(count: Arc<AtomicUsize>) -> Self {
899                Self { count }
900            }
901        }
902
903        impl<A: opendal::raw::Access> opendal::raw::Layer<A> for StatCounterLayer {
904            type LayeredAccess = StatCounterAccessor<A>;
905
906            fn layer(&self, inner: A) -> Self::LayeredAccess {
907                StatCounterAccessor {
908                    inner,
909                    count: self.count.clone(),
910                }
911            }
912        }
913
914        #[derive(Debug, Clone)]
915        pub struct StatCounterAccessor<A> {
916            inner: A,
917            count: Arc<AtomicUsize>,
918        }
919
920        impl<A: opendal::raw::Access> opendal::raw::LayeredAccess for StatCounterAccessor<A> {
921            type Inner = A;
922            type Reader = A::Reader;
923            type Writer = A::Writer;
924            type Lister = A::Lister;
925            type Deleter = A::Deleter;
926
927            fn inner(&self) -> &Self::Inner {
928                &self.inner
929            }
930
931            async fn stat(
932                &self,
933                path: &str,
934                args: opendal::raw::OpStat,
935            ) -> opendal::Result<opendal::raw::RpStat> {
936                self.count.fetch_add(1, Ordering::SeqCst);
937                self.inner.stat(path, args).await
938            }
939
940            async fn read(
941                &self,
942                path: &str,
943                args: opendal::raw::OpRead,
944            ) -> opendal::Result<(opendal::raw::RpRead, Self::Reader)> {
945                self.inner.read(path, args).await
946            }
947
948            async fn write(
949                &self,
950                path: &str,
951                args: opendal::raw::OpWrite,
952            ) -> opendal::Result<(opendal::raw::RpWrite, Self::Writer)> {
953                self.inner.write(path, args).await
954            }
955
956            async fn delete(&self) -> opendal::Result<(opendal::raw::RpDelete, Self::Deleter)> {
957                self.inner.delete().await
958            }
959
960            async fn list(
961                &self,
962                path: &str,
963                args: opendal::raw::OpList,
964            ) -> opendal::Result<(opendal::raw::RpList, Self::Lister)> {
965                self.inner.list(path, args).await
966            }
967
968            async fn copy(
969                &self,
970                from: &str,
971                to: &str,
972                args: opendal::raw::OpCopy,
973            ) -> opendal::Result<opendal::raw::RpCopy> {
974                self.inner.copy(from, to, args).await
975            }
976
977            async fn rename(
978                &self,
979                from: &str,
980                to: &str,
981                args: opendal::raw::OpRename,
982            ) -> opendal::Result<opendal::raw::RpRename> {
983                self.inner.rename(from, to, args).await
984            }
985        }
986    }
987
988    #[tokio::test]
989    async fn test_get_range_no_stat() {
990        use std::sync::atomic::{AtomicUsize, Ordering};
991
992        // Create a stat counter and operator with tracking layer
993        let stat_count = Arc::new(AtomicUsize::new(0));
994        let op = Operator::new(opendal::services::Memory::default())
995            .unwrap()
996            .layer(stat_counter::StatCounterLayer::new(stat_count.clone()))
997            .finish();
998        let store = OpendalStore::new(op);
999
1000        // Create a test file
1001        let location = "test_get_range.txt".into();
1002        let value = Bytes::from_static(b"Hello, world!");
1003        store.put(&location, value.clone().into()).await.unwrap();
1004
1005        // Reset counter after put
1006        stat_count.store(0, Ordering::SeqCst);
1007
1008        // Test 1: get_range should NOT call stat()
1009        let ret = store.get_range(&location, 0..5).await.unwrap();
1010        assert_eq!(Bytes::from_static(b"Hello"), ret);
1011        assert_eq!(
1012            stat_count.load(Ordering::SeqCst),
1013            0,
1014            "get_range should not call stat()"
1015        );
1016
1017        // Reset counter
1018        stat_count.store(0, Ordering::SeqCst);
1019
1020        // Test 2: get_opts SHOULD call stat() to get metadata
1021        let opts = object_store::GetOptions {
1022            range: Some(object_store::GetRange::Bounded(0..5)),
1023            ..Default::default()
1024        };
1025        let ret = store.get_opts(&location, opts).await.unwrap();
1026        let data = ret.bytes().await.unwrap();
1027        assert_eq!(Bytes::from_static(b"Hello"), data);
1028        assert!(
1029            stat_count.load(Ordering::SeqCst) > 0,
1030            "get_opts should call stat() to get metadata"
1031        );
1032
1033        // Cleanup
1034        store.delete(&location).await.unwrap();
1035    }
1036}