1use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::sync::Arc;
22
23use crate::utils::*;
24use async_trait::async_trait;
25use futures::stream::BoxStream;
26use futures::FutureExt;
27use futures::StreamExt;
28use futures::TryStreamExt;
29use object_store::path::Path;
30use object_store::ListResult;
31use object_store::MultipartUpload;
32use object_store::ObjectMeta;
33use object_store::ObjectStore;
34use object_store::PutMultipartOptions;
35use object_store::PutOptions;
36use object_store::PutPayload;
37use object_store::PutResult;
38use object_store::{GetOptions, UploadPart};
39use object_store::{GetRange, GetResultPayload};
40use object_store::{GetResult, PutMode};
41use opendal::options::CopyOptions;
42use opendal::raw::percent_decode_path;
43use opendal::Buffer;
44use opendal::Writer;
45use opendal::{Operator, OperatorInfo};
46use std::collections::HashMap;
47use tokio::sync::{Mutex, Notify};
48
49#[derive(Clone)]
96pub struct OpendalStore {
97 info: Arc<OperatorInfo>,
98 inner: Operator,
99}
100
101impl OpendalStore {
102 pub fn new(op: Operator) -> Self {
104 Self {
105 info: op.info().into(),
106 inner: op,
107 }
108 }
109
110 pub fn info(&self) -> &OperatorInfo {
112 self.info.as_ref()
113 }
114
115 async fn copy_request(
117 &self,
118 from: &Path,
119 to: &Path,
120 if_not_exists: bool,
121 ) -> object_store::Result<()> {
122 let mut copy_options = CopyOptions::default();
123 if if_not_exists {
124 copy_options.if_not_exists = true;
125 }
126
127 self.inner
129 .copy_options(
130 &percent_decode_path(from.as_ref()),
131 &percent_decode_path(to.as_ref()),
132 copy_options,
133 )
134 .into_send()
135 .await
136 .map_err(|err| {
137 if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
138 object_store::Error::AlreadyExists {
139 path: to.to_string(),
140 source: Box::new(err),
141 }
142 } else {
143 format_object_store_error(err, from.as_ref())
144 }
145 })?;
146
147 Ok(())
148 }
149}
150
151impl Debug for OpendalStore {
152 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153 f.debug_struct("OpendalStore")
154 .field("scheme", &self.info.scheme())
155 .field("name", &self.info.name())
156 .field("root", &self.info.root())
157 .field("capability", &self.info.full_capability())
158 .finish()
159 }
160}
161
162impl Display for OpendalStore {
163 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
164 let info = self.inner.info();
165 write!(
166 f,
167 "Opendal({}, bucket={}, root={})",
168 info.scheme(),
169 info.name(),
170 info.root()
171 )
172 }
173}
174
175impl From<Operator> for OpendalStore {
176 fn from(value: Operator) -> Self {
177 Self::new(value)
178 }
179}
180
181#[async_trait]
182impl ObjectStore for OpendalStore {
183 async fn put_opts(
184 &self,
185 location: &Path,
186 bytes: PutPayload,
187 opts: PutOptions,
188 ) -> object_store::Result<PutResult> {
189 let mut future_write = self.inner.write_with(
190 &percent_decode_path(location.as_ref()),
191 Buffer::from_iter(bytes),
192 );
193 let opts_mode = opts.mode.clone();
194 match opts.mode {
195 PutMode::Overwrite => {}
196 PutMode::Create => {
197 future_write = future_write.if_not_exists(true);
198 }
199 PutMode::Update(update_version) => {
200 let Some(etag) = update_version.e_tag else {
201 Err(object_store::Error::NotSupported {
202 source: Box::new(opendal::Error::new(
203 opendal::ErrorKind::Unsupported,
204 "etag is required for conditional put",
205 )),
206 })?
207 };
208 future_write = future_write.if_match(etag.as_str());
209 }
210 }
211 let rp = future_write.into_send().await.map_err(|err| {
212 match format_object_store_error(err, location.as_ref()) {
213 object_store::Error::Precondition { path, source }
214 if opts_mode == PutMode::Create =>
215 {
216 object_store::Error::AlreadyExists { path, source }
217 }
218 e => e,
219 }
220 })?;
221
222 let e_tag = rp.etag().map(|s| s.to_string());
223 let version = rp.version().map(|s| s.to_string());
224
225 Ok(PutResult { e_tag, version })
226 }
227
228 async fn put_multipart(
229 &self,
230 location: &Path,
231 ) -> object_store::Result<Box<dyn MultipartUpload>> {
232 let writer = self
233 .inner
234 .writer_with(&percent_decode_path(location.as_ref()))
235 .concurrent(8)
236 .into_send()
237 .await
238 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
239 let upload = OpendalMultipartUpload::new(writer, location.clone());
240
241 Ok(Box::new(upload))
242 }
243
244 async fn put_multipart_opts(
245 &self,
246 location: &Path,
247 opts: PutMultipartOptions,
248 ) -> object_store::Result<Box<dyn MultipartUpload>> {
249 const DEFAULT_CONCURRENT: usize = 8;
250
251 let mut options = opendal::options::WriteOptions {
252 concurrent: DEFAULT_CONCURRENT,
253 ..Default::default()
254 };
255
256 let mut user_metadata = HashMap::new();
258
259 for (key, value) in opts.attributes.iter() {
261 match key {
262 object_store::Attribute::CacheControl => {
263 options.cache_control = Some(value.to_string());
264 }
265 object_store::Attribute::ContentDisposition => {
266 options.content_disposition = Some(value.to_string());
267 }
268 object_store::Attribute::ContentEncoding => {
269 options.content_encoding = Some(value.to_string());
270 }
271 object_store::Attribute::ContentLanguage => {
272 continue;
274 }
275 object_store::Attribute::ContentType => {
276 options.content_type = Some(value.to_string());
277 }
278 object_store::Attribute::Metadata(k) => {
279 user_metadata.insert(k.to_string(), value.to_string());
280 }
281 _ => {}
282 }
283 }
284
285 if !user_metadata.is_empty() {
287 options.user_metadata = Some(user_metadata);
288 }
289
290 let writer = self
291 .inner
292 .writer_options(&percent_decode_path(location.as_ref()), options)
293 .into_send()
294 .await
295 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
296 let upload = OpendalMultipartUpload::new(writer, location.clone());
297
298 Ok(Box::new(upload))
299 }
300
301 async fn get_opts(
302 &self,
303 location: &Path,
304 options: GetOptions,
305 ) -> object_store::Result<GetResult> {
306 let raw_location = percent_decode_path(location.as_ref());
307 let meta = {
308 let mut s = self.inner.stat_with(&raw_location);
309 if let Some(version) = &options.version {
310 s = s.version(version.as_str())
311 }
312 if let Some(if_match) = &options.if_match {
313 s = s.if_match(if_match.as_str());
314 }
315 if let Some(if_none_match) = &options.if_none_match {
316 s = s.if_none_match(if_none_match.as_str());
317 }
318 if let Some(if_modified_since) = options.if_modified_since {
319 s = s.if_modified_since(if_modified_since);
320 }
321 if let Some(if_unmodified_since) = options.if_unmodified_since {
322 s = s.if_unmodified_since(if_unmodified_since);
323 }
324 s.into_send()
325 .await
326 .map_err(|err| format_object_store_error(err, location.as_ref()))?
327 };
328
329 let mut attributes = object_store::Attributes::new();
331 if let Some(user_meta) = meta.user_metadata() {
332 for (key, value) in user_meta {
333 attributes.insert(
334 object_store::Attribute::Metadata(key.clone().into()),
335 value.clone().into(),
336 );
337 }
338 }
339
340 let meta = ObjectMeta {
341 location: location.clone(),
342 last_modified: meta.last_modified().unwrap_or_default(),
343 size: meta.content_length(),
344 e_tag: meta.etag().map(|x| x.to_string()),
345 version: meta.version().map(|x| x.to_string()),
346 };
347
348 if options.head {
349 return Ok(GetResult {
350 payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
351 range: 0..0,
352 meta,
353 attributes,
354 });
355 }
356
357 let reader = {
358 let mut r = self.inner.reader_with(raw_location.as_ref());
359 if let Some(version) = options.version {
360 r = r.version(version.as_str());
361 }
362 if let Some(if_match) = options.if_match {
363 r = r.if_match(if_match.as_str());
364 }
365 if let Some(if_none_match) = options.if_none_match {
366 r = r.if_none_match(if_none_match.as_str());
367 }
368 if let Some(if_modified_since) = options.if_modified_since {
369 r = r.if_modified_since(if_modified_since);
370 }
371 if let Some(if_unmodified_since) = options.if_unmodified_since {
372 r = r.if_unmodified_since(if_unmodified_since);
373 }
374 r.into_send()
375 .await
376 .map_err(|err| format_object_store_error(err, location.as_ref()))?
377 };
378
379 let read_range = match options.range {
380 Some(GetRange::Bounded(r)) => {
381 if r.start >= r.end || r.start >= meta.size {
382 0..0
383 } else {
384 let end = r.end.min(meta.size);
385 r.start..end
386 }
387 }
388 Some(GetRange::Offset(r)) => {
389 if r < meta.size {
390 r..meta.size
391 } else {
392 0..0
393 }
394 }
395 Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
396 _ => 0..meta.size,
397 };
398
399 let stream = reader
400 .into_bytes_stream(read_range.start..read_range.end)
401 .into_send()
402 .await
403 .map_err(|err| format_object_store_error(err, location.as_ref()))?
404 .into_send()
405 .map_err(|err: io::Error| object_store::Error::Generic {
406 store: "IoError",
407 source: Box::new(err),
408 });
409
410 Ok(GetResult {
411 payload: GetResultPayload::Stream(Box::pin(stream)),
412 range: read_range.start..read_range.end,
413 meta,
414 attributes,
415 })
416 }
417
418 async fn delete(&self, location: &Path) -> object_store::Result<()> {
419 self.inner
420 .delete(&percent_decode_path(location.as_ref()))
421 .into_send()
422 .await
423 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
424
425 Ok(())
426 }
427
428 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
429 let path = prefix.map_or("".into(), |x| {
432 format!("{}/", percent_decode_path(x.as_ref()))
433 });
434
435 let lister_fut = self.inner.lister_with(&path).recursive(true);
436 let fut = async move {
437 let stream = lister_fut
438 .await
439 .map_err(|err| format_object_store_error(err, &path))?;
440
441 let stream = stream.then(|res| async {
442 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
443 let meta = entry.metadata();
444
445 Ok(format_object_meta(entry.path(), meta))
446 });
447 Ok::<_, object_store::Error>(stream)
448 };
449
450 fut.into_stream().try_flatten().into_send().boxed()
451 }
452
453 fn list_with_offset(
454 &self,
455 prefix: Option<&Path>,
456 offset: &Path,
457 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
458 let path = prefix.map_or("".into(), |x| {
459 format!("{}/", percent_decode_path(x.as_ref()))
460 });
461 let offset = offset.clone();
462
463 let this = self.clone();
466
467 let fut = async move {
468 let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
469 let mut fut = this.inner.lister_with(&path).recursive(true);
470
471 if list_with_start_after {
473 fut = fut.start_after(offset.as_ref());
474 }
475
476 let lister = fut
477 .await
478 .map_err(|err| format_object_store_error(err, &path))?
479 .then(move |entry| {
480 let path = path.clone();
481 let this = this.clone();
482 async move {
483 let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
484 let (path, metadata) = entry.into_parts();
485
486 if metadata.is_dir() || metadata.last_modified().is_some() {
488 let object_meta = format_object_meta(&path, &metadata);
489 return Ok(object_meta);
490 }
491
492 let metadata = this
493 .inner
494 .stat(&path)
495 .await
496 .map_err(|err| format_object_store_error(err, &path))?;
497 let object_meta = format_object_meta(&path, &metadata);
498 Ok::<_, object_store::Error>(object_meta)
499 }
500 })
501 .into_send()
502 .boxed();
503
504 let stream = if list_with_start_after {
505 lister
506 } else {
507 lister
508 .try_filter(move |entry| futures::future::ready(entry.location > offset))
509 .into_send()
510 .boxed()
511 };
512
513 Ok::<_, object_store::Error>(stream)
514 };
515
516 fut.into_stream().into_send().try_flatten().boxed()
517 }
518
519 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
520 let path = prefix.map_or("".into(), |x| {
521 format!("{}/", percent_decode_path(x.as_ref()))
522 });
523 let mut stream = self
524 .inner
525 .lister_with(&path)
526 .into_future()
527 .into_send()
528 .await
529 .map_err(|err| format_object_store_error(err, &path))?
530 .into_send();
531
532 let mut common_prefixes = Vec::new();
533 let mut objects = Vec::new();
534
535 while let Some(res) = stream.next().into_send().await {
536 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
537 let meta = entry.metadata();
538
539 if meta.is_dir() {
540 common_prefixes.push(entry.path().into());
541 } else if meta.last_modified().is_some() {
542 objects.push(format_object_meta(entry.path(), meta));
543 } else {
544 let meta = self
545 .inner
546 .stat(entry.path())
547 .into_send()
548 .await
549 .map_err(|err| format_object_store_error(err, entry.path()))?;
550 objects.push(format_object_meta(entry.path(), &meta));
551 }
552 }
553
554 Ok(ListResult {
555 common_prefixes,
556 objects,
557 })
558 }
559
560 async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
561 self.copy_request(from, to, false).await
562 }
563
564 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
565 self.copy_request(from, to, true).await
566 }
567
568 async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
569 self.inner
570 .rename(
571 &percent_decode_path(from.as_ref()),
572 &percent_decode_path(to.as_ref()),
573 )
574 .into_send()
575 .await
576 .map_err(|err| format_object_store_error(err, from.as_ref()))?;
577
578 Ok(())
579 }
580}
581
582struct OpendalMultipartUpload {
591 writer: Arc<Mutex<Writer>>,
592 location: Path,
593 next_notify: Option<Arc<Notify>>,
594}
595
596impl OpendalMultipartUpload {
597 fn new(writer: Writer, location: Path) -> Self {
598 Self {
599 writer: Arc::new(Mutex::new(writer)),
600 location,
601 next_notify: None,
602 }
603 }
604}
605
606#[async_trait]
607impl MultipartUpload for OpendalMultipartUpload {
608 fn put_part(&mut self, data: PutPayload) -> UploadPart {
609 let writer = self.writer.clone();
610 let location = self.location.clone();
611
612 let next_notify = Arc::new(Notify::new());
614 let current_notify = self.next_notify.replace(next_notify.clone());
616
617 async move {
618 if let Some(notify) = current_notify {
620 notify.notified().await;
622 }
623
624 let mut writer = writer.lock().await;
625 let result = writer
626 .write(Buffer::from_iter(data.into_iter()))
627 .await
628 .map_err(|err| format_object_store_error(err, location.as_ref()));
629
630 next_notify.notify_one();
632
633 result
634 }
635 .into_send()
636 .boxed()
637 }
638
639 async fn complete(&mut self) -> object_store::Result<PutResult> {
640 let mut writer = self.writer.lock().await;
641 let metadata = writer
642 .close()
643 .into_send()
644 .await
645 .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
646
647 let e_tag = metadata.etag().map(|s| s.to_string());
648 let version = metadata.version().map(|s| s.to_string());
649
650 Ok(PutResult { e_tag, version })
651 }
652
653 async fn abort(&mut self) -> object_store::Result<()> {
654 let mut writer = self.writer.lock().await;
655 writer
656 .abort()
657 .into_send()
658 .await
659 .map_err(|err| format_object_store_error(err, self.location.as_ref()))
660 }
661}
662
663impl Debug for OpendalMultipartUpload {
664 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
665 f.debug_struct("OpendalMultipartUpload")
666 .field("location", &self.location)
667 .finish()
668 }
669}
670
671#[cfg(test)]
672mod tests {
673 use bytes::Bytes;
674 use object_store::path::Path;
675 use object_store::{ObjectStore, WriteMultipart};
676 use opendal::services;
677 use rand::prelude::*;
678 use std::sync::Arc;
679
680 use super::*;
681
682 async fn create_test_object_store() -> Arc<dyn ObjectStore> {
683 let op = Operator::new(services::Memory::default()).unwrap().finish();
684 let object_store = Arc::new(OpendalStore::new(op));
685
686 let path: Path = "data/test.txt".into();
687 let bytes = Bytes::from_static(b"hello, world!");
688 object_store.put(&path, bytes.into()).await.unwrap();
689
690 let path: Path = "data/nested/test.txt".into();
691 let bytes = Bytes::from_static(b"hello, world! I am nested.");
692 object_store.put(&path, bytes.into()).await.unwrap();
693
694 object_store
695 }
696
697 #[tokio::test]
698 async fn test_basic() {
699 let op = Operator::new(services::Memory::default()).unwrap().finish();
700 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
701
702 let path: Path = "data/test.txt".into();
704
705 let bytes = Bytes::from_static(b"hello, world!");
706 object_store.put(&path, bytes.clone().into()).await.unwrap();
707
708 let meta = object_store.head(&path).await.unwrap();
709
710 assert_eq!(meta.size, 13);
711
712 assert_eq!(
713 object_store
714 .get(&path)
715 .await
716 .unwrap()
717 .bytes()
718 .await
719 .unwrap(),
720 bytes
721 );
722 }
723
724 #[tokio::test]
725 async fn test_put_multipart() {
726 let op = Operator::new(services::Memory::default()).unwrap().finish();
727 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
728
729 let mut rng = thread_rng();
730
731 let path: Path = "data/test_complete.txt".into();
733 let upload = object_store.put_multipart(&path).await.unwrap();
734
735 let mut write = WriteMultipart::new(upload);
736
737 let mut all_bytes = vec![];
738 let round = rng.gen_range(1..=1024);
739 for _ in 0..round {
740 let size = rng.gen_range(1..=1024);
741 let mut bytes = vec![0; size];
742 rng.fill_bytes(&mut bytes);
743
744 all_bytes.extend_from_slice(&bytes);
745 write.put(bytes.into());
746 }
747
748 let _ = write.finish().await.unwrap();
749
750 let meta = object_store.head(&path).await.unwrap();
751
752 assert_eq!(meta.size, all_bytes.len() as u64);
753
754 assert_eq!(
755 object_store
756 .get(&path)
757 .await
758 .unwrap()
759 .bytes()
760 .await
761 .unwrap(),
762 Bytes::from(all_bytes)
763 );
764
765 let path: Path = "data/test_abort.txt".into();
767 let mut upload = object_store.put_multipart(&path).await.unwrap();
768 upload.put_part(vec![1; 1024].into()).await.unwrap();
769 upload.abort().await.unwrap();
770
771 let res = object_store.head(&path).await;
772 let err = res.unwrap_err();
773
774 assert!(matches!(err, object_store::Error::NotFound { .. }))
775 }
776
777 #[tokio::test]
778 async fn test_list() {
779 let object_store = create_test_object_store().await;
780 let path: Path = "data/".into();
781 let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
782 assert_eq!(results.len(), 2);
783 let mut locations = results
784 .iter()
785 .map(|x| x.as_ref().unwrap().location.as_ref())
786 .collect::<Vec<_>>();
787
788 let expected_files = vec![
789 (
790 "data/nested/test.txt",
791 Bytes::from_static(b"hello, world! I am nested."),
792 ),
793 ("data/test.txt", Bytes::from_static(b"hello, world!")),
794 ];
795
796 let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
797
798 locations.sort();
799 assert_eq!(locations, expected_locations);
800
801 for (location, bytes) in expected_files {
802 let path: Path = location.into();
803 assert_eq!(
804 object_store
805 .get(&path)
806 .await
807 .unwrap()
808 .bytes()
809 .await
810 .unwrap(),
811 bytes
812 );
813 }
814 }
815
816 #[tokio::test]
817 async fn test_list_with_delimiter() {
818 let object_store = create_test_object_store().await;
819 let path: Path = "data/".into();
820 let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
821 assert_eq!(result.objects.len(), 1);
822 assert_eq!(result.common_prefixes.len(), 1);
823 assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
824 assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
825 }
826
827 #[tokio::test]
828 async fn test_list_with_offset() {
829 let object_store = create_test_object_store().await;
830 let path: Path = "data/".into();
831 let offset: Path = "data/nested/test.txt".into();
832 let result = object_store
833 .list_with_offset(Some(&path), &offset)
834 .collect::<Vec<_>>()
835 .await;
836 assert_eq!(result.len(), 1);
837 assert_eq!(
838 result[0].as_ref().unwrap().location.as_ref(),
839 "data/test.txt"
840 );
841 }
842}