object_store_opendal/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::ops::Range;
22use std::sync::Arc;
23
24use crate::datetime_to_timestamp;
25use crate::utils::*;
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::FutureExt;
29use futures::StreamExt;
30use futures::TryStreamExt;
31use futures::stream::BoxStream;
32use mea::mutex::Mutex;
33use mea::oneshot;
34use object_store::Attributes;
35use object_store::CopyMode as ObjectStoreCopyMode;
36use object_store::CopyOptions as ObjectStoreCopyOptions;
37use object_store::ListResult;
38use object_store::MultipartUpload;
39use object_store::ObjectMeta;
40use object_store::ObjectStore;
41use object_store::PutMultipartOptions;
42use object_store::PutOptions;
43use object_store::PutPayload;
44use object_store::PutResult;
45use object_store::path::Path;
46use object_store::{GetOptions, UploadPart};
47use object_store::{GetRange, GetResultPayload};
48use object_store::{GetResult, PutMode};
49use opendal::Buffer;
50use opendal::Writer;
51use opendal::options::CopyOptions;
52use opendal::options::ReaderOptions;
53use opendal::options::StatOptions;
54use opendal::raw::percent_decode_path;
55use opendal::{Operator, OperatorInfo};
56use std::collections::HashMap;
57
58const DEFAULT_CONCURRENT: usize = 8;
59
60fn format_object_attributes(meta: &opendal::Metadata) -> Attributes {
61    let mut attributes = Attributes::new();
62    if let Some(user_meta) = meta.user_metadata() {
63        for (key, value) in user_meta {
64            attributes.insert(
65                object_store::Attribute::Metadata(key.clone().into()),
66                value.clone().into(),
67            );
68        }
69    }
70
71    attributes
72}
73
74fn format_reader_options(options: &GetOptions, content_length_hint: Option<u64>) -> ReaderOptions {
75    ReaderOptions {
76        version: options.version.clone(),
77        if_match: options.if_match.clone(),
78        if_none_match: options.if_none_match.clone(),
79        if_modified_since: options.if_modified_since.and_then(datetime_to_timestamp),
80        if_unmodified_since: options.if_unmodified_since.and_then(datetime_to_timestamp),
81        content_length_hint,
82        ..Default::default()
83    }
84}
85
86fn format_stat_options(options: &GetOptions) -> StatOptions {
87    StatOptions {
88        version: options.version.clone(),
89        if_match: options.if_match.clone(),
90        if_none_match: options.if_none_match.clone(),
91        if_modified_since: options.if_modified_since.and_then(datetime_to_timestamp),
92        if_unmodified_since: options.if_unmodified_since.and_then(datetime_to_timestamp),
93        ..Default::default()
94    }
95}
96
97fn format_read_range(range: Option<&GetRange>, size: u64) -> Range<u64> {
98    match range {
99        Some(GetRange::Bounded(r)) => {
100            if r.start >= r.end || r.start >= size {
101                0..0
102            } else {
103                r.start..r.end.min(size)
104            }
105        }
106        Some(GetRange::Offset(offset)) => {
107            if *offset < size {
108                *offset..size
109            } else {
110                0..0
111            }
112        }
113        Some(GetRange::Suffix(suffix)) if *suffix < size => (size - *suffix)..size,
114        _ => 0..size,
115    }
116}
117
118fn format_without_stat_error(err: opendal::Error, path: &str) -> object_store::Error {
119    match err.kind() {
120        // Ask get_opts to fall back to the stat path when read-open can't provide
121        // enough metadata to build a valid GetResult, such as ranges beyond EOF.
122        opendal::ErrorKind::Unsupported | opendal::ErrorKind::RangeNotSatisfied => {
123            object_store::Error::NotSupported {
124                source: Box::new(err),
125            }
126        }
127        _ => format_object_store_error(err, path),
128    }
129}
130
131/// OpendalStore implements ObjectStore trait by using opendal.
132///
133/// This allows users to use opendal as an object store without extra cost.
134///
135/// Visit [`opendal::services`] for more information about supported services.
136///
137/// ```no_run
138/// use std::sync::Arc;
139///
140/// use bytes::Bytes;
141/// use object_store::path::Path;
142/// use object_store::ObjectStore;
143/// use object_store::ObjectStoreExt;
144/// use object_store_opendal::OpendalStore;
145/// use opendal::services::S3;
146/// use opendal::{Builder, Operator};
147///
148/// #[tokio::main]
149/// async fn main() {
150///    let builder = S3::default()
151///     .access_key_id("my_access_key")
152///     .secret_access_key("my_secret_key")
153///     .endpoint("my_endpoint")
154///     .region("my_region");
155///
156///     // Create a new operator
157///     let operator = Operator::new(builder).unwrap().finish();
158///
159///     // Create a new object store
160///     let object_store = Arc::new(OpendalStore::new(operator));
161///
162///     let path = Path::from("data/nested/test.txt");
163///     let bytes = Bytes::from_static(b"hello, world! I am nested.");
164///
165///     object_store.put(&path, bytes.clone().into()).await.unwrap();
166///
167///     let content = object_store
168///         .get(&path)
169///         .await
170///         .unwrap()
171///         .bytes()
172///         .await
173///         .unwrap();
174///
175///     assert_eq!(content, bytes);
176/// }
177/// ```
178#[derive(Clone)]
179pub struct OpendalStore {
180    info: Arc<OperatorInfo>,
181    inner: Operator,
182}
183
184impl OpendalStore {
185    /// Create OpendalStore by given Operator.
186    pub fn new(op: Operator) -> Self {
187        Self {
188            info: op.info().into(),
189            inner: op,
190        }
191    }
192
193    /// Get the Operator info.
194    pub fn info(&self) -> &OperatorInfo {
195        self.info.as_ref()
196    }
197
198    async fn get_opts_without_stat(
199        &self,
200        location: &Path,
201        raw_location: &str,
202        options: &GetOptions,
203    ) -> object_store::Result<GetResult> {
204        let reader = self
205            .inner
206            .reader_options(raw_location, format_reader_options(options, None))
207            .into_send()
208            .await
209            .map_err(|err| format_without_stat_error(err, location.as_ref()))?;
210
211        let mut stream = match options.range.as_ref() {
212            Some(GetRange::Bounded(range)) => {
213                reader
214                    .into_bytes_stream(range.start..range.end)
215                    .into_send()
216                    .await
217            }
218            Some(GetRange::Offset(offset)) => reader.into_bytes_stream(*offset..).into_send().await,
219            Some(GetRange::Suffix(_)) => unreachable!("suffix range needs object metadata"),
220            None => reader.into_bytes_stream(..).into_send().await,
221        }
222        .map_err(|err| format_without_stat_error(err, location.as_ref()))?;
223
224        let metadata = stream
225            .metadata()
226            .into_send()
227            .await
228            .map_err(|err| format_without_stat_error(err, location.as_ref()))?;
229        let attributes = format_object_attributes(&metadata);
230        let meta = format_object_meta(location.as_ref(), &metadata);
231        let read_range = format_read_range(options.range.as_ref(), meta.size);
232
233        if read_range.start >= read_range.end {
234            return Ok(GetResult {
235                payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
236                range: read_range,
237                meta,
238                attributes,
239            });
240        }
241
242        if matches!(
243            options.range.as_ref(),
244            Some(GetRange::Bounded(range)) if range.end > meta.size
245        ) {
246            let reader = self
247                .inner
248                .reader_options(
249                    raw_location,
250                    format_reader_options(options, Some(meta.size)),
251                )
252                .into_send()
253                .await
254                .map_err(|err| format_object_store_error(err, location.as_ref()))?;
255            stream = reader
256                .into_bytes_stream(read_range.start..read_range.end)
257                .into_send()
258                .await
259                .map_err(|err| format_object_store_error(err, location.as_ref()))?;
260        }
261
262        let stream = stream
263            .into_send()
264            .map_err(|err: io::Error| object_store::Error::Generic {
265                store: "IoError",
266                source: Box::new(err),
267            });
268
269        Ok(GetResult {
270            payload: GetResultPayload::Stream(Box::pin(stream)),
271            range: read_range,
272            meta,
273            attributes,
274        })
275    }
276
277    async fn get_opts_with_stat(
278        &self,
279        location: &Path,
280        raw_location: &str,
281        options: &GetOptions,
282    ) -> object_store::Result<GetResult> {
283        let metadata = self
284            .inner
285            .stat_options(raw_location, format_stat_options(options))
286            .into_send()
287            .await
288            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
289        let attributes = format_object_attributes(&metadata);
290        let meta = format_object_meta(location.as_ref(), &metadata);
291
292        if options.head {
293            return Ok(GetResult {
294                payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
295                range: 0..0,
296                meta,
297                attributes,
298            });
299        }
300
301        let read_range = format_read_range(options.range.as_ref(), meta.size);
302        if read_range.start >= read_range.end {
303            return Ok(GetResult {
304                payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
305                range: read_range,
306                meta,
307                attributes,
308            });
309        }
310
311        let reader = self
312            .inner
313            .reader_options(
314                raw_location,
315                format_reader_options(options, Some(meta.size)),
316            )
317            .into_send()
318            .await
319            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
320        let stream = reader
321            .into_bytes_stream(read_range.start..read_range.end)
322            .into_send()
323            .await
324            .map_err(|err| format_object_store_error(err, location.as_ref()))?
325            .into_send()
326            .map_err(|err: io::Error| object_store::Error::Generic {
327                store: "IoError",
328                source: Box::new(err),
329            });
330
331        Ok(GetResult {
332            payload: GetResultPayload::Stream(Box::pin(stream)),
333            range: read_range,
334            meta,
335            attributes,
336        })
337    }
338
339    /// Copy a file from one location to another
340    async fn copy_request(
341        &self,
342        from: &Path,
343        to: &Path,
344        if_not_exists: bool,
345    ) -> object_store::Result<()> {
346        let mut copy_options = CopyOptions::default();
347        if if_not_exists {
348            copy_options.if_not_exists = true;
349        }
350
351        // Perform the copy operation
352        self.inner
353            .copy_options(
354                &percent_decode_path(from.as_ref()),
355                &percent_decode_path(to.as_ref()),
356                copy_options,
357            )
358            .into_send()
359            .await
360            .map_err(|err| {
361                if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
362                    object_store::Error::AlreadyExists {
363                        path: to.to_string(),
364                        source: Box::new(err),
365                    }
366                } else {
367                    format_object_store_error(err, from.as_ref())
368                }
369            })?;
370
371        Ok(())
372    }
373}
374
375impl Debug for OpendalStore {
376    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
377        f.debug_struct("OpendalStore")
378            .field("scheme", &self.info.scheme())
379            .field("name", &self.info.name())
380            .field("root", &self.info.root())
381            .field("capability", &self.info.full_capability())
382            .finish()
383    }
384}
385
386impl Display for OpendalStore {
387    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
388        let info = self.inner.info();
389        write!(
390            f,
391            "Opendal({}, bucket={}, root={})",
392            info.scheme(),
393            info.name(),
394            info.root()
395        )
396    }
397}
398
399impl From<Operator> for OpendalStore {
400    fn from(value: Operator) -> Self {
401        Self::new(value)
402    }
403}
404
405#[async_trait]
406impl ObjectStore for OpendalStore {
407    async fn put_opts(
408        &self,
409        location: &Path,
410        bytes: PutPayload,
411        opts: PutOptions,
412    ) -> object_store::Result<PutResult> {
413        let decoded_location = percent_decode_path(location.as_ref());
414        let mut future_write = self
415            .inner
416            .write_with(&decoded_location, Buffer::from_iter(bytes));
417        let opts_mode = opts.mode.clone();
418        match opts.mode {
419            PutMode::Overwrite => {}
420            PutMode::Create => {
421                future_write = future_write.if_not_exists(true);
422            }
423            PutMode::Update(update_version) => {
424                let Some(etag) = update_version.e_tag else {
425                    Err(object_store::Error::NotSupported {
426                        source: Box::new(opendal::Error::new(
427                            opendal::ErrorKind::Unsupported,
428                            "etag is required for conditional put",
429                        )),
430                    })?
431                };
432                future_write = future_write.if_match(etag.as_str());
433            }
434        }
435        let rp = future_write.into_send().await.map_err(|err| {
436            match format_object_store_error(err, location.as_ref()) {
437                object_store::Error::Precondition { path, source }
438                    if opts_mode == PutMode::Create =>
439                {
440                    object_store::Error::AlreadyExists { path, source }
441                }
442                e => e,
443            }
444        })?;
445
446        let e_tag = rp.etag().map(|s| s.to_string());
447        let version = rp.version().map(|s| s.to_string());
448
449        Ok(PutResult { e_tag, version })
450    }
451
452    async fn put_multipart_opts(
453        &self,
454        location: &Path,
455        opts: PutMultipartOptions,
456    ) -> object_store::Result<Box<dyn MultipartUpload>> {
457        let mut options = opendal::options::WriteOptions {
458            concurrent: DEFAULT_CONCURRENT,
459            ..Default::default()
460        };
461
462        // Collect user metadata separately to handle multiple entries
463        let mut user_metadata = HashMap::new();
464
465        // Handle attributes if provided
466        for (key, value) in opts.attributes.iter() {
467            match key {
468                object_store::Attribute::CacheControl => {
469                    options.cache_control = Some(value.to_string());
470                }
471                object_store::Attribute::ContentDisposition => {
472                    options.content_disposition = Some(value.to_string());
473                }
474                object_store::Attribute::ContentEncoding => {
475                    options.content_encoding = Some(value.to_string());
476                }
477                object_store::Attribute::ContentLanguage => {
478                    // no support
479                    continue;
480                }
481                object_store::Attribute::ContentType => {
482                    options.content_type = Some(value.to_string());
483                }
484                object_store::Attribute::Metadata(k) => {
485                    user_metadata.insert(k.to_string(), value.to_string());
486                }
487                _ => {}
488            }
489        }
490
491        // Apply user metadata if any entries were collected
492        if !user_metadata.is_empty() {
493            options.user_metadata = Some(user_metadata);
494        }
495
496        let decoded_location = percent_decode_path(location.as_ref());
497        let writer = self
498            .inner
499            .writer_options(&decoded_location, options)
500            .into_send()
501            .await
502            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
503        let upload = OpendalMultipartUpload::new(writer, location.clone());
504
505        Ok(Box::new(upload))
506    }
507
508    async fn get_opts(
509        &self,
510        location: &Path,
511        options: GetOptions,
512    ) -> object_store::Result<GetResult> {
513        let raw_location = percent_decode_path(location.as_ref());
514
515        if options.head {
516            return self
517                .get_opts_with_stat(location, &raw_location, &options)
518                .await;
519        }
520
521        if !matches!(options.range.as_ref(), Some(GetRange::Suffix(_))) {
522            match self
523                .get_opts_without_stat(location, &raw_location, &options)
524                .await
525            {
526                Ok(result) => return Ok(result),
527                Err(object_store::Error::NotSupported { .. }) => {}
528                Err(err) => return Err(err),
529            }
530        }
531
532        self.get_opts_with_stat(location, &raw_location, &options)
533            .await
534    }
535
536    async fn get_ranges(
537        &self,
538        location: &Path,
539        ranges: &[Range<u64>],
540    ) -> object_store::Result<Vec<Bytes>> {
541        let raw_location = percent_decode_path(location.as_ref());
542        let reader = self
543            .inner
544            .reader(&raw_location)
545            .into_send()
546            .await
547            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
548
549        let location_ref: Arc<str> = Arc::from(location.as_ref());
550        futures::stream::iter(ranges.iter().cloned())
551            .map(|range| {
552                let reader = reader.clone();
553                let location_ref = location_ref.clone();
554                async move {
555                    reader
556                        .read(range)
557                        .into_send()
558                        .await
559                        .map(|buf| buf.to_bytes())
560                        .map_err(|err| format_object_store_error(err, &location_ref))
561                }
562            })
563            .buffered(DEFAULT_CONCURRENT)
564            .try_collect()
565            .await
566    }
567
568    fn delete_stream(
569        &self,
570        locations: BoxStream<'static, object_store::Result<Path>>,
571    ) -> BoxStream<'static, object_store::Result<Path>> {
572        let this = self.clone();
573        locations
574            .and_then(move |location| {
575                let this = this.clone();
576                async move {
577                    let decoded = percent_decode_path(location.as_ref());
578                    this.inner
579                        .delete(&decoded)
580                        .into_send()
581                        .await
582                        .map_err(|err| format_object_store_error(err, location.as_ref()))?;
583                    Ok(location)
584                }
585                .into_send()
586            })
587            .boxed()
588    }
589
590    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
591        // object_store `Path` always removes trailing slash
592        // need to add it back
593        let path = prefix.map_or("".into(), |x| {
594            format!("{}/", percent_decode_path(x.as_ref()))
595        });
596
597        let this = self.clone();
598        let fut = async move {
599            let stream = this
600                .inner
601                .lister_with(&path)
602                .recursive(true)
603                .await
604                .map_err(|err| format_object_store_error(err, &path))?;
605
606            let stream = stream.then(|res| async {
607                let entry = res.map_err(|err| format_object_store_error(err, ""))?;
608                let meta = entry.metadata();
609
610                Ok(format_object_meta(entry.path(), meta))
611            });
612            Ok::<_, object_store::Error>(stream)
613        };
614
615        fut.into_stream().try_flatten().into_send().boxed()
616    }
617
618    fn list_with_offset(
619        &self,
620        prefix: Option<&Path>,
621        offset: &Path,
622    ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
623        let path = prefix.map_or("".into(), |x| {
624            format!("{}/", percent_decode_path(x.as_ref()))
625        });
626        let offset = offset.clone();
627
628        // clone self for 'static lifetime
629        // clone self is cheap
630        let this = self.clone();
631
632        let fut = async move {
633            let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
634            let mut fut = this.inner.lister_with(&path).recursive(true);
635
636            // Use native start_after support if possible.
637            if list_with_start_after {
638                fut = fut.start_after(offset.as_ref());
639            }
640
641            let lister = fut
642                .await
643                .map_err(|err| format_object_store_error(err, &path))?
644                .then(move |entry| {
645                    let path = path.clone();
646                    let this = this.clone();
647                    async move {
648                        let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
649                        let (path, metadata) = entry.into_parts();
650
651                        // If it's a dir or last_modified is present, we can use it directly.
652                        if metadata.is_dir() || metadata.last_modified().is_some() {
653                            let object_meta = format_object_meta(&path, &metadata);
654                            return Ok(object_meta);
655                        }
656
657                        let metadata = this
658                            .inner
659                            .stat(&path)
660                            .await
661                            .map_err(|err| format_object_store_error(err, &path))?;
662                        let object_meta = format_object_meta(&path, &metadata);
663                        Ok::<_, object_store::Error>(object_meta)
664                    }
665                })
666                .into_send()
667                .boxed();
668
669            let stream = if list_with_start_after {
670                lister
671            } else {
672                lister
673                    .try_filter(move |entry| futures::future::ready(entry.location > offset))
674                    .into_send()
675                    .boxed()
676            };
677
678            Ok::<_, object_store::Error>(stream)
679        };
680
681        fut.into_stream().into_send().try_flatten().boxed()
682    }
683
684    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
685        let path = prefix.map_or("".into(), |x| {
686            format!("{}/", percent_decode_path(x.as_ref()))
687        });
688        let mut stream = self
689            .inner
690            .lister_with(&path)
691            .into_future()
692            .into_send()
693            .await
694            .map_err(|err| format_object_store_error(err, &path))?
695            .into_send();
696
697        let mut common_prefixes = Vec::new();
698        let mut objects = Vec::new();
699
700        while let Some(res) = stream.next().into_send().await {
701            let entry = res.map_err(|err| format_object_store_error(err, ""))?;
702            let meta = entry.metadata();
703
704            if meta.is_dir() {
705                common_prefixes.push(entry.path().into());
706            } else if meta.last_modified().is_some() {
707                objects.push(format_object_meta(entry.path(), meta));
708            } else {
709                let meta = self
710                    .inner
711                    .stat(entry.path())
712                    .into_send()
713                    .await
714                    .map_err(|err| format_object_store_error(err, entry.path()))?;
715                objects.push(format_object_meta(entry.path(), &meta));
716            }
717        }
718
719        Ok(ListResult {
720            common_prefixes,
721            objects,
722        })
723    }
724
725    async fn copy_opts(
726        &self,
727        from: &Path,
728        to: &Path,
729        options: ObjectStoreCopyOptions,
730    ) -> object_store::Result<()> {
731        let if_not_exists = matches!(options.mode, ObjectStoreCopyMode::Create);
732        self.copy_request(from, to, if_not_exists).await
733    }
734}
735
736/// `MultipartUpload`'s impl based on `Writer` in opendal
737///
738/// # Notes
739///
740/// OpenDAL writer can handle concurrent internally we don't generate real `UploadPart` like existing
741/// implementation do. Instead, we just write the part and notify the next task to be written.
742///
743/// The lock here doesn't really involve the write process, it's just for the notify mechanism.
744struct OpendalMultipartUpload {
745    writer: Arc<Mutex<Writer>>,
746    location: Path,
747    next_notify: oneshot::Receiver<()>,
748}
749
750impl OpendalMultipartUpload {
751    fn new(writer: Writer, location: Path) -> Self {
752        // an immediately dropped sender for the first part to write without waiting
753        let (_, rx) = oneshot::channel();
754
755        Self {
756            writer: Arc::new(Mutex::new(writer)),
757            location,
758            next_notify: rx,
759        }
760    }
761}
762
763#[async_trait]
764impl MultipartUpload for OpendalMultipartUpload {
765    fn put_part(&mut self, data: PutPayload) -> UploadPart {
766        let writer = self.writer.clone();
767        let location = self.location.clone();
768
769        // Generate next notify which will be notified after the current part is written.
770        let (tx, rx) = oneshot::channel();
771        // Fetch the notify for current part to wait for it to be written.
772        let last_rx = std::mem::replace(&mut self.next_notify, rx);
773
774        async move {
775            // Wait for the previous part to be written
776            let _ = last_rx.await;
777
778            let mut writer = writer.lock().await;
779            let result = writer
780                .write(Buffer::from_iter(data))
781                .await
782                .map_err(|err| format_object_store_error(err, location.as_ref()));
783
784            // Notify the next part to be written
785            drop(tx);
786
787            result
788        }
789        .into_send()
790        .boxed()
791    }
792
793    async fn complete(&mut self) -> object_store::Result<PutResult> {
794        let mut writer = self.writer.lock().await;
795        let metadata = writer
796            .close()
797            .into_send()
798            .await
799            .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
800
801        let e_tag = metadata.etag().map(|s| s.to_string());
802        let version = metadata.version().map(|s| s.to_string());
803
804        Ok(PutResult { e_tag, version })
805    }
806
807    async fn abort(&mut self) -> object_store::Result<()> {
808        let mut writer = self.writer.lock().await;
809        writer
810            .abort()
811            .into_send()
812            .await
813            .map_err(|err| format_object_store_error(err, self.location.as_ref()))
814    }
815}
816
817impl Debug for OpendalMultipartUpload {
818    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
819        f.debug_struct("OpendalMultipartUpload")
820            .field("location", &self.location)
821            .finish()
822    }
823}
824
825#[cfg(test)]
826mod tests {
827    use bytes::Bytes;
828    use object_store::path::Path;
829    use object_store::{ObjectStore, ObjectStoreExt, WriteMultipart};
830    use opendal::services;
831    use rand::prelude::*;
832    use rand::rng;
833    use std::sync::Arc;
834
835    use super::*;
836
837    async fn create_test_object_store() -> Arc<dyn ObjectStore> {
838        let op = Operator::new(services::Memory::default()).unwrap().finish();
839        let object_store = Arc::new(OpendalStore::new(op));
840
841        let path: Path = "data/test.txt".into();
842        let bytes = Bytes::from_static(b"hello, world!");
843        object_store.put(&path, bytes.into()).await.unwrap();
844
845        let path: Path = "data/nested/test.txt".into();
846        let bytes = Bytes::from_static(b"hello, world! I am nested.");
847        object_store.put(&path, bytes.into()).await.unwrap();
848
849        object_store
850    }
851
852    #[tokio::test]
853    async fn test_basic() {
854        let op = Operator::new(services::Memory::default()).unwrap().finish();
855        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
856
857        // Retrieve a specific file
858        let path: Path = "data/test.txt".into();
859
860        let bytes = Bytes::from_static(b"hello, world!");
861        object_store.put(&path, bytes.clone().into()).await.unwrap();
862
863        let meta = object_store.head(&path).await.unwrap();
864
865        assert_eq!(meta.size, 13);
866
867        assert_eq!(
868            object_store
869                .get(&path)
870                .await
871                .unwrap()
872                .bytes()
873                .await
874                .unwrap(),
875            bytes
876        );
877    }
878
879    #[tokio::test]
880    async fn test_put_multipart() {
881        let op = Operator::new(services::Memory::default()).unwrap().finish();
882        let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
883
884        let mut rng = rng();
885
886        // Case complete
887        let path: Path = "data/test_complete.txt".into();
888        let upload = object_store.put_multipart(&path).await.unwrap();
889
890        let mut write = WriteMultipart::new(upload);
891
892        let mut all_bytes = vec![];
893        let round = rng.random_range(1..=1024);
894        for _ in 0..round {
895            let size = rng.random_range(1..=1024);
896            let mut bytes = vec![0; size];
897            rng.fill_bytes(&mut bytes);
898
899            all_bytes.extend_from_slice(&bytes);
900            write.put(bytes.into());
901        }
902
903        let _ = write.finish().await.unwrap();
904
905        let meta = object_store.head(&path).await.unwrap();
906
907        assert_eq!(meta.size, all_bytes.len() as u64);
908
909        assert_eq!(
910            object_store
911                .get(&path)
912                .await
913                .unwrap()
914                .bytes()
915                .await
916                .unwrap(),
917            Bytes::from(all_bytes)
918        );
919
920        // Case abort
921        let path: Path = "data/test_abort.txt".into();
922        let mut upload = object_store.put_multipart(&path).await.unwrap();
923        upload.put_part(vec![1; 1024].into()).await.unwrap();
924        upload.abort().await.unwrap();
925
926        let res = object_store.head(&path).await;
927        let err = res.unwrap_err();
928
929        assert!(matches!(err, object_store::Error::NotFound { .. }))
930    }
931
932    #[tokio::test]
933    async fn test_list() {
934        let object_store = create_test_object_store().await;
935        let path: Path = "data/".into();
936        let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
937        assert_eq!(results.len(), 2);
938        let mut locations = results
939            .iter()
940            .map(|x| x.as_ref().unwrap().location.as_ref())
941            .collect::<Vec<_>>();
942
943        let expected_files = vec![
944            (
945                "data/nested/test.txt",
946                Bytes::from_static(b"hello, world! I am nested."),
947            ),
948            ("data/test.txt", Bytes::from_static(b"hello, world!")),
949        ];
950
951        let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
952
953        locations.sort();
954        assert_eq!(locations, expected_locations);
955
956        for (location, bytes) in expected_files {
957            let path: Path = location.into();
958            assert_eq!(
959                object_store
960                    .get(&path)
961                    .await
962                    .unwrap()
963                    .bytes()
964                    .await
965                    .unwrap(),
966                bytes
967            );
968        }
969    }
970
971    #[tokio::test]
972    async fn test_list_with_delimiter() {
973        let object_store = create_test_object_store().await;
974        let path: Path = "data/".into();
975        let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
976        assert_eq!(result.objects.len(), 1);
977        assert_eq!(result.common_prefixes.len(), 1);
978        assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
979        assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
980    }
981
982    #[tokio::test]
983    async fn test_list_with_offset() {
984        let object_store = create_test_object_store().await;
985        let path: Path = "data/".into();
986        let offset: Path = "data/nested/test.txt".into();
987        let result = object_store
988            .list_with_offset(Some(&path), &offset)
989            .collect::<Vec<_>>()
990            .await;
991        assert_eq!(result.len(), 1);
992        assert_eq!(
993            result[0].as_ref().unwrap().location.as_ref(),
994            "data/test.txt"
995        );
996    }
997
998    /// Custom layer that counts stat operations for testing
999    mod stat_counter {
1000        use super::*;
1001        use std::sync::atomic::{AtomicUsize, Ordering};
1002
1003        #[derive(Debug, Clone)]
1004        pub struct StatCounterLayer {
1005            count: Arc<AtomicUsize>,
1006        }
1007
1008        impl StatCounterLayer {
1009            pub fn new(count: Arc<AtomicUsize>) -> Self {
1010                Self { count }
1011            }
1012        }
1013
1014        impl<A: opendal::raw::Access> opendal::raw::Layer<A> for StatCounterLayer {
1015            type LayeredAccess = StatCounterAccessor<A>;
1016
1017            fn layer(&self, inner: A) -> Self::LayeredAccess {
1018                StatCounterAccessor {
1019                    inner,
1020                    count: self.count.clone(),
1021                }
1022            }
1023        }
1024
1025        #[derive(Debug, Clone)]
1026        pub struct StatCounterAccessor<A> {
1027            inner: A,
1028            count: Arc<AtomicUsize>,
1029        }
1030
1031        impl<A: opendal::raw::Access> opendal::raw::LayeredAccess for StatCounterAccessor<A> {
1032            type Inner = A;
1033            type Reader = A::Reader;
1034            type Writer = A::Writer;
1035            type Lister = A::Lister;
1036            type Deleter = A::Deleter;
1037            type Copier = A::Copier;
1038
1039            fn inner(&self) -> &Self::Inner {
1040                &self.inner
1041            }
1042
1043            async fn stat(
1044                &self,
1045                path: &str,
1046                args: opendal::raw::OpStat,
1047            ) -> opendal::Result<opendal::raw::RpStat> {
1048                self.count.fetch_add(1, Ordering::SeqCst);
1049                self.inner.stat(path, args).await
1050            }
1051
1052            async fn read(
1053                &self,
1054                path: &str,
1055                args: opendal::raw::OpRead,
1056            ) -> opendal::Result<(opendal::raw::RpRead, Self::Reader)> {
1057                self.inner.read(path, args).await
1058            }
1059
1060            async fn write(
1061                &self,
1062                path: &str,
1063                args: opendal::raw::OpWrite,
1064            ) -> opendal::Result<(opendal::raw::RpWrite, Self::Writer)> {
1065                self.inner.write(path, args).await
1066            }
1067
1068            async fn delete(&self) -> opendal::Result<(opendal::raw::RpDelete, Self::Deleter)> {
1069                self.inner.delete().await
1070            }
1071
1072            async fn list(
1073                &self,
1074                path: &str,
1075                args: opendal::raw::OpList,
1076            ) -> opendal::Result<(opendal::raw::RpList, Self::Lister)> {
1077                self.inner.list(path, args).await
1078            }
1079
1080            async fn copy(
1081                &self,
1082                from: &str,
1083                to: &str,
1084                args: opendal::raw::OpCopy,
1085                opts: opendal::raw::OpCopier,
1086            ) -> opendal::Result<(opendal::raw::RpCopy, Self::Copier)> {
1087                self.inner.copy(from, to, args, opts).await
1088            }
1089
1090            async fn rename(
1091                &self,
1092                from: &str,
1093                to: &str,
1094                args: opendal::raw::OpRename,
1095            ) -> opendal::Result<opendal::raw::RpRename> {
1096                self.inner.rename(from, to, args).await
1097            }
1098        }
1099    }
1100
1101    #[tokio::test]
1102    async fn test_get_range_no_stat() {
1103        use std::sync::atomic::{AtomicUsize, Ordering};
1104
1105        // Create a stat counter and operator with tracking layer
1106        let stat_count = Arc::new(AtomicUsize::new(0));
1107        let op = Operator::new(opendal::services::Memory::default())
1108            .unwrap()
1109            .layer(stat_counter::StatCounterLayer::new(stat_count.clone()))
1110            .finish();
1111        let store = OpendalStore::new(op);
1112
1113        // Create a test file
1114        let location = "test_get_range.txt".into();
1115        let value = Bytes::from_static(b"Hello, world!");
1116        store.put(&location, value.clone().into()).await.unwrap();
1117
1118        // Reset counter after put
1119        stat_count.store(0, Ordering::SeqCst);
1120
1121        // Test 1: get_ranges should NOT call stat()
1122        #[allow(clippy::single_range_in_vec_init)]
1123        let ranges = [0u64..5];
1124        let ret = store.get_ranges(&location, &ranges).await.unwrap();
1125        assert_eq!(Bytes::from_static(b"Hello"), ret[0]);
1126        assert_eq!(
1127            stat_count.load(Ordering::SeqCst),
1128            0,
1129            "get_ranges should not call stat()"
1130        );
1131
1132        // Reset counter
1133        stat_count.store(0, Ordering::SeqCst);
1134
1135        // Test 2: get_opts should NOT call stat() for bounded range reads
1136        let opts = object_store::GetOptions {
1137            range: Some(object_store::GetRange::Bounded(0..5)),
1138            ..Default::default()
1139        };
1140        let ret = store.get_opts(&location, opts).await.unwrap();
1141        assert_eq!(ret.meta.size, value.len() as u64);
1142        assert_eq!(ret.range, 0..5);
1143        let data = ret.bytes().await.unwrap();
1144        assert_eq!(Bytes::from_static(b"Hello"), data);
1145        assert_eq!(
1146            stat_count.load(Ordering::SeqCst),
1147            0,
1148            "get_opts should not call stat() for bounded range reads"
1149        );
1150
1151        // Reset counter
1152        stat_count.store(0, Ordering::SeqCst);
1153
1154        // Test 3: get_opts should NOT call stat() for offset range reads
1155        let opts = object_store::GetOptions {
1156            range: Some(object_store::GetRange::Offset(7)),
1157            ..Default::default()
1158        };
1159        let ret = store.get_opts(&location, opts).await.unwrap();
1160        assert_eq!(ret.meta.size, value.len() as u64);
1161        assert_eq!(ret.range, 7..value.len() as u64);
1162        let data = ret.bytes().await.unwrap();
1163        assert_eq!(Bytes::from_static(b"world!"), data);
1164        assert_eq!(
1165            stat_count.load(Ordering::SeqCst),
1166            0,
1167            "get_opts should not call stat() for offset range reads"
1168        );
1169
1170        // Reset counter
1171        stat_count.store(0, Ordering::SeqCst);
1172
1173        // Test 4: get_opts should NOT call stat() for full reads
1174        let ret = store
1175            .get_opts(&location, object_store::GetOptions::default())
1176            .await
1177            .unwrap();
1178        assert_eq!(ret.meta.size, value.len() as u64);
1179        assert_eq!(ret.range, 0..value.len() as u64);
1180        let data = ret.bytes().await.unwrap();
1181        assert_eq!(value, data);
1182        assert_eq!(
1183            stat_count.load(Ordering::SeqCst),
1184            0,
1185            "get_opts should not call stat() for full reads"
1186        );
1187
1188        // Reset counter
1189        stat_count.store(0, Ordering::SeqCst);
1190
1191        // Test 5: get_opts should still call stat() for suffix range reads
1192        let opts = object_store::GetOptions {
1193            range: Some(object_store::GetRange::Suffix(6)),
1194            ..Default::default()
1195        };
1196        let ret = store.get_opts(&location, opts).await.unwrap();
1197        assert_eq!(ret.meta.size, value.len() as u64);
1198        assert_eq!(ret.range, 7..value.len() as u64);
1199        let data = ret.bytes().await.unwrap();
1200        assert_eq!(Bytes::from_static(b"world!"), data);
1201        assert!(
1202            stat_count.load(Ordering::SeqCst) > 0,
1203            "get_opts should call stat() for suffix range reads"
1204        );
1205
1206        // Cleanup
1207        store.delete(&location).await.unwrap();
1208    }
1209}