1use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::sync::Arc;
22
23use crate::utils::*;
24use crate::{datetime_to_timestamp, timestamp_to_datetime};
25use async_trait::async_trait;
26use futures::FutureExt;
27use futures::StreamExt;
28use futures::TryStreamExt;
29use futures::stream::BoxStream;
30use mea::mutex::Mutex;
31use mea::oneshot;
32use object_store::ListResult;
33use object_store::MultipartUpload;
34use object_store::ObjectMeta;
35use object_store::ObjectStore;
36use object_store::PutMultipartOptions;
37use object_store::PutOptions;
38use object_store::PutPayload;
39use object_store::PutResult;
40use object_store::path::Path;
41use object_store::{GetOptions, UploadPart};
42use object_store::{GetRange, GetResultPayload};
43use object_store::{GetResult, PutMode};
44use opendal::Buffer;
45use opendal::Writer;
46use opendal::options::CopyOptions;
47use opendal::raw::percent_decode_path;
48use opendal::{Operator, OperatorInfo};
49use std::collections::HashMap;
50
51#[derive(Clone)]
98pub struct OpendalStore {
99 info: Arc<OperatorInfo>,
100 inner: Operator,
101}
102
103impl OpendalStore {
104 pub fn new(op: Operator) -> Self {
106 Self {
107 info: op.info().into(),
108 inner: op,
109 }
110 }
111
112 pub fn info(&self) -> &OperatorInfo {
114 self.info.as_ref()
115 }
116
117 async fn copy_request(
119 &self,
120 from: &Path,
121 to: &Path,
122 if_not_exists: bool,
123 ) -> object_store::Result<()> {
124 let mut copy_options = CopyOptions::default();
125 if if_not_exists {
126 copy_options.if_not_exists = true;
127 }
128
129 self.inner
131 .copy_options(
132 &percent_decode_path(from.as_ref()),
133 &percent_decode_path(to.as_ref()),
134 copy_options,
135 )
136 .into_send()
137 .await
138 .map_err(|err| {
139 if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
140 object_store::Error::AlreadyExists {
141 path: to.to_string(),
142 source: Box::new(err),
143 }
144 } else {
145 format_object_store_error(err, from.as_ref())
146 }
147 })?;
148
149 Ok(())
150 }
151}
152
153impl Debug for OpendalStore {
154 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
155 f.debug_struct("OpendalStore")
156 .field("scheme", &self.info.scheme())
157 .field("name", &self.info.name())
158 .field("root", &self.info.root())
159 .field("capability", &self.info.full_capability())
160 .finish()
161 }
162}
163
164impl Display for OpendalStore {
165 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
166 let info = self.inner.info();
167 write!(
168 f,
169 "Opendal({}, bucket={}, root={})",
170 info.scheme(),
171 info.name(),
172 info.root()
173 )
174 }
175}
176
177impl From<Operator> for OpendalStore {
178 fn from(value: Operator) -> Self {
179 Self::new(value)
180 }
181}
182
183#[async_trait]
184impl ObjectStore for OpendalStore {
185 async fn put_opts(
186 &self,
187 location: &Path,
188 bytes: PutPayload,
189 opts: PutOptions,
190 ) -> object_store::Result<PutResult> {
191 let decoded_location = percent_decode_path(location.as_ref());
192 let mut future_write = self
193 .inner
194 .write_with(&decoded_location, Buffer::from_iter(bytes));
195 let opts_mode = opts.mode.clone();
196 match opts.mode {
197 PutMode::Overwrite => {}
198 PutMode::Create => {
199 future_write = future_write.if_not_exists(true);
200 }
201 PutMode::Update(update_version) => {
202 let Some(etag) = update_version.e_tag else {
203 Err(object_store::Error::NotSupported {
204 source: Box::new(opendal::Error::new(
205 opendal::ErrorKind::Unsupported,
206 "etag is required for conditional put",
207 )),
208 })?
209 };
210 future_write = future_write.if_match(etag.as_str());
211 }
212 }
213 let rp = future_write.into_send().await.map_err(|err| {
214 match format_object_store_error(err, location.as_ref()) {
215 object_store::Error::Precondition { path, source }
216 if opts_mode == PutMode::Create =>
217 {
218 object_store::Error::AlreadyExists { path, source }
219 }
220 e => e,
221 }
222 })?;
223
224 let e_tag = rp.etag().map(|s| s.to_string());
225 let version = rp.version().map(|s| s.to_string());
226
227 Ok(PutResult { e_tag, version })
228 }
229
230 async fn put_multipart(
231 &self,
232 location: &Path,
233 ) -> object_store::Result<Box<dyn MultipartUpload>> {
234 let decoded_location = percent_decode_path(location.as_ref());
235 let writer = self
236 .inner
237 .writer_with(&decoded_location)
238 .concurrent(8)
239 .into_send()
240 .await
241 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
242 let upload = OpendalMultipartUpload::new(writer, location.clone());
243
244 Ok(Box::new(upload))
245 }
246
247 async fn put_multipart_opts(
248 &self,
249 location: &Path,
250 opts: PutMultipartOptions,
251 ) -> object_store::Result<Box<dyn MultipartUpload>> {
252 const DEFAULT_CONCURRENT: usize = 8;
253
254 let mut options = opendal::options::WriteOptions {
255 concurrent: DEFAULT_CONCURRENT,
256 ..Default::default()
257 };
258
259 let mut user_metadata = HashMap::new();
261
262 for (key, value) in opts.attributes.iter() {
264 match key {
265 object_store::Attribute::CacheControl => {
266 options.cache_control = Some(value.to_string());
267 }
268 object_store::Attribute::ContentDisposition => {
269 options.content_disposition = Some(value.to_string());
270 }
271 object_store::Attribute::ContentEncoding => {
272 options.content_encoding = Some(value.to_string());
273 }
274 object_store::Attribute::ContentLanguage => {
275 continue;
277 }
278 object_store::Attribute::ContentType => {
279 options.content_type = Some(value.to_string());
280 }
281 object_store::Attribute::Metadata(k) => {
282 user_metadata.insert(k.to_string(), value.to_string());
283 }
284 _ => {}
285 }
286 }
287
288 if !user_metadata.is_empty() {
290 options.user_metadata = Some(user_metadata);
291 }
292
293 let decoded_location = percent_decode_path(location.as_ref());
294 let writer = self
295 .inner
296 .writer_options(&decoded_location, options)
297 .into_send()
298 .await
299 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
300 let upload = OpendalMultipartUpload::new(writer, location.clone());
301
302 Ok(Box::new(upload))
303 }
304
305 async fn get_opts(
306 &self,
307 location: &Path,
308 options: GetOptions,
309 ) -> object_store::Result<GetResult> {
310 let raw_location = percent_decode_path(location.as_ref());
311 let meta = {
312 let mut s = self.inner.stat_with(&raw_location);
313 if let Some(version) = &options.version {
314 s = s.version(version.as_str())
315 }
316 if let Some(if_match) = &options.if_match {
317 s = s.if_match(if_match.as_str());
318 }
319 if let Some(if_none_match) = &options.if_none_match {
320 s = s.if_none_match(if_none_match.as_str());
321 }
322 if let Some(if_modified_since) =
323 options.if_modified_since.and_then(datetime_to_timestamp)
324 {
325 s = s.if_modified_since(if_modified_since);
326 }
327 if let Some(if_unmodified_since) =
328 options.if_unmodified_since.and_then(datetime_to_timestamp)
329 {
330 s = s.if_unmodified_since(if_unmodified_since);
331 }
332 s.into_send()
333 .await
334 .map_err(|err| format_object_store_error(err, location.as_ref()))?
335 };
336
337 let mut attributes = object_store::Attributes::new();
339 if let Some(user_meta) = meta.user_metadata() {
340 for (key, value) in user_meta {
341 attributes.insert(
342 object_store::Attribute::Metadata(key.clone().into()),
343 value.clone().into(),
344 );
345 }
346 }
347
348 let meta = ObjectMeta {
349 location: location.clone(),
350 last_modified: meta
351 .last_modified()
352 .and_then(timestamp_to_datetime)
353 .unwrap_or_default(),
354 size: meta.content_length(),
355 e_tag: meta.etag().map(|x| x.to_string()),
356 version: meta.version().map(|x| x.to_string()),
357 };
358
359 if options.head {
360 return Ok(GetResult {
361 payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
362 range: 0..0,
363 meta,
364 attributes,
365 });
366 }
367
368 let reader = {
369 let mut r = self.inner.reader_with(raw_location.as_ref());
370 if let Some(version) = options.version {
371 r = r.version(version.as_str());
372 }
373 if let Some(if_match) = options.if_match {
374 r = r.if_match(if_match.as_str());
375 }
376 if let Some(if_none_match) = options.if_none_match {
377 r = r.if_none_match(if_none_match.as_str());
378 }
379 if let Some(if_modified_since) =
380 options.if_modified_since.and_then(datetime_to_timestamp)
381 {
382 r = r.if_modified_since(if_modified_since);
383 }
384 if let Some(if_unmodified_since) =
385 options.if_unmodified_since.and_then(datetime_to_timestamp)
386 {
387 r = r.if_unmodified_since(if_unmodified_since);
388 }
389 r.into_send()
390 .await
391 .map_err(|err| format_object_store_error(err, location.as_ref()))?
392 };
393
394 let read_range = match options.range {
395 Some(GetRange::Bounded(r)) => {
396 if r.start >= r.end || r.start >= meta.size {
397 0..0
398 } else {
399 let end = r.end.min(meta.size);
400 r.start..end
401 }
402 }
403 Some(GetRange::Offset(r)) => {
404 if r < meta.size {
405 r..meta.size
406 } else {
407 0..0
408 }
409 }
410 Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
411 _ => 0..meta.size,
412 };
413
414 let stream = reader
415 .into_bytes_stream(read_range.start..read_range.end)
416 .into_send()
417 .await
418 .map_err(|err| format_object_store_error(err, location.as_ref()))?
419 .into_send()
420 .map_err(|err: io::Error| object_store::Error::Generic {
421 store: "IoError",
422 source: Box::new(err),
423 });
424
425 Ok(GetResult {
426 payload: GetResultPayload::Stream(Box::pin(stream)),
427 range: read_range.start..read_range.end,
428 meta,
429 attributes,
430 })
431 }
432
433 async fn delete(&self, location: &Path) -> object_store::Result<()> {
434 let decoded_location = percent_decode_path(location.as_ref());
435 self.inner
436 .delete(&decoded_location)
437 .into_send()
438 .await
439 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
440
441 Ok(())
442 }
443
444 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
445 let path = prefix.map_or("".into(), |x| {
448 format!("{}/", percent_decode_path(x.as_ref()))
449 });
450
451 let this = self.clone();
452 let fut = async move {
453 let stream = this
454 .inner
455 .lister_with(&path)
456 .recursive(true)
457 .await
458 .map_err(|err| format_object_store_error(err, &path))?;
459
460 let stream = stream.then(|res| async {
461 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
462 let meta = entry.metadata();
463
464 Ok(format_object_meta(entry.path(), meta))
465 });
466 Ok::<_, object_store::Error>(stream)
467 };
468
469 fut.into_stream().try_flatten().into_send().boxed()
470 }
471
472 fn list_with_offset(
473 &self,
474 prefix: Option<&Path>,
475 offset: &Path,
476 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
477 let path = prefix.map_or("".into(), |x| {
478 format!("{}/", percent_decode_path(x.as_ref()))
479 });
480 let offset = offset.clone();
481
482 let this = self.clone();
485
486 let fut = async move {
487 let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
488 let mut fut = this.inner.lister_with(&path).recursive(true);
489
490 if list_with_start_after {
492 fut = fut.start_after(offset.as_ref());
493 }
494
495 let lister = fut
496 .await
497 .map_err(|err| format_object_store_error(err, &path))?
498 .then(move |entry| {
499 let path = path.clone();
500 let this = this.clone();
501 async move {
502 let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
503 let (path, metadata) = entry.into_parts();
504
505 if metadata.is_dir() || metadata.last_modified().is_some() {
507 let object_meta = format_object_meta(&path, &metadata);
508 return Ok(object_meta);
509 }
510
511 let metadata = this
512 .inner
513 .stat(&path)
514 .await
515 .map_err(|err| format_object_store_error(err, &path))?;
516 let object_meta = format_object_meta(&path, &metadata);
517 Ok::<_, object_store::Error>(object_meta)
518 }
519 })
520 .into_send()
521 .boxed();
522
523 let stream = if list_with_start_after {
524 lister
525 } else {
526 lister
527 .try_filter(move |entry| futures::future::ready(entry.location > offset))
528 .into_send()
529 .boxed()
530 };
531
532 Ok::<_, object_store::Error>(stream)
533 };
534
535 fut.into_stream().into_send().try_flatten().boxed()
536 }
537
538 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
539 let path = prefix.map_or("".into(), |x| {
540 format!("{}/", percent_decode_path(x.as_ref()))
541 });
542 let mut stream = self
543 .inner
544 .lister_with(&path)
545 .into_future()
546 .into_send()
547 .await
548 .map_err(|err| format_object_store_error(err, &path))?
549 .into_send();
550
551 let mut common_prefixes = Vec::new();
552 let mut objects = Vec::new();
553
554 while let Some(res) = stream.next().into_send().await {
555 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
556 let meta = entry.metadata();
557
558 if meta.is_dir() {
559 common_prefixes.push(entry.path().into());
560 } else if meta.last_modified().is_some() {
561 objects.push(format_object_meta(entry.path(), meta));
562 } else {
563 let meta = self
564 .inner
565 .stat(entry.path())
566 .into_send()
567 .await
568 .map_err(|err| format_object_store_error(err, entry.path()))?;
569 objects.push(format_object_meta(entry.path(), &meta));
570 }
571 }
572
573 Ok(ListResult {
574 common_prefixes,
575 objects,
576 })
577 }
578
579 async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
580 self.copy_request(from, to, false).await
581 }
582
583 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
584 self.copy_request(from, to, true).await
585 }
586
587 async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
588 self.inner
589 .rename(
590 &percent_decode_path(from.as_ref()),
591 &percent_decode_path(to.as_ref()),
592 )
593 .into_send()
594 .await
595 .map_err(|err| format_object_store_error(err, from.as_ref()))?;
596
597 Ok(())
598 }
599}
600
601struct OpendalMultipartUpload {
610 writer: Arc<Mutex<Writer>>,
611 location: Path,
612 next_notify: oneshot::Receiver<()>,
613}
614
615impl OpendalMultipartUpload {
616 fn new(writer: Writer, location: Path) -> Self {
617 let (_, rx) = oneshot::channel();
619
620 Self {
621 writer: Arc::new(Mutex::new(writer)),
622 location,
623 next_notify: rx,
624 }
625 }
626}
627
628#[async_trait]
629impl MultipartUpload for OpendalMultipartUpload {
630 fn put_part(&mut self, data: PutPayload) -> UploadPart {
631 let writer = self.writer.clone();
632 let location = self.location.clone();
633
634 let (tx, rx) = oneshot::channel();
636 let last_rx = std::mem::replace(&mut self.next_notify, rx);
638
639 async move {
640 let _ = last_rx.await;
642
643 let mut writer = writer.lock().await;
644 let result = writer
645 .write(Buffer::from_iter(data.into_iter()))
646 .await
647 .map_err(|err| format_object_store_error(err, location.as_ref()));
648
649 drop(tx);
651
652 result
653 }
654 .into_send()
655 .boxed()
656 }
657
658 async fn complete(&mut self) -> object_store::Result<PutResult> {
659 let mut writer = self.writer.lock().await;
660 let metadata = writer
661 .close()
662 .into_send()
663 .await
664 .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
665
666 let e_tag = metadata.etag().map(|s| s.to_string());
667 let version = metadata.version().map(|s| s.to_string());
668
669 Ok(PutResult { e_tag, version })
670 }
671
672 async fn abort(&mut self) -> object_store::Result<()> {
673 let mut writer = self.writer.lock().await;
674 writer
675 .abort()
676 .into_send()
677 .await
678 .map_err(|err| format_object_store_error(err, self.location.as_ref()))
679 }
680}
681
682impl Debug for OpendalMultipartUpload {
683 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
684 f.debug_struct("OpendalMultipartUpload")
685 .field("location", &self.location)
686 .finish()
687 }
688}
689
690#[cfg(test)]
691mod tests {
692 use bytes::Bytes;
693 use object_store::path::Path;
694 use object_store::{ObjectStore, WriteMultipart};
695 use opendal::services;
696 use rand::prelude::*;
697 use std::sync::Arc;
698
699 use super::*;
700
701 async fn create_test_object_store() -> Arc<dyn ObjectStore> {
702 let op = Operator::new(services::Memory::default()).unwrap().finish();
703 let object_store = Arc::new(OpendalStore::new(op));
704
705 let path: Path = "data/test.txt".into();
706 let bytes = Bytes::from_static(b"hello, world!");
707 object_store.put(&path, bytes.into()).await.unwrap();
708
709 let path: Path = "data/nested/test.txt".into();
710 let bytes = Bytes::from_static(b"hello, world! I am nested.");
711 object_store.put(&path, bytes.into()).await.unwrap();
712
713 object_store
714 }
715
716 #[tokio::test]
717 async fn test_basic() {
718 let op = Operator::new(services::Memory::default()).unwrap().finish();
719 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
720
721 let path: Path = "data/test.txt".into();
723
724 let bytes = Bytes::from_static(b"hello, world!");
725 object_store.put(&path, bytes.clone().into()).await.unwrap();
726
727 let meta = object_store.head(&path).await.unwrap();
728
729 assert_eq!(meta.size, 13);
730
731 assert_eq!(
732 object_store
733 .get(&path)
734 .await
735 .unwrap()
736 .bytes()
737 .await
738 .unwrap(),
739 bytes
740 );
741 }
742
743 #[tokio::test]
744 async fn test_put_multipart() {
745 let op = Operator::new(services::Memory::default()).unwrap().finish();
746 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
747
748 let mut rng = thread_rng();
749
750 let path: Path = "data/test_complete.txt".into();
752 let upload = object_store.put_multipart(&path).await.unwrap();
753
754 let mut write = WriteMultipart::new(upload);
755
756 let mut all_bytes = vec![];
757 let round = rng.gen_range(1..=1024);
758 for _ in 0..round {
759 let size = rng.gen_range(1..=1024);
760 let mut bytes = vec![0; size];
761 rng.fill_bytes(&mut bytes);
762
763 all_bytes.extend_from_slice(&bytes);
764 write.put(bytes.into());
765 }
766
767 let _ = write.finish().await.unwrap();
768
769 let meta = object_store.head(&path).await.unwrap();
770
771 assert_eq!(meta.size, all_bytes.len() as u64);
772
773 assert_eq!(
774 object_store
775 .get(&path)
776 .await
777 .unwrap()
778 .bytes()
779 .await
780 .unwrap(),
781 Bytes::from(all_bytes)
782 );
783
784 let path: Path = "data/test_abort.txt".into();
786 let mut upload = object_store.put_multipart(&path).await.unwrap();
787 upload.put_part(vec![1; 1024].into()).await.unwrap();
788 upload.abort().await.unwrap();
789
790 let res = object_store.head(&path).await;
791 let err = res.unwrap_err();
792
793 assert!(matches!(err, object_store::Error::NotFound { .. }))
794 }
795
796 #[tokio::test]
797 async fn test_list() {
798 let object_store = create_test_object_store().await;
799 let path: Path = "data/".into();
800 let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
801 assert_eq!(results.len(), 2);
802 let mut locations = results
803 .iter()
804 .map(|x| x.as_ref().unwrap().location.as_ref())
805 .collect::<Vec<_>>();
806
807 let expected_files = vec![
808 (
809 "data/nested/test.txt",
810 Bytes::from_static(b"hello, world! I am nested."),
811 ),
812 ("data/test.txt", Bytes::from_static(b"hello, world!")),
813 ];
814
815 let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
816
817 locations.sort();
818 assert_eq!(locations, expected_locations);
819
820 for (location, bytes) in expected_files {
821 let path: Path = location.into();
822 assert_eq!(
823 object_store
824 .get(&path)
825 .await
826 .unwrap()
827 .bytes()
828 .await
829 .unwrap(),
830 bytes
831 );
832 }
833 }
834
835 #[tokio::test]
836 async fn test_list_with_delimiter() {
837 let object_store = create_test_object_store().await;
838 let path: Path = "data/".into();
839 let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
840 assert_eq!(result.objects.len(), 1);
841 assert_eq!(result.common_prefixes.len(), 1);
842 assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
843 assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
844 }
845
846 #[tokio::test]
847 async fn test_list_with_offset() {
848 let object_store = create_test_object_store().await;
849 let path: Path = "data/".into();
850 let offset: Path = "data/nested/test.txt".into();
851 let result = object_store
852 .list_with_offset(Some(&path), &offset)
853 .collect::<Vec<_>>()
854 .await;
855 assert_eq!(result.len(), 1);
856 assert_eq!(
857 result[0].as_ref().unwrap().location.as_ref(),
858 "data/test.txt"
859 );
860 }
861}