opendal_core/raw/oio/write/
block_write.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use futures::Future;
21use futures::FutureExt;
22use futures::TryFutureExt;
23use futures::select;
24use uuid::Uuid;
25
26use crate::raw::*;
27use crate::*;
28
29/// BlockWrite is used to implement [`oio::Write`] based on block
30/// uploads. By implementing BlockWrite, services don't need to
31/// care about the details of uploading blocks.
32///
33/// # Architecture
34///
35/// The architecture after adopting [`BlockWrite`]:
36///
37/// - Services impl `BlockWrite`
38/// - `BlockWriter` impl `Write`
39/// - Expose `BlockWriter` as `Accessor::Writer`
40///
41/// # Notes
42///
43/// `BlockWrite` has an oneshot optimization when `write` has been called only once:
44///
45/// ```no_build
46/// w.write(bs).await?;
47/// w.close().await?;
48/// ```
49///
50/// We will use `write_once` instead of starting a new block upload.
51///
52/// # Requirements
53///
54/// Services that implement `BlockWrite` must fulfill the following requirements:
55///
56/// - Must be a http service that could accept `AsyncBody`.
57/// - Don't need initialization before writing.
58/// - Block ID is generated by caller `BlockWrite` instead of services.
59/// - Complete block by an ordered block id list.
60pub trait BlockWrite: Send + Sync + Unpin + 'static {
61    /// write_once is used to write the data to underlying storage at once.
62    ///
63    /// BlockWriter will call this API when:
64    ///
65    /// - All the data has been written to the buffer and we can perform the upload at once.
66    fn write_once(
67        &self,
68        size: u64,
69        body: Buffer,
70    ) -> impl Future<Output = Result<Metadata>> + MaybeSend;
71
72    /// write_block will write a block of the data.
73    ///
74    /// BlockWriter will call this API and stores the result in
75    /// order.
76    ///
77    /// - block_id is the id of the block.
78    fn write_block(
79        &self,
80        block_id: Uuid,
81        size: u64,
82        body: Buffer,
83    ) -> impl Future<Output = Result<()>> + MaybeSend;
84
85    /// complete_block will complete the block upload to build the final
86    /// file.
87    fn complete_block(
88        &self,
89        block_ids: Vec<Uuid>,
90    ) -> impl Future<Output = Result<Metadata>> + MaybeSend;
91
92    /// abort_block will cancel the block upload and purge all data.
93    fn abort_block(&self, block_ids: Vec<Uuid>) -> impl Future<Output = Result<()>> + MaybeSend;
94}
95
96struct WriteInput<W: BlockWrite> {
97    w: Arc<W>,
98    executor: Executor,
99    block_id: Uuid,
100    bytes: Buffer,
101}
102
103/// BlockWriter will implement [`oio::Write`] based on block
104/// uploads.
105pub struct BlockWriter<W: BlockWrite> {
106    w: Arc<W>,
107    executor: Executor,
108
109    started: bool,
110    block_ids: Vec<Uuid>,
111    cache: Option<Buffer>,
112    tasks: ConcurrentTasks<WriteInput<W>, Uuid>,
113}
114
115impl<W: BlockWrite> BlockWriter<W> {
116    /// Create a new BlockWriter.
117    pub fn new(info: Arc<AccessorInfo>, inner: W, concurrent: usize) -> Self {
118        let executor = info.executor();
119
120        Self {
121            w: Arc::new(inner),
122            executor: executor.clone(),
123            started: false,
124            block_ids: Vec::new(),
125            cache: None,
126
127            tasks: ConcurrentTasks::new(executor, concurrent, 8192, |input| {
128                Box::pin(async move {
129                    let fut = input
130                        .w
131                        .write_block(
132                            input.block_id,
133                            input.bytes.len() as u64,
134                            input.bytes.clone(),
135                        )
136                        .map_ok(|_| input.block_id);
137                    match input.executor.timeout() {
138                        None => {
139                            let result = fut.await;
140                            (input, result)
141                        }
142                        Some(timeout) => {
143                            let result = select! {
144                                result = fut.fuse() => {
145                                    result
146                                }
147                                _ = timeout.fuse() => {
148                                      Err(Error::new(
149                                            ErrorKind::Unexpected, "write block timeout")
150                                                .with_context("block_id", input.block_id.to_string())
151                                                .set_temporary())
152                                }
153                            };
154                            (input, result)
155                        }
156                    }
157                })
158            }),
159        }
160    }
161
162    fn fill_cache(&mut self, bs: Buffer) -> usize {
163        let size = bs.len();
164        assert!(self.cache.is_none());
165        self.cache = Some(bs);
166        size
167    }
168}
169
170impl<W> oio::Write for BlockWriter<W>
171where
172    W: BlockWrite,
173{
174    async fn write(&mut self, bs: Buffer) -> Result<()> {
175        if !self.started && self.cache.is_none() {
176            self.fill_cache(bs);
177            return Ok(());
178        }
179
180        // The block upload process has been started.
181        self.started = true;
182
183        let bytes = self.cache.clone().expect("pending write must exist");
184        self.tasks
185            .execute(WriteInput {
186                w: self.w.clone(),
187                executor: self.executor.clone(),
188                block_id: Uuid::new_v4(),
189                bytes,
190            })
191            .await?;
192        self.cache = None;
193        self.fill_cache(bs);
194        Ok(())
195    }
196
197    async fn close(&mut self) -> Result<Metadata> {
198        if !self.started {
199            let (size, body) = match self.cache.clone() {
200                Some(cache) => (cache.len(), cache),
201                None => (0, Buffer::new()),
202            };
203
204            let meta = self.w.write_once(size as u64, body).await?;
205            self.cache = None;
206            return Ok(meta);
207        }
208
209        if let Some(cache) = self.cache.clone() {
210            self.tasks
211                .execute(WriteInput {
212                    w: self.w.clone(),
213                    executor: self.executor.clone(),
214                    block_id: Uuid::new_v4(),
215                    bytes: cache,
216                })
217                .await?;
218            self.cache = None;
219        }
220
221        loop {
222            let Some(result) = self.tasks.next().await.transpose()? else {
223                break;
224            };
225            self.block_ids.push(result);
226        }
227
228        let block_ids = self.block_ids.clone();
229        self.w.complete_block(block_ids).await
230    }
231
232    async fn abort(&mut self) -> Result<()> {
233        if !self.started {
234            return Ok(());
235        }
236
237        self.tasks.clear();
238        self.cache = None;
239        self.w.abort_block(self.block_ids.clone()).await?;
240        Ok(())
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use std::collections::HashMap;
247    use std::sync::Mutex;
248
249    use pretty_assertions::assert_eq;
250    use rand::Rng;
251    use rand::RngCore;
252    use rand::thread_rng;
253    use tokio::time::sleep;
254
255    use super::*;
256    use crate::raw::oio::Write;
257
258    struct TestWrite {
259        length: u64,
260        bytes: HashMap<Uuid, Buffer>,
261        content: Option<Buffer>,
262    }
263
264    impl TestWrite {
265        pub fn new() -> Arc<Mutex<Self>> {
266            let v = Self {
267                length: 0,
268                bytes: HashMap::new(),
269                content: None,
270            };
271
272            Arc::new(Mutex::new(v))
273        }
274    }
275
276    impl BlockWrite for Arc<Mutex<TestWrite>> {
277        async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
278            sleep(Duration::from_nanos(50)).await;
279
280            if thread_rng().gen_bool(1.0 / 10.0) {
281                return Err(
282                    Error::new(ErrorKind::Unexpected, "I'm a crazy monkey!").set_temporary()
283                );
284            }
285
286            let mut this = self.lock().unwrap();
287            this.length = size;
288            this.content = Some(body);
289            Ok(Metadata::default())
290        }
291
292        async fn write_block(&self, block_id: Uuid, size: u64, body: Buffer) -> Result<()> {
293            // Add an async sleep here to enforce some pending.
294            sleep(Duration::from_millis(50)).await;
295
296            // We will have 10% percent rate for write part to fail.
297            if thread_rng().gen_bool(1.0 / 10.0) {
298                return Err(
299                    Error::new(ErrorKind::Unexpected, "I'm a crazy monkey!").set_temporary()
300                );
301            }
302
303            let mut this = self.lock().unwrap();
304            this.length += size;
305            this.bytes.insert(block_id, body);
306
307            Ok(())
308        }
309
310        async fn complete_block(&self, block_ids: Vec<Uuid>) -> Result<Metadata> {
311            let mut this = self.lock().unwrap();
312            let mut bs = Vec::new();
313            for id in block_ids {
314                bs.push(this.bytes[&id].clone());
315            }
316            this.content = Some(bs.into_iter().flatten().collect());
317
318            Ok(Metadata::default())
319        }
320
321        async fn abort_block(&self, _: Vec<Uuid>) -> Result<()> {
322            Ok(())
323        }
324    }
325
326    #[tokio::test]
327    async fn test_block_writer_with_concurrent_errors() {
328        let mut rng = thread_rng();
329
330        let mut w = BlockWriter::new(Arc::default(), TestWrite::new(), 8);
331        let mut total_size = 0u64;
332        let mut expected_content = Vec::new();
333
334        for _ in 0..1000 {
335            let size = rng.gen_range(1..1024);
336            total_size += size as u64;
337
338            let mut bs = vec![0; size];
339            rng.fill_bytes(&mut bs);
340
341            expected_content.extend_from_slice(&bs);
342
343            loop {
344                match w.write(bs.clone().into()).await {
345                    Ok(_) => break,
346                    Err(_) => continue,
347                }
348            }
349        }
350
351        loop {
352            match w.close().await {
353                Ok(_) => break,
354                Err(_) => continue,
355            }
356        }
357
358        let inner = w.w.lock().unwrap();
359
360        assert_eq!(total_size, inner.length, "length must be the same");
361        assert!(inner.content.is_some());
362        assert_eq!(
363            expected_content,
364            inner.content.clone().unwrap().to_bytes(),
365            "content must be the same"
366        );
367    }
368
369    #[tokio::test]
370    async fn test_block_writer_with_retry_when_write_once_error() {
371        let mut rng = thread_rng();
372
373        for _ in 1..100 {
374            let mut w = BlockWriter::new(Arc::default(), TestWrite::new(), 8);
375
376            let size = rng.gen_range(1..1024);
377            let mut bs = vec![0; size];
378            rng.fill_bytes(&mut bs);
379
380            loop {
381                match w.write(bs.clone().into()).await {
382                    Ok(_) => break,
383                    Err(_) => continue,
384                }
385            }
386
387            loop {
388                match w.close().await {
389                    Ok(_) => break,
390                    Err(_) => continue,
391                }
392            }
393
394            let inner = w.w.lock().unwrap();
395            assert_eq!(size as u64, inner.length, "length must be the same");
396            assert!(inner.content.is_some());
397            assert_eq!(
398                bs,
399                inner.content.clone().unwrap().to_bytes(),
400                "content must be the same"
401            );
402        }
403    }
404}