opendal/raw/accessor.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::future::ready;
20use std::hash::Hash;
21use std::hash::Hasher;
22use std::mem;
23use std::sync::Arc;
24
25use futures::Future;
26
27use crate::raw::*;
28use crate::*;
29
30/// Underlying trait of all backends for implementers.
31///
32/// The actual data access of storage service happens in Accessor layer.
33/// Every storage supported by OpenDAL must implement [`Access`] but not all
34/// methods of [`Access`] will be implemented according to how the storage service is.
35///
36/// For example, user can not modify the content from one HTTP file server directly.
37/// So [`Http`][crate::services::Http] implements and provides only read related actions.
38///
39/// [`Access`] gives default implementation for all methods which will raise [`ErrorKind::Unsupported`] error.
40/// And what action this [`Access`] supports will be pointed out in [`AccessorInfo`].
41///
42/// # Note
43///
44/// Visit [`internals`][crate::docs::internals] for more tutorials.
45///
46/// # Operations
47///
48/// - Path in args will all be normalized into the same style, services
49/// should handle them based on services' requirement.
50/// - Path that ends with `/` means it's Dir, otherwise, it's File.
51/// - Root dir is `/`
52/// - Path will never be empty.
53/// - Operations without capability requirement like `metadata`, `create` are
54/// basic operations.
55/// - All services must implement them.
56/// - Use `unimplemented!()` if not implemented or can't implement.
57/// - Operations with capability requirement like `presign` are optional operations.
58/// - Services can implement them based on services capabilities.
59/// - The default implementation should return [`ErrorKind::Unsupported`].
60pub trait Access: Send + Sync + Debug + Unpin + 'static {
61 /// Reader is the associated reader returned in `read` operation.
62 type Reader: oio::Read;
63 /// Writer is the associated writer returned in `write` operation.
64 type Writer: oio::Write;
65 /// Lister is the associated lister returned in `list` operation.
66 type Lister: oio::List;
67 /// Deleter is the associated deleter returned in `delete` operation.
68 type Deleter: oio::Delete;
69
70 /// BlockingReader is the associated reader returned `blocking_read` operation.
71 type BlockingReader: oio::BlockingRead;
72 /// BlockingWriter is the associated writer returned `blocking_write` operation.
73 type BlockingWriter: oio::BlockingWrite;
74 /// BlockingLister is the associated lister returned `blocking_list` operation.
75 type BlockingLister: oio::BlockingList;
76 /// BlockingDeleter is the associated deleter returned `blocking_delete` operation.
77 type BlockingDeleter: oio::BlockingDelete;
78
79 /// Invoke the `info` operation to get metadata of accessor.
80 ///
81 /// # Notes
82 ///
83 /// This function is required to be implemented.
84 ///
85 /// By returning AccessorInfo, underlying services can declare
86 /// some useful information about itself.
87 ///
88 /// - scheme: declare the scheme of backend.
89 /// - capabilities: declare the capabilities of current backend.
90 fn info(&self) -> Arc<AccessorInfo>;
91
92 /// Invoke the `create` operation on the specified path
93 ///
94 /// Require [`Capability::create_dir`]
95 ///
96 /// # Behavior
97 ///
98 /// - Input path MUST match with EntryMode, DON'T NEED to check mode.
99 /// - Create on existing dir SHOULD succeed.
100 fn create_dir(
101 &self,
102 path: &str,
103 args: OpCreateDir,
104 ) -> impl Future<Output = Result<RpCreateDir>> + MaybeSend {
105 let (_, _) = (path, args);
106
107 ready(Err(Error::new(
108 ErrorKind::Unsupported,
109 "operation is not supported",
110 )))
111 }
112
113 /// Invoke the `stat` operation on the specified path.
114 ///
115 /// Require [`Capability::stat`]
116 ///
117 /// # Behavior
118 ///
119 /// - `stat` empty path means stat backend's root path.
120 /// - `stat` a path endswith "/" means stating a dir.
121 /// - `mode` and `content_length` must be set.
122 fn stat(&self, path: &str, args: OpStat) -> impl Future<Output = Result<RpStat>> + MaybeSend {
123 let (_, _) = (path, args);
124
125 ready(Err(Error::new(
126 ErrorKind::Unsupported,
127 "operation is not supported",
128 )))
129 }
130
131 /// Invoke the `read` operation on the specified path, returns a
132 /// [`Reader`][crate::Reader] if operate successful.
133 ///
134 /// Require [`Capability::read`]
135 ///
136 /// # Behavior
137 ///
138 /// - Input path MUST be file path, DON'T NEED to check mode.
139 /// - The returning content length may be smaller than the range specified.
140 fn read(
141 &self,
142 path: &str,
143 args: OpRead,
144 ) -> impl Future<Output = Result<(RpRead, Self::Reader)>> + MaybeSend {
145 let (_, _) = (path, args);
146
147 ready(Err(Error::new(
148 ErrorKind::Unsupported,
149 "operation is not supported",
150 )))
151 }
152
153 /// Invoke the `write` operation on the specified path, returns a
154 /// written size if operate successful.
155 ///
156 /// Require [`Capability::write`]
157 ///
158 /// # Behavior
159 ///
160 /// - Input path MUST be file path, DON'T NEED to check mode.
161 fn write(
162 &self,
163 path: &str,
164 args: OpWrite,
165 ) -> impl Future<Output = Result<(RpWrite, Self::Writer)>> + MaybeSend {
166 let (_, _) = (path, args);
167
168 ready(Err(Error::new(
169 ErrorKind::Unsupported,
170 "operation is not supported",
171 )))
172 }
173
174 /// Invoke the `delete` operation on the specified path.
175 ///
176 /// Require [`Capability::delete`]
177 ///
178 /// # Behavior
179 ///
180 /// - `delete` is an idempotent operation, it's safe to call `Delete` on the same path multiple times.
181 /// - `delete` SHOULD return `Ok(())` if the path is deleted successfully or not exist.
182 fn delete(&self) -> impl Future<Output = Result<(RpDelete, Self::Deleter)>> + MaybeSend {
183 ready(Err(Error::new(
184 ErrorKind::Unsupported,
185 "operation is not supported",
186 )))
187 }
188
189 /// Invoke the `list` operation on the specified path.
190 ///
191 /// Require [`Capability::list`]
192 ///
193 /// # Behavior
194 ///
195 /// - Input path MUST be dir path, DON'T NEED to check mode.
196 /// - List non-exist dir should return Empty.
197 fn list(
198 &self,
199 path: &str,
200 args: OpList,
201 ) -> impl Future<Output = Result<(RpList, Self::Lister)>> + MaybeSend {
202 let (_, _) = (path, args);
203
204 ready(Err(Error::new(
205 ErrorKind::Unsupported,
206 "operation is not supported",
207 )))
208 }
209
210 /// Invoke the `copy` operation on the specified `from` path and `to` path.
211 ///
212 /// Require [Capability::copy]
213 ///
214 /// # Behaviour
215 ///
216 /// - `from` and `to` MUST be file path, DON'T NEED to check mode.
217 /// - Copy on existing file SHOULD succeed.
218 /// - Copy on existing file SHOULD overwrite and truncate.
219 fn copy(
220 &self,
221 from: &str,
222 to: &str,
223 args: OpCopy,
224 ) -> impl Future<Output = Result<RpCopy>> + MaybeSend {
225 let (_, _, _) = (from, to, args);
226
227 ready(Err(Error::new(
228 ErrorKind::Unsupported,
229 "operation is not supported",
230 )))
231 }
232
233 /// Invoke the `rename` operation on the specified `from` path and `to` path.
234 ///
235 /// Require [Capability::rename]
236 fn rename(
237 &self,
238 from: &str,
239 to: &str,
240 args: OpRename,
241 ) -> impl Future<Output = Result<RpRename>> + MaybeSend {
242 let (_, _, _) = (from, to, args);
243
244 ready(Err(Error::new(
245 ErrorKind::Unsupported,
246 "operation is not supported",
247 )))
248 }
249
250 /// Invoke the `presign` operation on the specified path.
251 ///
252 /// Require [`Capability::presign`]
253 ///
254 /// # Behavior
255 ///
256 /// - This API is optional, return [`std::io::ErrorKind::Unsupported`] if not supported.
257 fn presign(
258 &self,
259 path: &str,
260 args: OpPresign,
261 ) -> impl Future<Output = Result<RpPresign>> + MaybeSend {
262 let (_, _) = (path, args);
263
264 ready(Err(Error::new(
265 ErrorKind::Unsupported,
266 "operation is not supported",
267 )))
268 }
269
270 /// Invoke the `blocking_create` operation on the specified path.
271 ///
272 /// This operation is the blocking version of [`Accessor::create_dir`]
273 ///
274 /// Require [`Capability::create_dir`] and [`Capability::blocking`]
275 fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
276 let (_, _) = (path, args);
277
278 Err(Error::new(
279 ErrorKind::Unsupported,
280 "operation is not supported",
281 ))
282 }
283
284 /// Invoke the `blocking_stat` operation on the specified path.
285 ///
286 /// This operation is the blocking version of [`Accessor::stat`]
287 ///
288 /// Require [`Capability::stat`] and [`Capability::blocking`]
289 fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
290 let (_, _) = (path, args);
291
292 Err(Error::new(
293 ErrorKind::Unsupported,
294 "operation is not supported",
295 ))
296 }
297
298 /// Invoke the `blocking_read` operation on the specified path.
299 ///
300 /// This operation is the blocking version of [`Accessor::read`]
301 ///
302 /// Require [`Capability::read`] and [`Capability::blocking`]
303 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
304 let (_, _) = (path, args);
305
306 Err(Error::new(
307 ErrorKind::Unsupported,
308 "operation is not supported",
309 ))
310 }
311
312 /// Invoke the `blocking_write` operation on the specified path.
313 ///
314 /// This operation is the blocking version of [`Accessor::write`]
315 ///
316 /// Require [`Capability::write`] and [`Capability::blocking`]
317 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
318 let (_, _) = (path, args);
319
320 Err(Error::new(
321 ErrorKind::Unsupported,
322 "operation is not supported",
323 ))
324 }
325
326 /// Invoke the `blocking_delete` operation on the specified path.
327 ///
328 /// This operation is the blocking version of [`Accessor::delete`]
329 ///
330 /// Require [`Capability::write`] and [`Capability::blocking`]
331 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
332 Err(Error::new(
333 ErrorKind::Unsupported,
334 "operation is not supported",
335 ))
336 }
337
338 /// Invoke the `blocking_list` operation on the specified path.
339 ///
340 /// This operation is the blocking version of [`Accessor::list`]
341 ///
342 /// Require [`Capability::list`] and [`Capability::blocking`]
343 ///
344 /// # Behavior
345 ///
346 /// - List non-exist dir should return Empty.
347 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
348 let (_, _) = (path, args);
349
350 Err(Error::new(
351 ErrorKind::Unsupported,
352 "operation is not supported",
353 ))
354 }
355
356 /// Invoke the `blocking_copy` operation on the specified `from` path and `to` path.
357 ///
358 /// This operation is the blocking version of [`Accessor::copy`]
359 ///
360 /// Require [`Capability::copy`] and [`Capability::blocking`]
361 fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
362 let (_, _, _) = (from, to, args);
363
364 Err(Error::new(
365 ErrorKind::Unsupported,
366 "operation is not supported",
367 ))
368 }
369
370 /// Invoke the `blocking_rename` operation on the specified `from` path and `to` path.
371 ///
372 /// This operation is the blocking version of [`Accessor::rename`]
373 ///
374 /// Require [`Capability::rename`] and [`Capability::blocking`]
375 fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
376 let (_, _, _) = (from, to, args);
377
378 Err(Error::new(
379 ErrorKind::Unsupported,
380 "operation is not supported",
381 ))
382 }
383}
384
385/// `AccessDyn` is the dyn version of [`Access`] make it possible to use as
386/// `Box<dyn AccessDyn>`.
387pub trait AccessDyn: Send + Sync + Debug + Unpin {
388 /// Dyn version of [`Accessor::info`]
389 fn info_dyn(&self) -> Arc<AccessorInfo>;
390 /// Dyn version of [`Accessor::create_dir`]
391 fn create_dir_dyn<'a>(
392 &'a self,
393 path: &'a str,
394 args: OpCreateDir,
395 ) -> BoxedFuture<'a, Result<RpCreateDir>>;
396 /// Dyn version of [`Accessor::stat`]
397 fn stat_dyn<'a>(&'a self, path: &'a str, args: OpStat) -> BoxedFuture<'a, Result<RpStat>>;
398 /// Dyn version of [`Accessor::read`]
399 fn read_dyn<'a>(
400 &'a self,
401 path: &'a str,
402 args: OpRead,
403 ) -> BoxedFuture<'a, Result<(RpRead, oio::Reader)>>;
404 /// Dyn version of [`Accessor::write`]
405 fn write_dyn<'a>(
406 &'a self,
407 path: &'a str,
408 args: OpWrite,
409 ) -> BoxedFuture<'a, Result<(RpWrite, oio::Writer)>>;
410 /// Dyn version of [`Accessor::delete`]
411 fn delete_dyn(&self) -> BoxedFuture<Result<(RpDelete, oio::Deleter)>>;
412 /// Dyn version of [`Accessor::list`]
413 fn list_dyn<'a>(
414 &'a self,
415 path: &'a str,
416 args: OpList,
417 ) -> BoxedFuture<'a, Result<(RpList, oio::Lister)>>;
418 /// Dyn version of [`Accessor::copy`]
419 fn copy_dyn<'a>(
420 &'a self,
421 from: &'a str,
422 to: &'a str,
423 args: OpCopy,
424 ) -> BoxedFuture<'a, Result<RpCopy>>;
425 /// Dyn version of [`Accessor::rename`]
426 fn rename_dyn<'a>(
427 &'a self,
428 from: &'a str,
429 to: &'a str,
430 args: OpRename,
431 ) -> BoxedFuture<'a, Result<RpRename>>;
432 /// Dyn version of [`Accessor::presign`]
433 fn presign_dyn<'a>(
434 &'a self,
435 path: &'a str,
436 args: OpPresign,
437 ) -> BoxedFuture<'a, Result<RpPresign>>;
438 /// Dyn version of [`Accessor::blocking_create_dir`]
439 fn blocking_create_dir_dyn(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir>;
440 /// Dyn version of [`Accessor::blocking_stat`]
441 fn blocking_stat_dyn(&self, path: &str, args: OpStat) -> Result<RpStat>;
442 /// Dyn version of [`Accessor::blocking_read`]
443 fn blocking_read_dyn(&self, path: &str, args: OpRead) -> Result<(RpRead, oio::BlockingReader)>;
444 /// Dyn version of [`Accessor::blocking_write`]
445 fn blocking_write_dyn(
446 &self,
447 path: &str,
448 args: OpWrite,
449 ) -> Result<(RpWrite, oio::BlockingWriter)>;
450 /// Dyn version of [`Accessor::blocking_delete`]
451 fn blocking_delete_dyn(&self) -> Result<(RpDelete, oio::BlockingDeleter)>;
452 /// Dyn version of [`Accessor::blocking_list`]
453 fn blocking_list_dyn(&self, path: &str, args: OpList) -> Result<(RpList, oio::BlockingLister)>;
454 /// Dyn version of [`Accessor::blocking_copy`]
455 fn blocking_copy_dyn(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy>;
456 /// Dyn version of [`Accessor::blocking_rename`]
457 fn blocking_rename_dyn(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename>;
458}
459
460impl<A: ?Sized> AccessDyn for A
461where
462 A: Access<
463 Reader = oio::Reader,
464 BlockingReader = oio::BlockingReader,
465 Writer = oio::Writer,
466 BlockingWriter = oio::BlockingWriter,
467 Lister = oio::Lister,
468 BlockingLister = oio::BlockingLister,
469 Deleter = oio::Deleter,
470 BlockingDeleter = oio::BlockingDeleter,
471 >,
472{
473 fn info_dyn(&self) -> Arc<AccessorInfo> {
474 self.info()
475 }
476
477 fn create_dir_dyn<'a>(
478 &'a self,
479 path: &'a str,
480 args: OpCreateDir,
481 ) -> BoxedFuture<'a, Result<RpCreateDir>> {
482 Box::pin(self.create_dir(path, args))
483 }
484
485 fn stat_dyn<'a>(&'a self, path: &'a str, args: OpStat) -> BoxedFuture<'a, Result<RpStat>> {
486 Box::pin(self.stat(path, args))
487 }
488
489 fn read_dyn<'a>(
490 &'a self,
491 path: &'a str,
492 args: OpRead,
493 ) -> BoxedFuture<'a, Result<(RpRead, oio::Reader)>> {
494 Box::pin(self.read(path, args))
495 }
496
497 fn write_dyn<'a>(
498 &'a self,
499 path: &'a str,
500 args: OpWrite,
501 ) -> BoxedFuture<'a, Result<(RpWrite, oio::Writer)>> {
502 Box::pin(self.write(path, args))
503 }
504
505 fn delete_dyn(&self) -> BoxedFuture<Result<(RpDelete, oio::Deleter)>> {
506 Box::pin(self.delete())
507 }
508
509 fn list_dyn<'a>(
510 &'a self,
511 path: &'a str,
512 args: OpList,
513 ) -> BoxedFuture<'a, Result<(RpList, oio::Lister)>> {
514 Box::pin(self.list(path, args))
515 }
516
517 fn copy_dyn<'a>(
518 &'a self,
519 from: &'a str,
520 to: &'a str,
521 args: OpCopy,
522 ) -> BoxedFuture<'a, Result<RpCopy>> {
523 Box::pin(self.copy(from, to, args))
524 }
525
526 fn rename_dyn<'a>(
527 &'a self,
528 from: &'a str,
529 to: &'a str,
530 args: OpRename,
531 ) -> BoxedFuture<'a, Result<RpRename>> {
532 Box::pin(self.rename(from, to, args))
533 }
534
535 fn presign_dyn<'a>(
536 &'a self,
537 path: &'a str,
538 args: OpPresign,
539 ) -> BoxedFuture<'a, Result<RpPresign>> {
540 Box::pin(self.presign(path, args))
541 }
542
543 fn blocking_create_dir_dyn(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
544 self.blocking_create_dir(path, args)
545 }
546
547 fn blocking_stat_dyn(&self, path: &str, args: OpStat) -> Result<RpStat> {
548 self.blocking_stat(path, args)
549 }
550
551 fn blocking_read_dyn(&self, path: &str, args: OpRead) -> Result<(RpRead, oio::BlockingReader)> {
552 self.blocking_read(path, args)
553 }
554
555 fn blocking_write_dyn(
556 &self,
557 path: &str,
558 args: OpWrite,
559 ) -> Result<(RpWrite, oio::BlockingWriter)> {
560 self.blocking_write(path, args)
561 }
562
563 fn blocking_delete_dyn(&self) -> Result<(RpDelete, oio::BlockingDeleter)> {
564 self.blocking_delete()
565 }
566
567 fn blocking_list_dyn(&self, path: &str, args: OpList) -> Result<(RpList, oio::BlockingLister)> {
568 self.blocking_list(path, args)
569 }
570
571 fn blocking_copy_dyn(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
572 self.blocking_copy(from, to, args)
573 }
574
575 fn blocking_rename_dyn(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
576 self.blocking_rename(from, to, args)
577 }
578}
579
580impl Access for dyn AccessDyn {
581 type Reader = oio::Reader;
582 type BlockingReader = oio::BlockingReader;
583 type Writer = oio::Writer;
584 type Deleter = oio::Deleter;
585 type BlockingWriter = oio::BlockingWriter;
586 type Lister = oio::Lister;
587 type BlockingLister = oio::BlockingLister;
588 type BlockingDeleter = oio::BlockingDeleter;
589
590 fn info(&self) -> Arc<AccessorInfo> {
591 self.info_dyn()
592 }
593
594 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
595 self.create_dir_dyn(path, args).await
596 }
597
598 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
599 self.stat_dyn(path, args).await
600 }
601
602 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
603 self.read_dyn(path, args).await
604 }
605
606 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
607 self.write_dyn(path, args).await
608 }
609
610 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
611 self.delete_dyn().await
612 }
613
614 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
615 self.list_dyn(path, args).await
616 }
617
618 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
619 self.copy_dyn(from, to, args).await
620 }
621
622 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
623 self.rename_dyn(from, to, args).await
624 }
625
626 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
627 self.presign_dyn(path, args).await
628 }
629
630 fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
631 self.blocking_create_dir_dyn(path, args)
632 }
633
634 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
635 self.blocking_read_dyn(path, args)
636 }
637
638 fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
639 self.blocking_stat_dyn(path, args)
640 }
641
642 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
643 self.blocking_write_dyn(path, args)
644 }
645
646 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
647 self.blocking_delete_dyn()
648 }
649
650 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
651 self.blocking_list_dyn(path, args)
652 }
653
654 fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
655 self.blocking_copy_dyn(from, to, args)
656 }
657
658 fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
659 self.blocking_rename_dyn(from, to, args)
660 }
661}
662
663/// Dummy implementation of accessor.
664impl Access for () {
665 type Reader = ();
666 type Writer = ();
667 type Lister = ();
668 type Deleter = ();
669 type BlockingReader = ();
670 type BlockingWriter = ();
671 type BlockingLister = ();
672 type BlockingDeleter = ();
673
674 fn info(&self) -> Arc<AccessorInfo> {
675 let ai = AccessorInfo::default();
676 ai.set_scheme(Scheme::Custom("dummy"))
677 .set_root("")
678 .set_name("dummy")
679 .set_native_capability(Capability::default());
680 ai.into()
681 }
682}
683
684/// All functions in `Accessor` only requires `&self`, so it's safe to implement
685/// `Accessor` for `Arc<impl Access>`.
686// If we use async fn directly, some weird higher rank trait bound error (`Send`/`Accessor` impl not general enough) will happen.
687// Probably related to https://github.com/rust-lang/rust/issues/96865
688#[allow(clippy::manual_async_fn)]
689impl<T: Access + ?Sized> Access for Arc<T> {
690 type Reader = T::Reader;
691 type Writer = T::Writer;
692 type Lister = T::Lister;
693 type Deleter = T::Deleter;
694 type BlockingReader = T::BlockingReader;
695 type BlockingWriter = T::BlockingWriter;
696 type BlockingLister = T::BlockingLister;
697 type BlockingDeleter = T::BlockingDeleter;
698
699 fn info(&self) -> Arc<AccessorInfo> {
700 self.as_ref().info()
701 }
702
703 fn create_dir(
704 &self,
705 path: &str,
706 args: OpCreateDir,
707 ) -> impl Future<Output = Result<RpCreateDir>> + MaybeSend {
708 async move { self.as_ref().create_dir(path, args).await }
709 }
710
711 fn stat(&self, path: &str, args: OpStat) -> impl Future<Output = Result<RpStat>> + MaybeSend {
712 async move { self.as_ref().stat(path, args).await }
713 }
714
715 fn read(
716 &self,
717 path: &str,
718 args: OpRead,
719 ) -> impl Future<Output = Result<(RpRead, Self::Reader)>> + MaybeSend {
720 async move { self.as_ref().read(path, args).await }
721 }
722
723 fn write(
724 &self,
725 path: &str,
726 args: OpWrite,
727 ) -> impl Future<Output = Result<(RpWrite, Self::Writer)>> + MaybeSend {
728 async move { self.as_ref().write(path, args).await }
729 }
730
731 fn delete(&self) -> impl Future<Output = Result<(RpDelete, Self::Deleter)>> + MaybeSend {
732 async move { self.as_ref().delete().await }
733 }
734
735 fn list(
736 &self,
737 path: &str,
738 args: OpList,
739 ) -> impl Future<Output = Result<(RpList, Self::Lister)>> + MaybeSend {
740 async move { self.as_ref().list(path, args).await }
741 }
742
743 fn copy(
744 &self,
745 from: &str,
746 to: &str,
747 args: OpCopy,
748 ) -> impl Future<Output = Result<RpCopy>> + MaybeSend {
749 async move { self.as_ref().copy(from, to, args).await }
750 }
751
752 fn rename(
753 &self,
754 from: &str,
755 to: &str,
756 args: OpRename,
757 ) -> impl Future<Output = Result<RpRename>> + MaybeSend {
758 async move { self.as_ref().rename(from, to, args).await }
759 }
760
761 fn presign(
762 &self,
763 path: &str,
764 args: OpPresign,
765 ) -> impl Future<Output = Result<RpPresign>> + MaybeSend {
766 async move { self.as_ref().presign(path, args).await }
767 }
768
769 fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
770 self.as_ref().blocking_create_dir(path, args)
771 }
772
773 fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
774 self.as_ref().blocking_stat(path, args)
775 }
776
777 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
778 self.as_ref().blocking_read(path, args)
779 }
780
781 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
782 self.as_ref().blocking_write(path, args)
783 }
784
785 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
786 self.as_ref().blocking_delete()
787 }
788
789 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
790 self.as_ref().blocking_list(path, args)
791 }
792
793 fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
794 self.as_ref().blocking_copy(from, to, args)
795 }
796
797 fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
798 self.as_ref().blocking_rename(from, to, args)
799 }
800}
801
802/// Accessor is the type erased accessor with `Arc<dyn Accessor>`.
803pub type Accessor = Arc<dyn AccessDyn>;
804
805#[derive(Debug, Default)]
806struct AccessorInfoInner {
807 scheme: Scheme,
808 root: Arc<str>,
809 name: Arc<str>,
810
811 native_capability: Capability,
812 full_capability: Capability,
813
814 http_client: HttpClient,
815 executor: Executor,
816}
817
818/// Info for the accessor. Users can use this struct to retrieve information about the underlying backend.
819///
820/// This struct is intentionally not implemented with `Clone` to ensure that all accesses
821/// within the same operator, access layers, and services use the same instance of `AccessorInfo`.
822/// This is especially important for `HttpClient` and `Executor`.
823///
824/// ## Panic Safety
825///
826/// All methods provided by `AccessorInfo` will safely handle lock poisoning scenarios.
827/// If the inner `RwLock` is poisoned (which happens when another thread panicked while holding
828/// the write lock), this method will gracefully continue execution.
829///
830/// - For read operations, the method will return the current state.
831/// - For write operations, the method will do nothing.
832///
833/// ## Maintain Notes
834///
835/// We are using `std::sync::RwLock` to provide thread-safe access to the inner data.
836///
837/// I have performed [the bench across different arc-swap alike crates](https://github.com/krdln/arc-swap-benches):
838///
839/// ```txt
840/// test arcswap ... bench: 14.85 ns/iter (+/- 0.33)
841/// test arcswap_full ... bench: 128.27 ns/iter (+/- 4.30)
842/// test baseline ... bench: 11.33 ns/iter (+/- 0.76)
843/// test mutex_4 ... bench: 296.73 ns/iter (+/- 49.96)
844/// test mutex_unconteded ... bench: 13.26 ns/iter (+/- 0.56)
845/// test rwlock_fast_4 ... bench: 201.60 ns/iter (+/- 7.47)
846/// test rwlock_fast_uncontended ... bench: 12.77 ns/iter (+/- 0.37)
847/// test rwlock_parking_4 ... bench: 232.02 ns/iter (+/- 11.14)
848/// test rwlock_parking_uncontended ... bench: 13.18 ns/iter (+/- 0.39)
849/// test rwlock_std_4 ... bench: 219.56 ns/iter (+/- 5.56)
850/// test rwlock_std_uncontended ... bench: 13.55 ns/iter (+/- 0.33)
851/// ```
852///
853/// The results show that as long as there aren't too many uncontended accesses, `RwLock` is the
854/// best choice, allowing for fast access and the ability to modify partial data without cloning
855/// everything.
856///
857/// And it's true: we only update and modify the internal data in a few instances, such as when
858/// building an operator or applying new layers.
859#[derive(Debug, Default)]
860pub struct AccessorInfo {
861 inner: std::sync::RwLock<AccessorInfoInner>,
862}
863
864impl PartialEq for AccessorInfo {
865 fn eq(&self, other: &Self) -> bool {
866 self.scheme() == other.scheme()
867 && self.root() == other.root()
868 && self.name() == other.name()
869 }
870}
871
872impl Eq for AccessorInfo {}
873
874impl Hash for AccessorInfo {
875 fn hash<H: Hasher>(&self, state: &mut H) {
876 self.scheme().hash(state);
877 self.root().hash(state);
878 self.name().hash(state);
879 }
880}
881
882impl AccessorInfo {
883 /// [`Scheme`] of backend.
884 ///
885 /// # Panic Safety
886 ///
887 /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned,
888 /// this method will gracefully continue execution by simply returning the current scheme.
889 pub fn scheme(&self) -> Scheme {
890 match self.inner.read() {
891 Ok(v) => v.scheme,
892 Err(err) => err.get_ref().scheme,
893 }
894 }
895
896 /// Set [`Scheme`] for backend.
897 ///
898 /// # Panic Safety
899 ///
900 /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned,
901 /// this method will gracefully continue execution by simply skipping the update operation
902 /// rather than propagating the panic.
903 pub fn set_scheme(&self, scheme: Scheme) -> &Self {
904 if let Ok(mut v) = self.inner.write() {
905 v.scheme = scheme;
906 }
907
908 self
909 }
910
911 /// Root of backend, will be in format like `/path/to/dir/`
912 ///
913 /// # Panic Safety
914 ///
915 /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned,
916 /// this method will gracefully continue execution by simply returning the current root.
917 pub fn root(&self) -> Arc<str> {
918 match self.inner.read() {
919 Ok(v) => v.root.clone(),
920 Err(err) => err.get_ref().root.clone(),
921 }
922 }
923
924 /// Set root for backend.
925 ///
926 /// Note: input root must be normalized.
927 ///
928 /// # Panic Safety
929 ///
930 /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned,
931 /// this method will gracefully continue execution by simply skipping the update operation
932 /// rather than propagating the panic.
933 pub fn set_root(&self, root: &str) -> &Self {
934 if let Ok(mut v) = self.inner.write() {
935 v.root = Arc::from(root);
936 }
937
938 self
939 }
940
941 /// Name of backend, could be empty if underlying backend doesn't have namespace concept.
942 ///
943 /// For example:
944 ///
945 /// - name for `s3` => bucket name
946 /// - name for `azblob` => container name
947 ///
948 /// # Panic Safety
949 ///
950 /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned,
951 /// this method will gracefully continue execution by simply returning the current scheme.
952 pub fn name(&self) -> Arc<str> {
953 match self.inner.read() {
954 Ok(v) => v.name.clone(),
955 Err(err) => err.get_ref().name.clone(),
956 }
957 }
958
959 /// Set name of this backend.
960 ///
961 /// # Panic Safety
962 ///
963 /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned,
964 /// this method will gracefully continue execution by simply skipping the update operation
965 /// rather than propagating the panic.
966 pub fn set_name(&self, name: &str) -> &Self {
967 if let Ok(mut v) = self.inner.write() {
968 v.name = Arc::from(name)
969 }
970
971 self
972 }
973
974 /// Get backend's native capabilities.
975 ///
976 /// # Panic Safety
977 ///
978 /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned,
979 /// this method will gracefully continue execution by simply returning the current native capability.
980 pub fn native_capability(&self) -> Capability {
981 match self.inner.read() {
982 Ok(v) => v.native_capability,
983 Err(err) => err.get_ref().native_capability,
984 }
985 }
986
987 /// Set native capabilities for service.
988 ///
989 /// # NOTES
990 ///
991 /// Set native capability will also flush the full capability. The only way to change
992 /// full_capability is via `update_full_capability`.
993 ///
994 /// # Panic Safety
995 ///
996 /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned,
997 /// this method will gracefully continue execution by simply skipping the update operation
998 /// rather than propagating the panic.
999 pub fn set_native_capability(&self, capability: Capability) -> &Self {
1000 if let Ok(mut v) = self.inner.write() {
1001 v.native_capability = capability;
1002 v.full_capability = capability;
1003 }
1004
1005 self
1006 }
1007
1008 /// Get service's full capabilities.
1009 ///
1010 /// # Panic Safety
1011 ///
1012 /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned,
1013 /// this method will gracefully continue execution by simply returning the current native capability.
1014 pub fn full_capability(&self) -> Capability {
1015 match self.inner.read() {
1016 Ok(v) => v.full_capability,
1017 Err(err) => err.get_ref().full_capability,
1018 }
1019 }
1020
1021 /// Update service's full capabilities.
1022 ///
1023 /// # Panic Safety
1024 ///
1025 /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned,
1026 /// this method will gracefully continue execution by simply skipping the update operation
1027 /// rather than propagating the panic.
1028 pub fn update_full_capability(&self, f: impl FnOnce(Capability) -> Capability) -> &Self {
1029 if let Ok(mut v) = self.inner.write() {
1030 v.full_capability = f(v.full_capability);
1031 }
1032
1033 self
1034 }
1035
1036 /// Get http client from the context.
1037 ///
1038 /// # Panic Safety
1039 ///
1040 /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned,
1041 /// this method will gracefully continue execution by simply returning the current http client.
1042 pub fn http_client(&self) -> HttpClient {
1043 match self.inner.read() {
1044 Ok(v) => v.http_client.clone(),
1045 Err(err) => err.get_ref().http_client.clone(),
1046 }
1047 }
1048
1049 /// Update http client for the context.
1050 ///
1051 /// # Note
1052 ///
1053 /// Requests must be forwarded to the old HTTP client after the update. Otherwise, features such as retry, tracing, and metrics may not function properly.
1054 ///
1055 /// # Panic Safety
1056 ///
1057 /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned,
1058 /// this method will gracefully continue execution by simply skipping the update operation.
1059 pub fn update_http_client(&self, f: impl FnOnce(HttpClient) -> HttpClient) -> &Self {
1060 if let Ok(mut v) = self.inner.write() {
1061 let client = mem::take(&mut v.http_client);
1062 v.http_client = f(client);
1063 }
1064
1065 self
1066 }
1067
1068 /// Get executor from the context.
1069 ///
1070 /// # Panic Safety
1071 ///
1072 /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned,
1073 /// this method will gracefully continue execution by simply returning the current executor.
1074 pub fn executor(&self) -> Executor {
1075 match self.inner.read() {
1076 Ok(v) => v.executor.clone(),
1077 Err(err) => err.get_ref().executor.clone(),
1078 }
1079 }
1080
1081 /// Update executor for the context.
1082 ///
1083 /// # Note
1084 ///
1085 /// Tasks must be forwarded to the old executor after the update. Otherwise, features such as retry, timeout, and metrics may not function properly.
1086 ///
1087 /// # Panic Safety
1088 ///
1089 /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned,
1090 /// this method will gracefully continue execution by simply skipping the update operation.
1091 pub fn update_executor(&self, f: impl FnOnce(Executor) -> Executor) -> &Self {
1092 if let Ok(mut v) = self.inner.write() {
1093 let executor = mem::take(&mut v.executor);
1094 v.executor = f(executor);
1095 }
1096
1097 self
1098 }
1099}