opendal/layers/
chaos.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19use std::sync::Mutex;
20
21use rand::prelude::*;
22use rand::rngs::StdRng;
23
24use crate::raw::*;
25use crate::*;
26
27/// Inject chaos into underlying services for robustness test.
28///
29/// # Chaos
30///
31/// Chaos tests is a part of stress test. By generating errors at specified
32/// error ratio, we can reproduce underlying services error more reliable.
33///
34/// Running tests under ChaosLayer will make your application more robust.
35///
36/// For example: If we specify an error rate of 0.5, there is a 50% chance
37/// of an EOF error for every read operation.
38///
39/// # Note
40///
41/// For now, ChaosLayer only injects read operations. More operations may
42/// be added in the future.
43///
44/// # Examples
45///
46/// ```no_run
47/// # use opendal::layers::ChaosLayer;
48/// # use opendal::services;
49/// # use opendal::Operator;
50/// # use opendal::Result;
51/// # use opendal::Scheme;
52///
53/// # fn main() -> Result<()> {
54/// let _ = Operator::new(services::Memory::default())?
55///     .layer(ChaosLayer::new(0.1))
56///     .finish();
57/// Ok(())
58/// # }
59/// ```
60#[derive(Debug, Clone)]
61pub struct ChaosLayer {
62    error_ratio: f64,
63}
64
65impl ChaosLayer {
66    /// Create a new chaos layer with specified error ratio.
67    ///
68    /// # Panics
69    ///
70    /// Input error_ratio must in [0.0..=1.0]
71    pub fn new(error_ratio: f64) -> Self {
72        assert!(
73            (0.0..=1.0).contains(&error_ratio),
74            "error_ratio must between 0.0 and 1.0"
75        );
76        Self { error_ratio }
77    }
78}
79
80impl<A: Access> Layer<A> for ChaosLayer {
81    type LayeredAccess = ChaosAccessor<A>;
82
83    fn layer(&self, inner: A) -> Self::LayeredAccess {
84        ChaosAccessor {
85            inner,
86            rng: StdRng::from_entropy(),
87            error_ratio: self.error_ratio,
88        }
89    }
90}
91
92#[derive(Debug)]
93pub struct ChaosAccessor<A> {
94    inner: A,
95    rng: StdRng,
96
97    error_ratio: f64,
98}
99
100impl<A: Access> LayeredAccess for ChaosAccessor<A> {
101    type Inner = A;
102    type Reader = ChaosReader<A::Reader>;
103    type BlockingReader = ChaosReader<A::BlockingReader>;
104    type Writer = A::Writer;
105    type BlockingWriter = A::BlockingWriter;
106    type Lister = A::Lister;
107    type BlockingLister = A::BlockingLister;
108    type Deleter = A::Deleter;
109    type BlockingDeleter = A::BlockingDeleter;
110
111    fn inner(&self) -> &Self::Inner {
112        &self.inner
113    }
114
115    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
116        self.inner
117            .read(path, args)
118            .await
119            .map(|(rp, r)| (rp, ChaosReader::new(r, self.rng.clone(), self.error_ratio)))
120    }
121
122    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
123        self.inner
124            .blocking_read(path, args)
125            .map(|(rp, r)| (rp, ChaosReader::new(r, self.rng.clone(), self.error_ratio)))
126    }
127
128    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
129        self.inner.write(path, args).await
130    }
131
132    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
133        self.inner.blocking_write(path, args)
134    }
135
136    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
137        self.inner.list(path, args).await
138    }
139
140    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
141        self.inner.blocking_list(path, args)
142    }
143
144    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
145        self.inner.delete().await
146    }
147
148    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
149        self.inner.blocking_delete()
150    }
151}
152
153/// ChaosReader will inject error into read operations.
154pub struct ChaosReader<R> {
155    inner: R,
156    rng: Arc<Mutex<StdRng>>,
157
158    error_ratio: f64,
159}
160
161impl<R> ChaosReader<R> {
162    fn new(inner: R, rng: StdRng, error_ratio: f64) -> Self {
163        Self {
164            inner,
165            rng: Arc::new(Mutex::new(rng)),
166            error_ratio,
167        }
168    }
169
170    /// If I feel lucky, we can return the correct response. Otherwise,
171    /// we need to generate an error.
172    fn i_feel_lucky(&self) -> bool {
173        let point = self.rng.lock().unwrap().gen_range(0..=100);
174        point >= (self.error_ratio * 100.0) as i32
175    }
176
177    fn unexpected_eof() -> Error {
178        Error::new(ErrorKind::Unexpected, "I am your chaos!")
179            .with_operation("chaos")
180            .set_temporary()
181    }
182}
183
184impl<R: oio::Read> oio::Read for ChaosReader<R> {
185    async fn read(&mut self) -> Result<Buffer> {
186        if self.i_feel_lucky() {
187            self.inner.read().await
188        } else {
189            Err(Self::unexpected_eof())
190        }
191    }
192}
193
194impl<R: oio::BlockingRead> oio::BlockingRead for ChaosReader<R> {
195    fn read(&mut self) -> Result<Buffer> {
196        if self.i_feel_lucky() {
197            self.inner.read()
198        } else {
199            Err(Self::unexpected_eof())
200        }
201    }
202}