1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::future::Future;
21use std::sync::Arc;
22
23use crate::raw::*;
24use crate::*;
25
26#[derive(Default)]
41pub struct CorrectnessCheckLayer;
42
43impl<A: Access> Layer<A> for CorrectnessCheckLayer {
44 type LayeredAccess = CorrectnessAccessor<A>;
45
46 fn layer(&self, inner: A) -> Self::LayeredAccess {
47 CorrectnessAccessor {
48 info: inner.info(),
49 inner,
50 }
51 }
52}
53
54pub(crate) fn new_unsupported_error(info: &AccessorInfo, op: Operation, args: &str) -> Error {
55 let scheme = info.scheme();
56 let op = op.into_static();
57
58 Error::new(
59 ErrorKind::Unsupported,
60 format!("The service {scheme} does not support the operation {op} with the arguments {args}. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."),
61 )
62 .with_operation(op)
63}
64
65pub struct CorrectnessAccessor<A: Access> {
66 info: Arc<AccessorInfo>,
67 inner: A,
68}
69
70impl<A: Access> Debug for CorrectnessAccessor<A> {
71 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("CorrectnessCheckAccessor")
73 .field("inner", &self.inner)
74 .finish_non_exhaustive()
75 }
76}
77
78impl<A: Access> LayeredAccess for CorrectnessAccessor<A> {
79 type Inner = A;
80 type Reader = A::Reader;
81 type Writer = A::Writer;
82 type Lister = A::Lister;
83 type Deleter = CheckWrapper<A::Deleter>;
84
85 fn inner(&self) -> &Self::Inner {
86 &self.inner
87 }
88
89 fn info(&self) -> Arc<AccessorInfo> {
90 self.info.clone()
91 }
92
93 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
94 let capability = self.info.full_capability();
95 if !capability.read_with_version && args.version().is_some() {
96 return Err(new_unsupported_error(
97 self.info.as_ref(),
98 Operation::Read,
99 "version",
100 ));
101 }
102 if !capability.read_with_if_match && args.if_match().is_some() {
103 return Err(new_unsupported_error(
104 self.info.as_ref(),
105 Operation::Read,
106 "if_match",
107 ));
108 }
109 if !capability.read_with_if_none_match && args.if_none_match().is_some() {
110 return Err(new_unsupported_error(
111 self.info.as_ref(),
112 Operation::Read,
113 "if_none_match",
114 ));
115 }
116 if !capability.read_with_if_modified_since && args.if_modified_since().is_some() {
117 return Err(new_unsupported_error(
118 self.info.as_ref(),
119 Operation::Read,
120 "if_modified_since",
121 ));
122 }
123 if !capability.read_with_if_unmodified_since && args.if_unmodified_since().is_some() {
124 return Err(new_unsupported_error(
125 self.info.as_ref(),
126 Operation::Read,
127 "if_unmodified_since",
128 ));
129 }
130
131 self.inner.read(path, args).await
132 }
133
134 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
135 let capability = self.info.full_capability();
136 if args.append() && !capability.write_can_append {
137 return Err(new_unsupported_error(
138 &self.info,
139 Operation::Write,
140 "append",
141 ));
142 }
143 if args.if_not_exists() && !capability.write_with_if_not_exists {
144 return Err(new_unsupported_error(
145 &self.info,
146 Operation::Write,
147 "if_not_exists",
148 ));
149 }
150 if let Some(if_none_match) = args.if_none_match() {
151 if !capability.write_with_if_none_match {
152 let mut err =
153 new_unsupported_error(self.info.as_ref(), Operation::Write, "if_none_match");
154 if if_none_match == "*" && capability.write_with_if_not_exists {
155 err = err.with_context("hint", "use if_not_exists instead");
156 }
157
158 return Err(err);
159 }
160 }
161
162 self.inner.write(path, args).await
163 }
164
165 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
166 let capability = self.info.full_capability();
167 if !capability.stat_with_version && args.version().is_some() {
168 return Err(new_unsupported_error(
169 self.info.as_ref(),
170 Operation::Stat,
171 "version",
172 ));
173 }
174 if !capability.stat_with_if_match && args.if_match().is_some() {
175 return Err(new_unsupported_error(
176 self.info.as_ref(),
177 Operation::Stat,
178 "if_match",
179 ));
180 }
181 if !capability.stat_with_if_none_match && args.if_none_match().is_some() {
182 return Err(new_unsupported_error(
183 self.info.as_ref(),
184 Operation::Stat,
185 "if_none_match",
186 ));
187 }
188 if !capability.stat_with_if_modified_since && args.if_modified_since().is_some() {
189 return Err(new_unsupported_error(
190 self.info.as_ref(),
191 Operation::Stat,
192 "if_modified_since",
193 ));
194 }
195 if !capability.stat_with_if_unmodified_since && args.if_unmodified_since().is_some() {
196 return Err(new_unsupported_error(
197 self.info.as_ref(),
198 Operation::Stat,
199 "if_unmodified_since",
200 ));
201 }
202
203 self.inner.stat(path, args).await
204 }
205
206 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
207 self.inner.delete().await.map(|(rp, deleter)| {
208 let deleter = CheckWrapper::new(deleter, self.info.clone());
209 (rp, deleter)
210 })
211 }
212
213 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
214 self.inner.list(path, args).await
215 }
216}
217
218pub struct CheckWrapper<T> {
219 info: Arc<AccessorInfo>,
220 inner: T,
221}
222
223impl<T> CheckWrapper<T> {
224 fn new(inner: T, info: Arc<AccessorInfo>) -> Self {
225 Self { inner, info }
226 }
227
228 fn check_delete(&self, args: &OpDelete) -> Result<()> {
229 if args.version().is_some() && !self.info.full_capability().delete_with_version {
230 return Err(new_unsupported_error(
231 &self.info,
232 Operation::Delete,
233 "version",
234 ));
235 }
236
237 Ok(())
238 }
239}
240
241impl<T: oio::Delete> oio::Delete for CheckWrapper<T> {
242 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
243 self.check_delete(&args)?;
244 self.inner.delete(path, args)
245 }
246
247 fn flush(&mut self) -> impl Future<Output = Result<usize>> + MaybeSend {
248 self.inner.flush()
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255 use crate::raw::oio;
256 use crate::Capability;
257 use crate::EntryMode;
258 use crate::Metadata;
259 use crate::Operator;
260
261 #[derive(Debug)]
262 struct MockService {
263 capability: Capability,
264 }
265
266 impl Access for MockService {
267 type Reader = oio::Reader;
268 type Writer = oio::Writer;
269 type Lister = oio::Lister;
270 type Deleter = oio::Deleter;
271
272 fn info(&self) -> Arc<AccessorInfo> {
273 let info = AccessorInfo::default();
274 info.set_native_capability(self.capability);
275
276 info.into()
277 }
278
279 async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
280 Ok(RpStat::new(Metadata::new(EntryMode::Unknown)))
281 }
282
283 async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
284 Ok((RpRead::new(), Box::new(bytes::Bytes::new())))
285 }
286
287 async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
288 Ok((RpWrite::new(), Box::new(MockWriter)))
289 }
290
291 async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
292 Ok((RpList::default(), Box::new(())))
293 }
294
295 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
296 Ok((RpDelete::default(), Box::new(MockDeleter)))
297 }
298 }
299
300 struct MockWriter;
301
302 impl oio::Write for MockWriter {
303 async fn write(&mut self, _: Buffer) -> Result<()> {
304 Ok(())
305 }
306
307 async fn close(&mut self) -> Result<Metadata> {
308 Ok(Metadata::default())
309 }
310
311 async fn abort(&mut self) -> Result<()> {
312 Ok(())
313 }
314 }
315
316 struct MockDeleter;
317
318 impl oio::Delete for MockDeleter {
319 fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> {
320 Ok(())
321 }
322
323 async fn flush(&mut self) -> Result<usize> {
324 Ok(1)
325 }
326 }
327
328 fn new_test_operator(capability: Capability) -> Operator {
329 let srv = MockService { capability };
330
331 Operator::from_inner(Arc::new(srv)).layer(CorrectnessCheckLayer)
332 }
333
334 #[tokio::test]
335 async fn test_read() {
336 let op = new_test_operator(Capability {
337 read: true,
338 ..Default::default()
339 });
340 let res = op.read_with("path").version("version").await;
341 assert!(res.is_err());
342 assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
343
344 let op = new_test_operator(Capability {
345 read: true,
346 read_with_version: true,
347 ..Default::default()
348 });
349 let res = op.read_with("path").version("version").await;
350 assert!(res.is_ok());
351 }
352
353 #[tokio::test]
354 async fn test_stat() {
355 let op = new_test_operator(Capability {
356 stat: true,
357 ..Default::default()
358 });
359 let res = op.stat_with("path").version("version").await;
360 assert!(res.is_err());
361 assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
362
363 let op = new_test_operator(Capability {
364 stat: true,
365 stat_with_version: true,
366 ..Default::default()
367 });
368 let res = op.stat_with("path").version("version").await;
369 assert!(res.is_ok());
370 }
371
372 #[tokio::test]
373 async fn test_write_with() {
374 let op = new_test_operator(Capability {
375 write: true,
376 write_with_if_not_exists: true,
377 ..Default::default()
378 });
379 let res = op.write_with("path", "".as_bytes()).append(true).await;
380 assert!(res.is_err());
381 assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
382
383 let res = op
384 .write_with("path", "".as_bytes())
385 .if_none_match("etag")
386 .await;
387 assert!(res.is_err());
388 assert_eq!(
389 res.unwrap_err().to_string(),
390 "Unsupported (permanent) at write => The service memory does not support the operation write with the arguments if_none_match. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."
391 );
392
393 let res = op
395 .write_with("path", "".as_bytes())
396 .if_none_match("*")
397 .await;
398 assert!(res.is_err());
399 assert_eq!(
400 res.unwrap_err().to_string(),
401 "Unsupported (permanent) at write, context: { hint: use if_not_exists instead } => The service memory does not support the operation write with the arguments if_none_match. Please verify if the relevant flags have been enabled, or submit an issue if you believe this is incorrect."
402 );
403
404 let res = op
405 .write_with("path", "".as_bytes())
406 .if_not_exists(true)
407 .await;
408 assert!(res.is_ok());
409
410 let op = new_test_operator(Capability {
411 write: true,
412 write_can_append: true,
413 write_with_if_not_exists: true,
414 write_with_if_none_match: true,
415 ..Default::default()
416 });
417 let res = op.writer_with("path").append(true).await;
418 assert!(res.is_ok());
419 }
420
421 #[tokio::test]
422 async fn test_delete() {
423 let op = new_test_operator(Capability {
424 delete: true,
425 ..Default::default()
426 });
427 let res = op.delete_with("path").version("version").await;
428 assert!(res.is_err());
429 assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
430
431 let op = new_test_operator(Capability {
432 delete: true,
433 delete_with_version: true,
434 ..Default::default()
435 });
436 let res = op.delete_with("path").version("version").await;
437 assert!(res.is_ok())
438 }
439}