object_store_opendal/service/
mod.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use object_store::ObjectStore;
23use object_store::path::Path as ObjectStorePath;
24use opendal::Error;
25use opendal::ErrorKind;
26use opendal::raw::oio::BatchDeleter;
27use opendal::raw::oio::MultipartWriter;
28use opendal::raw::*;
29use opendal::*;
30
31mod core;
32mod deleter;
33mod error;
34mod lister;
35mod reader;
36mod writer;
37
38use deleter::ObjectStoreDeleter;
39use error::parse_error;
40use lister::ObjectStoreLister;
41use reader::ObjectStoreReader;
42use writer::ObjectStoreWriter;
43
44use crate::service::core::format_metadata as parse_metadata;
45use crate::service::core::parse_op_stat;
46
47#[derive(Default)]
49pub struct ObjectStoreBuilder {
50 store: Option<Arc<dyn ObjectStore + 'static>>,
51}
52
53impl Debug for ObjectStoreBuilder {
54 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55 let mut d = f.debug_struct("ObjectStoreBuilder");
56 d.finish_non_exhaustive()
57 }
58}
59
60impl ObjectStoreBuilder {
61 pub fn new(store: Arc<dyn ObjectStore + 'static>) -> Self {
63 Self { store: Some(store) }
64 }
65}
66
67impl Builder for ObjectStoreBuilder {
68 type Config = ();
69
70 fn build(self) -> Result<impl Access> {
71 let store = self.store.ok_or_else(|| {
72 Error::new(ErrorKind::ConfigInvalid, "object store is required")
73 .with_context("service", Scheme::Custom("object_store"))
74 })?;
75
76 Ok(ObjectStoreService { store })
77 }
78}
79
80pub struct ObjectStoreService {
82 store: Arc<dyn ObjectStore + 'static>,
83}
84
85impl Debug for ObjectStoreService {
86 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
87 let mut d = f.debug_struct("ObjectStoreBackend");
88 d.finish_non_exhaustive()
89 }
90}
91
92impl Access for ObjectStoreService {
93 type Reader = ObjectStoreReader;
94 type Writer = MultipartWriter<ObjectStoreWriter>;
95 type Lister = ObjectStoreLister;
96 type Deleter = BatchDeleter<ObjectStoreDeleter>;
97
98 fn info(&self) -> Arc<AccessorInfo> {
99 let info = AccessorInfo::default();
100 info.set_scheme("object_store")
101 .set_root("/")
102 .set_name("object_store")
103 .set_native_capability(Capability {
104 stat: true,
105 stat_with_if_match: true,
106 stat_with_if_unmodified_since: true,
107 read: true,
108 write: true,
109 delete: true,
110 list: true,
111 list_with_limit: true,
112 list_with_start_after: true,
113 delete_with_version: false,
114 ..Default::default()
115 });
116 Arc::new(info)
117 }
118
119 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
120 let path = ObjectStorePath::from(path);
121 let opts = parse_op_stat(&args)?;
122 let result = self
123 .store
124 .get_opts(&path, opts)
125 .await
126 .map_err(parse_error)?;
127 let metadata = parse_metadata(&result.meta);
128 Ok(RpStat::new(metadata))
129 }
130
131 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
132 let reader = ObjectStoreReader::new(self.store.clone(), path, args).await?;
133 Ok((reader.rp(), reader))
134 }
135
136 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
137 let writer = ObjectStoreWriter::new(self.store.clone(), path, args);
138 Ok((
139 RpWrite::default(),
140 MultipartWriter::new(self.info(), writer, 10),
141 ))
142 }
143
144 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
145 let deleter = BatchDeleter::new(ObjectStoreDeleter::new(self.store.clone()));
146 Ok((RpDelete::default(), deleter))
147 }
148
149 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
150 let lister = ObjectStoreLister::new(self.store.clone(), path, args).await?;
151 Ok((RpList::default(), lister))
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use object_store::memory::InMemory;
159 use opendal::Buffer;
160 use opendal::raw::oio::{Delete, List, Read, Write};
161
162 #[tokio::test]
163 async fn test_object_store_backend_builder() {
164 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
165 let builder = ObjectStoreBuilder::new(store);
166
167 let backend = builder.build().expect("build should succeed");
168 assert!(backend.info().scheme() == "object_store");
169 }
170
171 #[tokio::test]
172 async fn test_object_store_backend_info() {
173 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
174 let backend = ObjectStoreBuilder::new(store)
175 .build()
176 .expect("build should succeed");
177
178 let info = backend.info();
179 assert_eq!(info.scheme(), "object_store");
180 assert_eq!(info.name(), "object_store".into());
181 assert_eq!(info.root(), "/".into());
182
183 let cap = info.native_capability();
184 assert!(cap.stat);
185 assert!(cap.read);
186 assert!(cap.write);
187 assert!(cap.delete);
188 assert!(cap.list);
189 }
190
191 #[tokio::test]
192 async fn test_object_store_backend_basic_operations() {
193 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
194 let backend = ObjectStoreBuilder::new(store.clone())
195 .build()
196 .expect("build should succeed");
197
198 let path = "test_file.txt";
199 let content = b"Hello, world!";
200
201 let (_, mut writer) = backend
203 .write(path, OpWrite::default())
204 .await
205 .expect("write should succeed");
206
207 writer
208 .write(Buffer::from(&content[..]))
209 .await
210 .expect("write content should succeed");
211 writer.close().await.expect("close should succeed");
212
213 let stat_result = backend
215 .stat(path, OpStat::default())
216 .await
217 .expect("stat should succeed");
218
219 assert_eq!(
220 stat_result.into_metadata().content_length(),
221 content.len() as u64
222 );
223
224 let (_, mut reader) = backend
226 .read(path, OpRead::default())
227 .await
228 .expect("read should succeed");
229
230 let buf = reader.read().await.expect("read should succeed");
231 assert_eq!(buf.to_vec(), content);
232 }
233
234 #[tokio::test]
235 async fn test_object_store_backend_multipart_upload() {
236 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
237 let backend = ObjectStoreBuilder::new(store.clone())
238 .build()
239 .expect("build should succeed");
240
241 let path = "test_file.txt";
242 let content =
243 b"Hello, multipart upload! This is a test content for multipart upload functionality.";
244 let content_len = content.len();
245
246 let (_, mut writer) = backend
248 .write(path, OpWrite::default())
249 .await
250 .expect("write should succeed");
251
252 let chunk_size = 20;
254 for chunk in content.chunks(chunk_size) {
255 writer
256 .write(Buffer::from(chunk))
257 .await
258 .expect("write chunk should succeed");
259 }
260
261 writer.close().await.expect("close should succeed");
262
263 let stat_result = backend
265 .stat(path, OpStat::default())
266 .await
267 .expect("stat should succeed");
268
269 assert_eq!(
270 stat_result.into_metadata().content_length(),
271 content_len as u64
272 );
273
274 let (_, mut reader) = backend
276 .read(path, OpRead::default())
277 .await
278 .expect("read should succeed");
279
280 let buf = reader.read().await.expect("read should succeed");
281 assert_eq!(buf.to_vec(), content);
282 }
283
284 #[tokio::test]
285 async fn test_object_store_backend_list() {
286 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
287 let backend = ObjectStoreBuilder::new(store.clone())
288 .build()
289 .expect("build should succeed");
290
291 let files = vec![
293 ("dir1/file1.txt", b"content1"),
294 ("dir1/file2.txt", b"content2"),
295 ("dir2/file3.txt", b"content3"),
296 ];
297
298 for (path, content) in &files {
299 let (_, mut writer) = backend
300 .write(path, OpWrite::default())
301 .await
302 .expect("write should succeed");
303 writer
304 .write(Buffer::from(&content[..]))
305 .await
306 .expect("write content should succeed");
307 writer.close().await.expect("close should succeed");
308 }
309
310 let (_, mut lister) = backend
312 .list("dir1/", OpList::default())
313 .await
314 .expect("list should succeed");
315
316 let mut entries = Vec::new();
317 while let Some(entry) = lister.next().await.expect("next should succeed") {
318 entries.push(entry);
319 }
320
321 assert_eq!(entries.len(), 2);
322 assert!(entries.iter().any(|e| e.path() == "dir1/file1.txt"));
323 assert!(entries.iter().any(|e| e.path() == "dir1/file2.txt"));
324 }
325
326 #[tokio::test]
327 async fn test_object_store_backend_delete() {
328 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
329 let backend = ObjectStoreBuilder::new(store)
330 .build()
331 .expect("build should succeed");
332
333 let path = "test_delete.txt";
334 let content = b"To be deleted";
335
336 let (_, mut writer) = backend
338 .write(path, OpWrite::default())
339 .await
340 .expect("write should succeed");
341 writer
342 .write(Buffer::from(&content[..]))
343 .await
344 .expect("write content should succeed");
345 writer.close().await.expect("close should succeed");
346
347 backend
349 .stat(path, OpStat::default())
350 .await
351 .expect("file should exist");
352
353 let (_, mut deleter) = backend.delete().await.expect("delete should succeed");
355 deleter
356 .delete(path, OpDelete::default())
357 .expect("delete should succeed");
358 deleter.flush().await.expect("flush should succeed");
359
360 let result = backend.stat(path, OpStat::default()).await;
362 assert!(result.is_err());
363 }
364
365 #[tokio::test]
366 async fn test_object_store_backend_error_handling() {
367 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
368 let backend = ObjectStoreBuilder::new(store)
369 .build()
370 .expect("build should succeed");
371
372 let result = backend.stat("non_existent.txt", OpStat::default()).await;
374 assert!(result.is_err());
375
376 let result = backend.read("non_existent.txt", OpRead::default()).await;
378 assert!(result.is_err());
379
380 let result = backend.list("non_existent_dir/", OpList::default()).await;
382 if let Ok((_, mut lister)) = result {
384 let entry = lister.next().await.expect("next should succeed");
385 assert!(entry.is_none());
386 }
387 }
388}