1use std::fmt::{Debug, Formatter};
19use std::future::Future;
20use std::sync::Arc;
21
22use crate::raw::*;
23use crate::*;
24
25#[derive(Default)]
40pub struct CorrectnessCheckLayer;
41
42impl<A: Access> Layer<A> for CorrectnessCheckLayer {
43 type LayeredAccess = CorrectnessAccessor<A>;
44
45 fn layer(&self, inner: A) -> Self::LayeredAccess {
46 CorrectnessAccessor {
47 info: inner.info(),
48 inner,
49 }
50 }
51}
52
53pub(crate) fn new_unsupported_error(info: &AccessorInfo, op: Operation, args: &str) -> Error {
54 let scheme = info.scheme();
55 let op = op.into_static();
56
57 Error::new(
58 ErrorKind::Unsupported,
59 format!("The service {scheme} does not support the operation {op} with the arguments {args}. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."),
60 )
61 .with_operation(op)
62}
63
64pub struct CorrectnessAccessor<A: Access> {
65 info: Arc<AccessorInfo>,
66 inner: A,
67}
68
69impl<A: Access> Debug for CorrectnessAccessor<A> {
70 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("CorrectnessCheckAccessor")
72 .field("inner", &self.inner)
73 .finish_non_exhaustive()
74 }
75}
76
77impl<A: Access> LayeredAccess for CorrectnessAccessor<A> {
78 type Inner = A;
79 type Reader = A::Reader;
80 type Writer = A::Writer;
81 type Lister = A::Lister;
82 type Deleter = CheckWrapper<A::Deleter>;
83 type BlockingReader = A::BlockingReader;
84 type BlockingWriter = A::BlockingWriter;
85 type BlockingLister = A::BlockingLister;
86 type BlockingDeleter = CheckWrapper<A::BlockingDeleter>;
87
88 fn inner(&self) -> &Self::Inner {
89 &self.inner
90 }
91
92 fn info(&self) -> Arc<AccessorInfo> {
93 self.info.clone()
94 }
95
96 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
97 let capability = self.info.full_capability();
98 if !capability.read_with_version && args.version().is_some() {
99 return Err(new_unsupported_error(
100 self.info.as_ref(),
101 Operation::Read,
102 "version",
103 ));
104 }
105 if !capability.read_with_if_match && args.if_match().is_some() {
106 return Err(new_unsupported_error(
107 self.info.as_ref(),
108 Operation::Read,
109 "if_match",
110 ));
111 }
112 if !capability.read_with_if_none_match && args.if_none_match().is_some() {
113 return Err(new_unsupported_error(
114 self.info.as_ref(),
115 Operation::Read,
116 "if_none_match",
117 ));
118 }
119 if !capability.read_with_if_modified_since && args.if_modified_since().is_some() {
120 return Err(new_unsupported_error(
121 self.info.as_ref(),
122 Operation::Read,
123 "if_modified_since",
124 ));
125 }
126 if !capability.read_with_if_unmodified_since && args.if_unmodified_since().is_some() {
127 return Err(new_unsupported_error(
128 self.info.as_ref(),
129 Operation::Read,
130 "if_unmodified_since",
131 ));
132 }
133
134 self.inner.read(path, args).await
135 }
136
137 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
138 let capability = self.info.full_capability();
139 if args.append() && !capability.write_can_append {
140 return Err(new_unsupported_error(
141 &self.info,
142 Operation::Write,
143 "append",
144 ));
145 }
146 if args.if_not_exists() && !capability.write_with_if_not_exists {
147 return Err(new_unsupported_error(
148 &self.info,
149 Operation::Write,
150 "if_not_exists",
151 ));
152 }
153 if let Some(if_none_match) = args.if_none_match() {
154 if !capability.write_with_if_none_match {
155 let mut err =
156 new_unsupported_error(self.info.as_ref(), Operation::Write, "if_none_match");
157 if if_none_match == "*" && capability.write_with_if_not_exists {
158 err = err.with_context("hint", "use if_not_exists instead");
159 }
160
161 return Err(err);
162 }
163 }
164
165 self.inner.write(path, args).await
166 }
167
168 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
169 let capability = self.info.full_capability();
170 if !capability.stat_with_version && args.version().is_some() {
171 return Err(new_unsupported_error(
172 self.info.as_ref(),
173 Operation::Stat,
174 "version",
175 ));
176 }
177 if !capability.stat_with_if_match && args.if_match().is_some() {
178 return Err(new_unsupported_error(
179 self.info.as_ref(),
180 Operation::Stat,
181 "if_match",
182 ));
183 }
184 if !capability.stat_with_if_none_match && args.if_none_match().is_some() {
185 return Err(new_unsupported_error(
186 self.info.as_ref(),
187 Operation::Stat,
188 "if_none_match",
189 ));
190 }
191 if !capability.stat_with_if_modified_since && args.if_modified_since().is_some() {
192 return Err(new_unsupported_error(
193 self.info.as_ref(),
194 Operation::Stat,
195 "if_modified_since",
196 ));
197 }
198 if !capability.stat_with_if_unmodified_since && args.if_unmodified_since().is_some() {
199 return Err(new_unsupported_error(
200 self.info.as_ref(),
201 Operation::Stat,
202 "if_unmodified_since",
203 ));
204 }
205
206 self.inner.stat(path, args).await
207 }
208
209 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
210 self.inner.delete().await.map(|(rp, deleter)| {
211 let deleter = CheckWrapper::new(deleter, self.info.clone());
212 (rp, deleter)
213 })
214 }
215
216 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
217 self.inner.list(path, args).await
218 }
219
220 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
221 let capability = self.info.full_capability();
222 if !capability.read_with_version && args.version().is_some() {
223 return Err(new_unsupported_error(
224 self.info.as_ref(),
225 Operation::Read,
226 "version",
227 ));
228 }
229
230 self.inner.blocking_read(path, args)
231 }
232
233 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
234 let capability = self.info.full_capability();
235 if args.append() && !capability.write_can_append {
236 return Err(new_unsupported_error(
237 &self.info,
238 Operation::Write,
239 "append",
240 ));
241 }
242 if args.if_not_exists() && !capability.write_with_if_not_exists {
243 return Err(new_unsupported_error(
244 &self.info,
245 Operation::Write,
246 "if_not_exists",
247 ));
248 }
249 if args.if_none_match().is_some() && !capability.write_with_if_none_match {
250 return Err(new_unsupported_error(
251 self.info.as_ref(),
252 Operation::Write,
253 "if_none_match",
254 ));
255 }
256
257 self.inner.blocking_write(path, args)
258 }
259
260 fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
261 let capability = self.info.full_capability();
262 if !capability.stat_with_version && args.version().is_some() {
263 return Err(new_unsupported_error(
264 self.info.as_ref(),
265 Operation::Stat,
266 "version",
267 ));
268 }
269
270 self.inner.blocking_stat(path, args)
271 }
272
273 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
274 self.inner.blocking_delete().map(|(rp, deleter)| {
275 let deleter = CheckWrapper::new(deleter, self.info.clone());
276 (rp, deleter)
277 })
278 }
279
280 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
281 self.inner.blocking_list(path, args)
282 }
283}
284
285pub struct CheckWrapper<T> {
286 info: Arc<AccessorInfo>,
287 inner: T,
288}
289
290impl<T> CheckWrapper<T> {
291 fn new(inner: T, info: Arc<AccessorInfo>) -> Self {
292 Self { inner, info }
293 }
294
295 fn check_delete(&self, args: &OpDelete) -> Result<()> {
296 if args.version().is_some() && !self.info.full_capability().delete_with_version {
297 return Err(new_unsupported_error(
298 &self.info,
299 Operation::Delete,
300 "version",
301 ));
302 }
303
304 Ok(())
305 }
306}
307
308impl<T: oio::Delete> oio::Delete for CheckWrapper<T> {
309 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
310 self.check_delete(&args)?;
311 self.inner.delete(path, args)
312 }
313
314 fn flush(&mut self) -> impl Future<Output = Result<usize>> + MaybeSend {
315 self.inner.flush()
316 }
317}
318
319impl<T: oio::BlockingDelete> oio::BlockingDelete for CheckWrapper<T> {
320 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
321 self.check_delete(&args)?;
322 self.inner.delete(path, args)
323 }
324
325 fn flush(&mut self) -> Result<usize> {
326 self.inner.flush()
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use crate::raw::oio;
334 use crate::{Capability, EntryMode, Metadata, Operator};
335
336 #[derive(Debug)]
337 struct MockService {
338 capability: Capability,
339 }
340
341 impl Access for MockService {
342 type Reader = oio::Reader;
343 type Writer = oio::Writer;
344 type Lister = oio::Lister;
345 type Deleter = oio::Deleter;
346 type BlockingReader = oio::BlockingReader;
347 type BlockingWriter = oio::BlockingWriter;
348 type BlockingLister = oio::BlockingLister;
349 type BlockingDeleter = oio::BlockingDeleter;
350
351 fn info(&self) -> Arc<AccessorInfo> {
352 let info = AccessorInfo::default();
353 info.set_native_capability(self.capability);
354
355 info.into()
356 }
357
358 async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
359 Ok(RpStat::new(Metadata::new(EntryMode::Unknown)))
360 }
361
362 async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
363 Ok((RpRead::new(), Box::new(bytes::Bytes::new())))
364 }
365
366 async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
367 Ok((RpWrite::new(), Box::new(MockWriter)))
368 }
369
370 async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
371 Ok((RpList::default(), Box::new(())))
372 }
373
374 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
375 Ok((RpDelete::default(), Box::new(MockDeleter)))
376 }
377 }
378
379 struct MockWriter;
380
381 impl oio::Write for MockWriter {
382 async fn write(&mut self, _: Buffer) -> Result<()> {
383 Ok(())
384 }
385
386 async fn close(&mut self) -> Result<Metadata> {
387 Ok(Metadata::default())
388 }
389
390 async fn abort(&mut self) -> Result<()> {
391 Ok(())
392 }
393 }
394
395 struct MockDeleter;
396
397 impl oio::Delete for MockDeleter {
398 fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
399 Ok(())
400 }
401
402 async fn flush(&mut self) -> Result<usize> {
403 Ok(1)
404 }
405 }
406
407 fn new_test_operator(capability: Capability) -> Operator {
408 let srv = MockService { capability };
409
410 Operator::from_inner(Arc::new(srv)).layer(CorrectnessCheckLayer)
411 }
412
413 #[tokio::test]
414 async fn test_read() {
415 let op = new_test_operator(Capability {
416 read: true,
417 ..Default::default()
418 });
419 let res = op.read_with("path").version("version").await;
420 assert!(res.is_err());
421 assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
422
423 let op = new_test_operator(Capability {
424 read: true,
425 read_with_version: true,
426 ..Default::default()
427 });
428 let res = op.read_with("path").version("version").await;
429 assert!(res.is_ok());
430 }
431
432 #[tokio::test]
433 async fn test_stat() {
434 let op = new_test_operator(Capability {
435 stat: true,
436 ..Default::default()
437 });
438 let res = op.stat_with("path").version("version").await;
439 assert!(res.is_err());
440 assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
441
442 let op = new_test_operator(Capability {
443 stat: true,
444 stat_with_version: true,
445 ..Default::default()
446 });
447 let res = op.stat_with("path").version("version").await;
448 assert!(res.is_ok());
449 }
450
451 #[tokio::test]
452 async fn test_write_with() {
453 let op = new_test_operator(Capability {
454 write: true,
455 write_with_if_not_exists: true,
456 ..Default::default()
457 });
458 let res = op.write_with("path", "".as_bytes()).append(true).await;
459 assert!(res.is_err());
460 assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
461
462 let res = op
463 .write_with("path", "".as_bytes())
464 .if_none_match("etag")
465 .await;
466 assert!(res.is_err());
467 assert_eq!(
468 res.unwrap_err().to_string(),
469 "Unsupported (permanent) at write => The service memory does not support the operation write with the arguments if_none_match. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."
470 );
471
472 let res = op
474 .write_with("path", "".as_bytes())
475 .if_none_match("*")
476 .await;
477 assert!(res.is_err());
478 assert_eq!(
479 res.unwrap_err().to_string(),
480 "Unsupported (permanent) at write, context: { hint: use if_not_exists instead } => The service memory does not support the operation write with the arguments if_none_match. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."
481 );
482
483 let res = op
484 .write_with("path", "".as_bytes())
485 .if_not_exists(true)
486 .await;
487 assert!(res.is_ok());
488
489 let op = new_test_operator(Capability {
490 write: true,
491 write_can_append: true,
492 write_with_if_not_exists: true,
493 write_with_if_none_match: true,
494 ..Default::default()
495 });
496 let res = op.writer_with("path").append(true).await;
497 assert!(res.is_ok());
498 }
499
500 #[tokio::test]
501 async fn test_delete() {
502 let op = new_test_operator(Capability {
503 delete: true,
504 ..Default::default()
505 });
506 let res = op.delete_with("path").version("version").await;
507 assert!(res.is_err());
508 assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
509
510 let op = new_test_operator(Capability {
511 delete: true,
512 delete_with_version: true,
513 ..Default::default()
514 });
515 let res = op.delete_with("path").version("version").await;
516 assert!(res.is_ok())
517 }
518}