1use std::fmt::{self, Debug, Display, Formatter};
19use std::future::IntoFuture;
20use std::io;
21use std::sync::Arc;
22
23use crate::utils::*;
24use async_trait::async_trait;
25use futures::stream::BoxStream;
26use futures::FutureExt;
27use futures::StreamExt;
28use futures::TryStreamExt;
29use object_store::path::Path;
30use object_store::ListResult;
31use object_store::MultipartUpload;
32use object_store::ObjectMeta;
33use object_store::ObjectStore;
34use object_store::PutMultipartOpts;
35use object_store::PutOptions;
36use object_store::PutPayload;
37use object_store::PutResult;
38use object_store::{GetOptions, UploadPart};
39use object_store::{GetRange, GetResultPayload};
40use object_store::{GetResult, PutMode};
41use opendal::Buffer;
42use opendal::Writer;
43use opendal::{Operator, OperatorInfo};
44use tokio::sync::{Mutex, Notify};
45
46#[derive(Clone)]
93pub struct OpendalStore {
94 info: Arc<OperatorInfo>,
95 inner: Operator,
96}
97
98impl OpendalStore {
99 pub fn new(op: Operator) -> Self {
101 Self {
102 info: op.info().into(),
103 inner: op,
104 }
105 }
106
107 pub fn info(&self) -> &OperatorInfo {
109 self.info.as_ref()
110 }
111}
112
113impl Debug for OpendalStore {
114 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
115 f.debug_struct("OpendalStore")
116 .field("scheme", &self.info.scheme())
117 .field("name", &self.info.name())
118 .field("root", &self.info.root())
119 .field("capability", &self.info.full_capability())
120 .finish()
121 }
122}
123
124impl Display for OpendalStore {
125 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
126 let info = self.inner.info();
127 write!(
128 f,
129 "Opendal({}, bucket={}, root={})",
130 info.scheme(),
131 info.name(),
132 info.root()
133 )
134 }
135}
136
137impl From<Operator> for OpendalStore {
138 fn from(value: Operator) -> Self {
139 Self::new(value)
140 }
141}
142
143#[async_trait]
144impl ObjectStore for OpendalStore {
145 async fn put_opts(
146 &self,
147 location: &Path,
148 bytes: PutPayload,
149 opts: PutOptions,
150 ) -> object_store::Result<PutResult> {
151 let mut future_write = self
152 .inner
153 .write_with(location.as_ref(), Buffer::from_iter(bytes.into_iter()));
154 let opts_mode = opts.mode.clone();
155 match opts.mode {
156 PutMode::Overwrite => {}
157 PutMode::Create => {
158 future_write = future_write.if_not_exists(true);
159 }
160 PutMode::Update(update_version) => {
161 let Some(etag) = update_version.e_tag else {
162 Err(object_store::Error::NotSupported {
163 source: Box::new(opendal::Error::new(
164 opendal::ErrorKind::Unsupported,
165 "etag is required for conditional put",
166 )),
167 })?
168 };
169 future_write = future_write.if_match(etag.as_str());
170 }
171 }
172 future_write.into_send().await.map_err(|err| {
173 match format_object_store_error(err, location.as_ref()) {
174 object_store::Error::Precondition { path, source }
175 if opts_mode == PutMode::Create =>
176 {
177 object_store::Error::AlreadyExists { path, source }
178 }
179 e => e,
180 }
181 })?;
182
183 Ok(PutResult {
184 e_tag: None,
185 version: None,
186 })
187 }
188
189 async fn put_multipart(
190 &self,
191 location: &Path,
192 ) -> object_store::Result<Box<dyn MultipartUpload>> {
193 let writer = self
194 .inner
195 .writer_with(location.as_ref())
196 .concurrent(8)
197 .into_send()
198 .await
199 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
200 let upload = OpendalMultipartUpload::new(writer, location.clone());
201
202 Ok(Box::new(upload))
203 }
204
205 async fn put_multipart_opts(
206 &self,
207 _location: &Path,
208 _opts: PutMultipartOpts,
209 ) -> object_store::Result<Box<dyn MultipartUpload>> {
210 Err(object_store::Error::NotSupported {
211 source: Box::new(opendal::Error::new(
212 opendal::ErrorKind::Unsupported,
213 "put_multipart_opts is not implemented so far",
214 )),
215 })
216 }
217
218 async fn get_opts(
219 &self,
220 location: &Path,
221 options: GetOptions,
222 ) -> object_store::Result<GetResult> {
223 let meta = {
224 let mut s = self.inner.stat_with(location.as_ref());
225 if let Some(version) = &options.version {
226 s = s.version(version.as_str())
227 }
228 if let Some(if_match) = &options.if_match {
229 s = s.if_match(if_match.as_str());
230 }
231 if let Some(if_none_match) = &options.if_none_match {
232 s = s.if_none_match(if_none_match.as_str());
233 }
234 if let Some(if_modified_since) = options.if_modified_since {
235 s = s.if_modified_since(if_modified_since);
236 }
237 if let Some(if_unmodified_since) = options.if_unmodified_since {
238 s = s.if_unmodified_since(if_unmodified_since);
239 }
240 s.into_send()
241 .await
242 .map_err(|err| format_object_store_error(err, location.as_ref()))?
243 };
244
245 let meta = ObjectMeta {
246 location: location.clone(),
247 last_modified: meta.last_modified().unwrap_or_default(),
248 size: meta.content_length(),
249 e_tag: meta.etag().map(|x| x.to_string()),
250 version: meta.version().map(|x| x.to_string()),
251 };
252
253 if options.head {
254 return Ok(GetResult {
255 payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
256 range: 0..0,
257 meta,
258 attributes: Default::default(),
259 });
260 }
261
262 let reader = {
263 let mut r = self.inner.reader_with(location.as_ref());
264 if let Some(version) = options.version {
265 r = r.version(version.as_str());
266 }
267 if let Some(if_match) = options.if_match {
268 r = r.if_match(if_match.as_str());
269 }
270 if let Some(if_none_match) = options.if_none_match {
271 r = r.if_none_match(if_none_match.as_str());
272 }
273 if let Some(if_modified_since) = options.if_modified_since {
274 r = r.if_modified_since(if_modified_since);
275 }
276 if let Some(if_unmodified_since) = options.if_unmodified_since {
277 r = r.if_unmodified_since(if_unmodified_since);
278 }
279 r.into_send()
280 .await
281 .map_err(|err| format_object_store_error(err, location.as_ref()))?
282 };
283
284 let read_range = match options.range {
285 Some(GetRange::Bounded(r)) => {
286 if r.start >= r.end || r.start >= meta.size {
287 0..0
288 } else {
289 let end = r.end.min(meta.size);
290 r.start..end
291 }
292 }
293 Some(GetRange::Offset(r)) => {
294 if r < meta.size {
295 r..meta.size
296 } else {
297 0..0
298 }
299 }
300 Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
301 _ => 0..meta.size,
302 };
303
304 let stream = reader
305 .into_bytes_stream(read_range.start..read_range.end)
306 .into_send()
307 .await
308 .map_err(|err| format_object_store_error(err, location.as_ref()))?
309 .into_send()
310 .map_err(|err: io::Error| object_store::Error::Generic {
311 store: "IoError",
312 source: Box::new(err),
313 });
314
315 Ok(GetResult {
316 payload: GetResultPayload::Stream(Box::pin(stream)),
317 range: read_range.start..read_range.end,
318 meta,
319 attributes: Default::default(),
320 })
321 }
322
323 async fn delete(&self, location: &Path) -> object_store::Result<()> {
324 self.inner
325 .delete(location.as_ref())
326 .into_send()
327 .await
328 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
329
330 Ok(())
331 }
332
333 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
334 let path = prefix.map_or("".into(), |x| format!("{}/", x));
337
338 let lister_fut = self.inner.lister_with(&path).recursive(true);
339 let fut = async move {
340 let stream = lister_fut
341 .await
342 .map_err(|err| format_object_store_error(err, &path))?;
343
344 let stream = stream.then(|res| async {
345 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
346 let meta = entry.metadata();
347
348 Ok(format_object_meta(entry.path(), meta))
349 });
350 Ok::<_, object_store::Error>(stream)
351 };
352
353 fut.into_stream().try_flatten().into_send().boxed()
354 }
355
356 fn list_with_offset(
357 &self,
358 prefix: Option<&Path>,
359 offset: &Path,
360 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
361 let path = prefix.map_or("".into(), |x| format!("{}/", x));
362 let offset = offset.clone();
363
364 let this = self.clone();
367
368 let fut = async move {
369 let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
370 let mut fut = this.inner.lister_with(&path).recursive(true);
371
372 if list_with_start_after {
374 fut = fut.start_after(offset.as_ref());
375 }
376
377 let lister = fut
378 .await
379 .map_err(|err| format_object_store_error(err, &path))?
380 .then(move |entry| {
381 let path = path.clone();
382 let this = this.clone();
383 async move {
384 let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
385 let (path, metadata) = entry.into_parts();
386
387 if metadata.is_dir() || metadata.last_modified().is_some() {
389 let object_meta = format_object_meta(&path, &metadata);
390 return Ok(object_meta);
391 }
392
393 let metadata = this
394 .inner
395 .stat(&path)
396 .await
397 .map_err(|err| format_object_store_error(err, &path))?;
398 let object_meta = format_object_meta(&path, &metadata);
399 Ok::<_, object_store::Error>(object_meta)
400 }
401 })
402 .into_send()
403 .boxed();
404
405 let stream = if list_with_start_after {
406 lister
407 } else {
408 lister
409 .try_filter(move |entry| futures::future::ready(entry.location > offset))
410 .into_send()
411 .boxed()
412 };
413
414 Ok::<_, object_store::Error>(stream)
415 };
416
417 fut.into_stream().into_send().try_flatten().boxed()
418 }
419
420 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
421 let path = prefix.map_or("".into(), |x| format!("{}/", x));
422 let mut stream = self
423 .inner
424 .lister_with(&path)
425 .into_future()
426 .into_send()
427 .await
428 .map_err(|err| format_object_store_error(err, &path))?
429 .into_send();
430
431 let mut common_prefixes = Vec::new();
432 let mut objects = Vec::new();
433
434 while let Some(res) = stream.next().into_send().await {
435 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
436 let meta = entry.metadata();
437
438 if meta.is_dir() {
439 common_prefixes.push(entry.path().into());
440 } else if meta.last_modified().is_some() {
441 objects.push(format_object_meta(entry.path(), meta));
442 } else {
443 let meta = self
444 .inner
445 .stat(entry.path())
446 .into_send()
447 .await
448 .map_err(|err| format_object_store_error(err, entry.path()))?;
449 objects.push(format_object_meta(entry.path(), &meta));
450 }
451 }
452
453 Ok(ListResult {
454 common_prefixes,
455 objects,
456 })
457 }
458
459 async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
460 Err(object_store::Error::NotSupported {
461 source: Box::new(opendal::Error::new(
462 opendal::ErrorKind::Unsupported,
463 "copy is not implemented so far",
464 )),
465 })
466 }
467
468 async fn rename(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
469 Err(object_store::Error::NotSupported {
470 source: Box::new(opendal::Error::new(
471 opendal::ErrorKind::Unsupported,
472 "rename is not implemented so far",
473 )),
474 })
475 }
476
477 async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
478 Err(object_store::Error::NotSupported {
479 source: Box::new(opendal::Error::new(
480 opendal::ErrorKind::Unsupported,
481 "copy_if_not_exists is not implemented so far",
482 )),
483 })
484 }
485}
486
487struct OpendalMultipartUpload {
496 writer: Arc<Mutex<Writer>>,
497 location: Path,
498 next_notify: Option<Arc<Notify>>,
499}
500
501impl OpendalMultipartUpload {
502 fn new(writer: Writer, location: Path) -> Self {
503 Self {
504 writer: Arc::new(Mutex::new(writer)),
505 location,
506 next_notify: None,
507 }
508 }
509}
510
511#[async_trait]
512impl MultipartUpload for OpendalMultipartUpload {
513 fn put_part(&mut self, data: PutPayload) -> UploadPart {
514 let writer = self.writer.clone();
515 let location = self.location.clone();
516
517 let next_notify = Arc::new(Notify::new());
519 let current_notify = self.next_notify.replace(next_notify.clone());
521
522 async move {
523 if let Some(notify) = current_notify {
525 notify.notified().await;
527 }
528
529 let mut writer = writer.lock().await;
530 let result = writer
531 .write(Buffer::from_iter(data.into_iter()))
532 .await
533 .map_err(|err| format_object_store_error(err, location.as_ref()));
534
535 next_notify.notify_one();
537
538 result
539 }
540 .into_send()
541 .boxed()
542 }
543
544 async fn complete(&mut self) -> object_store::Result<PutResult> {
545 let mut writer = self.writer.lock().await;
546 writer
547 .close()
548 .into_send()
549 .await
550 .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
551
552 Ok(PutResult {
553 e_tag: None,
554 version: None,
555 })
556 }
557
558 async fn abort(&mut self) -> object_store::Result<()> {
559 let mut writer = self.writer.lock().await;
560 writer
561 .abort()
562 .into_send()
563 .await
564 .map_err(|err| format_object_store_error(err, self.location.as_ref()))
565 }
566}
567
568impl Debug for OpendalMultipartUpload {
569 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
570 f.debug_struct("OpendalMultipartUpload")
571 .field("location", &self.location)
572 .finish()
573 }
574}
575
576#[cfg(test)]
577mod tests {
578 use bytes::Bytes;
579 use object_store::path::Path;
580 use object_store::{ObjectStore, WriteMultipart};
581 use opendal::services;
582 use rand::prelude::*;
583 use std::sync::Arc;
584
585 use super::*;
586
587 async fn create_test_object_store() -> Arc<dyn ObjectStore> {
588 let op = Operator::new(services::Memory::default()).unwrap().finish();
589 let object_store = Arc::new(OpendalStore::new(op));
590
591 let path: Path = "data/test.txt".into();
592 let bytes = Bytes::from_static(b"hello, world!");
593 object_store.put(&path, bytes.into()).await.unwrap();
594
595 let path: Path = "data/nested/test.txt".into();
596 let bytes = Bytes::from_static(b"hello, world! I am nested.");
597 object_store.put(&path, bytes.into()).await.unwrap();
598
599 object_store
600 }
601
602 #[tokio::test]
603 async fn test_basic() {
604 let op = Operator::new(services::Memory::default()).unwrap().finish();
605 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
606
607 let path: Path = "data/test.txt".into();
609
610 let bytes = Bytes::from_static(b"hello, world!");
611 object_store.put(&path, bytes.clone().into()).await.unwrap();
612
613 let meta = object_store.head(&path).await.unwrap();
614
615 assert_eq!(meta.size, 13);
616
617 assert_eq!(
618 object_store
619 .get(&path)
620 .await
621 .unwrap()
622 .bytes()
623 .await
624 .unwrap(),
625 bytes
626 );
627 }
628
629 #[tokio::test]
630 async fn test_put_multipart() {
631 let op = Operator::new(services::Memory::default()).unwrap().finish();
632 let object_store: Arc<dyn ObjectStore> = Arc::new(OpendalStore::new(op));
633
634 let mut rng = thread_rng();
635
636 let path: Path = "data/test_complete.txt".into();
638 let upload = object_store.put_multipart(&path).await.unwrap();
639
640 let mut write = WriteMultipart::new(upload);
641
642 let mut all_bytes = vec![];
643 let round = rng.gen_range(1..=1024);
644 for _ in 0..round {
645 let size = rng.gen_range(1..=1024);
646 let mut bytes = vec![0; size];
647 rng.fill_bytes(&mut bytes);
648
649 all_bytes.extend_from_slice(&bytes);
650 write.put(bytes.into());
651 }
652
653 let _ = write.finish().await.unwrap();
654
655 let meta = object_store.head(&path).await.unwrap();
656
657 assert_eq!(meta.size, all_bytes.len() as u64);
658
659 assert_eq!(
660 object_store
661 .get(&path)
662 .await
663 .unwrap()
664 .bytes()
665 .await
666 .unwrap(),
667 Bytes::from(all_bytes)
668 );
669
670 let path: Path = "data/test_abort.txt".into();
672 let mut upload = object_store.put_multipart(&path).await.unwrap();
673 upload.put_part(vec![1; 1024].into()).await.unwrap();
674 upload.abort().await.unwrap();
675
676 let res = object_store.head(&path).await;
677 let err = res.unwrap_err();
678
679 assert!(matches!(err, object_store::Error::NotFound { .. }))
680 }
681
682 #[tokio::test]
683 async fn test_list() {
684 let object_store = create_test_object_store().await;
685 let path: Path = "data/".into();
686 let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
687 assert_eq!(results.len(), 2);
688 let mut locations = results
689 .iter()
690 .map(|x| x.as_ref().unwrap().location.as_ref())
691 .collect::<Vec<_>>();
692
693 let expected_files = vec![
694 (
695 "data/nested/test.txt",
696 Bytes::from_static(b"hello, world! I am nested."),
697 ),
698 ("data/test.txt", Bytes::from_static(b"hello, world!")),
699 ];
700
701 let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
702
703 locations.sort();
704 assert_eq!(locations, expected_locations);
705
706 for (location, bytes) in expected_files {
707 let path: Path = location.into();
708 assert_eq!(
709 object_store
710 .get(&path)
711 .await
712 .unwrap()
713 .bytes()
714 .await
715 .unwrap(),
716 bytes
717 );
718 }
719 }
720
721 #[tokio::test]
722 async fn test_list_with_delimiter() {
723 let object_store = create_test_object_store().await;
724 let path: Path = "data/".into();
725 let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
726 assert_eq!(result.objects.len(), 1);
727 assert_eq!(result.common_prefixes.len(), 1);
728 assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
729 assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
730 }
731
732 #[tokio::test]
733 async fn test_list_with_offset() {
734 let object_store = create_test_object_store().await;
735 let path: Path = "data/".into();
736 let offset: Path = "data/nested/test.txt".into();
737 let result = object_store
738 .list_with_offset(Some(&path), &offset)
739 .collect::<Vec<_>>()
740 .await;
741 assert_eq!(result.len(), 1);
742 assert_eq!(
743 result[0].as_ref().unwrap().location.as_ref(),
744 "data/test.txt"
745 );
746 }
747}