1use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::sync::Arc;
22
23use crate::utils::*;
24use async_trait::async_trait;
25use futures::stream::BoxStream;
26use futures::FutureExt;
27use futures::StreamExt;
28use futures::TryStreamExt;
29use object_store::path::Path;
30use object_store::ListResult;
31use object_store::MultipartUpload;
32use object_store::ObjectMeta;
33use object_store::ObjectStore;
34use object_store::PutMultipartOpts;
35use object_store::PutOptions;
36use object_store::PutPayload;
37use object_store::PutResult;
38use object_store::{GetOptions, UploadPart};
39use object_store::{GetRange, GetResultPayload};
40use object_store::{GetResult, PutMode};
41use opendal::raw::percent_decode_path;
42use opendal::Buffer;
43use opendal::Writer;
44use opendal::{Operator, OperatorInfo};
45use tokio::sync::{Mutex, Notify};
46
47#[derive(Clone)]
94pub struct OpendalStore {
95 info: Arc<OperatorInfo>,
96 inner: Operator,
97}
98
99impl OpendalStore {
100 pub fn new(op: Operator) -> Self {
102 Self {
103 info: op.info().into(),
104 inner: op,
105 }
106 }
107
108 pub fn info(&self) -> &OperatorInfo {
110 self.info.as_ref()
111 }
112}
113
114impl Debug for OpendalStore {
115 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
116 f.debug_struct("OpendalStore")
117 .field("scheme", &self.info.scheme())
118 .field("name", &self.info.name())
119 .field("root", &self.info.root())
120 .field("capability", &self.info.full_capability())
121 .finish()
122 }
123}
124
125impl Display for OpendalStore {
126 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
127 let info = self.inner.info();
128 write!(
129 f,
130 "Opendal({}, bucket={}, root={})",
131 info.scheme(),
132 info.name(),
133 info.root()
134 )
135 }
136}
137
138impl From<Operator> for OpendalStore {
139 fn from(value: Operator) -> Self {
140 Self::new(value)
141 }
142}
143
144#[async_trait]
145impl ObjectStore for OpendalStore {
146 async fn put_opts(
147 &self,
148 location: &Path,
149 bytes: PutPayload,
150 opts: PutOptions,
151 ) -> object_store::Result<PutResult> {
152 let mut future_write = self.inner.write_with(
153 &percent_decode_path(location.as_ref()),
154 Buffer::from_iter(bytes.into_iter()),
155 );
156 let opts_mode = opts.mode.clone();
157 match opts.mode {
158 PutMode::Overwrite => {}
159 PutMode::Create => {
160 future_write = future_write.if_not_exists(true);
161 }
162 PutMode::Update(update_version) => {
163 let Some(etag) = update_version.e_tag else {
164 Err(object_store::Error::NotSupported {
165 source: Box::new(opendal::Error::new(
166 opendal::ErrorKind::Unsupported,
167 "etag is required for conditional put",
168 )),
169 })?
170 };
171 future_write = future_write.if_match(etag.as_str());
172 }
173 }
174 future_write.into_send().await.map_err(|err| {
175 match format_object_store_error(err, location.as_ref()) {
176 object_store::Error::Precondition { path, source }
177 if opts_mode == PutMode::Create =>
178 {
179 object_store::Error::AlreadyExists { path, source }
180 }
181 e => e,
182 }
183 })?;
184
185 Ok(PutResult {
186 e_tag: None,
187 version: None,
188 })
189 }
190
191 async fn put_multipart(
192 &self,
193 location: &Path,
194 ) -> object_store::Result<Box<dyn MultipartUpload>> {
195 let writer = self
196 .inner
197 .writer_with(&percent_decode_path(location.as_ref()))
198 .concurrent(8)
199 .into_send()
200 .await
201 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
202 let upload = OpendalMultipartUpload::new(writer, location.clone());
203
204 Ok(Box::new(upload))
205 }
206
207 async fn put_multipart_opts(
208 &self,
209 _location: &Path,
210 _opts: PutMultipartOpts,
211 ) -> object_store::Result<Box<dyn MultipartUpload>> {
212 Err(object_store::Error::NotSupported {
213 source: Box::new(opendal::Error::new(
214 opendal::ErrorKind::Unsupported,
215 "put_multipart_opts is not implemented so far",
216 )),
217 })
218 }
219
220 async fn get_opts(
221 &self,
222 location: &Path,
223 options: GetOptions,
224 ) -> object_store::Result<GetResult> {
225 let raw_location = percent_decode_path(location.as_ref());
226 let meta = {
227 let mut s = self.inner.stat_with(&raw_location);
228 if let Some(version) = &options.version {
229 s = s.version(version.as_str())
230 }
231 if let Some(if_match) = &options.if_match {
232 s = s.if_match(if_match.as_str());
233 }
234 if let Some(if_none_match) = &options.if_none_match {
235 s = s.if_none_match(if_none_match.as_str());
236 }
237 if let Some(if_modified_since) = options.if_modified_since {
238 s = s.if_modified_since(if_modified_since);
239 }
240 if let Some(if_unmodified_since) = options.if_unmodified_since {
241 s = s.if_unmodified_since(if_unmodified_since);
242 }
243 s.into_send()
244 .await
245 .map_err(|err| format_object_store_error(err, location.as_ref()))?
246 };
247
248 let meta = ObjectMeta {
249 location: location.clone(),
250 last_modified: meta.last_modified().unwrap_or_default(),
251 size: meta.content_length(),
252 e_tag: meta.etag().map(|x| x.to_string()),
253 version: meta.version().map(|x| x.to_string()),
254 };
255
256 if options.head {
257 return Ok(GetResult {
258 payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
259 range: 0..0,
260 meta,
261 attributes: Default::default(),
262 });
263 }
264
265 let reader = {
266 let mut r = self.inner.reader_with(raw_location.as_ref());
267 if let Some(version) = options.version {
268 r = r.version(version.as_str());
269 }
270 if let Some(if_match) = options.if_match {
271 r = r.if_match(if_match.as_str());
272 }
273 if let Some(if_none_match) = options.if_none_match {
274 r = r.if_none_match(if_none_match.as_str());
275 }
276 if let Some(if_modified_since) = options.if_modified_since {
277 r = r.if_modified_since(if_modified_since);
278 }
279 if let Some(if_unmodified_since) = options.if_unmodified_since {
280 r = r.if_unmodified_since(if_unmodified_since);
281 }
282 r.into_send()
283 .await
284 .map_err(|err| format_object_store_error(err, location.as_ref()))?
285 };
286
287 let read_range = match options.range {
288 Some(GetRange::Bounded(r)) => {
289 if r.start >= r.end || r.start >= meta.size {
290 0..0
291 } else {
292 let end = r.end.min(meta.size);
293 r.start..end
294 }
295 }
296 Some(GetRange::Offset(r)) => {
297 if r < meta.size {
298 r..meta.size
299 } else {
300 0..0
301 }
302 }
303 Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
304 _ => 0..meta.size,
305 };
306
307 let stream = reader
308 .into_bytes_stream(read_range.start..read_range.end)
309 .into_send()
310 .await
311 .map_err(|err| format_object_store_error(err, location.as_ref()))?
312 .into_send()
313 .map_err(|err: io::Error| object_store::Error::Generic {
314 store: "IoError",
315 source: Box::new(err),
316 });
317
318 Ok(GetResult {
319 payload: GetResultPayload::Stream(Box::pin(stream)),
320 range: read_range.start..read_range.end,
321 meta,
322 attributes: Default::default(),
323 })
324 }
325
326 async fn delete(&self, location: &Path) -> object_store::Result<()> {
327 self.inner
328 .delete(&percent_decode_path(location.as_ref()))
329 .into_send()
330 .await
331 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
332
333 Ok(())
334 }
335
336 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
337 let path = prefix.map_or("".into(), |x| {
340 format!("{}/", percent_decode_path(x.as_ref()))
341 });
342
343 let lister_fut = self.inner.lister_with(&path).recursive(true);
344 let fut = async move {
345 let stream = lister_fut
346 .await
347 .map_err(|err| format_object_store_error(err, &path))?;
348
349 let stream = stream.then(|res| async {
350 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
351 let meta = entry.metadata();
352
353 Ok(format_object_meta(entry.path(), meta))
354 });
355 Ok::<_, object_store::Error>(stream)
356 };
357
358 fut.into_stream().try_flatten().into_send().boxed()
359 }
360
361 fn list_with_offset(
362 &self,
363 prefix: Option<&Path>,
364 offset: &Path,
365 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
366 let path = prefix.map_or("".into(), |x| {
367 format!("{}/", percent_decode_path(x.as_ref()))
368 });
369 let offset = offset.clone();
370
371 let this = self.clone();
374
375 let fut = async move {
376 let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
377 let mut fut = this.inner.lister_with(&path).recursive(true);
378
379 if list_with_start_after {
381 fut = fut.start_after(offset.as_ref());
382 }
383
384 let lister = fut
385 .await
386 .map_err(|err| format_object_store_error(err, &path))?
387 .then(move |entry| {
388 let path = path.clone();
389 let this = this.clone();
390 async move {
391 let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
392 let (path, metadata) = entry.into_parts();
393
394 if metadata.is_dir() || metadata.last_modified().is_some() {
396 let object_meta = format_object_meta(&path, &metadata);
397 return Ok(object_meta);
398 }
399
400 let metadata = this
401 .inner
402 .stat(&path)
403 .await
404 .map_err(|err| format_object_store_error(err, &path))?;
405 let object_meta = format_object_meta(&path, &metadata);
406 Ok::<_, object_store::Error>(object_meta)
407 }
408 })
409 .into_send()
410 .boxed();
411
412 let stream = if list_with_start_after {
413 lister
414 } else {
415 lister
416 .try_filter(move |entry| futures::future::ready(entry.location > offset))
417 .into_send()
418 .boxed()
419 };
420
421 Ok::<_, object_store::Error>(stream)
422 };
423
424 fut.into_stream().into_send().try_flatten().boxed()
425 }
426
427 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
428 let path = prefix.map_or("".into(), |x| {
429 format!("{}/", percent_decode_path(x.as_ref()))
430 });
431 let mut stream = self
432 .inner
433 .lister_with(&path)
434 .into_future()
435 .into_send()
436 .await
437 .map_err(|err| format_object_store_error(err, &path))?
438 .into_send();
439
440 let mut common_prefixes = Vec::new();
441 let mut objects = Vec::new();
442
443 while let Some(res) = stream.next().into_send().await {
444 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
445 let meta = entry.metadata();
446
447 if meta.is_dir() {
448 common_prefixes.push(entry.path().into());
449 } else if meta.last_modified().is_some() {
450 objects.push(format_object_meta(entry.path(), meta));
451 } else {
452 let meta = self
453 .inner
454 .stat(entry.path())
455 .into_send()
456 .await
457 .map_err(|err| format_object_store_error(err, entry.path()))?;
458 objects.push(format_object_meta(entry.path(), &meta));
459 }
460 }
461
462 Ok(ListResult {
463 common_prefixes,
464 objects,
465 })
466 }
467
468 async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
469 Err(object_store::Error::NotSupported {
470 source: Box::new(opendal::Error::new(
471 opendal::ErrorKind::Unsupported,
472 "copy is not implemented so far",
473 )),
474 })
475 }
476
477 async fn rename(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
478 Err(object_store::Error::NotSupported {
479 source: Box::new(opendal::Error::new(
480 opendal::ErrorKind::Unsupported,
481 "rename is not implemented so far",
482 )),
483 })
484 }
485
486 async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
487 Err(object_store::Error::NotSupported {
488 source: Box::new(opendal::Error::new(
489 opendal::ErrorKind::Unsupported,
490 "copy_if_not_exists is not implemented so far",
491 )),
492 })
493 }
494}
495
496struct OpendalMultipartUpload {
505 writer: Arc<Mutex<Writer>>,
506 location: Path,
507 next_notify: Option<Arc<Notify>>,
508}
509
510impl OpendalMultipartUpload {
511 fn new(writer: Writer, location: Path) -> Self {
512 Self {
513 writer: Arc::new(Mutex::new(writer)),
514 location,
515 next_notify: None,
516 }
517 }
518}
519
520#[async_trait]
521impl MultipartUpload for OpendalMultipartUpload {
522 fn put_part(&mut self, data: PutPayload) -> UploadPart {
523 let writer = self.writer.clone();
524 let location = self.location.clone();
525
526 let next_notify = Arc::new(Notify::new());
528 let current_notify = self.next_notify.replace(next_notify.clone());
530
531 async move {
532 if let Some(notify) = current_notify {
534 notify.notified().await;
536 }
537
538 let mut writer = writer.lock().await;
539 let result = writer
540 .write(Buffer::from_iter(data.into_iter()))
541 .await
542 .map_err(|err| format_object_store_error(err, location.as_ref()));
543
544 next_notify.notify_one();
546
547 result
548 }
549 .into_send()
550 .boxed()
551 }
552
553 async fn complete(&mut self) -> object_store::Result<PutResult> {
554 let mut writer = self.writer.lock().await;
555 writer
556 .close()
557 .into_send()
558 .await
559 .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
560
561 Ok(PutResult {
562 e_tag: None,
563 version: None,
564 })
565 }
566
567 async fn abort(&mut self) -> object_store::Result<()> {
568 let mut writer = self.writer.lock().await;
569 writer
570 .abort()
571 .into_send()
572 .await
573 .map_err(|err| format_object_store_error(err, self.location.as_ref()))
574 }
575}
576
577impl Debug for OpendalMultipartUpload {
578 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
579 f.debug_struct("OpendalMultipartUpload")
580 .field("location", &self.location)
581 .finish()
582 }
583}
584
585#[cfg(test)]
586mod tests {
587 use bytes::Bytes;
588 use object_store::path::Path;
589 use object_store::{ObjectStore, WriteMultipart};
590 use opendal::services;
591 use rand::prelude::*;
592 use std::sync::Arc;
593
594 use super::*;
595
596 async fn create_test_object_store() -> Arc<dyn ObjectStore> {
597 let op = Operator::new(services::Memory::default()).unwrap().finish();
598 let object_store = Arc::new(OpendalStore::new(op));
599
600 let path: Path = "data/test.txt".into();
601 let bytes = Bytes::from_static(b"hello, world!");
602 object_store.put(&path, bytes.into()).await.unwrap();
603
604 let path: Path = "data/nested/test.txt".into();
605 let bytes = Bytes::from_static(b"hello, world! I am nested.");
606 object_store.put(&path, bytes.into()).await.unwrap();
607
608 object_store
609 }
610
611 #[tokio::test]
612 async fn test_basic() {
613 let op = Operator::new(services::Memory::default()).unwrap().finish();
614 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
615
616 let path: Path = "data/test.txt".into();
618
619 let bytes = Bytes::from_static(b"hello, world!");
620 object_store.put(&path, bytes.clone().into()).await.unwrap();
621
622 let meta = object_store.head(&path).await.unwrap();
623
624 assert_eq!(meta.size, 13);
625
626 assert_eq!(
627 object_store
628 .get(&path)
629 .await
630 .unwrap()
631 .bytes()
632 .await
633 .unwrap(),
634 bytes
635 );
636 }
637
638 #[tokio::test]
639 async fn test_put_multipart() {
640 let op = Operator::new(services::Memory::default()).unwrap().finish();
641 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
642
643 let mut rng = thread_rng();
644
645 let path: Path = "data/test_complete.txt".into();
647 let upload = object_store.put_multipart(&path).await.unwrap();
648
649 let mut write = WriteMultipart::new(upload);
650
651 let mut all_bytes = vec![];
652 let round = rng.gen_range(1..=1024);
653 for _ in 0..round {
654 let size = rng.gen_range(1..=1024);
655 let mut bytes = vec![0; size];
656 rng.fill_bytes(&mut bytes);
657
658 all_bytes.extend_from_slice(&bytes);
659 write.put(bytes.into());
660 }
661
662 let _ = write.finish().await.unwrap();
663
664 let meta = object_store.head(&path).await.unwrap();
665
666 assert_eq!(meta.size, all_bytes.len() as u64);
667
668 assert_eq!(
669 object_store
670 .get(&path)
671 .await
672 .unwrap()
673 .bytes()
674 .await
675 .unwrap(),
676 Bytes::from(all_bytes)
677 );
678
679 let path: Path = "data/test_abort.txt".into();
681 let mut upload = object_store.put_multipart(&path).await.unwrap();
682 upload.put_part(vec![1; 1024].into()).await.unwrap();
683 upload.abort().await.unwrap();
684
685 let res = object_store.head(&path).await;
686 let err = res.unwrap_err();
687
688 assert!(matches!(err, object_store::Error::NotFound { .. }))
689 }
690
691 #[tokio::test]
692 async fn test_list() {
693 let object_store = create_test_object_store().await;
694 let path: Path = "data/".into();
695 let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
696 assert_eq!(results.len(), 2);
697 let mut locations = results
698 .iter()
699 .map(|x| x.as_ref().unwrap().location.as_ref())
700 .collect::<Vec<_>>();
701
702 let expected_files = vec![
703 (
704 "data/nested/test.txt",
705 Bytes::from_static(b"hello, world! I am nested."),
706 ),
707 ("data/test.txt", Bytes::from_static(b"hello, world!")),
708 ];
709
710 let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
711
712 locations.sort();
713 assert_eq!(locations, expected_locations);
714
715 for (location, bytes) in expected_files {
716 let path: Path = location.into();
717 assert_eq!(
718 object_store
719 .get(&path)
720 .await
721 .unwrap()
722 .bytes()
723 .await
724 .unwrap(),
725 bytes
726 );
727 }
728 }
729
730 #[tokio::test]
731 async fn test_list_with_delimiter() {
732 let object_store = create_test_object_store().await;
733 let path: Path = "data/".into();
734 let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
735 assert_eq!(result.objects.len(), 1);
736 assert_eq!(result.common_prefixes.len(), 1);
737 assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
738 assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
739 }
740
741 #[tokio::test]
742 async fn test_list_with_offset() {
743 let object_store = create_test_object_store().await;
744 let path: Path = "data/".into();
745 let offset: Path = "data/nested/test.txt".into();
746 let result = object_store
747 .list_with_offset(Some(&path), &offset)
748 .collect::<Vec<_>>()
749 .await;
750 assert_eq!(result.len(), 1);
751 assert_eq!(
752 result[0].as_ref().unwrap().location.as_ref(),
753 "data/test.txt"
754 );
755 }
756}