Skip to main content

object_store_opendal/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::ops::Range;
22use std::sync::Arc;
23
24use crate::datetime_to_timestamp;
25use crate::utils::*;
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::FutureExt;
29use futures::StreamExt;
30use futures::TryStreamExt;
31use futures::stream::BoxStream;
32use mea::mutex::Mutex;
33use mea::oneshot;
34use object_store::Attributes;
35use object_store::CopyMode as ObjectStoreCopyMode;
36use object_store::CopyOptions as ObjectStoreCopyOptions;
37use object_store::ListResult;
38use object_store::MultipartUpload;
39use object_store::ObjectMeta;
40use object_store::ObjectStore;
41use object_store::PutMultipartOptions;
42use object_store::PutOptions;
43use object_store::PutPayload;
44use object_store::PutResult;
45use object_store::path::Path;
46use object_store::{GetOptions, UploadPart};
47use object_store::{GetRange, GetResultPayload};
48use object_store::{GetResult, PutMode};
49use opendal::Buffer;
50use opendal::BytesRange;
51use opendal::Writer;
52use opendal::options::CopyOptions;
53use opendal::options::ReaderOptions;
54use opendal::options::StatOptions;
55use opendal::raw::percent_decode_path;
56use opendal::{Operator, OperatorInfo};
57use std::collections::HashMap;
58
59const DEFAULT_CONCURRENT: usize = 8;
60
61fn format_object_attributes(meta: &opendal::Metadata) -> Attributes {
62    let mut attributes = Attributes::new();
63    if let Some(user_meta) = meta.user_metadata() {
64        for (key, value) in user_meta {
65            attributes.insert(
66                object_store::Attribute::Metadata(key.clone().into()),
67                value.clone().into(),
68            );
69        }
70    }
71
72    attributes
73}
74
75fn format_reader_options(options: &GetOptions, content_length_hint: Option<u64>) -> ReaderOptions {
76    ReaderOptions {
77        version: options.version.clone(),
78        if_match: options.if_match.clone(),
79        if_none_match: options.if_none_match.clone(),
80        if_modified_since: options.if_modified_since.and_then(datetime_to_timestamp),
81        if_unmodified_since: options.if_unmodified_since.and_then(datetime_to_timestamp),
82        content_length_hint,
83        ..Default::default()
84    }
85}
86
87fn format_stat_options(options: &GetOptions) -> StatOptions {
88    StatOptions {
89        version: options.version.clone(),
90        if_match: options.if_match.clone(),
91        if_none_match: options.if_none_match.clone(),
92        if_modified_since: options.if_modified_since.and_then(datetime_to_timestamp),
93        if_unmodified_since: options.if_unmodified_since.and_then(datetime_to_timestamp),
94        ..Default::default()
95    }
96}
97
98fn format_read_range(range: Option<&GetRange>, size: u64) -> Range<u64> {
99    match range {
100        Some(GetRange::Bounded(r)) => {
101            if r.start >= r.end || r.start >= size {
102                0..0
103            } else {
104                r.start..r.end.min(size)
105            }
106        }
107        Some(GetRange::Offset(offset)) => {
108            if *offset < size {
109                *offset..size
110            } else {
111                0..0
112            }
113        }
114        Some(GetRange::Suffix(suffix)) if *suffix < size => (size - *suffix)..size,
115        _ => 0..size,
116    }
117}
118
119fn format_without_stat_error(err: opendal::Error, path: &str) -> object_store::Error {
120    match err.kind() {
121        // Ask get_opts to fall back to the stat path when read-open can't provide
122        // enough metadata to build a valid GetResult, such as ranges beyond EOF.
123        opendal::ErrorKind::Unsupported | opendal::ErrorKind::RangeNotSatisfied => {
124            object_store::Error::NotSupported {
125                source: Box::new(err),
126            }
127        }
128        _ => format_object_store_error(err, path),
129    }
130}
131
132/// OpendalStore implements ObjectStore trait by using opendal.
133///
134/// This allows users to use opendal as an object store without extra cost.
135///
136/// Visit [`opendal::services`] for more information about supported services.
137///
138/// ```no_run
139/// use std::sync::Arc;
140///
141/// use bytes::Bytes;
142/// use object_store::path::Path;
143/// use object_store::ObjectStore;
144/// use object_store::ObjectStoreExt;
145/// use object_store_opendal::OpendalStore;
146/// use opendal::services::S3;
147/// use opendal::{Builder, Operator};
148///
149/// #[tokio::main]
150/// async fn main() {
151///    let builder = S3::default()
152///     .access_key_id("my_access_key")
153///     .secret_access_key("my_secret_key")
154///     .endpoint("my_endpoint")
155///     .region("my_region");
156///
157///     // Create a new operator
158///     let operator = Operator::new(builder).unwrap();
159///
160///     // Create a new object store
161///     let object_store = Arc::new(OpendalStore::new(operator));
162///
163///     let path = Path::from("data/nested/test.txt");
164///     let bytes = Bytes::from_static(b"hello, world! I am nested.");
165///
166///     object_store.put(&path, bytes.clone().into()).await.unwrap();
167///
168///     let content = object_store
169///         .get(&path)
170///         .await
171///         .unwrap()
172///         .bytes()
173///         .await
174///         .unwrap();
175///
176///     assert_eq!(content, bytes);
177/// }
178/// ```
179#[derive(Clone)]
180pub struct OpendalStore {
181    info: Arc<OperatorInfo>,
182    inner: Operator,
183}
184
185impl OpendalStore {
186    /// Create OpendalStore by given Operator.
187    pub fn new(op: Operator) -> Self {
188        Self {
189            info: op.info().into(),
190            inner: op,
191        }
192    }
193
194    /// Get the Operator info.
195    pub fn info(&self) -> &OperatorInfo {
196        self.info.as_ref()
197    }
198
199    async fn get_opts_without_stat(
200        &self,
201        location: &Path,
202        raw_location: &str,
203        options: &GetOptions,
204    ) -> object_store::Result<GetResult> {
205        let reader = self
206            .inner
207            .reader_options(raw_location, format_reader_options(options, None))
208            .into_send()
209            .await
210            .map_err(|err| format_without_stat_error(err, location.as_ref()))?;
211
212        let mut stream = match options.range.as_ref() {
213            Some(GetRange::Bounded(range)) => {
214                reader
215                    .into_bytes_stream(range.start..range.end)
216                    .into_send()
217                    .await
218            }
219            Some(GetRange::Offset(offset)) => reader.into_bytes_stream(*offset..).into_send().await,
220            Some(GetRange::Suffix(suffix)) => {
221                reader
222                    .into_bytes_stream(BytesRange::suffix(*suffix))
223                    .into_send()
224                    .await
225            }
226            None => reader.into_bytes_stream(..).into_send().await,
227        }
228        .map_err(|err| format_without_stat_error(err, location.as_ref()))?;
229
230        let metadata = stream
231            .metadata()
232            .into_send()
233            .await
234            .map_err(|err| format_without_stat_error(err, location.as_ref()))?;
235        let attributes = format_object_attributes(&metadata);
236        let meta = format_object_meta(location.as_ref(), &metadata);
237        let read_range = format_read_range(options.range.as_ref(), meta.size);
238
239        if read_range.start >= read_range.end {
240            return Ok(GetResult {
241                payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
242                range: read_range,
243                meta,
244                attributes,
245            });
246        }
247
248        if matches!(
249            options.range.as_ref(),
250            Some(GetRange::Bounded(range)) if range.end > meta.size
251        ) {
252            let reader = self
253                .inner
254                .reader_options(
255                    raw_location,
256                    format_reader_options(options, Some(meta.size)),
257                )
258                .into_send()
259                .await
260                .map_err(|err| format_object_store_error(err, location.as_ref()))?;
261            stream = reader
262                .into_bytes_stream(read_range.start..read_range.end)
263                .into_send()
264                .await
265                .map_err(|err| format_object_store_error(err, location.as_ref()))?;
266        }
267
268        let stream = stream
269            .into_send()
270            .map_err(|err: io::Error| object_store::Error::Generic {
271                store: "IoError",
272                source: Box::new(err),
273            });
274
275        Ok(GetResult {
276            payload: GetResultPayload::Stream(Box::pin(stream)),
277            range: read_range,
278            meta,
279            attributes,
280        })
281    }
282
283    async fn get_opts_with_stat(
284        &self,
285        location: &Path,
286        raw_location: &str,
287        options: &GetOptions,
288    ) -> object_store::Result<GetResult> {
289        let metadata = self
290            .inner
291            .stat_options(raw_location, format_stat_options(options))
292            .into_send()
293            .await
294            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
295        let attributes = format_object_attributes(&metadata);
296        let meta = format_object_meta(location.as_ref(), &metadata);
297
298        if options.head {
299            return Ok(GetResult {
300                payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
301                range: 0..0,
302                meta,
303                attributes,
304            });
305        }
306
307        let read_range = format_read_range(options.range.as_ref(), meta.size);
308        if read_range.start >= read_range.end {
309            return Ok(GetResult {
310                payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
311                range: read_range,
312                meta,
313                attributes,
314            });
315        }
316
317        let reader = self
318            .inner
319            .reader_options(
320                raw_location,
321                format_reader_options(options, Some(meta.size)),
322            )
323            .into_send()
324            .await
325            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
326        let stream = reader
327            .into_bytes_stream(read_range.start..read_range.end)
328            .into_send()
329            .await
330            .map_err(|err| format_object_store_error(err, location.as_ref()))?
331            .into_send()
332            .map_err(|err: io::Error| object_store::Error::Generic {
333                store: "IoError",
334                source: Box::new(err),
335            });
336
337        Ok(GetResult {
338            payload: GetResultPayload::Stream(Box::pin(stream)),
339            range: read_range,
340            meta,
341            attributes,
342        })
343    }
344
345    /// Copy a file from one location to another
346    async fn copy_request(
347        &self,
348        from: &Path,
349        to: &Path,
350        if_not_exists: bool,
351    ) -> object_store::Result<()> {
352        let mut copy_options = CopyOptions::default();
353        if if_not_exists {
354            copy_options.if_not_exists = true;
355        }
356
357        // Perform the copy operation
358        self.inner
359            .copy_options(
360                &percent_decode_path(from.as_ref()),
361                &percent_decode_path(to.as_ref()),
362                copy_options,
363            )
364            .into_send()
365            .await
366            .map_err(|err| {
367                if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
368                    object_store::Error::AlreadyExists {
369                        path: to.to_string(),
370                        source: Box::new(err),
371                    }
372                } else {
373                    format_object_store_error(err, from.as_ref())
374                }
375            })?;
376
377        Ok(())
378    }
379}
380
381impl Debug for OpendalStore {
382    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
383        f.debug_struct("OpendalStore")
384            .field("scheme", &self.info.scheme())
385            .field("name", &self.info.name())
386            .field("root", &self.info.root())
387            .field("capability", &self.info.capability())
388            .finish()
389    }
390}
391
392impl Display for OpendalStore {
393    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
394        let info = self.inner.info();
395        write!(
396            f,
397            "Opendal({}, bucket={}, root={})",
398            info.scheme(),
399            info.name(),
400            info.root()
401        )
402    }
403}
404
405impl From<Operator> for OpendalStore {
406    fn from(value: Operator) -> Self {
407        Self::new(value)
408    }
409}
410
411#[async_trait]
412impl ObjectStore for OpendalStore {
413    async fn put_opts(
414        &self,
415        location: &Path,
416        bytes: PutPayload,
417        opts: PutOptions,
418    ) -> object_store::Result<PutResult> {
419        let decoded_location = percent_decode_path(location.as_ref());
420        let mut future_write = self
421            .inner
422            .write_with(&decoded_location, Buffer::from_iter(bytes));
423        let opts_mode = opts.mode.clone();
424        match opts.mode {
425            PutMode::Overwrite => {}
426            PutMode::Create => {
427                future_write = future_write.if_not_exists(true);
428            }
429            PutMode::Update(update_version) => {
430                let Some(etag) = update_version.e_tag else {
431                    Err(object_store::Error::NotSupported {
432                        source: Box::new(opendal::Error::new(
433                            opendal::ErrorKind::Unsupported,
434                            "etag is required for conditional put",
435                        )),
436                    })?
437                };
438                future_write = future_write.if_match(etag.as_str());
439            }
440        }
441        let rp = future_write.into_send().await.map_err(|err| {
442            match format_object_store_error(err, location.as_ref()) {
443                object_store::Error::Precondition { path, source }
444                    if opts_mode == PutMode::Create =>
445                {
446                    object_store::Error::AlreadyExists { path, source }
447                }
448                e => e,
449            }
450        })?;
451
452        let e_tag = rp.etag().map(|s| s.to_string());
453        let version = rp.version().map(|s| s.to_string());
454
455        Ok(PutResult { e_tag, version })
456    }
457
458    async fn put_multipart_opts(
459        &self,
460        location: &Path,
461        opts: PutMultipartOptions,
462    ) -> object_store::Result<Box<dyn MultipartUpload>> {
463        let mut options = opendal::options::WriteOptions {
464            concurrent: DEFAULT_CONCURRENT,
465            ..Default::default()
466        };
467
468        // Collect user metadata separately to handle multiple entries
469        let mut user_metadata = HashMap::new();
470
471        // Handle attributes if provided
472        for (key, value) in opts.attributes.iter() {
473            match key {
474                object_store::Attribute::CacheControl => {
475                    options.cache_control = Some(value.to_string());
476                }
477                object_store::Attribute::ContentDisposition => {
478                    options.content_disposition = Some(value.to_string());
479                }
480                object_store::Attribute::ContentEncoding => {
481                    options.content_encoding = Some(value.to_string());
482                }
483                object_store::Attribute::ContentLanguage => {
484                    // no support
485                    continue;
486                }
487                object_store::Attribute::ContentType => {
488                    options.content_type = Some(value.to_string());
489                }
490                object_store::Attribute::Metadata(k) => {
491                    user_metadata.insert(k.to_string(), value.to_string());
492                }
493                _ => {}
494            }
495        }
496
497        // Apply user metadata if any entries were collected
498        if !user_metadata.is_empty() {
499            options.user_metadata = Some(user_metadata);
500        }
501
502        let decoded_location = percent_decode_path(location.as_ref());
503        let writer = self
504            .inner
505            .writer_options(&decoded_location, options)
506            .into_send()
507            .await
508            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
509        let upload = OpendalMultipartUpload::new(writer, location.clone());
510
511        Ok(Box::new(upload))
512    }
513
514    async fn get_opts(
515        &self,
516        location: &Path,
517        options: GetOptions,
518    ) -> object_store::Result<GetResult> {
519        let raw_location = percent_decode_path(location.as_ref());
520
521        if options.head {
522            return self
523                .get_opts_with_stat(location, &raw_location, &options)
524                .await;
525        }
526
527        match self
528            .get_opts_without_stat(location, &raw_location, &options)
529            .await
530        {
531            Ok(result) => return Ok(result),
532            Err(object_store::Error::NotSupported { .. }) => {}
533            Err(err) => return Err(err),
534        }
535
536        self.get_opts_with_stat(location, &raw_location, &options)
537            .await
538    }
539
540    async fn get_ranges(
541        &self,
542        location: &Path,
543        ranges: &[Range<u64>],
544    ) -> object_store::Result<Vec<Bytes>> {
545        let raw_location = percent_decode_path(location.as_ref());
546        let reader = self
547            .inner
548            .reader(&raw_location)
549            .into_send()
550            .await
551            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
552
553        let location_ref: Arc<str> = Arc::from(location.as_ref());
554        futures::stream::iter(ranges.iter().cloned())
555            .map(|range| {
556                let reader = reader.clone();
557                let location_ref = location_ref.clone();
558                async move {
559                    reader
560                        .read(range)
561                        .into_send()
562                        .await
563                        .map(|buf| buf.to_bytes())
564                        .map_err(|err| format_object_store_error(err, &location_ref))
565                }
566            })
567            .buffered(DEFAULT_CONCURRENT)
568            .try_collect()
569            .await
570    }
571
572    fn delete_stream(
573        &self,
574        locations: BoxStream<'static, object_store::Result<Path>>,
575    ) -> BoxStream<'static, object_store::Result<Path>> {
576        let this = self.clone();
577        locations
578            .and_then(move |location| {
579                let this = this.clone();
580                async move {
581                    let decoded = percent_decode_path(location.as_ref());
582                    this.inner
583                        .delete(&decoded)
584                        .into_send()
585                        .await
586                        .map_err(|err| format_object_store_error(err, location.as_ref()))?;
587                    Ok(location)
588                }
589                .into_send()
590            })
591            .boxed()
592    }
593
594    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
595        // object_store `Path` always removes trailing slash
596        // need to add it back
597        let path = prefix.map_or("".into(), |x| {
598            format!("{}/", percent_decode_path(x.as_ref()))
599        });
600
601        let this = self.clone();
602        let fut = async move {
603            let stream = this
604                .inner
605                .lister_with(&path)
606                .recursive(true)
607                .await
608                .map_err(|err| format_object_store_error(err, &path))?;
609
610            let stream = stream.then(|res| async {
611                let entry = res.map_err(|err| format_object_store_error(err, ""))?;
612                let meta = entry.metadata();
613
614                Ok(format_object_meta(entry.path(), meta))
615            });
616            Ok::<_, object_store::Error>(stream)
617        };
618
619        fut.into_stream().try_flatten().into_send().boxed()
620    }
621
622    fn list_with_offset(
623        &self,
624        prefix: Option<&Path>,
625        offset: &Path,
626    ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
627        let path = prefix.map_or("".into(), |x| {
628            format!("{}/", percent_decode_path(x.as_ref()))
629        });
630        let offset = offset.clone();
631
632        // clone self for 'static lifetime
633        // clone self is cheap
634        let this = self.clone();
635
636        let fut = async move {
637            let list_with_start_after = this.inner.info().capability().list_with_start_after;
638            let mut fut = this.inner.lister_with(&path).recursive(true);
639
640            // Use native start_after support if possible.
641            if list_with_start_after {
642                fut = fut.start_after(offset.as_ref());
643            }
644
645            let lister = fut
646                .await
647                .map_err(|err| format_object_store_error(err, &path))?
648                .then(move |entry| {
649                    let path = path.clone();
650                    let this = this.clone();
651                    async move {
652                        let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
653                        let (path, metadata) = entry.into_parts();
654
655                        // If it's a dir or last_modified is present, we can use it directly.
656                        if metadata.is_dir() || metadata.last_modified().is_some() {
657                            let object_meta = format_object_meta(&path, &metadata);
658                            return Ok(object_meta);
659                        }
660
661                        let metadata = this
662                            .inner
663                            .stat(&path)
664                            .await
665                            .map_err(|err| format_object_store_error(err, &path))?;
666                        let object_meta = format_object_meta(&path, &metadata);
667                        Ok::<_, object_store::Error>(object_meta)
668                    }
669                })
670                .into_send()
671                .boxed();
672
673            let stream = if list_with_start_after {
674                lister
675            } else {
676                lister
677                    .try_filter(move |entry| futures::future::ready(entry.location > offset))
678                    .into_send()
679                    .boxed()
680            };
681
682            Ok::<_, object_store::Error>(stream)
683        };
684
685        fut.into_stream().into_send().try_flatten().boxed()
686    }
687
688    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
689        let path = prefix.map_or("".into(), |x| {
690            format!("{}/", percent_decode_path(x.as_ref()))
691        });
692        let mut stream = self
693            .inner
694            .lister_with(&path)
695            .into_future()
696            .into_send()
697            .await
698            .map_err(|err| format_object_store_error(err, &path))?
699            .into_send();
700
701        let mut common_prefixes = Vec::new();
702        let mut objects = Vec::new();
703
704        while let Some(res) = stream.next().into_send().await {
705            let entry = res.map_err(|err| format_object_store_error(err, ""))?;
706            let meta = entry.metadata();
707
708            if meta.is_dir() {
709                common_prefixes.push(entry.path().into());
710            } else if meta.last_modified().is_some() {
711                objects.push(format_object_meta(entry.path(), meta));
712            } else {
713                let meta = self
714                    .inner
715                    .stat(entry.path())
716                    .into_send()
717                    .await
718                    .map_err(|err| format_object_store_error(err, entry.path()))?;
719                objects.push(format_object_meta(entry.path(), &meta));
720            }
721        }
722
723        Ok(ListResult {
724            common_prefixes,
725            objects,
726        })
727    }
728
729    async fn copy_opts(
730        &self,
731        from: &Path,
732        to: &Path,
733        options: ObjectStoreCopyOptions,
734    ) -> object_store::Result<()> {
735        let if_not_exists = matches!(options.mode, ObjectStoreCopyMode::Create);
736        self.copy_request(from, to, if_not_exists).await
737    }
738}
739
740/// `MultipartUpload`'s impl based on `Writer` in opendal
741///
742/// # Notes
743///
744/// OpenDAL writer can handle concurrent internally we don't generate real `UploadPart` like existing
745/// implementation do. Instead, we just write the part and notify the next task to be written.
746///
747/// The lock here doesn't really involve the write process, it's just for the notify mechanism.
748struct OpendalMultipartUpload {
749    writer: Arc<Mutex<Writer>>,
750    location: Path,
751    next_notify: oneshot::Receiver<()>,
752}
753
754impl OpendalMultipartUpload {
755    fn new(writer: Writer, location: Path) -> Self {
756        // an immediately dropped sender for the first part to write without waiting
757        let (_, rx) = oneshot::channel();
758
759        Self {
760            writer: Arc::new(Mutex::new(writer)),
761            location,
762            next_notify: rx,
763        }
764    }
765}
766
767#[async_trait]
768impl MultipartUpload for OpendalMultipartUpload {
769    fn put_part(&mut self, data: PutPayload) -> UploadPart {
770        let writer = self.writer.clone();
771        let location = self.location.clone();
772
773        // Generate next notify which will be notified after the current part is written.
774        let (tx, rx) = oneshot::channel();
775        // Fetch the notify for current part to wait for it to be written.
776        let last_rx = std::mem::replace(&mut self.next_notify, rx);
777
778        async move {
779            // Wait for the previous part to be written
780            let _ = last_rx.await;
781
782            let mut writer = writer.lock().await;
783            let result = writer
784                .write(Buffer::from_iter(data))
785                .await
786                .map_err(|err| format_object_store_error(err, location.as_ref()));
787
788            // Notify the next part to be written
789            drop(tx);
790
791            result
792        }
793        .into_send()
794        .boxed()
795    }
796
797    async fn complete(&mut self) -> object_store::Result<PutResult> {
798        let mut writer = self.writer.lock().await;
799        let metadata = writer
800            .close()
801            .into_send()
802            .await
803            .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
804
805        let e_tag = metadata.etag().map(|s| s.to_string());
806        let version = metadata.version().map(|s| s.to_string());
807
808        Ok(PutResult { e_tag, version })
809    }
810
811    async fn abort(&mut self) -> object_store::Result<()> {
812        let mut writer = self.writer.lock().await;
813        writer
814            .abort()
815            .into_send()
816            .await
817            .map_err(|err| format_object_store_error(err, self.location.as_ref()))
818    }
819}
820
821impl Debug for OpendalMultipartUpload {
822    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
823        f.debug_struct("OpendalMultipartUpload")
824            .field("location", &self.location)
825            .finish()
826    }
827}
828
829#[cfg(test)]
830mod tests {
831    use bytes::Bytes;
832    use object_store::path::Path;
833    use object_store::{ObjectStore, ObjectStoreExt, WriteMultipart};
834    use opendal::services;
835    use rand::prelude::*;
836    use rand::rng;
837    use std::sync::Arc;
838
839    use super::*;
840
841    async fn create_test_object_store() -> Arc<dyn ObjectStore> {
842        let op = Operator::new(services::Memory::default()).unwrap();
843        let object_store = Arc::new(OpendalStore::new(op));
844
845        let path: Path = "data/test.txt".into();
846        let bytes = Bytes::from_static(b"hello, world!");
847        object_store.put(&path, bytes.into()).await.unwrap();
848
849        let path: Path = "data/nested/test.txt".into();
850        let bytes = Bytes::from_static(b"hello, world! I am nested.");
851        object_store.put(&path, bytes.into()).await.unwrap();
852
853        object_store
854    }
855
856    #[tokio::test]
857    async fn test_basic() {
858        let op = Operator::new(services::Memory::default()).unwrap();
859        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
860
861        // Retrieve a specific file
862        let path: Path = "data/test.txt".into();
863
864        let bytes = Bytes::from_static(b"hello, world!");
865        object_store.put(&path, bytes.clone().into()).await.unwrap();
866
867        let meta = object_store.head(&path).await.unwrap();
868
869        assert_eq!(meta.size, 13);
870
871        assert_eq!(
872            object_store
873                .get(&path)
874                .await
875                .unwrap()
876                .bytes()
877                .await
878                .unwrap(),
879            bytes
880        );
881    }
882
883    #[tokio::test]
884    async fn test_put_multipart() {
885        let op = Operator::new(services::Memory::default()).unwrap();
886        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
887
888        let mut rng = rng();
889
890        // Case complete
891        let path: Path = "data/test_complete.txt".into();
892        let upload = object_store.put_multipart(&path).await.unwrap();
893
894        let mut write = WriteMultipart::new(upload);
895
896        let mut all_bytes = vec![];
897        let round = rng.random_range(1..=1024);
898        for _ in 0..round {
899            let size = rng.random_range(1..=1024);
900            let mut bytes = vec![0; size];
901            rng.fill_bytes(&mut bytes);
902
903            all_bytes.extend_from_slice(&bytes);
904            write.put(bytes.into());
905        }
906
907        let _ = write.finish().await.unwrap();
908
909        let meta = object_store.head(&path).await.unwrap();
910
911        assert_eq!(meta.size, all_bytes.len() as u64);
912
913        assert_eq!(
914            object_store
915                .get(&path)
916                .await
917                .unwrap()
918                .bytes()
919                .await
920                .unwrap(),
921            Bytes::from(all_bytes)
922        );
923
924        // Case abort
925        let path: Path = "data/test_abort.txt".into();
926        let mut upload = object_store.put_multipart(&path).await.unwrap();
927        upload.put_part(vec![1; 1024].into()).await.unwrap();
928        upload.abort().await.unwrap();
929
930        let res = object_store.head(&path).await;
931        let err = res.unwrap_err();
932
933        assert!(matches!(err, object_store::Error::NotFound { .. }))
934    }
935
936    #[tokio::test]
937    async fn test_list() {
938        let object_store = create_test_object_store().await;
939        let path: Path = "data/".into();
940        let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
941        assert_eq!(results.len(), 2);
942        let mut locations = results
943            .iter()
944            .map(|x| x.as_ref().unwrap().location.as_ref())
945            .collect::<Vec<_>>();
946
947        let expected_files = vec![
948            (
949                "data/nested/test.txt",
950                Bytes::from_static(b"hello, world! I am nested."),
951            ),
952            ("data/test.txt", Bytes::from_static(b"hello, world!")),
953        ];
954
955        let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
956
957        locations.sort();
958        assert_eq!(locations, expected_locations);
959
960        for (location, bytes) in expected_files {
961            let path: Path = location.into();
962            assert_eq!(
963                object_store
964                    .get(&path)
965                    .await
966                    .unwrap()
967                    .bytes()
968                    .await
969                    .unwrap(),
970                bytes
971            );
972        }
973    }
974
975    #[tokio::test]
976    async fn test_list_with_delimiter() {
977        let object_store = create_test_object_store().await;
978        let path: Path = "data/".into();
979        let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
980        assert_eq!(result.objects.len(), 1);
981        assert_eq!(result.common_prefixes.len(), 1);
982        assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
983        assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
984    }
985
986    #[tokio::test]
987    async fn test_list_with_offset() {
988        let object_store = create_test_object_store().await;
989        let path: Path = "data/".into();
990        let offset: Path = "data/nested/test.txt".into();
991        let result = object_store
992            .list_with_offset(Some(&path), &offset)
993            .collect::<Vec<_>>()
994            .await;
995        assert_eq!(result.len(), 1);
996        assert_eq!(
997            result[0].as_ref().unwrap().location.as_ref(),
998            "data/test.txt"
999        );
1000    }
1001
1002    /// Custom layer that counts stat operations for testing
1003    mod stat_counter {
1004        use super::*;
1005        use std::sync::atomic::{AtomicUsize, Ordering};
1006
1007        #[derive(Debug, Clone)]
1008        pub struct StatCounterLayer {
1009            count: Arc<AtomicUsize>,
1010        }
1011
1012        impl StatCounterLayer {
1013            pub fn new(count: Arc<AtomicUsize>) -> Self {
1014                Self { count }
1015            }
1016        }
1017
1018        impl opendal::raw::Layer for StatCounterLayer {
1019            fn apply_service(&self, srv: opendal::raw::Servicer) -> opendal::raw::Servicer {
1020                Arc::new(StatCounterService {
1021                    srv,
1022                    count: self.count.clone(),
1023                })
1024            }
1025        }
1026
1027        #[derive(Debug)]
1028        pub struct StatCounterService {
1029            srv: opendal::raw::Servicer,
1030            count: Arc<AtomicUsize>,
1031        }
1032
1033        impl opendal::raw::Service for StatCounterService {
1034            type Reader = opendal::raw::oio::Reader;
1035            type Writer = opendal::raw::oio::Writer;
1036            type Lister = opendal::raw::oio::Lister;
1037            type Deleter = opendal::raw::oio::Deleter;
1038            type Copier = opendal::raw::oio::Copier;
1039
1040            fn info(&self) -> opendal::raw::ServiceInfo {
1041                self.srv.info()
1042            }
1043
1044            fn capability(&self) -> opendal::Capability {
1045                self.srv.capability()
1046            }
1047
1048            async fn create_dir(
1049                &self,
1050                ctx: &opendal::OperationContext,
1051                path: &str,
1052                args: opendal::raw::OpCreateDir,
1053            ) -> opendal::Result<opendal::raw::RpCreateDir> {
1054                self.srv.create_dir(ctx, path, args).await
1055            }
1056
1057            async fn stat(
1058                &self,
1059                ctx: &opendal::OperationContext,
1060                path: &str,
1061                args: opendal::raw::OpStat,
1062            ) -> opendal::Result<opendal::raw::RpStat> {
1063                self.count.fetch_add(1, Ordering::SeqCst);
1064                self.srv.stat(ctx, path, args).await
1065            }
1066
1067            fn read(
1068                &self,
1069                ctx: &opendal::OperationContext,
1070                path: &str,
1071                args: opendal::raw::OpRead,
1072            ) -> opendal::Result<Self::Reader> {
1073                self.srv.read(ctx, path, args)
1074            }
1075
1076            fn write(
1077                &self,
1078                ctx: &opendal::OperationContext,
1079                path: &str,
1080                args: opendal::raw::OpWrite,
1081            ) -> opendal::Result<Self::Writer> {
1082                self.srv.write(ctx, path, args)
1083            }
1084
1085            fn delete(&self, ctx: &opendal::OperationContext) -> opendal::Result<Self::Deleter> {
1086                self.srv.delete(ctx)
1087            }
1088
1089            fn list(
1090                &self,
1091                ctx: &opendal::OperationContext,
1092                path: &str,
1093                args: opendal::raw::OpList,
1094            ) -> opendal::Result<Self::Lister> {
1095                self.srv.list(ctx, path, args)
1096            }
1097
1098            fn copy(
1099                &self,
1100                ctx: &opendal::OperationContext,
1101                from: &str,
1102                to: &str,
1103                args: opendal::raw::OpCopy,
1104                opts: opendal::raw::OpCopier,
1105            ) -> opendal::Result<Self::Copier> {
1106                self.srv.copy(ctx, from, to, args, opts)
1107            }
1108
1109            async fn rename(
1110                &self,
1111                ctx: &opendal::OperationContext,
1112                from: &str,
1113                to: &str,
1114                args: opendal::raw::OpRename,
1115            ) -> opendal::Result<opendal::raw::RpRename> {
1116                self.srv.rename(ctx, from, to, args).await
1117            }
1118
1119            async fn presign(
1120                &self,
1121                ctx: &opendal::OperationContext,
1122                path: &str,
1123                args: opendal::raw::OpPresign,
1124            ) -> opendal::Result<opendal::raw::RpPresign> {
1125                self.srv.presign(ctx, path, args).await
1126            }
1127        }
1128    }
1129
1130    #[tokio::test]
1131    async fn test_get_range_no_stat() {
1132        use std::sync::atomic::{AtomicUsize, Ordering};
1133
1134        // Create a stat counter and operator with tracking layer
1135        let stat_count = Arc::new(AtomicUsize::new(0));
1136        let op = Operator::new(opendal::services::Memory::default())
1137            .unwrap()
1138            .layer(stat_counter::StatCounterLayer::new(stat_count.clone()));
1139        let store = OpendalStore::new(op);
1140
1141        // Create a test file
1142        let location = "test_get_range.txt".into();
1143        let value = Bytes::from_static(b"Hello, world!");
1144        store.put(&location, value.clone().into()).await.unwrap();
1145
1146        // Reset counter after put
1147        stat_count.store(0, Ordering::SeqCst);
1148
1149        // Test 1: get_ranges should NOT call stat()
1150        #[allow(clippy::single_range_in_vec_init)]
1151        let ranges = [0u64..5];
1152        let ret = store.get_ranges(&location, &ranges).await.unwrap();
1153        assert_eq!(Bytes::from_static(b"Hello"), ret[0]);
1154        assert_eq!(
1155            stat_count.load(Ordering::SeqCst),
1156            0,
1157            "get_ranges should not call stat()"
1158        );
1159
1160        // Reset counter
1161        stat_count.store(0, Ordering::SeqCst);
1162
1163        // Test 2: get_opts should NOT call stat() for bounded range reads
1164        let opts = object_store::GetOptions {
1165            range: Some(object_store::GetRange::Bounded(0..5)),
1166            ..Default::default()
1167        };
1168        let ret = store.get_opts(&location, opts).await.unwrap();
1169        assert_eq!(ret.meta.size, value.len() as u64);
1170        assert_eq!(ret.range, 0..5);
1171        let data = ret.bytes().await.unwrap();
1172        assert_eq!(Bytes::from_static(b"Hello"), data);
1173        assert_eq!(
1174            stat_count.load(Ordering::SeqCst),
1175            0,
1176            "get_opts should not call stat() for bounded range reads"
1177        );
1178
1179        // Reset counter
1180        stat_count.store(0, Ordering::SeqCst);
1181
1182        // Test 3: get_opts should NOT call stat() for offset range reads
1183        let opts = object_store::GetOptions {
1184            range: Some(object_store::GetRange::Offset(7)),
1185            ..Default::default()
1186        };
1187        let ret = store.get_opts(&location, opts).await.unwrap();
1188        assert_eq!(ret.meta.size, value.len() as u64);
1189        assert_eq!(ret.range, 7..value.len() as u64);
1190        let data = ret.bytes().await.unwrap();
1191        assert_eq!(Bytes::from_static(b"world!"), data);
1192        assert_eq!(
1193            stat_count.load(Ordering::SeqCst),
1194            0,
1195            "get_opts should not call stat() for offset range reads"
1196        );
1197
1198        // Reset counter
1199        stat_count.store(0, Ordering::SeqCst);
1200
1201        // Test 4: get_opts should NOT call stat() for full reads
1202        let ret = store
1203            .get_opts(&location, object_store::GetOptions::default())
1204            .await
1205            .unwrap();
1206        assert_eq!(ret.meta.size, value.len() as u64);
1207        assert_eq!(ret.range, 0..value.len() as u64);
1208        let data = ret.bytes().await.unwrap();
1209        assert_eq!(value, data);
1210        assert_eq!(
1211            stat_count.load(Ordering::SeqCst),
1212            0,
1213            "get_opts should not call stat() for full reads"
1214        );
1215
1216        // Reset counter
1217        stat_count.store(0, Ordering::SeqCst);
1218
1219        // Test 5: get_opts should NOT call stat() for suffix range reads
1220        let opts = object_store::GetOptions {
1221            range: Some(object_store::GetRange::Suffix(6)),
1222            ..Default::default()
1223        };
1224        let ret = store.get_opts(&location, opts).await.unwrap();
1225        assert_eq!(ret.meta.size, value.len() as u64);
1226        assert_eq!(ret.range, 7..value.len() as u64);
1227        let data = ret.bytes().await.unwrap();
1228        assert_eq!(Bytes::from_static(b"world!"), data);
1229        assert_eq!(
1230            stat_count.load(Ordering::SeqCst),
1231            0,
1232            "get_opts should not call stat() for suffix range reads"
1233        );
1234
1235        // Cleanup
1236        store.delete(&location).await.unwrap();
1237    }
1238}