1use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::ops::Range;
22use std::sync::Arc;
23
24use crate::datetime_to_timestamp;
25use crate::utils::*;
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::FutureExt;
29use futures::StreamExt;
30use futures::TryStreamExt;
31use futures::stream::BoxStream;
32use mea::mutex::Mutex;
33use mea::oneshot;
34use object_store::Attributes;
35use object_store::CopyMode as ObjectStoreCopyMode;
36use object_store::CopyOptions as ObjectStoreCopyOptions;
37use object_store::ListResult;
38use object_store::MultipartUpload;
39use object_store::ObjectMeta;
40use object_store::ObjectStore;
41use object_store::PutMultipartOptions;
42use object_store::PutOptions;
43use object_store::PutPayload;
44use object_store::PutResult;
45use object_store::path::Path;
46use object_store::{GetOptions, UploadPart};
47use object_store::{GetRange, GetResultPayload};
48use object_store::{GetResult, PutMode};
49use opendal::Buffer;
50use opendal::Writer;
51use opendal::options::CopyOptions;
52use opendal::options::ReaderOptions;
53use opendal::options::StatOptions;
54use opendal::raw::percent_decode_path;
55use opendal::{Operator, OperatorInfo};
56use std::collections::HashMap;
57
58const DEFAULT_CONCURRENT: usize = 8;
59
60fn format_object_attributes(meta: &opendal::Metadata) -> Attributes {
61 let mut attributes = Attributes::new();
62 if let Some(user_meta) = meta.user_metadata() {
63 for (key, value) in user_meta {
64 attributes.insert(
65 object_store::Attribute::Metadata(key.clone().into()),
66 value.clone().into(),
67 );
68 }
69 }
70
71 attributes
72}
73
74fn format_reader_options(options: &GetOptions, content_length_hint: Option<u64>) -> ReaderOptions {
75 ReaderOptions {
76 version: options.version.clone(),
77 if_match: options.if_match.clone(),
78 if_none_match: options.if_none_match.clone(),
79 if_modified_since: options.if_modified_since.and_then(datetime_to_timestamp),
80 if_unmodified_since: options.if_unmodified_since.and_then(datetime_to_timestamp),
81 content_length_hint,
82 ..Default::default()
83 }
84}
85
86fn format_stat_options(options: &GetOptions) -> StatOptions {
87 StatOptions {
88 version: options.version.clone(),
89 if_match: options.if_match.clone(),
90 if_none_match: options.if_none_match.clone(),
91 if_modified_since: options.if_modified_since.and_then(datetime_to_timestamp),
92 if_unmodified_since: options.if_unmodified_since.and_then(datetime_to_timestamp),
93 ..Default::default()
94 }
95}
96
97fn format_read_range(range: Option<&GetRange>, size: u64) -> Range<u64> {
98 match range {
99 Some(GetRange::Bounded(r)) => {
100 if r.start >= r.end || r.start >= size {
101 0..0
102 } else {
103 r.start..r.end.min(size)
104 }
105 }
106 Some(GetRange::Offset(offset)) => {
107 if *offset < size {
108 *offset..size
109 } else {
110 0..0
111 }
112 }
113 Some(GetRange::Suffix(suffix)) if *suffix < size => (size - *suffix)..size,
114 _ => 0..size,
115 }
116}
117
118fn format_without_stat_error(err: opendal::Error, path: &str) -> object_store::Error {
119 match err.kind() {
120 opendal::ErrorKind::Unsupported | opendal::ErrorKind::RangeNotSatisfied => {
123 object_store::Error::NotSupported {
124 source: Box::new(err),
125 }
126 }
127 _ => format_object_store_error(err, path),
128 }
129}
130
131#[derive(Clone)]
179pub struct OpendalStore {
180 info: Arc<OperatorInfo>,
181 inner: Operator,
182}
183
184impl OpendalStore {
185 pub fn new(op: Operator) -> Self {
187 Self {
188 info: op.info().into(),
189 inner: op,
190 }
191 }
192
193 pub fn info(&self) -> &OperatorInfo {
195 self.info.as_ref()
196 }
197
198 async fn get_opts_without_stat(
199 &self,
200 location: &Path,
201 raw_location: &str,
202 options: &GetOptions,
203 ) -> object_store::Result<GetResult> {
204 let reader = self
205 .inner
206 .reader_options(raw_location, format_reader_options(options, None))
207 .into_send()
208 .await
209 .map_err(|err| format_without_stat_error(err, location.as_ref()))?;
210
211 let mut stream = match options.range.as_ref() {
212 Some(GetRange::Bounded(range)) => {
213 reader
214 .into_bytes_stream(range.start..range.end)
215 .into_send()
216 .await
217 }
218 Some(GetRange::Offset(offset)) => reader.into_bytes_stream(*offset..).into_send().await,
219 Some(GetRange::Suffix(_)) => unreachable!("suffix range needs object metadata"),
220 None => reader.into_bytes_stream(..).into_send().await,
221 }
222 .map_err(|err| format_without_stat_error(err, location.as_ref()))?;
223
224 let metadata = stream
225 .metadata()
226 .into_send()
227 .await
228 .map_err(|err| format_without_stat_error(err, location.as_ref()))?;
229 let attributes = format_object_attributes(&metadata);
230 let meta = format_object_meta(location.as_ref(), &metadata);
231 let read_range = format_read_range(options.range.as_ref(), meta.size);
232
233 if read_range.start >= read_range.end {
234 return Ok(GetResult {
235 payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
236 range: read_range,
237 meta,
238 attributes,
239 });
240 }
241
242 if matches!(
243 options.range.as_ref(),
244 Some(GetRange::Bounded(range)) if range.end > meta.size
245 ) {
246 let reader = self
247 .inner
248 .reader_options(
249 raw_location,
250 format_reader_options(options, Some(meta.size)),
251 )
252 .into_send()
253 .await
254 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
255 stream = reader
256 .into_bytes_stream(read_range.start..read_range.end)
257 .into_send()
258 .await
259 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
260 }
261
262 let stream = stream
263 .into_send()
264 .map_err(|err: io::Error| object_store::Error::Generic {
265 store: "IoError",
266 source: Box::new(err),
267 });
268
269 Ok(GetResult {
270 payload: GetResultPayload::Stream(Box::pin(stream)),
271 range: read_range,
272 meta,
273 attributes,
274 })
275 }
276
277 async fn get_opts_with_stat(
278 &self,
279 location: &Path,
280 raw_location: &str,
281 options: &GetOptions,
282 ) -> object_store::Result<GetResult> {
283 let metadata = self
284 .inner
285 .stat_options(raw_location, format_stat_options(options))
286 .into_send()
287 .await
288 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
289 let attributes = format_object_attributes(&metadata);
290 let meta = format_object_meta(location.as_ref(), &metadata);
291
292 if options.head {
293 return Ok(GetResult {
294 payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
295 range: 0..0,
296 meta,
297 attributes,
298 });
299 }
300
301 let read_range = format_read_range(options.range.as_ref(), meta.size);
302 if read_range.start >= read_range.end {
303 return Ok(GetResult {
304 payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
305 range: read_range,
306 meta,
307 attributes,
308 });
309 }
310
311 let reader = self
312 .inner
313 .reader_options(
314 raw_location,
315 format_reader_options(options, Some(meta.size)),
316 )
317 .into_send()
318 .await
319 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
320 let stream = reader
321 .into_bytes_stream(read_range.start..read_range.end)
322 .into_send()
323 .await
324 .map_err(|err| format_object_store_error(err, location.as_ref()))?
325 .into_send()
326 .map_err(|err: io::Error| object_store::Error::Generic {
327 store: "IoError",
328 source: Box::new(err),
329 });
330
331 Ok(GetResult {
332 payload: GetResultPayload::Stream(Box::pin(stream)),
333 range: read_range,
334 meta,
335 attributes,
336 })
337 }
338
339 async fn copy_request(
341 &self,
342 from: &Path,
343 to: &Path,
344 if_not_exists: bool,
345 ) -> object_store::Result<()> {
346 let mut copy_options = CopyOptions::default();
347 if if_not_exists {
348 copy_options.if_not_exists = true;
349 }
350
351 self.inner
353 .copy_options(
354 &percent_decode_path(from.as_ref()),
355 &percent_decode_path(to.as_ref()),
356 copy_options,
357 )
358 .into_send()
359 .await
360 .map_err(|err| {
361 if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
362 object_store::Error::AlreadyExists {
363 path: to.to_string(),
364 source: Box::new(err),
365 }
366 } else {
367 format_object_store_error(err, from.as_ref())
368 }
369 })?;
370
371 Ok(())
372 }
373}
374
375impl Debug for OpendalStore {
376 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
377 f.debug_struct("OpendalStore")
378 .field("scheme", &self.info.scheme())
379 .field("name", &self.info.name())
380 .field("root", &self.info.root())
381 .field("capability", &self.info.full_capability())
382 .finish()
383 }
384}
385
386impl Display for OpendalStore {
387 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
388 let info = self.inner.info();
389 write!(
390 f,
391 "Opendal({}, bucket={}, root={})",
392 info.scheme(),
393 info.name(),
394 info.root()
395 )
396 }
397}
398
399impl From<Operator> for OpendalStore {
400 fn from(value: Operator) -> Self {
401 Self::new(value)
402 }
403}
404
405#[async_trait]
406impl ObjectStore for OpendalStore {
407 async fn put_opts(
408 &self,
409 location: &Path,
410 bytes: PutPayload,
411 opts: PutOptions,
412 ) -> object_store::Result<PutResult> {
413 let decoded_location = percent_decode_path(location.as_ref());
414 let mut future_write = self
415 .inner
416 .write_with(&decoded_location, Buffer::from_iter(bytes));
417 let opts_mode = opts.mode.clone();
418 match opts.mode {
419 PutMode::Overwrite => {}
420 PutMode::Create => {
421 future_write = future_write.if_not_exists(true);
422 }
423 PutMode::Update(update_version) => {
424 let Some(etag) = update_version.e_tag else {
425 Err(object_store::Error::NotSupported {
426 source: Box::new(opendal::Error::new(
427 opendal::ErrorKind::Unsupported,
428 "etag is required for conditional put",
429 )),
430 })?
431 };
432 future_write = future_write.if_match(etag.as_str());
433 }
434 }
435 let rp = future_write.into_send().await.map_err(|err| {
436 match format_object_store_error(err, location.as_ref()) {
437 object_store::Error::Precondition { path, source }
438 if opts_mode == PutMode::Create =>
439 {
440 object_store::Error::AlreadyExists { path, source }
441 }
442 e => e,
443 }
444 })?;
445
446 let e_tag = rp.etag().map(|s| s.to_string());
447 let version = rp.version().map(|s| s.to_string());
448
449 Ok(PutResult { e_tag, version })
450 }
451
452 async fn put_multipart_opts(
453 &self,
454 location: &Path,
455 opts: PutMultipartOptions,
456 ) -> object_store::Result<Box<dyn MultipartUpload>> {
457 let mut options = opendal::options::WriteOptions {
458 concurrent: DEFAULT_CONCURRENT,
459 ..Default::default()
460 };
461
462 let mut user_metadata = HashMap::new();
464
465 for (key, value) in opts.attributes.iter() {
467 match key {
468 object_store::Attribute::CacheControl => {
469 options.cache_control = Some(value.to_string());
470 }
471 object_store::Attribute::ContentDisposition => {
472 options.content_disposition = Some(value.to_string());
473 }
474 object_store::Attribute::ContentEncoding => {
475 options.content_encoding = Some(value.to_string());
476 }
477 object_store::Attribute::ContentLanguage => {
478 continue;
480 }
481 object_store::Attribute::ContentType => {
482 options.content_type = Some(value.to_string());
483 }
484 object_store::Attribute::Metadata(k) => {
485 user_metadata.insert(k.to_string(), value.to_string());
486 }
487 _ => {}
488 }
489 }
490
491 if !user_metadata.is_empty() {
493 options.user_metadata = Some(user_metadata);
494 }
495
496 let decoded_location = percent_decode_path(location.as_ref());
497 let writer = self
498 .inner
499 .writer_options(&decoded_location, options)
500 .into_send()
501 .await
502 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
503 let upload = OpendalMultipartUpload::new(writer, location.clone());
504
505 Ok(Box::new(upload))
506 }
507
508 async fn get_opts(
509 &self,
510 location: &Path,
511 options: GetOptions,
512 ) -> object_store::Result<GetResult> {
513 let raw_location = percent_decode_path(location.as_ref());
514
515 if options.head {
516 return self
517 .get_opts_with_stat(location, &raw_location, &options)
518 .await;
519 }
520
521 if !matches!(options.range.as_ref(), Some(GetRange::Suffix(_))) {
522 match self
523 .get_opts_without_stat(location, &raw_location, &options)
524 .await
525 {
526 Ok(result) => return Ok(result),
527 Err(object_store::Error::NotSupported { .. }) => {}
528 Err(err) => return Err(err),
529 }
530 }
531
532 self.get_opts_with_stat(location, &raw_location, &options)
533 .await
534 }
535
536 async fn get_ranges(
537 &self,
538 location: &Path,
539 ranges: &[Range<u64>],
540 ) -> object_store::Result<Vec<Bytes>> {
541 let raw_location = percent_decode_path(location.as_ref());
542 let reader = self
543 .inner
544 .reader(&raw_location)
545 .into_send()
546 .await
547 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
548
549 let location_ref: Arc<str> = Arc::from(location.as_ref());
550 futures::stream::iter(ranges.iter().cloned())
551 .map(|range| {
552 let reader = reader.clone();
553 let location_ref = location_ref.clone();
554 async move {
555 reader
556 .read(range)
557 .into_send()
558 .await
559 .map(|buf| buf.to_bytes())
560 .map_err(|err| format_object_store_error(err, &location_ref))
561 }
562 })
563 .buffered(DEFAULT_CONCURRENT)
564 .try_collect()
565 .await
566 }
567
568 fn delete_stream(
569 &self,
570 locations: BoxStream<'static, object_store::Result<Path>>,
571 ) -> BoxStream<'static, object_store::Result<Path>> {
572 let this = self.clone();
573 locations
574 .and_then(move |location| {
575 let this = this.clone();
576 async move {
577 let decoded = percent_decode_path(location.as_ref());
578 this.inner
579 .delete(&decoded)
580 .into_send()
581 .await
582 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
583 Ok(location)
584 }
585 .into_send()
586 })
587 .boxed()
588 }
589
590 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
591 let path = prefix.map_or("".into(), |x| {
594 format!("{}/", percent_decode_path(x.as_ref()))
595 });
596
597 let this = self.clone();
598 let fut = async move {
599 let stream = this
600 .inner
601 .lister_with(&path)
602 .recursive(true)
603 .await
604 .map_err(|err| format_object_store_error(err, &path))?;
605
606 let stream = stream.then(|res| async {
607 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
608 let meta = entry.metadata();
609
610 Ok(format_object_meta(entry.path(), meta))
611 });
612 Ok::<_, object_store::Error>(stream)
613 };
614
615 fut.into_stream().try_flatten().into_send().boxed()
616 }
617
618 fn list_with_offset(
619 &self,
620 prefix: Option<&Path>,
621 offset: &Path,
622 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
623 let path = prefix.map_or("".into(), |x| {
624 format!("{}/", percent_decode_path(x.as_ref()))
625 });
626 let offset = offset.clone();
627
628 let this = self.clone();
631
632 let fut = async move {
633 let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
634 let mut fut = this.inner.lister_with(&path).recursive(true);
635
636 if list_with_start_after {
638 fut = fut.start_after(offset.as_ref());
639 }
640
641 let lister = fut
642 .await
643 .map_err(|err| format_object_store_error(err, &path))?
644 .then(move |entry| {
645 let path = path.clone();
646 let this = this.clone();
647 async move {
648 let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
649 let (path, metadata) = entry.into_parts();
650
651 if metadata.is_dir() || metadata.last_modified().is_some() {
653 let object_meta = format_object_meta(&path, &metadata);
654 return Ok(object_meta);
655 }
656
657 let metadata = this
658 .inner
659 .stat(&path)
660 .await
661 .map_err(|err| format_object_store_error(err, &path))?;
662 let object_meta = format_object_meta(&path, &metadata);
663 Ok::<_, object_store::Error>(object_meta)
664 }
665 })
666 .into_send()
667 .boxed();
668
669 let stream = if list_with_start_after {
670 lister
671 } else {
672 lister
673 .try_filter(move |entry| futures::future::ready(entry.location > offset))
674 .into_send()
675 .boxed()
676 };
677
678 Ok::<_, object_store::Error>(stream)
679 };
680
681 fut.into_stream().into_send().try_flatten().boxed()
682 }
683
684 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
685 let path = prefix.map_or("".into(), |x| {
686 format!("{}/", percent_decode_path(x.as_ref()))
687 });
688 let mut stream = self
689 .inner
690 .lister_with(&path)
691 .into_future()
692 .into_send()
693 .await
694 .map_err(|err| format_object_store_error(err, &path))?
695 .into_send();
696
697 let mut common_prefixes = Vec::new();
698 let mut objects = Vec::new();
699
700 while let Some(res) = stream.next().into_send().await {
701 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
702 let meta = entry.metadata();
703
704 if meta.is_dir() {
705 common_prefixes.push(entry.path().into());
706 } else if meta.last_modified().is_some() {
707 objects.push(format_object_meta(entry.path(), meta));
708 } else {
709 let meta = self
710 .inner
711 .stat(entry.path())
712 .into_send()
713 .await
714 .map_err(|err| format_object_store_error(err, entry.path()))?;
715 objects.push(format_object_meta(entry.path(), &meta));
716 }
717 }
718
719 Ok(ListResult {
720 common_prefixes,
721 objects,
722 })
723 }
724
725 async fn copy_opts(
726 &self,
727 from: &Path,
728 to: &Path,
729 options: ObjectStoreCopyOptions,
730 ) -> object_store::Result<()> {
731 let if_not_exists = matches!(options.mode, ObjectStoreCopyMode::Create);
732 self.copy_request(from, to, if_not_exists).await
733 }
734}
735
736struct OpendalMultipartUpload {
745 writer: Arc<Mutex<Writer>>,
746 location: Path,
747 next_notify: oneshot::Receiver<()>,
748}
749
750impl OpendalMultipartUpload {
751 fn new(writer: Writer, location: Path) -> Self {
752 let (_, rx) = oneshot::channel();
754
755 Self {
756 writer: Arc::new(Mutex::new(writer)),
757 location,
758 next_notify: rx,
759 }
760 }
761}
762
763#[async_trait]
764impl MultipartUpload for OpendalMultipartUpload {
765 fn put_part(&mut self, data: PutPayload) -> UploadPart {
766 let writer = self.writer.clone();
767 let location = self.location.clone();
768
769 let (tx, rx) = oneshot::channel();
771 let last_rx = std::mem::replace(&mut self.next_notify, rx);
773
774 async move {
775 let _ = last_rx.await;
777
778 let mut writer = writer.lock().await;
779 let result = writer
780 .write(Buffer::from_iter(data))
781 .await
782 .map_err(|err| format_object_store_error(err, location.as_ref()));
783
784 drop(tx);
786
787 result
788 }
789 .into_send()
790 .boxed()
791 }
792
793 async fn complete(&mut self) -> object_store::Result<PutResult> {
794 let mut writer = self.writer.lock().await;
795 let metadata = writer
796 .close()
797 .into_send()
798 .await
799 .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
800
801 let e_tag = metadata.etag().map(|s| s.to_string());
802 let version = metadata.version().map(|s| s.to_string());
803
804 Ok(PutResult { e_tag, version })
805 }
806
807 async fn abort(&mut self) -> object_store::Result<()> {
808 let mut writer = self.writer.lock().await;
809 writer
810 .abort()
811 .into_send()
812 .await
813 .map_err(|err| format_object_store_error(err, self.location.as_ref()))
814 }
815}
816
817impl Debug for OpendalMultipartUpload {
818 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
819 f.debug_struct("OpendalMultipartUpload")
820 .field("location", &self.location)
821 .finish()
822 }
823}
824
825#[cfg(test)]
826mod tests {
827 use bytes::Bytes;
828 use object_store::path::Path;
829 use object_store::{ObjectStore, ObjectStoreExt, WriteMultipart};
830 use opendal::services;
831 use rand::prelude::*;
832 use rand::rng;
833 use std::sync::Arc;
834
835 use super::*;
836
837 async fn create_test_object_store() -> Arc<dyn ObjectStore> {
838 let op = Operator::new(services::Memory::default()).unwrap().finish();
839 let object_store = Arc::new(OpendalStore::new(op));
840
841 let path: Path = "data/test.txt".into();
842 let bytes = Bytes::from_static(b"hello, world!");
843 object_store.put(&path, bytes.into()).await.unwrap();
844
845 let path: Path = "data/nested/test.txt".into();
846 let bytes = Bytes::from_static(b"hello, world! I am nested.");
847 object_store.put(&path, bytes.into()).await.unwrap();
848
849 object_store
850 }
851
852 #[tokio::test]
853 async fn test_basic() {
854 let op = Operator::new(services::Memory::default()).unwrap().finish();
855 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
856
857 let path: Path = "data/test.txt".into();
859
860 let bytes = Bytes::from_static(b"hello, world!");
861 object_store.put(&path, bytes.clone().into()).await.unwrap();
862
863 let meta = object_store.head(&path).await.unwrap();
864
865 assert_eq!(meta.size, 13);
866
867 assert_eq!(
868 object_store
869 .get(&path)
870 .await
871 .unwrap()
872 .bytes()
873 .await
874 .unwrap(),
875 bytes
876 );
877 }
878
879 #[tokio::test]
880 async fn test_put_multipart() {
881 let op = Operator::new(services::Memory::default()).unwrap().finish();
882 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
883
884 let mut rng = rng();
885
886 let path: Path = "data/test_complete.txt".into();
888 let upload = object_store.put_multipart(&path).await.unwrap();
889
890 let mut write = WriteMultipart::new(upload);
891
892 let mut all_bytes = vec![];
893 let round = rng.random_range(1..=1024);
894 for _ in 0..round {
895 let size = rng.random_range(1..=1024);
896 let mut bytes = vec![0; size];
897 rng.fill_bytes(&mut bytes);
898
899 all_bytes.extend_from_slice(&bytes);
900 write.put(bytes.into());
901 }
902
903 let _ = write.finish().await.unwrap();
904
905 let meta = object_store.head(&path).await.unwrap();
906
907 assert_eq!(meta.size, all_bytes.len() as u64);
908
909 assert_eq!(
910 object_store
911 .get(&path)
912 .await
913 .unwrap()
914 .bytes()
915 .await
916 .unwrap(),
917 Bytes::from(all_bytes)
918 );
919
920 let path: Path = "data/test_abort.txt".into();
922 let mut upload = object_store.put_multipart(&path).await.unwrap();
923 upload.put_part(vec![1; 1024].into()).await.unwrap();
924 upload.abort().await.unwrap();
925
926 let res = object_store.head(&path).await;
927 let err = res.unwrap_err();
928
929 assert!(matches!(err, object_store::Error::NotFound { .. }))
930 }
931
932 #[tokio::test]
933 async fn test_list() {
934 let object_store = create_test_object_store().await;
935 let path: Path = "data/".into();
936 let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
937 assert_eq!(results.len(), 2);
938 let mut locations = results
939 .iter()
940 .map(|x| x.as_ref().unwrap().location.as_ref())
941 .collect::<Vec<_>>();
942
943 let expected_files = vec![
944 (
945 "data/nested/test.txt",
946 Bytes::from_static(b"hello, world! I am nested."),
947 ),
948 ("data/test.txt", Bytes::from_static(b"hello, world!")),
949 ];
950
951 let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
952
953 locations.sort();
954 assert_eq!(locations, expected_locations);
955
956 for (location, bytes) in expected_files {
957 let path: Path = location.into();
958 assert_eq!(
959 object_store
960 .get(&path)
961 .await
962 .unwrap()
963 .bytes()
964 .await
965 .unwrap(),
966 bytes
967 );
968 }
969 }
970
971 #[tokio::test]
972 async fn test_list_with_delimiter() {
973 let object_store = create_test_object_store().await;
974 let path: Path = "data/".into();
975 let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
976 assert_eq!(result.objects.len(), 1);
977 assert_eq!(result.common_prefixes.len(), 1);
978 assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
979 assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
980 }
981
982 #[tokio::test]
983 async fn test_list_with_offset() {
984 let object_store = create_test_object_store().await;
985 let path: Path = "data/".into();
986 let offset: Path = "data/nested/test.txt".into();
987 let result = object_store
988 .list_with_offset(Some(&path), &offset)
989 .collect::<Vec<_>>()
990 .await;
991 assert_eq!(result.len(), 1);
992 assert_eq!(
993 result[0].as_ref().unwrap().location.as_ref(),
994 "data/test.txt"
995 );
996 }
997
998 mod stat_counter {
1000 use super::*;
1001 use std::sync::atomic::{AtomicUsize, Ordering};
1002
1003 #[derive(Debug, Clone)]
1004 pub struct StatCounterLayer {
1005 count: Arc<AtomicUsize>,
1006 }
1007
1008 impl StatCounterLayer {
1009 pub fn new(count: Arc<AtomicUsize>) -> Self {
1010 Self { count }
1011 }
1012 }
1013
1014 impl<A: opendal::raw::Access> opendal::raw::Layer<A> for StatCounterLayer {
1015 type LayeredAccess = StatCounterAccessor<A>;
1016
1017 fn layer(&self, inner: A) -> Self::LayeredAccess {
1018 StatCounterAccessor {
1019 inner,
1020 count: self.count.clone(),
1021 }
1022 }
1023 }
1024
1025 #[derive(Debug, Clone)]
1026 pub struct StatCounterAccessor<A> {
1027 inner: A,
1028 count: Arc<AtomicUsize>,
1029 }
1030
1031 impl<A: opendal::raw::Access> opendal::raw::LayeredAccess for StatCounterAccessor<A> {
1032 type Inner = A;
1033 type Reader = A::Reader;
1034 type Writer = A::Writer;
1035 type Lister = A::Lister;
1036 type Deleter = A::Deleter;
1037 type Copier = A::Copier;
1038
1039 fn inner(&self) -> &Self::Inner {
1040 &self.inner
1041 }
1042
1043 async fn stat(
1044 &self,
1045 path: &str,
1046 args: opendal::raw::OpStat,
1047 ) -> opendal::Result<opendal::raw::RpStat> {
1048 self.count.fetch_add(1, Ordering::SeqCst);
1049 self.inner.stat(path, args).await
1050 }
1051
1052 async fn read(
1053 &self,
1054 path: &str,
1055 args: opendal::raw::OpRead,
1056 ) -> opendal::Result<(opendal::raw::RpRead, Self::Reader)> {
1057 self.inner.read(path, args).await
1058 }
1059
1060 async fn write(
1061 &self,
1062 path: &str,
1063 args: opendal::raw::OpWrite,
1064 ) -> opendal::Result<(opendal::raw::RpWrite, Self::Writer)> {
1065 self.inner.write(path, args).await
1066 }
1067
1068 async fn delete(&self) -> opendal::Result<(opendal::raw::RpDelete, Self::Deleter)> {
1069 self.inner.delete().await
1070 }
1071
1072 async fn list(
1073 &self,
1074 path: &str,
1075 args: opendal::raw::OpList,
1076 ) -> opendal::Result<(opendal::raw::RpList, Self::Lister)> {
1077 self.inner.list(path, args).await
1078 }
1079
1080 async fn copy(
1081 &self,
1082 from: &str,
1083 to: &str,
1084 args: opendal::raw::OpCopy,
1085 opts: opendal::raw::OpCopier,
1086 ) -> opendal::Result<(opendal::raw::RpCopy, Self::Copier)> {
1087 self.inner.copy(from, to, args, opts).await
1088 }
1089
1090 async fn rename(
1091 &self,
1092 from: &str,
1093 to: &str,
1094 args: opendal::raw::OpRename,
1095 ) -> opendal::Result<opendal::raw::RpRename> {
1096 self.inner.rename(from, to, args).await
1097 }
1098 }
1099 }
1100
1101 #[tokio::test]
1102 async fn test_get_range_no_stat() {
1103 use std::sync::atomic::{AtomicUsize, Ordering};
1104
1105 let stat_count = Arc::new(AtomicUsize::new(0));
1107 let op = Operator::new(opendal::services::Memory::default())
1108 .unwrap()
1109 .layer(stat_counter::StatCounterLayer::new(stat_count.clone()))
1110 .finish();
1111 let store = OpendalStore::new(op);
1112
1113 let location = "test_get_range.txt".into();
1115 let value = Bytes::from_static(b"Hello, world!");
1116 store.put(&location, value.clone().into()).await.unwrap();
1117
1118 stat_count.store(0, Ordering::SeqCst);
1120
1121 #[allow(clippy::single_range_in_vec_init)]
1123 let ranges = [0u64..5];
1124 let ret = store.get_ranges(&location, &ranges).await.unwrap();
1125 assert_eq!(Bytes::from_static(b"Hello"), ret[0]);
1126 assert_eq!(
1127 stat_count.load(Ordering::SeqCst),
1128 0,
1129 "get_ranges should not call stat()"
1130 );
1131
1132 stat_count.store(0, Ordering::SeqCst);
1134
1135 let opts = object_store::GetOptions {
1137 range: Some(object_store::GetRange::Bounded(0..5)),
1138 ..Default::default()
1139 };
1140 let ret = store.get_opts(&location, opts).await.unwrap();
1141 assert_eq!(ret.meta.size, value.len() as u64);
1142 assert_eq!(ret.range, 0..5);
1143 let data = ret.bytes().await.unwrap();
1144 assert_eq!(Bytes::from_static(b"Hello"), data);
1145 assert_eq!(
1146 stat_count.load(Ordering::SeqCst),
1147 0,
1148 "get_opts should not call stat() for bounded range reads"
1149 );
1150
1151 stat_count.store(0, Ordering::SeqCst);
1153
1154 let opts = object_store::GetOptions {
1156 range: Some(object_store::GetRange::Offset(7)),
1157 ..Default::default()
1158 };
1159 let ret = store.get_opts(&location, opts).await.unwrap();
1160 assert_eq!(ret.meta.size, value.len() as u64);
1161 assert_eq!(ret.range, 7..value.len() as u64);
1162 let data = ret.bytes().await.unwrap();
1163 assert_eq!(Bytes::from_static(b"world!"), data);
1164 assert_eq!(
1165 stat_count.load(Ordering::SeqCst),
1166 0,
1167 "get_opts should not call stat() for offset range reads"
1168 );
1169
1170 stat_count.store(0, Ordering::SeqCst);
1172
1173 let ret = store
1175 .get_opts(&location, object_store::GetOptions::default())
1176 .await
1177 .unwrap();
1178 assert_eq!(ret.meta.size, value.len() as u64);
1179 assert_eq!(ret.range, 0..value.len() as u64);
1180 let data = ret.bytes().await.unwrap();
1181 assert_eq!(value, data);
1182 assert_eq!(
1183 stat_count.load(Ordering::SeqCst),
1184 0,
1185 "get_opts should not call stat() for full reads"
1186 );
1187
1188 stat_count.store(0, Ordering::SeqCst);
1190
1191 let opts = object_store::GetOptions {
1193 range: Some(object_store::GetRange::Suffix(6)),
1194 ..Default::default()
1195 };
1196 let ret = store.get_opts(&location, opts).await.unwrap();
1197 assert_eq!(ret.meta.size, value.len() as u64);
1198 assert_eq!(ret.range, 7..value.len() as u64);
1199 let data = ret.bytes().await.unwrap();
1200 assert_eq!(Bytes::from_static(b"world!"), data);
1201 assert!(
1202 stat_count.load(Ordering::SeqCst) > 0,
1203 "get_opts should call stat() for suffix range reads"
1204 );
1205
1206 store.delete(&location).await.unwrap();
1208 }
1209}