1use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::sync::Arc;
22
23use crate::utils::*;
24use async_trait::async_trait;
25use futures::stream::BoxStream;
26use futures::FutureExt;
27use futures::StreamExt;
28use futures::TryStreamExt;
29use object_store::path::Path;
30use object_store::ListResult;
31use object_store::MultipartUpload;
32use object_store::ObjectMeta;
33use object_store::ObjectStore;
34use object_store::PutMultipartOpts;
35use object_store::PutOptions;
36use object_store::PutPayload;
37use object_store::PutResult;
38use object_store::{GetOptions, UploadPart};
39use object_store::{GetRange, GetResultPayload};
40use object_store::{GetResult, PutMode};
41use opendal::Buffer;
42use opendal::Writer;
43use opendal::{Operator, OperatorInfo};
44use tokio::sync::{Mutex, Notify};
45
46#[derive(Clone)]
93pub struct OpendalStore {
94 info: Arc<OperatorInfo>,
95 inner: Operator,
96}
97
98impl OpendalStore {
99 pub fn new(op: Operator) -> Self {
101 Self {
102 info: op.info().into(),
103 inner: op,
104 }
105 }
106
107 pub fn info(&self) -> &OperatorInfo {
109 self.info.as_ref()
110 }
111}
112
113impl Debug for OpendalStore {
114 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
115 f.debug_struct("OpendalStore")
116 .field("scheme", &self.info.scheme())
117 .field("name", &self.info.name())
118 .field("root", &self.info.root())
119 .field("capability", &self.info.full_capability())
120 .finish()
121 }
122}
123
124impl Display for OpendalStore {
125 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
126 let info = self.inner.info();
127 write!(
128 f,
129 "Opendal({}, bucket={}, root={})",
130 info.scheme(),
131 info.name(),
132 info.root()
133 )
134 }
135}
136
137impl From<Operator> for OpendalStore {
138 fn from(value: Operator) -> Self {
139 Self::new(value)
140 }
141}
142
143#[async_trait]
144impl ObjectStore for OpendalStore {
145 async fn put_opts(
146 &self,
147 location: &Path,
148 bytes: PutPayload,
149 opts: PutOptions,
150 ) -> object_store::Result<PutResult> {
151 let mut future_write = self
152 .inner
153 .write_with(location.as_ref(), Buffer::from_iter(bytes.into_iter()));
154 let opts_mode = opts.mode.clone();
155 match opts.mode {
156 PutMode::Overwrite => {}
157 PutMode::Create => {
158 future_write = future_write.if_not_exists(true);
159 }
160 PutMode::Update(update_version) => {
161 let Some(etag) = update_version.e_tag else {
162 Err(object_store::Error::NotSupported {
163 source: Box::new(opendal::Error::new(
164 opendal::ErrorKind::Unsupported,
165 "etag is required for conditional put",
166 )),
167 })?
168 };
169 future_write = future_write.if_match(etag.as_str());
170 }
171 }
172 future_write.into_send().await.map_err(|err| {
173 match format_object_store_error(err, location.as_ref()) {
174 object_store::Error::Precondition { path, source }
175 if opts_mode == PutMode::Create =>
176 {
177 object_store::Error::AlreadyExists { path, source }
178 }
179 e => e,
180 }
181 })?;
182
183 Ok(PutResult {
184 e_tag: None,
185 version: None,
186 })
187 }
188
189 async fn put_multipart(
190 &self,
191 location: &Path,
192 ) -> object_store::Result<Box<dyn MultipartUpload>> {
193 let writer = self
194 .inner
195 .writer_with(location.as_ref())
196 .concurrent(8)
197 .into_send()
198 .await
199 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
200 let upload = OpendalMultipartUpload::new(writer, location.clone());
201
202 Ok(Box::new(upload))
203 }
204
205 async fn put_multipart_opts(
206 &self,
207 _location: &Path,
208 _opts: PutMultipartOpts,
209 ) -> object_store::Result<Box<dyn MultipartUpload>> {
210 Err(object_store::Error::NotSupported {
211 source: Box::new(opendal::Error::new(
212 opendal::ErrorKind::Unsupported,
213 "put_multipart_opts is not implemented so far",
214 )),
215 })
216 }
217
218 async fn get_opts(
219 &self,
220 location: &Path,
221 options: GetOptions,
222 ) -> object_store::Result<GetResult> {
223 let meta = {
224 let mut s = self.inner.stat_with(location.as_ref());
225 if let Some(version) = &options.version {
226 s = s.version(version.as_str())
227 }
228 if let Some(if_match) = &options.if_match {
229 s = s.if_match(if_match.as_str());
230 }
231 if let Some(if_none_match) = &options.if_none_match {
232 s = s.if_none_match(if_none_match.as_str());
233 }
234 if let Some(if_modified_since) = options.if_modified_since {
235 s = s.if_modified_since(if_modified_since);
236 }
237 if let Some(if_unmodified_since) = options.if_unmodified_since {
238 s = s.if_unmodified_since(if_unmodified_since);
239 }
240 s.into_send()
241 .await
242 .map_err(|err| format_object_store_error(err, location.as_ref()))?
243 };
244
245 let meta = ObjectMeta {
246 location: location.clone(),
247 last_modified: meta.last_modified().unwrap_or_default(),
248 size: meta.content_length() as usize,
249 e_tag: meta.etag().map(|x| x.to_string()),
250 version: meta.version().map(|x| x.to_string()),
251 };
252
253 if options.head {
254 return Ok(GetResult {
255 payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
256 range: 0..0,
257 meta,
258 attributes: Default::default(),
259 });
260 }
261
262 let reader = {
263 let mut r = self.inner.reader_with(location.as_ref());
264 if let Some(version) = options.version {
265 r = r.version(version.as_str());
266 }
267 if let Some(if_match) = options.if_match {
268 r = r.if_match(if_match.as_str());
269 }
270 if let Some(if_none_match) = options.if_none_match {
271 r = r.if_none_match(if_none_match.as_str());
272 }
273 if let Some(if_modified_since) = options.if_modified_since {
274 r = r.if_modified_since(if_modified_since);
275 }
276 if let Some(if_unmodified_since) = options.if_unmodified_since {
277 r = r.if_unmodified_since(if_unmodified_since);
278 }
279 r.into_send()
280 .await
281 .map_err(|err| format_object_store_error(err, location.as_ref()))?
282 };
283
284 let read_range = match options.range {
285 Some(GetRange::Bounded(r)) => {
286 if r.start >= r.end || r.start >= meta.size {
287 0..0
288 } else {
289 let end = r.end.min(meta.size);
290 r.start..end
291 }
292 }
293 Some(GetRange::Offset(r)) => {
294 if r < meta.size {
295 r..meta.size
296 } else {
297 0..0
298 }
299 }
300 Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
301 _ => 0..meta.size,
302 };
303
304 let stream = reader
305 .into_bytes_stream(read_range.start as u64..read_range.end as u64)
306 .into_send()
307 .await
308 .map_err(|err| format_object_store_error(err, location.as_ref()))?
309 .into_send()
310 .map_err(|err: io::Error| object_store::Error::Generic {
311 store: "IoError",
312 source: Box::new(err),
313 });
314
315 Ok(GetResult {
316 payload: GetResultPayload::Stream(Box::pin(stream)),
317 range: read_range.start..read_range.end,
318 meta,
319 attributes: Default::default(),
320 })
321 }
322
323 async fn delete(&self, location: &Path) -> object_store::Result<()> {
324 self.inner
325 .delete(location.as_ref())
326 .into_send()
327 .await
328 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
329
330 Ok(())
331 }
332
333 fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
334 let path = prefix.map_or("".into(), |x| format!("{}/", x));
337
338 let fut = async move {
339 let stream = self
340 .inner
341 .lister_with(&path)
342 .recursive(true)
343 .await
344 .map_err(|err| format_object_store_error(err, &path))?;
345
346 let stream = stream.then(|res| async {
347 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
348 let meta = entry.metadata();
349
350 Ok(format_object_meta(entry.path(), meta))
351 });
352 Ok::<_, object_store::Error>(stream)
353 };
354
355 fut.into_stream().try_flatten().into_send().boxed()
356 }
357
358 fn list_with_offset(
359 &self,
360 prefix: Option<&Path>,
361 offset: &Path,
362 ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
363 let path = prefix.map_or("".into(), |x| format!("{}/", x));
364 let offset = offset.clone();
365
366 let fut = async move {
367 let list_with_start_after = self.inner.info().full_capability().list_with_start_after;
368 let mut fut = self.inner.lister_with(&path).recursive(true);
369
370 if list_with_start_after {
372 fut = fut.start_after(offset.as_ref());
373 }
374
375 let lister = fut
376 .await
377 .map_err(|err| format_object_store_error(err, &path))?
378 .then(move |entry| {
379 let path = path.clone();
380
381 async move {
382 let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
383 let (path, metadata) = entry.into_parts();
384
385 if metadata.is_dir() || metadata.last_modified().is_some() {
387 let object_meta = format_object_meta(&path, &metadata);
388 return Ok(object_meta);
389 }
390
391 let metadata = self
392 .inner
393 .stat(&path)
394 .await
395 .map_err(|err| format_object_store_error(err, &path))?;
396 let object_meta = format_object_meta(&path, &metadata);
397 Ok::<_, object_store::Error>(object_meta)
398 }
399 })
400 .into_send()
401 .boxed();
402
403 let stream = if list_with_start_after {
404 lister
405 } else {
406 lister
407 .try_filter(move |entry| futures::future::ready(entry.location > offset))
408 .into_send()
409 .boxed()
410 };
411
412 Ok::<_, object_store::Error>(stream)
413 };
414
415 fut.into_stream().into_send().try_flatten().boxed()
416 }
417
418 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
419 let path = prefix.map_or("".into(), |x| format!("{}/", x));
420 let mut stream = self
421 .inner
422 .lister_with(&path)
423 .into_future()
424 .into_send()
425 .await
426 .map_err(|err| format_object_store_error(err, &path))?
427 .into_send();
428
429 let mut common_prefixes = Vec::new();
430 let mut objects = Vec::new();
431
432 while let Some(res) = stream.next().into_send().await {
433 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
434 let meta = entry.metadata();
435
436 if meta.is_dir() {
437 common_prefixes.push(entry.path().into());
438 } else if meta.last_modified().is_some() {
439 objects.push(format_object_meta(entry.path(), meta));
440 } else {
441 let meta = self
442 .inner
443 .stat(entry.path())
444 .into_send()
445 .await
446 .map_err(|err| format_object_store_error(err, entry.path()))?;
447 objects.push(format_object_meta(entry.path(), &meta));
448 }
449 }
450
451 Ok(ListResult {
452 common_prefixes,
453 objects,
454 })
455 }
456
457 async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
458 Err(object_store::Error::NotSupported {
459 source: Box::new(opendal::Error::new(
460 opendal::ErrorKind::Unsupported,
461 "copy is not implemented so far",
462 )),
463 })
464 }
465
466 async fn rename(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
467 Err(object_store::Error::NotSupported {
468 source: Box::new(opendal::Error::new(
469 opendal::ErrorKind::Unsupported,
470 "rename is not implemented so far",
471 )),
472 })
473 }
474
475 async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
476 Err(object_store::Error::NotSupported {
477 source: Box::new(opendal::Error::new(
478 opendal::ErrorKind::Unsupported,
479 "copy_if_not_exists is not implemented so far",
480 )),
481 })
482 }
483}
484
485struct OpendalMultipartUpload {
494 writer: Arc<Mutex<Writer>>,
495 location: Path,
496 next_notify: Option<Arc<Notify>>,
497}
498
499impl OpendalMultipartUpload {
500 fn new(writer: Writer, location: Path) -> Self {
501 Self {
502 writer: Arc::new(Mutex::new(writer)),
503 location,
504 next_notify: None,
505 }
506 }
507}
508
509#[async_trait]
510impl MultipartUpload for OpendalMultipartUpload {
511 fn put_part(&mut self, data: PutPayload) -> UploadPart {
512 let writer = self.writer.clone();
513 let location = self.location.clone();
514
515 let next_notify = Arc::new(Notify::new());
517 let current_notify = self.next_notify.replace(next_notify.clone());
519
520 async move {
521 if let Some(notify) = current_notify {
523 notify.notified().await;
525 }
526
527 let mut writer = writer.lock().await;
528 let result = writer
529 .write(Buffer::from_iter(data.into_iter()))
530 .await
531 .map_err(|err| format_object_store_error(err, location.as_ref()));
532
533 next_notify.notify_one();
535
536 result
537 }
538 .into_send()
539 .boxed()
540 }
541
542 async fn complete(&mut self) -> object_store::Result<PutResult> {
543 let mut writer = self.writer.lock().await;
544 writer
545 .close()
546 .into_send()
547 .await
548 .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
549
550 Ok(PutResult {
551 e_tag: None,
552 version: None,
553 })
554 }
555
556 async fn abort(&mut self) -> object_store::Result<()> {
557 let mut writer = self.writer.lock().await;
558 writer
559 .abort()
560 .into_send()
561 .await
562 .map_err(|err| format_object_store_error(err, self.location.as_ref()))
563 }
564}
565
566impl Debug for OpendalMultipartUpload {
567 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
568 f.debug_struct("OpendalMultipartUpload")
569 .field("location", &self.location)
570 .finish()
571 }
572}
573
574#[cfg(test)]
575mod tests {
576 use bytes::Bytes;
577 use object_store::path::Path;
578 use object_store::{ObjectStore, WriteMultipart};
579 use opendal::services;
580 use rand::prelude::*;
581 use std::sync::Arc;
582
583 use super::*;
584
585 async fn create_test_object_store() -> Arc<dyn ObjectStore> {
586 let op = Operator::new(services::Memory::default()).unwrap().finish();
587 let object_store = Arc::new(OpendalStore::new(op));
588
589 let path: Path = "data/test.txt".into();
590 let bytes = Bytes::from_static(b"hello, world!");
591 object_store.put(&path, bytes.into()).await.unwrap();
592
593 let path: Path = "data/nested/test.txt".into();
594 let bytes = Bytes::from_static(b"hello, world! I am nested.");
595 object_store.put(&path, bytes.into()).await.unwrap();
596
597 object_store
598 }
599
600 #[tokio::test]
601 async fn test_basic() {
602 let op = Operator::new(services::Memory::default()).unwrap().finish();
603 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
604
605 let path: Path = "data/test.txt".into();
607
608 let bytes = Bytes::from_static(b"hello, world!");
609 object_store.put(&path, bytes.clone().into()).await.unwrap();
610
611 let meta = object_store.head(&path).await.unwrap();
612
613 assert_eq!(meta.size, 13);
614
615 assert_eq!(
616 object_store
617 .get(&path)
618 .await
619 .unwrap()
620 .bytes()
621 .await
622 .unwrap(),
623 bytes
624 );
625 }
626
627 #[tokio::test]
628 async fn test_put_multipart() {
629 let op = Operator::new(services::Memory::default()).unwrap().finish();
630 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
631
632 let mut rng = thread_rng();
633
634 let path: Path = "data/test_complete.txt".into();
636 let upload = object_store.put_multipart(&path).await.unwrap();
637
638 let mut write = WriteMultipart::new(upload);
639
640 let mut all_bytes = vec![];
641 let round = rng.gen_range(1..=1024);
642 for _ in 0..round {
643 let size = rng.gen_range(1..=1024);
644 let mut bytes = vec![0; size];
645 rng.fill_bytes(&mut bytes);
646
647 all_bytes.extend_from_slice(&bytes);
648 write.put(bytes.into());
649 }
650
651 let _ = write.finish().await.unwrap();
652
653 let meta = object_store.head(&path).await.unwrap();
654
655 assert_eq!(meta.size, all_bytes.len());
656
657 assert_eq!(
658 object_store
659 .get(&path)
660 .await
661 .unwrap()
662 .bytes()
663 .await
664 .unwrap(),
665 Bytes::from(all_bytes)
666 );
667
668 let path: Path = "data/test_abort.txt".into();
670 let mut upload = object_store.put_multipart(&path).await.unwrap();
671 upload.put_part(vec![1; 1024].into()).await.unwrap();
672 upload.abort().await.unwrap();
673
674 let res = object_store.head(&path).await;
675 let err = res.unwrap_err();
676
677 assert!(matches!(err, object_store::Error::NotFound { .. }))
678 }
679
680 #[tokio::test]
681 async fn test_list() {
682 let object_store = create_test_object_store().await;
683 let path: Path = "data/".into();
684 let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
685 assert_eq!(results.len(), 2);
686 let mut locations = results
687 .iter()
688 .map(|x| x.as_ref().unwrap().location.as_ref())
689 .collect::<Vec<_>>();
690
691 let expected_files = vec![
692 (
693 "data/nested/test.txt",
694 Bytes::from_static(b"hello, world! I am nested."),
695 ),
696 ("data/test.txt", Bytes::from_static(b"hello, world!")),
697 ];
698
699 let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
700
701 locations.sort();
702 assert_eq!(locations, expected_locations);
703
704 for (location, bytes) in expected_files {
705 let path: Path = location.into();
706 assert_eq!(
707 object_store
708 .get(&path)
709 .await
710 .unwrap()
711 .bytes()
712 .await
713 .unwrap(),
714 bytes
715 );
716 }
717 }
718
719 #[tokio::test]
720 async fn test_list_with_delimiter() {
721 let object_store = create_test_object_store().await;
722 let path: Path = "data/".into();
723 let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
724 assert_eq!(result.objects.len(), 1);
725 assert_eq!(result.common_prefixes.len(), 1);
726 assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
727 assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
728 }
729
730 #[tokio::test]
731 async fn test_list_with_offset() {
732 let object_store = create_test_object_store().await;
733 let path: Path = "data/".into();
734 let offset: Path = "data/nested/test.txt".into();
735 let result = object_store
736 .list_with_offset(Some(&path), &offset)
737 .collect::<Vec<_>>()
738 .await;
739 assert_eq!(result.len(), 1);
740 assert_eq!(
741 result[0].as_ref().unwrap().location.as_ref(),
742 "data/test.txt"
743 );
744 }
745}