1use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::sync::Arc;
22
23use crate::utils::*;
24use crate::{datetime_to_timestamp, timestamp_to_datetime};
25use async_trait::async_trait;
26use futures::FutureExt;
27use futures::StreamExt;
28use futures::TryStreamExt;
29use futures::stream::BoxStream;
30use object_store::ListResult;
31use object_store::MultipartUpload;
32use object_store::ObjectMeta;
33use object_store::ObjectStore;
34use object_store::PutMultipartOptions;
35use object_store::PutOptions;
36use object_store::PutPayload;
37use object_store::PutResult;
38use object_store::path::Path;
39use object_store::{GetOptions, UploadPart};
40use object_store::{GetRange, GetResultPayload};
41use object_store::{GetResult, PutMode};
42use opendal::Buffer;
43use opendal::Writer;
44use opendal::options::CopyOptions;
45use opendal::raw::percent_decode_path;
46use opendal::{Operator, OperatorInfo};
47use std::collections::HashMap;
48use tokio::sync::{Mutex, Notify};
49
50#[derive(Clone)]
97pub struct OpendalStore {
98 info: Arc<OperatorInfo>,
99 inner: Operator,
100}
101
102impl OpendalStore {
103 pub fn new(op: Operator) -> Self {
105 Self {
106 info: op.info().into(),
107 inner: op,
108 }
109 }
110
111 pub fn info(&self) -> &OperatorInfo {
113 self.info.as_ref()
114 }
115
116 async fn copy_request(
118 &self,
119 from: &Path,
120 to: &Path,
121 if_not_exists: bool,
122 ) -> object_store::Result<()> {
123 let mut copy_options = CopyOptions::default();
124 if if_not_exists {
125 copy_options.if_not_exists = true;
126 }
127
128 self.inner
130 .copy_options(
131 &percent_decode_path(from.as_ref()),
132 &percent_decode_path(to.as_ref()),
133 copy_options,
134 )
135 .into_send()
136 .await
137 .map_err(|err| {
138 if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
139 object_store::Error::AlreadyExists {
140 path: to.to_string(),
141 source: Box::new(err),
142 }
143 } else {
144 format_object_store_error(err, from.as_ref())
145 }
146 })?;
147
148 Ok(())
149 }
150}
151
152impl Debug for OpendalStore {
153 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154 f.debug_struct("OpendalStore")
155 .field("scheme", &self.info.scheme())
156 .field("name", &self.info.name())
157 .field("root", &self.info.root())
158 .field("capability", &self.info.full_capability())
159 .finish()
160 }
161}
162
163impl Display for OpendalStore {
164 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
165 let info = self.inner.info();
166 write!(
167 f,
168 "Opendal({}, bucket={}, root={})",
169 info.scheme(),
170 info.name(),
171 info.root()
172 )
173 }
174}
175
176impl From<Operator> for OpendalStore {
177 fn from(value: Operator) -> Self {
178 Self::new(value)
179 }
180}
181
182#[async_trait]
183impl ObjectStore for OpendalStore {
184 async fn put_opts(
185 &self,
186 location: &Path,
187 bytes: PutPayload,
188 opts: PutOptions,
189 ) -> object_store::Result<PutResult> {
190 let decoded_location = percent_decode_path(location.as_ref());
191 let mut future_write = self
192 .inner
193 .write_with(&decoded_location, Buffer::from_iter(bytes));
194 let opts_mode = opts.mode.clone();
195 match opts.mode {
196 PutMode::Overwrite => {}
197 PutMode::Create => {
198 future_write = future_write.if_not_exists(true);
199 }
200 PutMode::Update(update_version) => {
201 let Some(etag) = update_version.e_tag else {
202 Err(object_store::Error::NotSupported {
203 source: Box::new(opendal::Error::new(
204 opendal::ErrorKind::Unsupported,
205 "etag is required for conditional put",
206 )),
207 })?
208 };
209 future_write = future_write.if_match(etag.as_str());
210 }
211 }
212 let rp = future_write.into_send().await.map_err(|err| {
213 match format_object_store_error(err, location.as_ref()) {
214 object_store::Error::Precondition { path, source }
215 if opts_mode == PutMode::Create =>
216 {
217 object_store::Error::AlreadyExists { path, source }
218 }
219 e => e,
220 }
221 })?;
222
223 let e_tag = rp.etag().map(|s| s.to_string());
224 let version = rp.version().map(|s| s.to_string());
225
226 Ok(PutResult { e_tag, version })
227 }
228
229 async fn put_multipart(
230 &self,
231 location: &Path,
232 ) -> object_store::Result<Box<dyn MultipartUpload>> {
233 let decoded_location = percent_decode_path(location.as_ref());
234 let writer = self
235 .inner
236 .writer_with(&decoded_location)
237 .concurrent(8)
238 .into_send()
239 .await
240 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
241 let upload = OpendalMultipartUpload::new(writer, location.clone());
242
243 Ok(Box::new(upload))
244 }
245
246 async fn put_multipart_opts(
247 &self,
248 location: &Path,
249 opts: PutMultipartOptions,
250 ) -> object_store::Result<Box<dyn MultipartUpload>> {
251 const DEFAULT_CONCURRENT: usize = 8;
252
253 let mut options = opendal::options::WriteOptions {
254 concurrent: DEFAULT_CONCURRENT,
255 ..Default::default()
256 };
257
258 let mut user_metadata = HashMap::new();
260
261 for (key, value) in opts.attributes.iter() {
263 match key {
264 object_store::Attribute::CacheControl => {
265 options.cache_control = Some(value.to_string());
266 }
267 object_store::Attribute::ContentDisposition => {
268 options.content_disposition = Some(value.to_string());
269 }
270 object_store::Attribute::ContentEncoding => {
271 options.content_encoding = Some(value.to_string());
272 }
273 object_store::Attribute::ContentLanguage => {
274 continue;
276 }
277 object_store::Attribute::ContentType => {
278 options.content_type = Some(value.to_string());
279 }
280 object_store::Attribute::Metadata(k) => {
281 user_metadata.insert(k.to_string(), value.to_string());
282 }
283 _ => {}
284 }
285 }
286
287 if !user_metadata.is_empty() {
289 options.user_metadata = Some(user_metadata);
290 }
291
292 let decoded_location = percent_decode_path(location.as_ref());
293 let writer = self
294 .inner
295 .writer_options(&decoded_location, options)
296 .into_send()
297 .await
298 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
299 let upload = OpendalMultipartUpload::new(writer, location.clone());
300
301 Ok(Box::new(upload))
302 }
303
304 async fn get_opts(
305 &self,
306 location: &Path,
307 options: GetOptions,
308 ) -> object_store::Result<GetResult> {
309 let raw_location = percent_decode_path(location.as_ref());
310 let meta = {
311 let mut s = self.inner.stat_with(&raw_location);
312 if let Some(version) = &options.version {
313 s = s.version(version.as_str())
314 }
315 if let Some(if_match) = &options.if_match {
316 s = s.if_match(if_match.as_str());
317 }
318 if let Some(if_none_match) = &options.if_none_match {
319 s = s.if_none_match(if_none_match.as_str());
320 }
321 if let Some(if_modified_since) =
322 options.if_modified_since.and_then(datetime_to_timestamp)
323 {
324 s = s.if_modified_since(if_modified_since);
325 }
326 if let Some(if_unmodified_since) =
327 options.if_unmodified_since.and_then(datetime_to_timestamp)
328 {
329 s = s.if_unmodified_since(if_unmodified_since);
330 }
331 s.into_send()
332 .await
333 .map_err(|err| format_object_store_error(err, location.as_ref()))?
334 };
335
336 let mut attributes = object_store::Attributes::new();
338 if let Some(user_meta) = meta.user_metadata() {
339 for (key, value) in user_meta {
340 attributes.insert(
341 object_store::Attribute::Metadata(key.clone().into()),
342 value.clone().into(),
343 );
344 }
345 }
346
347 let meta = ObjectMeta {
348 location: location.clone(),
349 last_modified: meta
350 .last_modified()
351 .and_then(timestamp_to_datetime)
352 .unwrap_or_default(),
353 size: meta.content_length(),
354 e_tag: meta.etag().map(|x| x.to_string()),
355 version: meta.version().map(|x| x.to_string()),
356 };
357
358 if options.head {
359 return Ok(GetResult {
360 payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
361 range: 0..0,
362 meta,
363 attributes,
364 });
365 }
366
367 let reader = {
368 let mut r = self.inner.reader_with(raw_location.as_ref());
369 if let Some(version) = options.version {
370 r = r.version(version.as_str());
371 }
372 if let Some(if_match) = options.if_match {
373 r = r.if_match(if_match.as_str());
374 }
375 if let Some(if_none_match) = options.if_none_match {
376 r = r.if_none_match(if_none_match.as_str());
377 }
378 if let Some(if_modified_since) =
379 options.if_modified_since.and_then(datetime_to_timestamp)
380 {
381 r = r.if_modified_since(if_modified_since);
382 }
383 if let Some(if_unmodified_since) =
384 options.if_unmodified_since.and_then(datetime_to_timestamp)
385 {
386 r = r.if_unmodified_since(if_unmodified_since);
387 }
388 r.into_send()
389 .await
390 .map_err(|err| format_object_store_error(err, location.as_ref()))?
391 };
392
393 let read_range = match options.range {
394 Some(GetRange::Bounded(r)) => {
395 if r.start >= r.end || r.start >= meta.size {
396 0..0
397 } else {
398 let end = r.end.min(meta.size);
399 r.start..end
400 }
401 }
402 Some(GetRange::Offset(r)) => {
403 if r < meta.size {
404 r..meta.size
405 } else {
406 0..0
407 }
408 }
409 Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
410 _ => 0..meta.size,
411 };
412
413 let stream = reader
414 .into_bytes_stream(read_range.start..read_range.end)
415 .into_send()
416 .await
417 .map_err(|err| format_object_store_error(err, location.as_ref()))?
418 .into_send()
419 .map_err(|err: io::Error| object_store::Error::Generic {
420 store: "IoError",
421 source: Box::new(err),
422 });
423
424 Ok(GetResult {
425 payload: GetResultPayload::Stream(Box::pin(stream)),
426 range: read_range.start..read_range.end,
427 meta,
428 attributes,
429 })
430 }
431
432 async fn delete(&self, location: &Path) -> object_store::Result<()> {
433 let decoded_location = percent_decode_path(location.as_ref());
434 self.inner
435 .delete(&decoded_location)
436 .into_send()
437 .await
438 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
439
440 Ok(())
441 }
442
443 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
444 let path = prefix.map_or("".into(), |x| {
447 format!("{}/", percent_decode_path(x.as_ref()))
448 });
449
450 let this = self.clone();
451 let fut = async move {
452 let stream = this
453 .inner
454 .lister_with(&path)
455 .recursive(true)
456 .await
457 .map_err(|err| format_object_store_error(err, &path))?;
458
459 let stream = stream.then(|res| async {
460 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
461 let meta = entry.metadata();
462
463 Ok(format_object_meta(entry.path(), meta))
464 });
465 Ok::<_, object_store::Error>(stream)
466 };
467
468 fut.into_stream().try_flatten().into_send().boxed()
469 }
470
471 fn list_with_offset(
472 &self,
473 prefix: Option<&Path>,
474 offset: &Path,
475 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
476 let path = prefix.map_or("".into(), |x| {
477 format!("{}/", percent_decode_path(x.as_ref()))
478 });
479 let offset = offset.clone();
480
481 let this = self.clone();
484
485 let fut = async move {
486 let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
487 let mut fut = this.inner.lister_with(&path).recursive(true);
488
489 if list_with_start_after {
491 fut = fut.start_after(offset.as_ref());
492 }
493
494 let lister = fut
495 .await
496 .map_err(|err| format_object_store_error(err, &path))?
497 .then(move |entry| {
498 let path = path.clone();
499 let this = this.clone();
500 async move {
501 let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
502 let (path, metadata) = entry.into_parts();
503
504 if metadata.is_dir() || metadata.last_modified().is_some() {
506 let object_meta = format_object_meta(&path, &metadata);
507 return Ok(object_meta);
508 }
509
510 let metadata = this
511 .inner
512 .stat(&path)
513 .await
514 .map_err(|err| format_object_store_error(err, &path))?;
515 let object_meta = format_object_meta(&path, &metadata);
516 Ok::<_, object_store::Error>(object_meta)
517 }
518 })
519 .into_send()
520 .boxed();
521
522 let stream = if list_with_start_after {
523 lister
524 } else {
525 lister
526 .try_filter(move |entry| futures::future::ready(entry.location > offset))
527 .into_send()
528 .boxed()
529 };
530
531 Ok::<_, object_store::Error>(stream)
532 };
533
534 fut.into_stream().into_send().try_flatten().boxed()
535 }
536
537 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
538 let path = prefix.map_or("".into(), |x| {
539 format!("{}/", percent_decode_path(x.as_ref()))
540 });
541 let mut stream = self
542 .inner
543 .lister_with(&path)
544 .into_future()
545 .into_send()
546 .await
547 .map_err(|err| format_object_store_error(err, &path))?
548 .into_send();
549
550 let mut common_prefixes = Vec::new();
551 let mut objects = Vec::new();
552
553 while let Some(res) = stream.next().into_send().await {
554 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
555 let meta = entry.metadata();
556
557 if meta.is_dir() {
558 common_prefixes.push(entry.path().into());
559 } else if meta.last_modified().is_some() {
560 objects.push(format_object_meta(entry.path(), meta));
561 } else {
562 let meta = self
563 .inner
564 .stat(entry.path())
565 .into_send()
566 .await
567 .map_err(|err| format_object_store_error(err, entry.path()))?;
568 objects.push(format_object_meta(entry.path(), &meta));
569 }
570 }
571
572 Ok(ListResult {
573 common_prefixes,
574 objects,
575 })
576 }
577
578 async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
579 self.copy_request(from, to, false).await
580 }
581
582 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
583 self.copy_request(from, to, true).await
584 }
585
586 async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
587 self.inner
588 .rename(
589 &percent_decode_path(from.as_ref()),
590 &percent_decode_path(to.as_ref()),
591 )
592 .into_send()
593 .await
594 .map_err(|err| format_object_store_error(err, from.as_ref()))?;
595
596 Ok(())
597 }
598}
599
600struct OpendalMultipartUpload {
609 writer: Arc<Mutex<Writer>>,
610 location: Path,
611 next_notify: Option<Arc<Notify>>,
612}
613
614impl OpendalMultipartUpload {
615 fn new(writer: Writer, location: Path) -> Self {
616 Self {
617 writer: Arc::new(Mutex::new(writer)),
618 location,
619 next_notify: None,
620 }
621 }
622}
623
624#[async_trait]
625impl MultipartUpload for OpendalMultipartUpload {
626 fn put_part(&mut self, data: PutPayload) -> UploadPart {
627 let writer = self.writer.clone();
628 let location = self.location.clone();
629
630 let next_notify = Arc::new(Notify::new());
632 let current_notify = self.next_notify.replace(next_notify.clone());
634
635 async move {
636 if let Some(notify) = current_notify {
638 notify.notified().await;
640 }
641
642 let mut writer = writer.lock().await;
643 let result = writer
644 .write(Buffer::from_iter(data.into_iter()))
645 .await
646 .map_err(|err| format_object_store_error(err, location.as_ref()));
647
648 next_notify.notify_one();
650
651 result
652 }
653 .into_send()
654 .boxed()
655 }
656
657 async fn complete(&mut self) -> object_store::Result<PutResult> {
658 let mut writer = self.writer.lock().await;
659 let metadata = writer
660 .close()
661 .into_send()
662 .await
663 .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
664
665 let e_tag = metadata.etag().map(|s| s.to_string());
666 let version = metadata.version().map(|s| s.to_string());
667
668 Ok(PutResult { e_tag, version })
669 }
670
671 async fn abort(&mut self) -> object_store::Result<()> {
672 let mut writer = self.writer.lock().await;
673 writer
674 .abort()
675 .into_send()
676 .await
677 .map_err(|err| format_object_store_error(err, self.location.as_ref()))
678 }
679}
680
681impl Debug for OpendalMultipartUpload {
682 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
683 f.debug_struct("OpendalMultipartUpload")
684 .field("location", &self.location)
685 .finish()
686 }
687}
688
689#[cfg(test)]
690mod tests {
691 use bytes::Bytes;
692 use object_store::path::Path;
693 use object_store::{ObjectStore, WriteMultipart};
694 use opendal::services;
695 use rand::prelude::*;
696 use std::sync::Arc;
697
698 use super::*;
699
700 async fn create_test_object_store() -> Arc<dyn ObjectStore> {
701 let op = Operator::new(services::Memory::default()).unwrap().finish();
702 let object_store = Arc::new(OpendalStore::new(op));
703
704 let path: Path = "data/test.txt".into();
705 let bytes = Bytes::from_static(b"hello, world!");
706 object_store.put(&path, bytes.into()).await.unwrap();
707
708 let path: Path = "data/nested/test.txt".into();
709 let bytes = Bytes::from_static(b"hello, world! I am nested.");
710 object_store.put(&path, bytes.into()).await.unwrap();
711
712 object_store
713 }
714
715 #[tokio::test]
716 async fn test_basic() {
717 let op = Operator::new(services::Memory::default()).unwrap().finish();
718 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
719
720 let path: Path = "data/test.txt".into();
722
723 let bytes = Bytes::from_static(b"hello, world!");
724 object_store.put(&path, bytes.clone().into()).await.unwrap();
725
726 let meta = object_store.head(&path).await.unwrap();
727
728 assert_eq!(meta.size, 13);
729
730 assert_eq!(
731 object_store
732 .get(&path)
733 .await
734 .unwrap()
735 .bytes()
736 .await
737 .unwrap(),
738 bytes
739 );
740 }
741
742 #[tokio::test]
743 async fn test_put_multipart() {
744 let op = Operator::new(services::Memory::default()).unwrap().finish();
745 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
746
747 let mut rng = thread_rng();
748
749 let path: Path = "data/test_complete.txt".into();
751 let upload = object_store.put_multipart(&path).await.unwrap();
752
753 let mut write = WriteMultipart::new(upload);
754
755 let mut all_bytes = vec![];
756 let round = rng.gen_range(1..=1024);
757 for _ in 0..round {
758 let size = rng.gen_range(1..=1024);
759 let mut bytes = vec![0; size];
760 rng.fill_bytes(&mut bytes);
761
762 all_bytes.extend_from_slice(&bytes);
763 write.put(bytes.into());
764 }
765
766 let _ = write.finish().await.unwrap();
767
768 let meta = object_store.head(&path).await.unwrap();
769
770 assert_eq!(meta.size, all_bytes.len() as u64);
771
772 assert_eq!(
773 object_store
774 .get(&path)
775 .await
776 .unwrap()
777 .bytes()
778 .await
779 .unwrap(),
780 Bytes::from(all_bytes)
781 );
782
783 let path: Path = "data/test_abort.txt".into();
785 let mut upload = object_store.put_multipart(&path).await.unwrap();
786 upload.put_part(vec![1; 1024].into()).await.unwrap();
787 upload.abort().await.unwrap();
788
789 let res = object_store.head(&path).await;
790 let err = res.unwrap_err();
791
792 assert!(matches!(err, object_store::Error::NotFound { .. }))
793 }
794
795 #[tokio::test]
796 async fn test_list() {
797 let object_store = create_test_object_store().await;
798 let path: Path = "data/".into();
799 let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
800 assert_eq!(results.len(), 2);
801 let mut locations = results
802 .iter()
803 .map(|x| x.as_ref().unwrap().location.as_ref())
804 .collect::<Vec<_>>();
805
806 let expected_files = vec![
807 (
808 "data/nested/test.txt",
809 Bytes::from_static(b"hello, world! I am nested."),
810 ),
811 ("data/test.txt", Bytes::from_static(b"hello, world!")),
812 ];
813
814 let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
815
816 locations.sort();
817 assert_eq!(locations, expected_locations);
818
819 for (location, bytes) in expected_files {
820 let path: Path = location.into();
821 assert_eq!(
822 object_store
823 .get(&path)
824 .await
825 .unwrap()
826 .bytes()
827 .await
828 .unwrap(),
829 bytes
830 );
831 }
832 }
833
834 #[tokio::test]
835 async fn test_list_with_delimiter() {
836 let object_store = create_test_object_store().await;
837 let path: Path = "data/".into();
838 let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
839 assert_eq!(result.objects.len(), 1);
840 assert_eq!(result.common_prefixes.len(), 1);
841 assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
842 assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
843 }
844
845 #[tokio::test]
846 async fn test_list_with_offset() {
847 let object_store = create_test_object_store().await;
848 let path: Path = "data/".into();
849 let offset: Path = "data/nested/test.txt".into();
850 let result = object_store
851 .list_with_offset(Some(&path), &offset)
852 .collect::<Vec<_>>()
853 .await;
854 assert_eq!(result.len(), 1);
855 assert_eq!(
856 result[0].as_ref().unwrap().location.as_ref(),
857 "data/test.txt"
858 );
859 }
860}