object_store_opendal/service/
mod.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use object_store::ObjectStore;
22use object_store::path::Path as ObjectStorePath;
23use opendal::Error;
24use opendal::ErrorKind;
25use opendal::raw::oio::BatchDeleter;
26use opendal::raw::oio::MultipartWriter;
27use opendal::raw::*;
28use opendal::*;
29
30mod core;
31mod deleter;
32mod error;
33mod lister;
34mod reader;
35mod writer;
36
37use deleter::ObjectStoreDeleter;
38use error::parse_error;
39use lister::ObjectStoreLister;
40use reader::ObjectStoreReader;
41use writer::ObjectStoreWriter;
42
43use crate::service::core::format_metadata as parse_metadata;
44use crate::service::core::parse_op_stat;
45
46pub const OBJECT_STORE_SCHEME: &str = "object_store";
47
48#[derive(Default)]
50pub struct ObjectStoreBuilder {
51 store: Option<Arc<dyn ObjectStore + 'static>>,
52}
53
54impl Debug for ObjectStoreBuilder {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 let mut d = f.debug_struct("ObjectStoreBuilder");
57 d.finish_non_exhaustive()
58 }
59}
60
61impl ObjectStoreBuilder {
62 pub fn new(store: Arc<dyn ObjectStore + 'static>) -> Self {
64 Self { store: Some(store) }
65 }
66}
67
68impl Builder for ObjectStoreBuilder {
69 type Config = ();
70
71 fn build(self) -> Result<impl Access> {
72 let store = self.store.ok_or_else(|| {
73 Error::new(ErrorKind::ConfigInvalid, "object store is required")
74 .with_context("service", OBJECT_STORE_SCHEME)
75 })?;
76
77 Ok(ObjectStoreService { store })
78 }
79}
80
81pub struct ObjectStoreService {
83 store: Arc<dyn ObjectStore + 'static>,
84}
85
86impl Debug for ObjectStoreService {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 let mut d = f.debug_struct("ObjectStoreBackend");
89 d.finish_non_exhaustive()
90 }
91}
92
93impl Access for ObjectStoreService {
94 type Reader = ObjectStoreReader;
95 type Writer = MultipartWriter<ObjectStoreWriter>;
96 type Lister = ObjectStoreLister;
97 type Deleter = BatchDeleter<ObjectStoreDeleter>;
98 type Copier = ();
99
100 fn info(&self) -> Arc<AccessorInfo> {
101 let info = AccessorInfo::default();
102 info.set_scheme(OBJECT_STORE_SCHEME)
103 .set_root("/")
104 .set_name("object_store")
105 .set_native_capability(Capability {
106 stat: true,
107 stat_with_if_match: true,
108 stat_with_if_unmodified_since: true,
109 read: true,
110 write: true,
111 delete: true,
112 list: true,
113 list_with_limit: true,
114 list_with_start_after: true,
115 delete_with_version: false,
116 ..Default::default()
117 });
118 Arc::new(info)
119 }
120
121 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
122 let path = ObjectStorePath::from(path);
123 let opts = parse_op_stat(&args)?;
124 let result = self
125 .store
126 .get_opts(&path, opts)
127 .await
128 .map_err(parse_error)?;
129 let metadata = parse_metadata(&result.meta);
130 Ok(RpStat::new(metadata))
131 }
132
133 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
134 let reader = ObjectStoreReader::new(self.store.clone(), path, args).await?;
135 Ok((reader.rp(), reader))
136 }
137
138 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
139 let writer = ObjectStoreWriter::new(self.store.clone(), path, args);
140 Ok((
141 RpWrite::default(),
142 MultipartWriter::new(self.info(), writer, 10),
143 ))
144 }
145
146 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
147 let deleter = BatchDeleter::new(ObjectStoreDeleter::new(self.store.clone()), Some(1000));
148 Ok((RpDelete::default(), deleter))
149 }
150
151 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
152 let lister = ObjectStoreLister::new(self.store.clone(), path, args).await?;
153 Ok((RpList::default(), lister))
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160 use object_store::memory::InMemory;
161 use opendal::Buffer;
162 use opendal::raw::oio::{Delete, List, ReadStream, Write};
163
164 #[tokio::test]
165 async fn test_object_store_backend_builder() {
166 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
167 let builder = ObjectStoreBuilder::new(store);
168
169 let backend = builder.build().expect("build should succeed");
170 assert!(backend.info().scheme() == OBJECT_STORE_SCHEME);
171 }
172
173 #[tokio::test]
174 async fn test_object_store_backend_info() {
175 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
176 let backend = ObjectStoreBuilder::new(store)
177 .build()
178 .expect("build should succeed");
179
180 let info = backend.info();
181 assert_eq!(info.scheme(), "object_store");
182 assert_eq!(info.name(), "object_store".into());
183 assert_eq!(info.root(), "/".into());
184
185 let cap = info.native_capability();
186 assert!(cap.stat);
187 assert!(cap.read);
188 assert!(cap.write);
189 assert!(cap.delete);
190 assert!(cap.list);
191 }
192
193 #[tokio::test]
194 async fn test_object_store_backend_basic_operations() {
195 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
196 let backend = ObjectStoreBuilder::new(store.clone())
197 .build()
198 .expect("build should succeed");
199
200 let path = "test_file.txt";
201 let content = b"Hello, world!";
202
203 let (_, mut writer) = backend
205 .write(path, OpWrite::default())
206 .await
207 .expect("write should succeed");
208
209 writer
210 .write(Buffer::from(&content[..]))
211 .await
212 .expect("write content should succeed");
213 writer.close().await.expect("close should succeed");
214
215 let stat_result = backend
217 .stat(path, OpStat::default())
218 .await
219 .expect("stat should succeed");
220
221 assert_eq!(
222 stat_result.into_metadata().content_length(),
223 content.len() as u64
224 );
225
226 let (_, mut reader) = backend
228 .read(path, OpRead::default())
229 .await
230 .expect("read should succeed");
231
232 let buf = reader.read().await.expect("read should succeed");
233 assert_eq!(buf.to_vec(), content);
234 }
235
236 #[tokio::test]
237 async fn test_object_store_backend_multipart_upload() {
238 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
239 let backend = ObjectStoreBuilder::new(store.clone())
240 .build()
241 .expect("build should succeed");
242
243 let path = "test_file.txt";
244 let content =
245 b"Hello, multipart upload! This is a test content for multipart upload functionality.";
246 let content_len = content.len();
247
248 let (_, mut writer) = backend
250 .write(path, OpWrite::default())
251 .await
252 .expect("write should succeed");
253
254 let chunk_size = 20;
256 for chunk in content.chunks(chunk_size) {
257 writer
258 .write(Buffer::from(chunk))
259 .await
260 .expect("write chunk should succeed");
261 }
262
263 writer.close().await.expect("close should succeed");
264
265 let stat_result = backend
267 .stat(path, OpStat::default())
268 .await
269 .expect("stat should succeed");
270
271 assert_eq!(
272 stat_result.into_metadata().content_length(),
273 content_len as u64
274 );
275
276 let (_, mut reader) = backend
278 .read(path, OpRead::default())
279 .await
280 .expect("read should succeed");
281
282 let buf = reader.read().await.expect("read should succeed");
283 assert_eq!(buf.to_vec(), content);
284 }
285
286 #[tokio::test]
287 async fn test_object_store_backend_list() {
288 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
289 let backend = ObjectStoreBuilder::new(store.clone())
290 .build()
291 .expect("build should succeed");
292
293 let files = vec![
295 ("dir1/file1.txt", b"content1"),
296 ("dir1/file2.txt", b"content2"),
297 ("dir2/file3.txt", b"content3"),
298 ];
299
300 for (path, content) in &files {
301 let (_, mut writer) = backend
302 .write(path, OpWrite::default())
303 .await
304 .expect("write should succeed");
305 writer
306 .write(Buffer::from(&content[..]))
307 .await
308 .expect("write content should succeed");
309 writer.close().await.expect("close should succeed");
310 }
311
312 let (_, mut lister) = backend
314 .list("dir1/", OpList::default())
315 .await
316 .expect("list should succeed");
317
318 let mut entries = Vec::new();
319 while let Some(entry) = lister.next().await.expect("next should succeed") {
320 entries.push(entry);
321 }
322
323 assert_eq!(entries.len(), 2);
324 assert!(entries.iter().any(|e| e.path() == "dir1/file1.txt"));
325 assert!(entries.iter().any(|e| e.path() == "dir1/file2.txt"));
326 }
327
328 #[tokio::test]
329 async fn test_object_store_backend_delete() {
330 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
331 let backend = ObjectStoreBuilder::new(store)
332 .build()
333 .expect("build should succeed");
334
335 let path = "test_delete.txt";
336 let content = b"To be deleted";
337
338 let (_, mut writer) = backend
340 .write(path, OpWrite::default())
341 .await
342 .expect("write should succeed");
343 writer
344 .write(Buffer::from(&content[..]))
345 .await
346 .expect("write content should succeed");
347 writer.close().await.expect("close should succeed");
348
349 backend
351 .stat(path, OpStat::default())
352 .await
353 .expect("file should exist");
354
355 let (_, mut deleter) = backend.delete().await.expect("delete should succeed");
357 deleter
358 .delete(path, OpDelete::default())
359 .await
360 .expect("delete should succeed");
361 deleter.close().await.expect("close should succeed");
362
363 let result = backend.stat(path, OpStat::default()).await;
365 assert!(result.is_err());
366 }
367
368 #[tokio::test]
369 async fn test_object_store_backend_error_handling() {
370 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
371 let backend = ObjectStoreBuilder::new(store)
372 .build()
373 .expect("build should succeed");
374
375 let result = backend.stat("non_existent.txt", OpStat::default()).await;
377 assert!(result.is_err());
378
379 let result = backend.read("non_existent.txt", OpRead::default()).await;
381 assert!(result.is_err());
382
383 let result = backend.list("non_existent_dir/", OpList::default()).await;
385 if let Ok((_, mut lister)) = result {
387 let entry = lister.next().await.expect("next should succeed");
388 assert!(entry.is_none());
389 }
390 }
391}