1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::future::Future;
21use std::sync::Arc;
22
23use crate::raw::*;
24use crate::*;
25
26#[derive(Default)]
41pub struct CorrectnessCheckLayer;
42
43impl<A: Access> Layer<A> for CorrectnessCheckLayer {
44 type LayeredAccess = CorrectnessAccessor<A>;
45
46 fn layer(&self, inner: A) -> Self::LayeredAccess {
47 CorrectnessAccessor {
48 info: inner.info(),
49 inner,
50 }
51 }
52}
53
54pub struct CorrectnessAccessor<A: Access> {
55 info: Arc<AccessorInfo>,
56 inner: A,
57}
58
59pub(crate) fn new_unsupported_error(info: &AccessorInfo, op: Operation, args: &str) -> Error {
60 let scheme = info.scheme();
61 let op = op.into_static();
62
63 Error::new(
64 ErrorKind::Unsupported,
65 format!("The service {scheme} does not support the operation {op} with the arguments {args}. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."),
66 )
67 .with_operation(op)
68}
69
70impl<A: Access> Debug for CorrectnessAccessor<A> {
71 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("CorrectnessCheckAccessor")
73 .field("inner", &self.inner)
74 .finish_non_exhaustive()
75 }
76}
77
78impl<A: Access> LayeredAccess for CorrectnessAccessor<A> {
79 type Inner = A;
80 type Reader = A::Reader;
81 type Writer = A::Writer;
82 type Lister = A::Lister;
83 type Deleter = CheckWrapper<A::Deleter>;
84
85 fn inner(&self) -> &Self::Inner {
86 &self.inner
87 }
88
89 fn info(&self) -> Arc<AccessorInfo> {
90 self.info.clone()
91 }
92
93 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
94 let capability = self.info.full_capability();
95 if !capability.read_with_version && args.version().is_some() {
96 return Err(new_unsupported_error(
97 self.info.as_ref(),
98 Operation::Read,
99 "version",
100 ));
101 }
102 if !capability.read_with_if_match && args.if_match().is_some() {
103 return Err(new_unsupported_error(
104 self.info.as_ref(),
105 Operation::Read,
106 "if_match",
107 ));
108 }
109 if !capability.read_with_if_none_match && args.if_none_match().is_some() {
110 return Err(new_unsupported_error(
111 self.info.as_ref(),
112 Operation::Read,
113 "if_none_match",
114 ));
115 }
116 if !capability.read_with_if_modified_since && args.if_modified_since().is_some() {
117 return Err(new_unsupported_error(
118 self.info.as_ref(),
119 Operation::Read,
120 "if_modified_since",
121 ));
122 }
123 if !capability.read_with_if_unmodified_since && args.if_unmodified_since().is_some() {
124 return Err(new_unsupported_error(
125 self.info.as_ref(),
126 Operation::Read,
127 "if_unmodified_since",
128 ));
129 }
130
131 self.inner.read(path, args).await
132 }
133
134 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
135 let capability = self.info.full_capability();
136 if args.append() && !capability.write_can_append {
137 return Err(new_unsupported_error(
138 &self.info,
139 Operation::Write,
140 "append",
141 ));
142 }
143 if args.if_not_exists() && !capability.write_with_if_not_exists {
144 return Err(new_unsupported_error(
145 &self.info,
146 Operation::Write,
147 "if_not_exists",
148 ));
149 }
150 if args.if_match().is_some() && !capability.write_with_if_match {
151 return Err(new_unsupported_error(
152 &self.info,
153 Operation::Write,
154 "if_match",
155 ));
156 }
157 if let Some(if_none_match) = args.if_none_match() {
158 if !capability.write_with_if_none_match {
159 let mut err =
160 new_unsupported_error(self.info.as_ref(), Operation::Write, "if_none_match");
161 if if_none_match == "*" && capability.write_with_if_not_exists {
162 err = err.with_context("hint", "use if_not_exists instead");
163 }
164
165 return Err(err);
166 }
167 }
168
169 self.inner.write(path, args).await
170 }
171
172 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
173 let capability = self.info.full_capability();
174 if !capability.stat_with_version && args.version().is_some() {
175 return Err(new_unsupported_error(
176 self.info.as_ref(),
177 Operation::Stat,
178 "version",
179 ));
180 }
181 if !capability.stat_with_if_match && args.if_match().is_some() {
182 return Err(new_unsupported_error(
183 self.info.as_ref(),
184 Operation::Stat,
185 "if_match",
186 ));
187 }
188 if !capability.stat_with_if_none_match && args.if_none_match().is_some() {
189 return Err(new_unsupported_error(
190 self.info.as_ref(),
191 Operation::Stat,
192 "if_none_match",
193 ));
194 }
195 if !capability.stat_with_if_modified_since && args.if_modified_since().is_some() {
196 return Err(new_unsupported_error(
197 self.info.as_ref(),
198 Operation::Stat,
199 "if_modified_since",
200 ));
201 }
202 if !capability.stat_with_if_unmodified_since && args.if_unmodified_since().is_some() {
203 return Err(new_unsupported_error(
204 self.info.as_ref(),
205 Operation::Stat,
206 "if_unmodified_since",
207 ));
208 }
209
210 self.inner.stat(path, args).await
211 }
212
213 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
214 self.inner.delete().await.map(|(rp, deleter)| {
215 let deleter = CheckWrapper::new(deleter, self.info.clone());
216 (rp, deleter)
217 })
218 }
219
220 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
221 let capability = self.info.full_capability();
222 if args.if_not_exists() && !capability.copy_with_if_not_exists {
223 return Err(new_unsupported_error(
224 &self.info,
225 Operation::Copy,
226 "if_not_exists",
227 ));
228 }
229
230 self.inner.copy(from, to, args).await
231 }
232
233 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
234 self.inner.list(path, args).await
235 }
236}
237
238pub struct CheckWrapper<T> {
239 info: Arc<AccessorInfo>,
240 inner: T,
241}
242
243impl<T> CheckWrapper<T> {
244 fn new(inner: T, info: Arc<AccessorInfo>) -> Self {
245 Self { inner, info }
246 }
247
248 fn check_delete(&self, args: &OpDelete) -> Result<()> {
249 if args.version().is_some() && !self.info.full_capability().delete_with_version {
250 return Err(new_unsupported_error(
251 &self.info,
252 Operation::Delete,
253 "version",
254 ));
255 }
256
257 if args.recursive() && !self.info.full_capability().delete_with_recursive {
258 return Err(new_unsupported_error(
259 &self.info,
260 Operation::Delete,
261 "recursive",
262 ));
263 }
264
265 Ok(())
266 }
267}
268
269impl<T: oio::Delete> oio::Delete for CheckWrapper<T> {
270 async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
271 self.check_delete(&args)?;
272 self.inner.delete(path, args).await
273 }
274
275 fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
276 self.inner.close()
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use crate::Capability;
284 use crate::EntryMode;
285 use crate::Metadata;
286 use crate::Operator;
287 use crate::raw::oio;
288
289 #[derive(Debug)]
290 struct MockService {
291 capability: Capability,
292 }
293
294 impl Access for MockService {
295 type Reader = oio::Reader;
296 type Writer = oio::Writer;
297 type Lister = oio::Lister;
298 type Deleter = oio::Deleter;
299
300 fn info(&self) -> Arc<AccessorInfo> {
301 let info = AccessorInfo::default();
302 info.set_scheme("memory");
303 info.set_native_capability(self.capability);
304
305 info.into()
306 }
307
308 async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
309 Ok(RpStat::new(Metadata::new(EntryMode::Unknown)))
310 }
311
312 async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
313 Ok((RpRead::new(), Box::new(bytes::Bytes::new())))
314 }
315
316 async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
317 Ok((RpWrite::new(), Box::new(MockWriter)))
318 }
319
320 async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
321 Ok((RpList::default(), Box::new(())))
322 }
323
324 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
325 Ok((RpDelete::default(), Box::new(MockDeleter)))
326 }
327 }
328
329 struct MockWriter;
330
331 impl oio::Write for MockWriter {
332 async fn write(&mut self, _: Buffer) -> Result<()> {
333 Ok(())
334 }
335
336 async fn close(&mut self) -> Result<Metadata> {
337 Ok(Metadata::default())
338 }
339
340 async fn abort(&mut self) -> Result<()> {
341 Ok(())
342 }
343 }
344
345 struct MockDeleter;
346
347 impl oio::Delete for MockDeleter {
348 async fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
349 Ok(())
350 }
351
352 async fn close(&mut self) -> Result<()> {
353 Ok(())
354 }
355 }
356
357 fn new_test_operator(capability: Capability) -> Operator {
358 let srv = MockService { capability };
359
360 Operator::from_inner(Arc::new(srv)).layer(CorrectnessCheckLayer)
361 }
362
363 #[tokio::test]
364 async fn test_read() {
365 let op = new_test_operator(Capability {
366 read: true,
367 ..Default::default()
368 });
369 let res = op.read_with("path").version("version").await;
370 assert!(res.is_err());
371 assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
372
373 let op = new_test_operator(Capability {
374 read: true,
375 read_with_version: true,
376 ..Default::default()
377 });
378 let res = op.read_with("path").version("version").await;
379 assert!(res.is_ok());
380 }
381
382 #[tokio::test]
383 async fn test_stat() {
384 let op = new_test_operator(Capability {
385 stat: true,
386 ..Default::default()
387 });
388 let res = op.stat_with("path").version("version").await;
389 assert!(res.is_err());
390 assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
391
392 let op = new_test_operator(Capability {
393 stat: true,
394 stat_with_version: true,
395 ..Default::default()
396 });
397 let res = op.stat_with("path").version("version").await;
398 assert!(res.is_ok());
399 }
400
401 #[tokio::test]
402 async fn test_write_with() {
403 let op = new_test_operator(Capability {
404 write: true,
405 write_with_if_not_exists: true,
406 ..Default::default()
407 });
408 let res = op.write_with("path", "".as_bytes()).append(true).await;
409 assert!(res.is_err());
410 assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
411
412 let res = op
413 .write_with("path", "".as_bytes())
414 .if_none_match("etag")
415 .await;
416 assert!(res.is_err());
417 assert_eq!(
418 res.unwrap_err().to_string(),
419 "Unsupported (permanent) at write => The service memory does not support the operation write with the arguments if_none_match. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."
420 );
421
422 let res = op
424 .write_with("path", "".as_bytes())
425 .if_none_match("*")
426 .await;
427 assert!(res.is_err());
428 assert_eq!(
429 res.unwrap_err().to_string(),
430 "Unsupported (permanent) at write, context: { hint: use if_not_exists instead } => The service memory does not support the operation write with the arguments if_none_match. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."
431 );
432
433 let res = op
434 .write_with("path", "".as_bytes())
435 .if_not_exists(true)
436 .await;
437 assert!(res.is_ok());
438
439 let op = new_test_operator(Capability {
440 write: true,
441 write_can_append: true,
442 write_with_if_not_exists: true,
443 write_with_if_none_match: true,
444 ..Default::default()
445 });
446 let res = op.writer_with("path").append(true).await;
447 assert!(res.is_ok());
448 }
449
450 #[tokio::test]
451 async fn test_delete() {
452 let op = new_test_operator(Capability {
453 delete: true,
454 ..Default::default()
455 });
456 let res = op.delete_with("path").version("version").await;
457 assert!(res.is_err());
458 assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
459
460 let op = new_test_operator(Capability {
461 delete: true,
462 delete_with_version: true,
463 ..Default::default()
464 });
465 let res = op.delete_with("path").version("version").await;
466 assert!(res.is_ok())
467 }
468}