1use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::ops::Range;
22use std::sync::Arc;
23
24use crate::datetime_to_timestamp;
25use crate::utils::*;
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::FutureExt;
29use futures::StreamExt;
30use futures::TryStreamExt;
31use futures::stream::BoxStream;
32use mea::mutex::Mutex;
33use mea::oneshot;
34use object_store::Attributes;
35use object_store::CopyMode as ObjectStoreCopyMode;
36use object_store::CopyOptions as ObjectStoreCopyOptions;
37use object_store::ListResult;
38use object_store::MultipartUpload;
39use object_store::ObjectMeta;
40use object_store::ObjectStore;
41use object_store::PutMultipartOptions;
42use object_store::PutOptions;
43use object_store::PutPayload;
44use object_store::PutResult;
45use object_store::path::Path;
46use object_store::{GetOptions, UploadPart};
47use object_store::{GetRange, GetResultPayload};
48use object_store::{GetResult, PutMode};
49use opendal::Buffer;
50use opendal::BytesRange;
51use opendal::Writer;
52use opendal::options::CopyOptions;
53use opendal::options::ReaderOptions;
54use opendal::options::StatOptions;
55use opendal::raw::percent_decode_path;
56use opendal::{Operator, OperatorInfo};
57use std::collections::HashMap;
58
59const DEFAULT_CONCURRENT: usize = 8;
60
61fn format_object_attributes(meta: &opendal::Metadata) -> Attributes {
62 let mut attributes = Attributes::new();
63 if let Some(user_meta) = meta.user_metadata() {
64 for (key, value) in user_meta {
65 attributes.insert(
66 object_store::Attribute::Metadata(key.clone().into()),
67 value.clone().into(),
68 );
69 }
70 }
71
72 attributes
73}
74
75fn format_reader_options(options: &GetOptions, content_length_hint: Option<u64>) -> ReaderOptions {
76 ReaderOptions {
77 version: options.version.clone(),
78 if_match: options.if_match.clone(),
79 if_none_match: options.if_none_match.clone(),
80 if_modified_since: options.if_modified_since.and_then(datetime_to_timestamp),
81 if_unmodified_since: options.if_unmodified_since.and_then(datetime_to_timestamp),
82 content_length_hint,
83 ..Default::default()
84 }
85}
86
87fn format_stat_options(options: &GetOptions) -> StatOptions {
88 StatOptions {
89 version: options.version.clone(),
90 if_match: options.if_match.clone(),
91 if_none_match: options.if_none_match.clone(),
92 if_modified_since: options.if_modified_since.and_then(datetime_to_timestamp),
93 if_unmodified_since: options.if_unmodified_since.and_then(datetime_to_timestamp),
94 ..Default::default()
95 }
96}
97
98fn format_read_range(range: Option<&GetRange>, size: u64) -> Range<u64> {
99 match range {
100 Some(GetRange::Bounded(r)) => {
101 if r.start >= r.end || r.start >= size {
102 0..0
103 } else {
104 r.start..r.end.min(size)
105 }
106 }
107 Some(GetRange::Offset(offset)) => {
108 if *offset < size {
109 *offset..size
110 } else {
111 0..0
112 }
113 }
114 Some(GetRange::Suffix(suffix)) if *suffix < size => (size - *suffix)..size,
115 _ => 0..size,
116 }
117}
118
119fn format_without_stat_error(err: opendal::Error, path: &str) -> object_store::Error {
120 match err.kind() {
121 opendal::ErrorKind::Unsupported | opendal::ErrorKind::RangeNotSatisfied => {
124 object_store::Error::NotSupported {
125 source: Box::new(err),
126 }
127 }
128 _ => format_object_store_error(err, path),
129 }
130}
131
132#[derive(Clone)]
180pub struct OpendalStore {
181 info: Arc<OperatorInfo>,
182 inner: Operator,
183}
184
185impl OpendalStore {
186 pub fn new(op: Operator) -> Self {
188 Self {
189 info: op.info().into(),
190 inner: op,
191 }
192 }
193
194 pub fn info(&self) -> &OperatorInfo {
196 self.info.as_ref()
197 }
198
199 async fn get_opts_without_stat(
200 &self,
201 location: &Path,
202 raw_location: &str,
203 options: &GetOptions,
204 ) -> object_store::Result<GetResult> {
205 let reader = self
206 .inner
207 .reader_options(raw_location, format_reader_options(options, None))
208 .into_send()
209 .await
210 .map_err(|err| format_without_stat_error(err, location.as_ref()))?;
211
212 let mut stream = match options.range.as_ref() {
213 Some(GetRange::Bounded(range)) => {
214 reader
215 .into_bytes_stream(range.start..range.end)
216 .into_send()
217 .await
218 }
219 Some(GetRange::Offset(offset)) => reader.into_bytes_stream(*offset..).into_send().await,
220 Some(GetRange::Suffix(suffix)) => {
221 reader
222 .into_bytes_stream(BytesRange::suffix(*suffix))
223 .into_send()
224 .await
225 }
226 None => reader.into_bytes_stream(..).into_send().await,
227 }
228 .map_err(|err| format_without_stat_error(err, location.as_ref()))?;
229
230 let metadata = stream
231 .metadata()
232 .into_send()
233 .await
234 .map_err(|err| format_without_stat_error(err, location.as_ref()))?;
235 let attributes = format_object_attributes(&metadata);
236 let meta = format_object_meta(location.as_ref(), &metadata);
237 let read_range = format_read_range(options.range.as_ref(), meta.size);
238
239 if read_range.start >= read_range.end {
240 return Ok(GetResult {
241 payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
242 range: read_range,
243 meta,
244 attributes,
245 });
246 }
247
248 if matches!(
249 options.range.as_ref(),
250 Some(GetRange::Bounded(range)) if range.end > meta.size
251 ) {
252 let reader = self
253 .inner
254 .reader_options(
255 raw_location,
256 format_reader_options(options, Some(meta.size)),
257 )
258 .into_send()
259 .await
260 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
261 stream = reader
262 .into_bytes_stream(read_range.start..read_range.end)
263 .into_send()
264 .await
265 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
266 }
267
268 let stream = stream
269 .into_send()
270 .map_err(|err: io::Error| object_store::Error::Generic {
271 store: "IoError",
272 source: Box::new(err),
273 });
274
275 Ok(GetResult {
276 payload: GetResultPayload::Stream(Box::pin(stream)),
277 range: read_range,
278 meta,
279 attributes,
280 })
281 }
282
283 async fn get_opts_with_stat(
284 &self,
285 location: &Path,
286 raw_location: &str,
287 options: &GetOptions,
288 ) -> object_store::Result<GetResult> {
289 let metadata = self
290 .inner
291 .stat_options(raw_location, format_stat_options(options))
292 .into_send()
293 .await
294 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
295 let attributes = format_object_attributes(&metadata);
296 let meta = format_object_meta(location.as_ref(), &metadata);
297
298 if options.head {
299 return Ok(GetResult {
300 payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
301 range: 0..0,
302 meta,
303 attributes,
304 });
305 }
306
307 let read_range = format_read_range(options.range.as_ref(), meta.size);
308 if read_range.start >= read_range.end {
309 return Ok(GetResult {
310 payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
311 range: read_range,
312 meta,
313 attributes,
314 });
315 }
316
317 let reader = self
318 .inner
319 .reader_options(
320 raw_location,
321 format_reader_options(options, Some(meta.size)),
322 )
323 .into_send()
324 .await
325 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
326 let stream = reader
327 .into_bytes_stream(read_range.start..read_range.end)
328 .into_send()
329 .await
330 .map_err(|err| format_object_store_error(err, location.as_ref()))?
331 .into_send()
332 .map_err(|err: io::Error| object_store::Error::Generic {
333 store: "IoError",
334 source: Box::new(err),
335 });
336
337 Ok(GetResult {
338 payload: GetResultPayload::Stream(Box::pin(stream)),
339 range: read_range,
340 meta,
341 attributes,
342 })
343 }
344
345 async fn copy_request(
347 &self,
348 from: &Path,
349 to: &Path,
350 if_not_exists: bool,
351 ) -> object_store::Result<()> {
352 let mut copy_options = CopyOptions::default();
353 if if_not_exists {
354 copy_options.if_not_exists = true;
355 }
356
357 self.inner
359 .copy_options(
360 &percent_decode_path(from.as_ref()),
361 &percent_decode_path(to.as_ref()),
362 copy_options,
363 )
364 .into_send()
365 .await
366 .map_err(|err| {
367 if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
368 object_store::Error::AlreadyExists {
369 path: to.to_string(),
370 source: Box::new(err),
371 }
372 } else {
373 format_object_store_error(err, from.as_ref())
374 }
375 })?;
376
377 Ok(())
378 }
379}
380
381impl Debug for OpendalStore {
382 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
383 f.debug_struct("OpendalStore")
384 .field("scheme", &self.info.scheme())
385 .field("name", &self.info.name())
386 .field("root", &self.info.root())
387 .field("capability", &self.info.capability())
388 .finish()
389 }
390}
391
392impl Display for OpendalStore {
393 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
394 let info = self.inner.info();
395 write!(
396 f,
397 "Opendal({}, bucket={}, root={})",
398 info.scheme(),
399 info.name(),
400 info.root()
401 )
402 }
403}
404
405impl From<Operator> for OpendalStore {
406 fn from(value: Operator) -> Self {
407 Self::new(value)
408 }
409}
410
411#[async_trait]
412impl ObjectStore for OpendalStore {
413 async fn put_opts(
414 &self,
415 location: &Path,
416 bytes: PutPayload,
417 opts: PutOptions,
418 ) -> object_store::Result<PutResult> {
419 let decoded_location = percent_decode_path(location.as_ref());
420 let mut future_write = self
421 .inner
422 .write_with(&decoded_location, Buffer::from_iter(bytes));
423 let opts_mode = opts.mode.clone();
424 match opts.mode {
425 PutMode::Overwrite => {}
426 PutMode::Create => {
427 future_write = future_write.if_not_exists(true);
428 }
429 PutMode::Update(update_version) => {
430 let Some(etag) = update_version.e_tag else {
431 Err(object_store::Error::NotSupported {
432 source: Box::new(opendal::Error::new(
433 opendal::ErrorKind::Unsupported,
434 "etag is required for conditional put",
435 )),
436 })?
437 };
438 future_write = future_write.if_match(etag.as_str());
439 }
440 }
441 let rp = future_write.into_send().await.map_err(|err| {
442 match format_object_store_error(err, location.as_ref()) {
443 object_store::Error::Precondition { path, source }
444 if opts_mode == PutMode::Create =>
445 {
446 object_store::Error::AlreadyExists { path, source }
447 }
448 e => e,
449 }
450 })?;
451
452 let e_tag = rp.etag().map(|s| s.to_string());
453 let version = rp.version().map(|s| s.to_string());
454
455 Ok(PutResult { e_tag, version })
456 }
457
458 async fn put_multipart_opts(
459 &self,
460 location: &Path,
461 opts: PutMultipartOptions,
462 ) -> object_store::Result<Box<dyn MultipartUpload>> {
463 let mut options = opendal::options::WriteOptions {
464 concurrent: DEFAULT_CONCURRENT,
465 ..Default::default()
466 };
467
468 let mut user_metadata = HashMap::new();
470
471 for (key, value) in opts.attributes.iter() {
473 match key {
474 object_store::Attribute::CacheControl => {
475 options.cache_control = Some(value.to_string());
476 }
477 object_store::Attribute::ContentDisposition => {
478 options.content_disposition = Some(value.to_string());
479 }
480 object_store::Attribute::ContentEncoding => {
481 options.content_encoding = Some(value.to_string());
482 }
483 object_store::Attribute::ContentLanguage => {
484 continue;
486 }
487 object_store::Attribute::ContentType => {
488 options.content_type = Some(value.to_string());
489 }
490 object_store::Attribute::Metadata(k) => {
491 user_metadata.insert(k.to_string(), value.to_string());
492 }
493 _ => {}
494 }
495 }
496
497 if !user_metadata.is_empty() {
499 options.user_metadata = Some(user_metadata);
500 }
501
502 let decoded_location = percent_decode_path(location.as_ref());
503 let writer = self
504 .inner
505 .writer_options(&decoded_location, options)
506 .into_send()
507 .await
508 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
509 let upload = OpendalMultipartUpload::new(writer, location.clone());
510
511 Ok(Box::new(upload))
512 }
513
514 async fn get_opts(
515 &self,
516 location: &Path,
517 options: GetOptions,
518 ) -> object_store::Result<GetResult> {
519 let raw_location = percent_decode_path(location.as_ref());
520
521 if options.head {
522 return self
523 .get_opts_with_stat(location, &raw_location, &options)
524 .await;
525 }
526
527 match self
528 .get_opts_without_stat(location, &raw_location, &options)
529 .await
530 {
531 Ok(result) => return Ok(result),
532 Err(object_store::Error::NotSupported { .. }) => {}
533 Err(err) => return Err(err),
534 }
535
536 self.get_opts_with_stat(location, &raw_location, &options)
537 .await
538 }
539
540 async fn get_ranges(
541 &self,
542 location: &Path,
543 ranges: &[Range<u64>],
544 ) -> object_store::Result<Vec<Bytes>> {
545 let raw_location = percent_decode_path(location.as_ref());
546 let reader = self
547 .inner
548 .reader(&raw_location)
549 .into_send()
550 .await
551 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
552
553 let location_ref: Arc<str> = Arc::from(location.as_ref());
554 futures::stream::iter(ranges.iter().cloned())
555 .map(|range| {
556 let reader = reader.clone();
557 let location_ref = location_ref.clone();
558 async move {
559 reader
560 .read(range)
561 .into_send()
562 .await
563 .map(|buf| buf.to_bytes())
564 .map_err(|err| format_object_store_error(err, &location_ref))
565 }
566 })
567 .buffered(DEFAULT_CONCURRENT)
568 .try_collect()
569 .await
570 }
571
572 fn delete_stream(
573 &self,
574 locations: BoxStream<'static, object_store::Result<Path>>,
575 ) -> BoxStream<'static, object_store::Result<Path>> {
576 let this = self.clone();
577 locations
578 .and_then(move |location| {
579 let this = this.clone();
580 async move {
581 let decoded = percent_decode_path(location.as_ref());
582 this.inner
583 .delete(&decoded)
584 .into_send()
585 .await
586 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
587 Ok(location)
588 }
589 .into_send()
590 })
591 .boxed()
592 }
593
594 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
595 let path = prefix.map_or("".into(), |x| {
598 format!("{}/", percent_decode_path(x.as_ref()))
599 });
600
601 let this = self.clone();
602 let fut = async move {
603 let stream = this
604 .inner
605 .lister_with(&path)
606 .recursive(true)
607 .await
608 .map_err(|err| format_object_store_error(err, &path))?;
609
610 let stream = stream.then(|res| async {
611 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
612 let meta = entry.metadata();
613
614 Ok(format_object_meta(entry.path(), meta))
615 });
616 Ok::<_, object_store::Error>(stream)
617 };
618
619 fut.into_stream().try_flatten().into_send().boxed()
620 }
621
622 fn list_with_offset(
623 &self,
624 prefix: Option<&Path>,
625 offset: &Path,
626 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
627 let path = prefix.map_or("".into(), |x| {
628 format!("{}/", percent_decode_path(x.as_ref()))
629 });
630 let offset = offset.clone();
631
632 let this = self.clone();
635
636 let fut = async move {
637 let list_with_start_after = this.inner.info().capability().list_with_start_after;
638 let mut fut = this.inner.lister_with(&path).recursive(true);
639
640 if list_with_start_after {
642 fut = fut.start_after(offset.as_ref());
643 }
644
645 let lister = fut
646 .await
647 .map_err(|err| format_object_store_error(err, &path))?
648 .then(move |entry| {
649 let path = path.clone();
650 let this = this.clone();
651 async move {
652 let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
653 let (path, metadata) = entry.into_parts();
654
655 if metadata.is_dir() || metadata.last_modified().is_some() {
657 let object_meta = format_object_meta(&path, &metadata);
658 return Ok(object_meta);
659 }
660
661 let metadata = this
662 .inner
663 .stat(&path)
664 .await
665 .map_err(|err| format_object_store_error(err, &path))?;
666 let object_meta = format_object_meta(&path, &metadata);
667 Ok::<_, object_store::Error>(object_meta)
668 }
669 })
670 .into_send()
671 .boxed();
672
673 let stream = if list_with_start_after {
674 lister
675 } else {
676 lister
677 .try_filter(move |entry| futures::future::ready(entry.location > offset))
678 .into_send()
679 .boxed()
680 };
681
682 Ok::<_, object_store::Error>(stream)
683 };
684
685 fut.into_stream().into_send().try_flatten().boxed()
686 }
687
688 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
689 let path = prefix.map_or("".into(), |x| {
690 format!("{}/", percent_decode_path(x.as_ref()))
691 });
692 let mut stream = self
693 .inner
694 .lister_with(&path)
695 .into_future()
696 .into_send()
697 .await
698 .map_err(|err| format_object_store_error(err, &path))?
699 .into_send();
700
701 let mut common_prefixes = Vec::new();
702 let mut objects = Vec::new();
703
704 while let Some(res) = stream.next().into_send().await {
705 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
706 let meta = entry.metadata();
707
708 if meta.is_dir() {
709 common_prefixes.push(entry.path().into());
710 } else if meta.last_modified().is_some() {
711 objects.push(format_object_meta(entry.path(), meta));
712 } else {
713 let meta = self
714 .inner
715 .stat(entry.path())
716 .into_send()
717 .await
718 .map_err(|err| format_object_store_error(err, entry.path()))?;
719 objects.push(format_object_meta(entry.path(), &meta));
720 }
721 }
722
723 Ok(ListResult {
724 common_prefixes,
725 objects,
726 })
727 }
728
729 async fn copy_opts(
730 &self,
731 from: &Path,
732 to: &Path,
733 options: ObjectStoreCopyOptions,
734 ) -> object_store::Result<()> {
735 let if_not_exists = matches!(options.mode, ObjectStoreCopyMode::Create);
736 self.copy_request(from, to, if_not_exists).await
737 }
738}
739
740struct OpendalMultipartUpload {
749 writer: Arc<Mutex<Writer>>,
750 location: Path,
751 next_notify: oneshot::Receiver<()>,
752}
753
754impl OpendalMultipartUpload {
755 fn new(writer: Writer, location: Path) -> Self {
756 let (_, rx) = oneshot::channel();
758
759 Self {
760 writer: Arc::new(Mutex::new(writer)),
761 location,
762 next_notify: rx,
763 }
764 }
765}
766
767#[async_trait]
768impl MultipartUpload for OpendalMultipartUpload {
769 fn put_part(&mut self, data: PutPayload) -> UploadPart {
770 let writer = self.writer.clone();
771 let location = self.location.clone();
772
773 let (tx, rx) = oneshot::channel();
775 let last_rx = std::mem::replace(&mut self.next_notify, rx);
777
778 async move {
779 let _ = last_rx.await;
781
782 let mut writer = writer.lock().await;
783 let result = writer
784 .write(Buffer::from_iter(data))
785 .await
786 .map_err(|err| format_object_store_error(err, location.as_ref()));
787
788 drop(tx);
790
791 result
792 }
793 .into_send()
794 .boxed()
795 }
796
797 async fn complete(&mut self) -> object_store::Result<PutResult> {
798 let mut writer = self.writer.lock().await;
799 let metadata = writer
800 .close()
801 .into_send()
802 .await
803 .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
804
805 let e_tag = metadata.etag().map(|s| s.to_string());
806 let version = metadata.version().map(|s| s.to_string());
807
808 Ok(PutResult { e_tag, version })
809 }
810
811 async fn abort(&mut self) -> object_store::Result<()> {
812 let mut writer = self.writer.lock().await;
813 writer
814 .abort()
815 .into_send()
816 .await
817 .map_err(|err| format_object_store_error(err, self.location.as_ref()))
818 }
819}
820
821impl Debug for OpendalMultipartUpload {
822 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
823 f.debug_struct("OpendalMultipartUpload")
824 .field("location", &self.location)
825 .finish()
826 }
827}
828
829#[cfg(test)]
830mod tests {
831 use bytes::Bytes;
832 use object_store::path::Path;
833 use object_store::{ObjectStore, ObjectStoreExt, WriteMultipart};
834 use opendal::services;
835 use rand::prelude::*;
836 use rand::rng;
837 use std::sync::Arc;
838
839 use super::*;
840
841 async fn create_test_object_store() -> Arc<dyn ObjectStore> {
842 let op = Operator::new(services::Memory::default()).unwrap();
843 let object_store = Arc::new(OpendalStore::new(op));
844
845 let path: Path = "data/test.txt".into();
846 let bytes = Bytes::from_static(b"hello, world!");
847 object_store.put(&path, bytes.into()).await.unwrap();
848
849 let path: Path = "data/nested/test.txt".into();
850 let bytes = Bytes::from_static(b"hello, world! I am nested.");
851 object_store.put(&path, bytes.into()).await.unwrap();
852
853 object_store
854 }
855
856 #[tokio::test]
857 async fn test_basic() {
858 let op = Operator::new(services::Memory::default()).unwrap();
859 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
860
861 let path: Path = "data/test.txt".into();
863
864 let bytes = Bytes::from_static(b"hello, world!");
865 object_store.put(&path, bytes.clone().into()).await.unwrap();
866
867 let meta = object_store.head(&path).await.unwrap();
868
869 assert_eq!(meta.size, 13);
870
871 assert_eq!(
872 object_store
873 .get(&path)
874 .await
875 .unwrap()
876 .bytes()
877 .await
878 .unwrap(),
879 bytes
880 );
881 }
882
883 #[tokio::test]
884 async fn test_put_multipart() {
885 let op = Operator::new(services::Memory::default()).unwrap();
886 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
887
888 let mut rng = rng();
889
890 let path: Path = "data/test_complete.txt".into();
892 let upload = object_store.put_multipart(&path).await.unwrap();
893
894 let mut write = WriteMultipart::new(upload);
895
896 let mut all_bytes = vec![];
897 let round = rng.random_range(1..=1024);
898 for _ in 0..round {
899 let size = rng.random_range(1..=1024);
900 let mut bytes = vec![0; size];
901 rng.fill_bytes(&mut bytes);
902
903 all_bytes.extend_from_slice(&bytes);
904 write.put(bytes.into());
905 }
906
907 let _ = write.finish().await.unwrap();
908
909 let meta = object_store.head(&path).await.unwrap();
910
911 assert_eq!(meta.size, all_bytes.len() as u64);
912
913 assert_eq!(
914 object_store
915 .get(&path)
916 .await
917 .unwrap()
918 .bytes()
919 .await
920 .unwrap(),
921 Bytes::from(all_bytes)
922 );
923
924 let path: Path = "data/test_abort.txt".into();
926 let mut upload = object_store.put_multipart(&path).await.unwrap();
927 upload.put_part(vec![1; 1024].into()).await.unwrap();
928 upload.abort().await.unwrap();
929
930 let res = object_store.head(&path).await;
931 let err = res.unwrap_err();
932
933 assert!(matches!(err, object_store::Error::NotFound { .. }))
934 }
935
936 #[tokio::test]
937 async fn test_list() {
938 let object_store = create_test_object_store().await;
939 let path: Path = "data/".into();
940 let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
941 assert_eq!(results.len(), 2);
942 let mut locations = results
943 .iter()
944 .map(|x| x.as_ref().unwrap().location.as_ref())
945 .collect::<Vec<_>>();
946
947 let expected_files = vec![
948 (
949 "data/nested/test.txt",
950 Bytes::from_static(b"hello, world! I am nested."),
951 ),
952 ("data/test.txt", Bytes::from_static(b"hello, world!")),
953 ];
954
955 let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
956
957 locations.sort();
958 assert_eq!(locations, expected_locations);
959
960 for (location, bytes) in expected_files {
961 let path: Path = location.into();
962 assert_eq!(
963 object_store
964 .get(&path)
965 .await
966 .unwrap()
967 .bytes()
968 .await
969 .unwrap(),
970 bytes
971 );
972 }
973 }
974
975 #[tokio::test]
976 async fn test_list_with_delimiter() {
977 let object_store = create_test_object_store().await;
978 let path: Path = "data/".into();
979 let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
980 assert_eq!(result.objects.len(), 1);
981 assert_eq!(result.common_prefixes.len(), 1);
982 assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
983 assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
984 }
985
986 #[tokio::test]
987 async fn test_list_with_offset() {
988 let object_store = create_test_object_store().await;
989 let path: Path = "data/".into();
990 let offset: Path = "data/nested/test.txt".into();
991 let result = object_store
992 .list_with_offset(Some(&path), &offset)
993 .collect::<Vec<_>>()
994 .await;
995 assert_eq!(result.len(), 1);
996 assert_eq!(
997 result[0].as_ref().unwrap().location.as_ref(),
998 "data/test.txt"
999 );
1000 }
1001
1002 mod stat_counter {
1004 use super::*;
1005 use std::sync::atomic::{AtomicUsize, Ordering};
1006
1007 #[derive(Debug, Clone)]
1008 pub struct StatCounterLayer {
1009 count: Arc<AtomicUsize>,
1010 }
1011
1012 impl StatCounterLayer {
1013 pub fn new(count: Arc<AtomicUsize>) -> Self {
1014 Self { count }
1015 }
1016 }
1017
1018 impl opendal::raw::Layer for StatCounterLayer {
1019 fn apply_service(&self, srv: opendal::raw::Servicer) -> opendal::raw::Servicer {
1020 Arc::new(StatCounterService {
1021 srv,
1022 count: self.count.clone(),
1023 })
1024 }
1025 }
1026
1027 #[derive(Debug)]
1028 pub struct StatCounterService {
1029 srv: opendal::raw::Servicer,
1030 count: Arc<AtomicUsize>,
1031 }
1032
1033 impl opendal::raw::Service for StatCounterService {
1034 type Reader = opendal::raw::oio::Reader;
1035 type Writer = opendal::raw::oio::Writer;
1036 type Lister = opendal::raw::oio::Lister;
1037 type Deleter = opendal::raw::oio::Deleter;
1038 type Copier = opendal::raw::oio::Copier;
1039
1040 fn info(&self) -> opendal::raw::ServiceInfo {
1041 self.srv.info()
1042 }
1043
1044 fn capability(&self) -> opendal::Capability {
1045 self.srv.capability()
1046 }
1047
1048 async fn create_dir(
1049 &self,
1050 ctx: &opendal::OperationContext,
1051 path: &str,
1052 args: opendal::raw::OpCreateDir,
1053 ) -> opendal::Result<opendal::raw::RpCreateDir> {
1054 self.srv.create_dir(ctx, path, args).await
1055 }
1056
1057 async fn stat(
1058 &self,
1059 ctx: &opendal::OperationContext,
1060 path: &str,
1061 args: opendal::raw::OpStat,
1062 ) -> opendal::Result<opendal::raw::RpStat> {
1063 self.count.fetch_add(1, Ordering::SeqCst);
1064 self.srv.stat(ctx, path, args).await
1065 }
1066
1067 fn read(
1068 &self,
1069 ctx: &opendal::OperationContext,
1070 path: &str,
1071 args: opendal::raw::OpRead,
1072 ) -> opendal::Result<Self::Reader> {
1073 self.srv.read(ctx, path, args)
1074 }
1075
1076 fn write(
1077 &self,
1078 ctx: &opendal::OperationContext,
1079 path: &str,
1080 args: opendal::raw::OpWrite,
1081 ) -> opendal::Result<Self::Writer> {
1082 self.srv.write(ctx, path, args)
1083 }
1084
1085 fn delete(&self, ctx: &opendal::OperationContext) -> opendal::Result<Self::Deleter> {
1086 self.srv.delete(ctx)
1087 }
1088
1089 fn list(
1090 &self,
1091 ctx: &opendal::OperationContext,
1092 path: &str,
1093 args: opendal::raw::OpList,
1094 ) -> opendal::Result<Self::Lister> {
1095 self.srv.list(ctx, path, args)
1096 }
1097
1098 fn copy(
1099 &self,
1100 ctx: &opendal::OperationContext,
1101 from: &str,
1102 to: &str,
1103 args: opendal::raw::OpCopy,
1104 opts: opendal::raw::OpCopier,
1105 ) -> opendal::Result<Self::Copier> {
1106 self.srv.copy(ctx, from, to, args, opts)
1107 }
1108
1109 async fn rename(
1110 &self,
1111 ctx: &opendal::OperationContext,
1112 from: &str,
1113 to: &str,
1114 args: opendal::raw::OpRename,
1115 ) -> opendal::Result<opendal::raw::RpRename> {
1116 self.srv.rename(ctx, from, to, args).await
1117 }
1118
1119 async fn presign(
1120 &self,
1121 ctx: &opendal::OperationContext,
1122 path: &str,
1123 args: opendal::raw::OpPresign,
1124 ) -> opendal::Result<opendal::raw::RpPresign> {
1125 self.srv.presign(ctx, path, args).await
1126 }
1127 }
1128 }
1129
1130 #[tokio::test]
1131 async fn test_get_range_no_stat() {
1132 use std::sync::atomic::{AtomicUsize, Ordering};
1133
1134 let stat_count = Arc::new(AtomicUsize::new(0));
1136 let op = Operator::new(opendal::services::Memory::default())
1137 .unwrap()
1138 .layer(stat_counter::StatCounterLayer::new(stat_count.clone()));
1139 let store = OpendalStore::new(op);
1140
1141 let location = "test_get_range.txt".into();
1143 let value = Bytes::from_static(b"Hello, world!");
1144 store.put(&location, value.clone().into()).await.unwrap();
1145
1146 stat_count.store(0, Ordering::SeqCst);
1148
1149 #[allow(clippy::single_range_in_vec_init)]
1151 let ranges = [0u64..5];
1152 let ret = store.get_ranges(&location, &ranges).await.unwrap();
1153 assert_eq!(Bytes::from_static(b"Hello"), ret[0]);
1154 assert_eq!(
1155 stat_count.load(Ordering::SeqCst),
1156 0,
1157 "get_ranges should not call stat()"
1158 );
1159
1160 stat_count.store(0, Ordering::SeqCst);
1162
1163 let opts = object_store::GetOptions {
1165 range: Some(object_store::GetRange::Bounded(0..5)),
1166 ..Default::default()
1167 };
1168 let ret = store.get_opts(&location, opts).await.unwrap();
1169 assert_eq!(ret.meta.size, value.len() as u64);
1170 assert_eq!(ret.range, 0..5);
1171 let data = ret.bytes().await.unwrap();
1172 assert_eq!(Bytes::from_static(b"Hello"), data);
1173 assert_eq!(
1174 stat_count.load(Ordering::SeqCst),
1175 0,
1176 "get_opts should not call stat() for bounded range reads"
1177 );
1178
1179 stat_count.store(0, Ordering::SeqCst);
1181
1182 let opts = object_store::GetOptions {
1184 range: Some(object_store::GetRange::Offset(7)),
1185 ..Default::default()
1186 };
1187 let ret = store.get_opts(&location, opts).await.unwrap();
1188 assert_eq!(ret.meta.size, value.len() as u64);
1189 assert_eq!(ret.range, 7..value.len() as u64);
1190 let data = ret.bytes().await.unwrap();
1191 assert_eq!(Bytes::from_static(b"world!"), data);
1192 assert_eq!(
1193 stat_count.load(Ordering::SeqCst),
1194 0,
1195 "get_opts should not call stat() for offset range reads"
1196 );
1197
1198 stat_count.store(0, Ordering::SeqCst);
1200
1201 let ret = store
1203 .get_opts(&location, object_store::GetOptions::default())
1204 .await
1205 .unwrap();
1206 assert_eq!(ret.meta.size, value.len() as u64);
1207 assert_eq!(ret.range, 0..value.len() as u64);
1208 let data = ret.bytes().await.unwrap();
1209 assert_eq!(value, data);
1210 assert_eq!(
1211 stat_count.load(Ordering::SeqCst),
1212 0,
1213 "get_opts should not call stat() for full reads"
1214 );
1215
1216 stat_count.store(0, Ordering::SeqCst);
1218
1219 let opts = object_store::GetOptions {
1221 range: Some(object_store::GetRange::Suffix(6)),
1222 ..Default::default()
1223 };
1224 let ret = store.get_opts(&location, opts).await.unwrap();
1225 assert_eq!(ret.meta.size, value.len() as u64);
1226 assert_eq!(ret.range, 7..value.len() as u64);
1227 let data = ret.bytes().await.unwrap();
1228 assert_eq!(Bytes::from_static(b"world!"), data);
1229 assert_eq!(
1230 stat_count.load(Ordering::SeqCst),
1231 0,
1232 "get_opts should not call stat() for suffix range reads"
1233 );
1234
1235 store.delete(&location).await.unwrap();
1237 }
1238}