1use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::ops::Range;
22use std::sync::Arc;
23
24use crate::utils::*;
25use crate::{datetime_to_timestamp, timestamp_to_datetime};
26use async_trait::async_trait;
27use bytes::Bytes;
28use futures::FutureExt;
29use futures::StreamExt;
30use futures::TryStreamExt;
31use futures::stream::BoxStream;
32use mea::mutex::Mutex;
33use mea::oneshot;
34use object_store::ListResult;
35use object_store::MultipartUpload;
36use object_store::ObjectMeta;
37use object_store::ObjectStore;
38use object_store::PutMultipartOptions;
39use object_store::PutOptions;
40use object_store::PutPayload;
41use object_store::PutResult;
42use object_store::path::Path;
43use object_store::{GetOptions, UploadPart};
44use object_store::{GetRange, GetResultPayload};
45use object_store::{GetResult, PutMode};
46use opendal::Buffer;
47use opendal::Writer;
48use opendal::options::CopyOptions;
49use opendal::raw::percent_decode_path;
50use opendal::{Operator, OperatorInfo};
51use std::collections::HashMap;
52
53#[derive(Clone)]
100pub struct OpendalStore {
101 info: Arc<OperatorInfo>,
102 inner: Operator,
103}
104
105impl OpendalStore {
106 pub fn new(op: Operator) -> Self {
108 Self {
109 info: op.info().into(),
110 inner: op,
111 }
112 }
113
114 pub fn info(&self) -> &OperatorInfo {
116 self.info.as_ref()
117 }
118
119 async fn copy_request(
121 &self,
122 from: &Path,
123 to: &Path,
124 if_not_exists: bool,
125 ) -> object_store::Result<()> {
126 let mut copy_options = CopyOptions::default();
127 if if_not_exists {
128 copy_options.if_not_exists = true;
129 }
130
131 self.inner
133 .copy_options(
134 &percent_decode_path(from.as_ref()),
135 &percent_decode_path(to.as_ref()),
136 copy_options,
137 )
138 .into_send()
139 .await
140 .map_err(|err| {
141 if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
142 object_store::Error::AlreadyExists {
143 path: to.to_string(),
144 source: Box::new(err),
145 }
146 } else {
147 format_object_store_error(err, from.as_ref())
148 }
149 })?;
150
151 Ok(())
152 }
153}
154
155impl Debug for OpendalStore {
156 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
157 f.debug_struct("OpendalStore")
158 .field("scheme", &self.info.scheme())
159 .field("name", &self.info.name())
160 .field("root", &self.info.root())
161 .field("capability", &self.info.full_capability())
162 .finish()
163 }
164}
165
166impl Display for OpendalStore {
167 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
168 let info = self.inner.info();
169 write!(
170 f,
171 "Opendal({}, bucket={}, root={})",
172 info.scheme(),
173 info.name(),
174 info.root()
175 )
176 }
177}
178
179impl From<Operator> for OpendalStore {
180 fn from(value: Operator) -> Self {
181 Self::new(value)
182 }
183}
184
185#[async_trait]
186impl ObjectStore for OpendalStore {
187 async fn put_opts(
188 &self,
189 location: &Path,
190 bytes: PutPayload,
191 opts: PutOptions,
192 ) -> object_store::Result<PutResult> {
193 let decoded_location = percent_decode_path(location.as_ref());
194 let mut future_write = self
195 .inner
196 .write_with(&decoded_location, Buffer::from_iter(bytes));
197 let opts_mode = opts.mode.clone();
198 match opts.mode {
199 PutMode::Overwrite => {}
200 PutMode::Create => {
201 future_write = future_write.if_not_exists(true);
202 }
203 PutMode::Update(update_version) => {
204 let Some(etag) = update_version.e_tag else {
205 Err(object_store::Error::NotSupported {
206 source: Box::new(opendal::Error::new(
207 opendal::ErrorKind::Unsupported,
208 "etag is required for conditional put",
209 )),
210 })?
211 };
212 future_write = future_write.if_match(etag.as_str());
213 }
214 }
215 let rp = future_write.into_send().await.map_err(|err| {
216 match format_object_store_error(err, location.as_ref()) {
217 object_store::Error::Precondition { path, source }
218 if opts_mode == PutMode::Create =>
219 {
220 object_store::Error::AlreadyExists { path, source }
221 }
222 e => e,
223 }
224 })?;
225
226 let e_tag = rp.etag().map(|s| s.to_string());
227 let version = rp.version().map(|s| s.to_string());
228
229 Ok(PutResult { e_tag, version })
230 }
231
232 async fn put_multipart(
233 &self,
234 location: &Path,
235 ) -> object_store::Result<Box<dyn MultipartUpload>> {
236 let decoded_location = percent_decode_path(location.as_ref());
237 let writer = self
238 .inner
239 .writer_with(&decoded_location)
240 .concurrent(8)
241 .into_send()
242 .await
243 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
244 let upload = OpendalMultipartUpload::new(writer, location.clone());
245
246 Ok(Box::new(upload))
247 }
248
249 async fn put_multipart_opts(
250 &self,
251 location: &Path,
252 opts: PutMultipartOptions,
253 ) -> object_store::Result<Box<dyn MultipartUpload>> {
254 const DEFAULT_CONCURRENT: usize = 8;
255
256 let mut options = opendal::options::WriteOptions {
257 concurrent: DEFAULT_CONCURRENT,
258 ..Default::default()
259 };
260
261 let mut user_metadata = HashMap::new();
263
264 for (key, value) in opts.attributes.iter() {
266 match key {
267 object_store::Attribute::CacheControl => {
268 options.cache_control = Some(value.to_string());
269 }
270 object_store::Attribute::ContentDisposition => {
271 options.content_disposition = Some(value.to_string());
272 }
273 object_store::Attribute::ContentEncoding => {
274 options.content_encoding = Some(value.to_string());
275 }
276 object_store::Attribute::ContentLanguage => {
277 continue;
279 }
280 object_store::Attribute::ContentType => {
281 options.content_type = Some(value.to_string());
282 }
283 object_store::Attribute::Metadata(k) => {
284 user_metadata.insert(k.to_string(), value.to_string());
285 }
286 _ => {}
287 }
288 }
289
290 if !user_metadata.is_empty() {
292 options.user_metadata = Some(user_metadata);
293 }
294
295 let decoded_location = percent_decode_path(location.as_ref());
296 let writer = self
297 .inner
298 .writer_options(&decoded_location, options)
299 .into_send()
300 .await
301 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
302 let upload = OpendalMultipartUpload::new(writer, location.clone());
303
304 Ok(Box::new(upload))
305 }
306
307 async fn get_opts(
308 &self,
309 location: &Path,
310 options: GetOptions,
311 ) -> object_store::Result<GetResult> {
312 let raw_location = percent_decode_path(location.as_ref());
313 let meta = {
314 let mut s = self.inner.stat_with(&raw_location);
315 if let Some(version) = &options.version {
316 s = s.version(version.as_str())
317 }
318 if let Some(if_match) = &options.if_match {
319 s = s.if_match(if_match.as_str());
320 }
321 if let Some(if_none_match) = &options.if_none_match {
322 s = s.if_none_match(if_none_match.as_str());
323 }
324 if let Some(if_modified_since) =
325 options.if_modified_since.and_then(datetime_to_timestamp)
326 {
327 s = s.if_modified_since(if_modified_since);
328 }
329 if let Some(if_unmodified_since) =
330 options.if_unmodified_since.and_then(datetime_to_timestamp)
331 {
332 s = s.if_unmodified_since(if_unmodified_since);
333 }
334 s.into_send()
335 .await
336 .map_err(|err| format_object_store_error(err, location.as_ref()))?
337 };
338
339 let mut attributes = object_store::Attributes::new();
341 if let Some(user_meta) = meta.user_metadata() {
342 for (key, value) in user_meta {
343 attributes.insert(
344 object_store::Attribute::Metadata(key.clone().into()),
345 value.clone().into(),
346 );
347 }
348 }
349
350 let meta = ObjectMeta {
351 location: location.clone(),
352 last_modified: meta
353 .last_modified()
354 .and_then(timestamp_to_datetime)
355 .unwrap_or_default(),
356 size: meta.content_length(),
357 e_tag: meta.etag().map(|x| x.to_string()),
358 version: meta.version().map(|x| x.to_string()),
359 };
360
361 if options.head {
362 return Ok(GetResult {
363 payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
364 range: 0..0,
365 meta,
366 attributes,
367 });
368 }
369
370 let reader = {
371 let mut r = self.inner.reader_with(raw_location.as_ref());
372 if let Some(version) = options.version {
373 r = r.version(version.as_str());
374 }
375 if let Some(if_match) = options.if_match {
376 r = r.if_match(if_match.as_str());
377 }
378 if let Some(if_none_match) = options.if_none_match {
379 r = r.if_none_match(if_none_match.as_str());
380 }
381 if let Some(if_modified_since) =
382 options.if_modified_since.and_then(datetime_to_timestamp)
383 {
384 r = r.if_modified_since(if_modified_since);
385 }
386 if let Some(if_unmodified_since) =
387 options.if_unmodified_since.and_then(datetime_to_timestamp)
388 {
389 r = r.if_unmodified_since(if_unmodified_since);
390 }
391 r.into_send()
392 .await
393 .map_err(|err| format_object_store_error(err, location.as_ref()))?
394 };
395
396 let read_range = match options.range {
397 Some(GetRange::Bounded(r)) => {
398 if r.start >= r.end || r.start >= meta.size {
399 0..0
400 } else {
401 let end = r.end.min(meta.size);
402 r.start..end
403 }
404 }
405 Some(GetRange::Offset(r)) => {
406 if r < meta.size {
407 r..meta.size
408 } else {
409 0..0
410 }
411 }
412 Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
413 _ => 0..meta.size,
414 };
415
416 let stream = reader
417 .into_bytes_stream(read_range.start..read_range.end)
418 .into_send()
419 .await
420 .map_err(|err| format_object_store_error(err, location.as_ref()))?
421 .into_send()
422 .map_err(|err: io::Error| object_store::Error::Generic {
423 store: "IoError",
424 source: Box::new(err),
425 });
426
427 Ok(GetResult {
428 payload: GetResultPayload::Stream(Box::pin(stream)),
429 range: read_range.start..read_range.end,
430 meta,
431 attributes,
432 })
433 }
434
435 async fn get_range(&self, location: &Path, range: Range<u64>) -> object_store::Result<Bytes> {
440 let raw_location = percent_decode_path(location.as_ref());
443 let reader = self
444 .inner
445 .reader_with(&raw_location)
446 .into_send()
447 .await
448 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
449
450 reader
451 .read(range.start..range.end)
452 .into_send()
453 .await
454 .map(|buf| buf.to_bytes())
455 .map_err(|err| format_object_store_error(err, location.as_ref()))
456 }
457
458 async fn delete(&self, location: &Path) -> object_store::Result<()> {
459 let decoded_location = percent_decode_path(location.as_ref());
460 self.inner
461 .delete(&decoded_location)
462 .into_send()
463 .await
464 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
465
466 Ok(())
467 }
468
469 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
470 let path = prefix.map_or("".into(), |x| {
473 format!("{}/", percent_decode_path(x.as_ref()))
474 });
475
476 let this = self.clone();
477 let fut = async move {
478 let stream = this
479 .inner
480 .lister_with(&path)
481 .recursive(true)
482 .await
483 .map_err(|err| format_object_store_error(err, &path))?;
484
485 let stream = stream.then(|res| async {
486 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
487 let meta = entry.metadata();
488
489 Ok(format_object_meta(entry.path(), meta))
490 });
491 Ok::<_, object_store::Error>(stream)
492 };
493
494 fut.into_stream().try_flatten().into_send().boxed()
495 }
496
497 fn list_with_offset(
498 &self,
499 prefix: Option<&Path>,
500 offset: &Path,
501 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
502 let path = prefix.map_or("".into(), |x| {
503 format!("{}/", percent_decode_path(x.as_ref()))
504 });
505 let offset = offset.clone();
506
507 let this = self.clone();
510
511 let fut = async move {
512 let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
513 let mut fut = this.inner.lister_with(&path).recursive(true);
514
515 if list_with_start_after {
517 fut = fut.start_after(offset.as_ref());
518 }
519
520 let lister = fut
521 .await
522 .map_err(|err| format_object_store_error(err, &path))?
523 .then(move |entry| {
524 let path = path.clone();
525 let this = this.clone();
526 async move {
527 let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
528 let (path, metadata) = entry.into_parts();
529
530 if metadata.is_dir() || metadata.last_modified().is_some() {
532 let object_meta = format_object_meta(&path, &metadata);
533 return Ok(object_meta);
534 }
535
536 let metadata = this
537 .inner
538 .stat(&path)
539 .await
540 .map_err(|err| format_object_store_error(err, &path))?;
541 let object_meta = format_object_meta(&path, &metadata);
542 Ok::<_, object_store::Error>(object_meta)
543 }
544 })
545 .into_send()
546 .boxed();
547
548 let stream = if list_with_start_after {
549 lister
550 } else {
551 lister
552 .try_filter(move |entry| futures::future::ready(entry.location > offset))
553 .into_send()
554 .boxed()
555 };
556
557 Ok::<_, object_store::Error>(stream)
558 };
559
560 fut.into_stream().into_send().try_flatten().boxed()
561 }
562
563 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
564 let path = prefix.map_or("".into(), |x| {
565 format!("{}/", percent_decode_path(x.as_ref()))
566 });
567 let mut stream = self
568 .inner
569 .lister_with(&path)
570 .into_future()
571 .into_send()
572 .await
573 .map_err(|err| format_object_store_error(err, &path))?
574 .into_send();
575
576 let mut common_prefixes = Vec::new();
577 let mut objects = Vec::new();
578
579 while let Some(res) = stream.next().into_send().await {
580 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
581 let meta = entry.metadata();
582
583 if meta.is_dir() {
584 common_prefixes.push(entry.path().into());
585 } else if meta.last_modified().is_some() {
586 objects.push(format_object_meta(entry.path(), meta));
587 } else {
588 let meta = self
589 .inner
590 .stat(entry.path())
591 .into_send()
592 .await
593 .map_err(|err| format_object_store_error(err, entry.path()))?;
594 objects.push(format_object_meta(entry.path(), &meta));
595 }
596 }
597
598 Ok(ListResult {
599 common_prefixes,
600 objects,
601 })
602 }
603
604 async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
605 self.copy_request(from, to, false).await
606 }
607
608 async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
609 self.inner
610 .rename(
611 &percent_decode_path(from.as_ref()),
612 &percent_decode_path(to.as_ref()),
613 )
614 .into_send()
615 .await
616 .map_err(|err| format_object_store_error(err, from.as_ref()))?;
617
618 Ok(())
619 }
620
621 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
622 self.copy_request(from, to, true).await
623 }
624}
625
626struct OpendalMultipartUpload {
635 writer: Arc<Mutex<Writer>>,
636 location: Path,
637 next_notify: oneshot::Receiver<()>,
638}
639
640impl OpendalMultipartUpload {
641 fn new(writer: Writer, location: Path) -> Self {
642 let (_, rx) = oneshot::channel();
644
645 Self {
646 writer: Arc::new(Mutex::new(writer)),
647 location,
648 next_notify: rx,
649 }
650 }
651}
652
653#[async_trait]
654impl MultipartUpload for OpendalMultipartUpload {
655 fn put_part(&mut self, data: PutPayload) -> UploadPart {
656 let writer = self.writer.clone();
657 let location = self.location.clone();
658
659 let (tx, rx) = oneshot::channel();
661 let last_rx = std::mem::replace(&mut self.next_notify, rx);
663
664 async move {
665 let _ = last_rx.await;
667
668 let mut writer = writer.lock().await;
669 let result = writer
670 .write(Buffer::from_iter(data.into_iter()))
671 .await
672 .map_err(|err| format_object_store_error(err, location.as_ref()));
673
674 drop(tx);
676
677 result
678 }
679 .into_send()
680 .boxed()
681 }
682
683 async fn complete(&mut self) -> object_store::Result<PutResult> {
684 let mut writer = self.writer.lock().await;
685 let metadata = writer
686 .close()
687 .into_send()
688 .await
689 .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
690
691 let e_tag = metadata.etag().map(|s| s.to_string());
692 let version = metadata.version().map(|s| s.to_string());
693
694 Ok(PutResult { e_tag, version })
695 }
696
697 async fn abort(&mut self) -> object_store::Result<()> {
698 let mut writer = self.writer.lock().await;
699 writer
700 .abort()
701 .into_send()
702 .await
703 .map_err(|err| format_object_store_error(err, self.location.as_ref()))
704 }
705}
706
707impl Debug for OpendalMultipartUpload {
708 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
709 f.debug_struct("OpendalMultipartUpload")
710 .field("location", &self.location)
711 .finish()
712 }
713}
714
715#[cfg(test)]
716mod tests {
717 use bytes::Bytes;
718 use object_store::path::Path;
719 use object_store::{ObjectStore, WriteMultipart};
720 use opendal::services;
721 use rand::prelude::*;
722 use std::sync::Arc;
723
724 use super::*;
725
726 async fn create_test_object_store() -> Arc<dyn ObjectStore> {
727 let op = Operator::new(services::Memory::default()).unwrap().finish();
728 let object_store = Arc::new(OpendalStore::new(op));
729
730 let path: Path = "data/test.txt".into();
731 let bytes = Bytes::from_static(b"hello, world!");
732 object_store.put(&path, bytes.into()).await.unwrap();
733
734 let path: Path = "data/nested/test.txt".into();
735 let bytes = Bytes::from_static(b"hello, world! I am nested.");
736 object_store.put(&path, bytes.into()).await.unwrap();
737
738 object_store
739 }
740
741 #[tokio::test]
742 async fn test_basic() {
743 let op = Operator::new(services::Memory::default()).unwrap().finish();
744 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
745
746 let path: Path = "data/test.txt".into();
748
749 let bytes = Bytes::from_static(b"hello, world!");
750 object_store.put(&path, bytes.clone().into()).await.unwrap();
751
752 let meta = object_store.head(&path).await.unwrap();
753
754 assert_eq!(meta.size, 13);
755
756 assert_eq!(
757 object_store
758 .get(&path)
759 .await
760 .unwrap()
761 .bytes()
762 .await
763 .unwrap(),
764 bytes
765 );
766 }
767
768 #[tokio::test]
769 async fn test_put_multipart() {
770 let op = Operator::new(services::Memory::default()).unwrap().finish();
771 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
772
773 let mut rng = thread_rng();
774
775 let path: Path = "data/test_complete.txt".into();
777 let upload = object_store.put_multipart(&path).await.unwrap();
778
779 let mut write = WriteMultipart::new(upload);
780
781 let mut all_bytes = vec![];
782 let round = rng.gen_range(1..=1024);
783 for _ in 0..round {
784 let size = rng.gen_range(1..=1024);
785 let mut bytes = vec![0; size];
786 rng.fill_bytes(&mut bytes);
787
788 all_bytes.extend_from_slice(&bytes);
789 write.put(bytes.into());
790 }
791
792 let _ = write.finish().await.unwrap();
793
794 let meta = object_store.head(&path).await.unwrap();
795
796 assert_eq!(meta.size, all_bytes.len() as u64);
797
798 assert_eq!(
799 object_store
800 .get(&path)
801 .await
802 .unwrap()
803 .bytes()
804 .await
805 .unwrap(),
806 Bytes::from(all_bytes)
807 );
808
809 let path: Path = "data/test_abort.txt".into();
811 let mut upload = object_store.put_multipart(&path).await.unwrap();
812 upload.put_part(vec![1; 1024].into()).await.unwrap();
813 upload.abort().await.unwrap();
814
815 let res = object_store.head(&path).await;
816 let err = res.unwrap_err();
817
818 assert!(matches!(err, object_store::Error::NotFound { .. }))
819 }
820
821 #[tokio::test]
822 async fn test_list() {
823 let object_store = create_test_object_store().await;
824 let path: Path = "data/".into();
825 let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
826 assert_eq!(results.len(), 2);
827 let mut locations = results
828 .iter()
829 .map(|x| x.as_ref().unwrap().location.as_ref())
830 .collect::<Vec<_>>();
831
832 let expected_files = vec![
833 (
834 "data/nested/test.txt",
835 Bytes::from_static(b"hello, world! I am nested."),
836 ),
837 ("data/test.txt", Bytes::from_static(b"hello, world!")),
838 ];
839
840 let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
841
842 locations.sort();
843 assert_eq!(locations, expected_locations);
844
845 for (location, bytes) in expected_files {
846 let path: Path = location.into();
847 assert_eq!(
848 object_store
849 .get(&path)
850 .await
851 .unwrap()
852 .bytes()
853 .await
854 .unwrap(),
855 bytes
856 );
857 }
858 }
859
860 #[tokio::test]
861 async fn test_list_with_delimiter() {
862 let object_store = create_test_object_store().await;
863 let path: Path = "data/".into();
864 let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
865 assert_eq!(result.objects.len(), 1);
866 assert_eq!(result.common_prefixes.len(), 1);
867 assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
868 assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
869 }
870
871 #[tokio::test]
872 async fn test_list_with_offset() {
873 let object_store = create_test_object_store().await;
874 let path: Path = "data/".into();
875 let offset: Path = "data/nested/test.txt".into();
876 let result = object_store
877 .list_with_offset(Some(&path), &offset)
878 .collect::<Vec<_>>()
879 .await;
880 assert_eq!(result.len(), 1);
881 assert_eq!(
882 result[0].as_ref().unwrap().location.as_ref(),
883 "data/test.txt"
884 );
885 }
886
887 mod stat_counter {
889 use super::*;
890 use std::sync::atomic::{AtomicUsize, Ordering};
891
892 #[derive(Debug, Clone)]
893 pub struct StatCounterLayer {
894 count: Arc<AtomicUsize>,
895 }
896
897 impl StatCounterLayer {
898 pub fn new(count: Arc<AtomicUsize>) -> Self {
899 Self { count }
900 }
901 }
902
903 impl<A: opendal::raw::Access> opendal::raw::Layer<A> for StatCounterLayer {
904 type LayeredAccess = StatCounterAccessor<A>;
905
906 fn layer(&self, inner: A) -> Self::LayeredAccess {
907 StatCounterAccessor {
908 inner,
909 count: self.count.clone(),
910 }
911 }
912 }
913
914 #[derive(Debug, Clone)]
915 pub struct StatCounterAccessor<A> {
916 inner: A,
917 count: Arc<AtomicUsize>,
918 }
919
920 impl<A: opendal::raw::Access> opendal::raw::LayeredAccess for StatCounterAccessor<A> {
921 type Inner = A;
922 type Reader = A::Reader;
923 type Writer = A::Writer;
924 type Lister = A::Lister;
925 type Deleter = A::Deleter;
926
927 fn inner(&self) -> &Self::Inner {
928 &self.inner
929 }
930
931 async fn stat(
932 &self,
933 path: &str,
934 args: opendal::raw::OpStat,
935 ) -> opendal::Result<opendal::raw::RpStat> {
936 self.count.fetch_add(1, Ordering::SeqCst);
937 self.inner.stat(path, args).await
938 }
939
940 async fn read(
941 &self,
942 path: &str,
943 args: opendal::raw::OpRead,
944 ) -> opendal::Result<(opendal::raw::RpRead, Self::Reader)> {
945 self.inner.read(path, args).await
946 }
947
948 async fn write(
949 &self,
950 path: &str,
951 args: opendal::raw::OpWrite,
952 ) -> opendal::Result<(opendal::raw::RpWrite, Self::Writer)> {
953 self.inner.write(path, args).await
954 }
955
956 async fn delete(&self) -> opendal::Result<(opendal::raw::RpDelete, Self::Deleter)> {
957 self.inner.delete().await
958 }
959
960 async fn list(
961 &self,
962 path: &str,
963 args: opendal::raw::OpList,
964 ) -> opendal::Result<(opendal::raw::RpList, Self::Lister)> {
965 self.inner.list(path, args).await
966 }
967
968 async fn copy(
969 &self,
970 from: &str,
971 to: &str,
972 args: opendal::raw::OpCopy,
973 ) -> opendal::Result<opendal::raw::RpCopy> {
974 self.inner.copy(from, to, args).await
975 }
976
977 async fn rename(
978 &self,
979 from: &str,
980 to: &str,
981 args: opendal::raw::OpRename,
982 ) -> opendal::Result<opendal::raw::RpRename> {
983 self.inner.rename(from, to, args).await
984 }
985 }
986 }
987
988 #[tokio::test]
989 async fn test_get_range_no_stat() {
990 use std::sync::atomic::{AtomicUsize, Ordering};
991
992 let stat_count = Arc::new(AtomicUsize::new(0));
994 let op = Operator::new(opendal::services::Memory::default())
995 .unwrap()
996 .layer(stat_counter::StatCounterLayer::new(stat_count.clone()))
997 .finish();
998 let store = OpendalStore::new(op);
999
1000 let location = "test_get_range.txt".into();
1002 let value = Bytes::from_static(b"Hello, world!");
1003 store.put(&location, value.clone().into()).await.unwrap();
1004
1005 stat_count.store(0, Ordering::SeqCst);
1007
1008 let ret = store.get_range(&location, 0..5).await.unwrap();
1010 assert_eq!(Bytes::from_static(b"Hello"), ret);
1011 assert_eq!(
1012 stat_count.load(Ordering::SeqCst),
1013 0,
1014 "get_range should not call stat()"
1015 );
1016
1017 stat_count.store(0, Ordering::SeqCst);
1019
1020 let opts = object_store::GetOptions {
1022 range: Some(object_store::GetRange::Bounded(0..5)),
1023 ..Default::default()
1024 };
1025 let ret = store.get_opts(&location, opts).await.unwrap();
1026 let data = ret.bytes().await.unwrap();
1027 assert_eq!(Bytes::from_static(b"Hello"), data);
1028 assert!(
1029 stat_count.load(Ordering::SeqCst) > 0,
1030 "get_opts should call stat() to get metadata"
1031 );
1032
1033 store.delete(&location).await.unwrap();
1035 }
1036}