1use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::ops::Range;
22use std::sync::Arc;
23
24use crate::utils::*;
25use crate::{datetime_to_timestamp, timestamp_to_datetime};
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::FutureExt;
29use futures::StreamExt;
30use futures::TryStreamExt;
31use futures::stream::BoxStream;
32use mea::mutex::Mutex;
33use mea::oneshot;
34use object_store::CopyMode as ObjectStoreCopyMode;
35use object_store::CopyOptions as ObjectStoreCopyOptions;
36use object_store::ListResult;
37use object_store::MultipartUpload;
38use object_store::ObjectMeta;
39use object_store::ObjectStore;
40use object_store::PutMultipartOptions;
41use object_store::PutOptions;
42use object_store::PutPayload;
43use object_store::PutResult;
44use object_store::path::Path;
45use object_store::{GetOptions, UploadPart};
46use object_store::{GetRange, GetResultPayload};
47use object_store::{GetResult, PutMode};
48use opendal::Buffer;
49use opendal::Writer;
50use opendal::options::CopyOptions;
51use opendal::raw::percent_decode_path;
52use opendal::{Operator, OperatorInfo};
53use std::collections::HashMap;
54
55const DEFAULT_CONCURRENT: usize = 8;
56
57#[derive(Clone)]
105pub struct OpendalStore {
106 info: Arc<OperatorInfo>,
107 inner: Operator,
108}
109
110impl OpendalStore {
111 pub fn new(op: Operator) -> Self {
113 Self {
114 info: op.info().into(),
115 inner: op,
116 }
117 }
118
119 pub fn info(&self) -> &OperatorInfo {
121 self.info.as_ref()
122 }
123
124 async fn copy_request(
126 &self,
127 from: &Path,
128 to: &Path,
129 if_not_exists: bool,
130 ) -> object_store::Result<()> {
131 let mut copy_options = CopyOptions::default();
132 if if_not_exists {
133 copy_options.if_not_exists = true;
134 }
135
136 self.inner
138 .copy_options(
139 &percent_decode_path(from.as_ref()),
140 &percent_decode_path(to.as_ref()),
141 copy_options,
142 )
143 .into_send()
144 .await
145 .map_err(|err| {
146 if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
147 object_store::Error::AlreadyExists {
148 path: to.to_string(),
149 source: Box::new(err),
150 }
151 } else {
152 format_object_store_error(err, from.as_ref())
153 }
154 })?;
155
156 Ok(())
157 }
158}
159
160impl Debug for OpendalStore {
161 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
162 f.debug_struct("OpendalStore")
163 .field("scheme", &self.info.scheme())
164 .field("name", &self.info.name())
165 .field("root", &self.info.root())
166 .field("capability", &self.info.full_capability())
167 .finish()
168 }
169}
170
171impl Display for OpendalStore {
172 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
173 let info = self.inner.info();
174 write!(
175 f,
176 "Opendal({}, bucket={}, root={})",
177 info.scheme(),
178 info.name(),
179 info.root()
180 )
181 }
182}
183
184impl From<Operator> for OpendalStore {
185 fn from(value: Operator) -> Self {
186 Self::new(value)
187 }
188}
189
190#[async_trait]
191impl ObjectStore for OpendalStore {
192 async fn put_opts(
193 &self,
194 location: &Path,
195 bytes: PutPayload,
196 opts: PutOptions,
197 ) -> object_store::Result<PutResult> {
198 let decoded_location = percent_decode_path(location.as_ref());
199 let mut future_write = self
200 .inner
201 .write_with(&decoded_location, Buffer::from_iter(bytes));
202 let opts_mode = opts.mode.clone();
203 match opts.mode {
204 PutMode::Overwrite => {}
205 PutMode::Create => {
206 future_write = future_write.if_not_exists(true);
207 }
208 PutMode::Update(update_version) => {
209 let Some(etag) = update_version.e_tag else {
210 Err(object_store::Error::NotSupported {
211 source: Box::new(opendal::Error::new(
212 opendal::ErrorKind::Unsupported,
213 "etag is required for conditional put",
214 )),
215 })?
216 };
217 future_write = future_write.if_match(etag.as_str());
218 }
219 }
220 let rp = future_write.into_send().await.map_err(|err| {
221 match format_object_store_error(err, location.as_ref()) {
222 object_store::Error::Precondition { path, source }
223 if opts_mode == PutMode::Create =>
224 {
225 object_store::Error::AlreadyExists { path, source }
226 }
227 e => e,
228 }
229 })?;
230
231 let e_tag = rp.etag().map(|s| s.to_string());
232 let version = rp.version().map(|s| s.to_string());
233
234 Ok(PutResult { e_tag, version })
235 }
236
237 async fn put_multipart_opts(
238 &self,
239 location: &Path,
240 opts: PutMultipartOptions,
241 ) -> object_store::Result<Box<dyn MultipartUpload>> {
242 let mut options = opendal::options::WriteOptions {
243 concurrent: DEFAULT_CONCURRENT,
244 ..Default::default()
245 };
246
247 let mut user_metadata = HashMap::new();
249
250 for (key, value) in opts.attributes.iter() {
252 match key {
253 object_store::Attribute::CacheControl => {
254 options.cache_control = Some(value.to_string());
255 }
256 object_store::Attribute::ContentDisposition => {
257 options.content_disposition = Some(value.to_string());
258 }
259 object_store::Attribute::ContentEncoding => {
260 options.content_encoding = Some(value.to_string());
261 }
262 object_store::Attribute::ContentLanguage => {
263 continue;
265 }
266 object_store::Attribute::ContentType => {
267 options.content_type = Some(value.to_string());
268 }
269 object_store::Attribute::Metadata(k) => {
270 user_metadata.insert(k.to_string(), value.to_string());
271 }
272 _ => {}
273 }
274 }
275
276 if !user_metadata.is_empty() {
278 options.user_metadata = Some(user_metadata);
279 }
280
281 let decoded_location = percent_decode_path(location.as_ref());
282 let writer = self
283 .inner
284 .writer_options(&decoded_location, options)
285 .into_send()
286 .await
287 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
288 let upload = OpendalMultipartUpload::new(writer, location.clone());
289
290 Ok(Box::new(upload))
291 }
292
293 async fn get_opts(
294 &self,
295 location: &Path,
296 options: GetOptions,
297 ) -> object_store::Result<GetResult> {
298 let raw_location = percent_decode_path(location.as_ref());
299 let meta = {
300 let mut s = self.inner.stat_with(&raw_location);
301 if let Some(version) = &options.version {
302 s = s.version(version.as_str())
303 }
304 if let Some(if_match) = &options.if_match {
305 s = s.if_match(if_match.as_str());
306 }
307 if let Some(if_none_match) = &options.if_none_match {
308 s = s.if_none_match(if_none_match.as_str());
309 }
310 if let Some(if_modified_since) =
311 options.if_modified_since.and_then(datetime_to_timestamp)
312 {
313 s = s.if_modified_since(if_modified_since);
314 }
315 if let Some(if_unmodified_since) =
316 options.if_unmodified_since.and_then(datetime_to_timestamp)
317 {
318 s = s.if_unmodified_since(if_unmodified_since);
319 }
320 s.into_send()
321 .await
322 .map_err(|err| format_object_store_error(err, location.as_ref()))?
323 };
324
325 let mut attributes = object_store::Attributes::new();
327 if let Some(user_meta) = meta.user_metadata() {
328 for (key, value) in user_meta {
329 attributes.insert(
330 object_store::Attribute::Metadata(key.clone().into()),
331 value.clone().into(),
332 );
333 }
334 }
335
336 let meta = ObjectMeta {
337 location: location.clone(),
338 last_modified: meta
339 .last_modified()
340 .and_then(timestamp_to_datetime)
341 .unwrap_or_default(),
342 size: meta.content_length(),
343 e_tag: meta.etag().map(|x| x.to_string()),
344 version: meta.version().map(|x| x.to_string()),
345 };
346
347 if options.head {
348 return Ok(GetResult {
349 payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
350 range: 0..0,
351 meta,
352 attributes,
353 });
354 }
355
356 let reader = {
357 let mut r = self.inner.reader_with(raw_location.as_ref());
358 if let Some(version) = options.version {
359 r = r.version(version.as_str());
360 }
361 if let Some(if_match) = options.if_match {
362 r = r.if_match(if_match.as_str());
363 }
364 if let Some(if_none_match) = options.if_none_match {
365 r = r.if_none_match(if_none_match.as_str());
366 }
367 if let Some(if_modified_since) =
368 options.if_modified_since.and_then(datetime_to_timestamp)
369 {
370 r = r.if_modified_since(if_modified_since);
371 }
372 if let Some(if_unmodified_since) =
373 options.if_unmodified_since.and_then(datetime_to_timestamp)
374 {
375 r = r.if_unmodified_since(if_unmodified_since);
376 }
377 r.into_send()
378 .await
379 .map_err(|err| format_object_store_error(err, location.as_ref()))?
380 };
381
382 let read_range = match options.range {
383 Some(GetRange::Bounded(r)) => {
384 if r.start >= r.end || r.start >= meta.size {
385 0..0
386 } else {
387 let end = r.end.min(meta.size);
388 r.start..end
389 }
390 }
391 Some(GetRange::Offset(r)) => {
392 if r < meta.size {
393 r..meta.size
394 } else {
395 0..0
396 }
397 }
398 Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
399 _ => 0..meta.size,
400 };
401
402 let stream = reader
403 .into_bytes_stream(read_range.start..read_range.end)
404 .into_send()
405 .await
406 .map_err(|err| format_object_store_error(err, location.as_ref()))?
407 .into_send()
408 .map_err(|err: io::Error| object_store::Error::Generic {
409 store: "IoError",
410 source: Box::new(err),
411 });
412
413 Ok(GetResult {
414 payload: GetResultPayload::Stream(Box::pin(stream)),
415 range: read_range.start..read_range.end,
416 meta,
417 attributes,
418 })
419 }
420
421 async fn get_ranges(
422 &self,
423 location: &Path,
424 ranges: &[Range<u64>],
425 ) -> object_store::Result<Vec<Bytes>> {
426 let raw_location = percent_decode_path(location.as_ref());
427 let reader = self
428 .inner
429 .reader(&raw_location)
430 .into_send()
431 .await
432 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
433
434 let location_ref: Arc<str> = Arc::from(location.as_ref());
435 futures::stream::iter(ranges.iter().cloned())
436 .map(|range| {
437 let reader = reader.clone();
438 let location_ref = location_ref.clone();
439 async move {
440 reader
441 .read(range)
442 .into_send()
443 .await
444 .map(|buf| buf.to_bytes())
445 .map_err(|err| format_object_store_error(err, &location_ref))
446 }
447 })
448 .buffered(DEFAULT_CONCURRENT)
449 .try_collect()
450 .await
451 }
452
453 fn delete_stream(
454 &self,
455 locations: BoxStream<'static, object_store::Result<Path>>,
456 ) -> BoxStream<'static, object_store::Result<Path>> {
457 let this = self.clone();
458 locations
459 .and_then(move |location| {
460 let this = this.clone();
461 async move {
462 let decoded = percent_decode_path(location.as_ref());
463 this.inner
464 .delete(&decoded)
465 .into_send()
466 .await
467 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
468 Ok(location)
469 }
470 .into_send()
471 })
472 .boxed()
473 }
474
475 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
476 let path = prefix.map_or("".into(), |x| {
479 format!("{}/", percent_decode_path(x.as_ref()))
480 });
481
482 let this = self.clone();
483 let fut = async move {
484 let stream = this
485 .inner
486 .lister_with(&path)
487 .recursive(true)
488 .await
489 .map_err(|err| format_object_store_error(err, &path))?;
490
491 let stream = stream.then(|res| async {
492 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
493 let meta = entry.metadata();
494
495 Ok(format_object_meta(entry.path(), meta))
496 });
497 Ok::<_, object_store::Error>(stream)
498 };
499
500 fut.into_stream().try_flatten().into_send().boxed()
501 }
502
503 fn list_with_offset(
504 &self,
505 prefix: Option<&Path>,
506 offset: &Path,
507 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
508 let path = prefix.map_or("".into(), |x| {
509 format!("{}/", percent_decode_path(x.as_ref()))
510 });
511 let offset = offset.clone();
512
513 let this = self.clone();
516
517 let fut = async move {
518 let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
519 let mut fut = this.inner.lister_with(&path).recursive(true);
520
521 if list_with_start_after {
523 fut = fut.start_after(offset.as_ref());
524 }
525
526 let lister = fut
527 .await
528 .map_err(|err| format_object_store_error(err, &path))?
529 .then(move |entry| {
530 let path = path.clone();
531 let this = this.clone();
532 async move {
533 let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
534 let (path, metadata) = entry.into_parts();
535
536 if metadata.is_dir() || metadata.last_modified().is_some() {
538 let object_meta = format_object_meta(&path, &metadata);
539 return Ok(object_meta);
540 }
541
542 let metadata = this
543 .inner
544 .stat(&path)
545 .await
546 .map_err(|err| format_object_store_error(err, &path))?;
547 let object_meta = format_object_meta(&path, &metadata);
548 Ok::<_, object_store::Error>(object_meta)
549 }
550 })
551 .into_send()
552 .boxed();
553
554 let stream = if list_with_start_after {
555 lister
556 } else {
557 lister
558 .try_filter(move |entry| futures::future::ready(entry.location > offset))
559 .into_send()
560 .boxed()
561 };
562
563 Ok::<_, object_store::Error>(stream)
564 };
565
566 fut.into_stream().into_send().try_flatten().boxed()
567 }
568
569 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
570 let path = prefix.map_or("".into(), |x| {
571 format!("{}/", percent_decode_path(x.as_ref()))
572 });
573 let mut stream = self
574 .inner
575 .lister_with(&path)
576 .into_future()
577 .into_send()
578 .await
579 .map_err(|err| format_object_store_error(err, &path))?
580 .into_send();
581
582 let mut common_prefixes = Vec::new();
583 let mut objects = Vec::new();
584
585 while let Some(res) = stream.next().into_send().await {
586 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
587 let meta = entry.metadata();
588
589 if meta.is_dir() {
590 common_prefixes.push(entry.path().into());
591 } else if meta.last_modified().is_some() {
592 objects.push(format_object_meta(entry.path(), meta));
593 } else {
594 let meta = self
595 .inner
596 .stat(entry.path())
597 .into_send()
598 .await
599 .map_err(|err| format_object_store_error(err, entry.path()))?;
600 objects.push(format_object_meta(entry.path(), &meta));
601 }
602 }
603
604 Ok(ListResult {
605 common_prefixes,
606 objects,
607 })
608 }
609
610 async fn copy_opts(
611 &self,
612 from: &Path,
613 to: &Path,
614 options: ObjectStoreCopyOptions,
615 ) -> object_store::Result<()> {
616 let if_not_exists = matches!(options.mode, ObjectStoreCopyMode::Create);
617 self.copy_request(from, to, if_not_exists).await
618 }
619}
620
621struct OpendalMultipartUpload {
630 writer: Arc<Mutex<Writer>>,
631 location: Path,
632 next_notify: oneshot::Receiver<()>,
633}
634
635impl OpendalMultipartUpload {
636 fn new(writer: Writer, location: Path) -> Self {
637 let (_, rx) = oneshot::channel();
639
640 Self {
641 writer: Arc::new(Mutex::new(writer)),
642 location,
643 next_notify: rx,
644 }
645 }
646}
647
648#[async_trait]
649impl MultipartUpload for OpendalMultipartUpload {
650 fn put_part(&mut self, data: PutPayload) -> UploadPart {
651 let writer = self.writer.clone();
652 let location = self.location.clone();
653
654 let (tx, rx) = oneshot::channel();
656 let last_rx = std::mem::replace(&mut self.next_notify, rx);
658
659 async move {
660 let _ = last_rx.await;
662
663 let mut writer = writer.lock().await;
664 let result = writer
665 .write(Buffer::from_iter(data))
666 .await
667 .map_err(|err| format_object_store_error(err, location.as_ref()));
668
669 drop(tx);
671
672 result
673 }
674 .into_send()
675 .boxed()
676 }
677
678 async fn complete(&mut self) -> object_store::Result<PutResult> {
679 let mut writer = self.writer.lock().await;
680 let metadata = writer
681 .close()
682 .into_send()
683 .await
684 .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
685
686 let e_tag = metadata.etag().map(|s| s.to_string());
687 let version = metadata.version().map(|s| s.to_string());
688
689 Ok(PutResult { e_tag, version })
690 }
691
692 async fn abort(&mut self) -> object_store::Result<()> {
693 let mut writer = self.writer.lock().await;
694 writer
695 .abort()
696 .into_send()
697 .await
698 .map_err(|err| format_object_store_error(err, self.location.as_ref()))
699 }
700}
701
702impl Debug for OpendalMultipartUpload {
703 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
704 f.debug_struct("OpendalMultipartUpload")
705 .field("location", &self.location)
706 .finish()
707 }
708}
709
710#[cfg(test)]
711mod tests {
712 use bytes::Bytes;
713 use object_store::path::Path;
714 use object_store::{ObjectStore, ObjectStoreExt, WriteMultipart};
715 use opendal::services;
716 use rand::prelude::*;
717 use rand::rng;
718 use std::sync::Arc;
719
720 use super::*;
721
722 async fn create_test_object_store() -> Arc<dyn ObjectStore> {
723 let op = Operator::new(services::Memory::default()).unwrap().finish();
724 let object_store = Arc::new(OpendalStore::new(op));
725
726 let path: Path = "data/test.txt".into();
727 let bytes = Bytes::from_static(b"hello, world!");
728 object_store.put(&path, bytes.into()).await.unwrap();
729
730 let path: Path = "data/nested/test.txt".into();
731 let bytes = Bytes::from_static(b"hello, world! I am nested.");
732 object_store.put(&path, bytes.into()).await.unwrap();
733
734 object_store
735 }
736
737 #[tokio::test]
738 async fn test_basic() {
739 let op = Operator::new(services::Memory::default()).unwrap().finish();
740 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
741
742 let path: Path = "data/test.txt".into();
744
745 let bytes = Bytes::from_static(b"hello, world!");
746 object_store.put(&path, bytes.clone().into()).await.unwrap();
747
748 let meta = object_store.head(&path).await.unwrap();
749
750 assert_eq!(meta.size, 13);
751
752 assert_eq!(
753 object_store
754 .get(&path)
755 .await
756 .unwrap()
757 .bytes()
758 .await
759 .unwrap(),
760 bytes
761 );
762 }
763
764 #[tokio::test]
765 async fn test_put_multipart() {
766 let op = Operator::new(services::Memory::default()).unwrap().finish();
767 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
768
769 let mut rng = rng();
770
771 let path: Path = "data/test_complete.txt".into();
773 let upload = object_store.put_multipart(&path).await.unwrap();
774
775 let mut write = WriteMultipart::new(upload);
776
777 let mut all_bytes = vec![];
778 let round = rng.random_range(1..=1024);
779 for _ in 0..round {
780 let size = rng.random_range(1..=1024);
781 let mut bytes = vec![0; size];
782 rng.fill_bytes(&mut bytes);
783
784 all_bytes.extend_from_slice(&bytes);
785 write.put(bytes.into());
786 }
787
788 let _ = write.finish().await.unwrap();
789
790 let meta = object_store.head(&path).await.unwrap();
791
792 assert_eq!(meta.size, all_bytes.len() as u64);
793
794 assert_eq!(
795 object_store
796 .get(&path)
797 .await
798 .unwrap()
799 .bytes()
800 .await
801 .unwrap(),
802 Bytes::from(all_bytes)
803 );
804
805 let path: Path = "data/test_abort.txt".into();
807 let mut upload = object_store.put_multipart(&path).await.unwrap();
808 upload.put_part(vec![1; 1024].into()).await.unwrap();
809 upload.abort().await.unwrap();
810
811 let res = object_store.head(&path).await;
812 let err = res.unwrap_err();
813
814 assert!(matches!(err, object_store::Error::NotFound { .. }))
815 }
816
817 #[tokio::test]
818 async fn test_list() {
819 let object_store = create_test_object_store().await;
820 let path: Path = "data/".into();
821 let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
822 assert_eq!(results.len(), 2);
823 let mut locations = results
824 .iter()
825 .map(|x| x.as_ref().unwrap().location.as_ref())
826 .collect::<Vec<_>>();
827
828 let expected_files = vec![
829 (
830 "data/nested/test.txt",
831 Bytes::from_static(b"hello, world! I am nested."),
832 ),
833 ("data/test.txt", Bytes::from_static(b"hello, world!")),
834 ];
835
836 let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
837
838 locations.sort();
839 assert_eq!(locations, expected_locations);
840
841 for (location, bytes) in expected_files {
842 let path: Path = location.into();
843 assert_eq!(
844 object_store
845 .get(&path)
846 .await
847 .unwrap()
848 .bytes()
849 .await
850 .unwrap(),
851 bytes
852 );
853 }
854 }
855
856 #[tokio::test]
857 async fn test_list_with_delimiter() {
858 let object_store = create_test_object_store().await;
859 let path: Path = "data/".into();
860 let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
861 assert_eq!(result.objects.len(), 1);
862 assert_eq!(result.common_prefixes.len(), 1);
863 assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
864 assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
865 }
866
867 #[tokio::test]
868 async fn test_list_with_offset() {
869 let object_store = create_test_object_store().await;
870 let path: Path = "data/".into();
871 let offset: Path = "data/nested/test.txt".into();
872 let result = object_store
873 .list_with_offset(Some(&path), &offset)
874 .collect::<Vec<_>>()
875 .await;
876 assert_eq!(result.len(), 1);
877 assert_eq!(
878 result[0].as_ref().unwrap().location.as_ref(),
879 "data/test.txt"
880 );
881 }
882
883 mod stat_counter {
885 use super::*;
886 use std::sync::atomic::{AtomicUsize, Ordering};
887
888 #[derive(Debug, Clone)]
889 pub struct StatCounterLayer {
890 count: Arc<AtomicUsize>,
891 }
892
893 impl StatCounterLayer {
894 pub fn new(count: Arc<AtomicUsize>) -> Self {
895 Self { count }
896 }
897 }
898
899 impl<A: opendal::raw::Access> opendal::raw::Layer<A> for StatCounterLayer {
900 type LayeredAccess = StatCounterAccessor<A>;
901
902 fn layer(&self, inner: A) -> Self::LayeredAccess {
903 StatCounterAccessor {
904 inner,
905 count: self.count.clone(),
906 }
907 }
908 }
909
910 #[derive(Debug, Clone)]
911 pub struct StatCounterAccessor<A> {
912 inner: A,
913 count: Arc<AtomicUsize>,
914 }
915
916 impl<A: opendal::raw::Access> opendal::raw::LayeredAccess for StatCounterAccessor<A> {
917 type Inner = A;
918 type Reader = A::Reader;
919 type Writer = A::Writer;
920 type Lister = A::Lister;
921 type Deleter = A::Deleter;
922
923 fn inner(&self) -> &Self::Inner {
924 &self.inner
925 }
926
927 async fn stat(
928 &self,
929 path: &str,
930 args: opendal::raw::OpStat,
931 ) -> opendal::Result<opendal::raw::RpStat> {
932 self.count.fetch_add(1, Ordering::SeqCst);
933 self.inner.stat(path, args).await
934 }
935
936 async fn read(
937 &self,
938 path: &str,
939 args: opendal::raw::OpRead,
940 ) -> opendal::Result<(opendal::raw::RpRead, Self::Reader)> {
941 self.inner.read(path, args).await
942 }
943
944 async fn write(
945 &self,
946 path: &str,
947 args: opendal::raw::OpWrite,
948 ) -> opendal::Result<(opendal::raw::RpWrite, Self::Writer)> {
949 self.inner.write(path, args).await
950 }
951
952 async fn delete(&self) -> opendal::Result<(opendal::raw::RpDelete, Self::Deleter)> {
953 self.inner.delete().await
954 }
955
956 async fn list(
957 &self,
958 path: &str,
959 args: opendal::raw::OpList,
960 ) -> opendal::Result<(opendal::raw::RpList, Self::Lister)> {
961 self.inner.list(path, args).await
962 }
963
964 async fn copy(
965 &self,
966 from: &str,
967 to: &str,
968 args: opendal::raw::OpCopy,
969 ) -> opendal::Result<opendal::raw::RpCopy> {
970 self.inner.copy(from, to, args).await
971 }
972
973 async fn rename(
974 &self,
975 from: &str,
976 to: &str,
977 args: opendal::raw::OpRename,
978 ) -> opendal::Result<opendal::raw::RpRename> {
979 self.inner.rename(from, to, args).await
980 }
981 }
982 }
983
984 #[tokio::test]
985 async fn test_get_range_no_stat() {
986 use std::sync::atomic::{AtomicUsize, Ordering};
987
988 let stat_count = Arc::new(AtomicUsize::new(0));
990 let op = Operator::new(opendal::services::Memory::default())
991 .unwrap()
992 .layer(stat_counter::StatCounterLayer::new(stat_count.clone()))
993 .finish();
994 let store = OpendalStore::new(op);
995
996 let location = "test_get_range.txt".into();
998 let value = Bytes::from_static(b"Hello, world!");
999 store.put(&location, value.clone().into()).await.unwrap();
1000
1001 stat_count.store(0, Ordering::SeqCst);
1003
1004 #[allow(clippy::single_range_in_vec_init)]
1006 let ranges = [0u64..5];
1007 let ret = store.get_ranges(&location, &ranges).await.unwrap();
1008 assert_eq!(Bytes::from_static(b"Hello"), ret[0]);
1009 assert_eq!(
1010 stat_count.load(Ordering::SeqCst),
1011 0,
1012 "get_ranges should not call stat()"
1013 );
1014
1015 stat_count.store(0, Ordering::SeqCst);
1017
1018 let opts = object_store::GetOptions {
1020 range: Some(object_store::GetRange::Bounded(0..5)),
1021 ..Default::default()
1022 };
1023 let ret = store.get_opts(&location, opts).await.unwrap();
1024 let data = ret.bytes().await.unwrap();
1025 assert_eq!(Bytes::from_static(b"Hello"), data);
1026 assert!(
1027 stat_count.load(Ordering::SeqCst) > 0,
1028 "get_opts should call stat() to get metadata"
1029 );
1030
1031 store.delete(&location).await.unwrap();
1033 }
1034}