opendal_core/raw/oio/write/
block_write.rs1use std::sync::Arc;
19
20use futures::Future;
21use futures::FutureExt;
22use futures::TryFutureExt;
23use futures::select;
24use uuid::Uuid;
25
26use crate::raw::*;
27use crate::*;
28
29pub trait BlockWrite: Send + Sync + Unpin + 'static {
61 fn write_once(
67 &self,
68 size: u64,
69 body: Buffer,
70 ) -> impl Future<Output = Result<Metadata>> + MaybeSend;
71
72 fn write_block(
79 &self,
80 block_id: Uuid,
81 size: u64,
82 body: Buffer,
83 ) -> impl Future<Output = Result<()>> + MaybeSend;
84
85 fn complete_block(
88 &self,
89 block_ids: Vec<Uuid>,
90 ) -> impl Future<Output = Result<Metadata>> + MaybeSend;
91
92 fn abort_block(&self, block_ids: Vec<Uuid>) -> impl Future<Output = Result<()>> + MaybeSend;
94}
95
96struct WriteInput<W: BlockWrite> {
97 w: Arc<W>,
98 executor: Executor,
99 block_id: Uuid,
100 bytes: Buffer,
101}
102
103pub struct BlockWriter<W: BlockWrite> {
106 w: Arc<W>,
107 executor: Executor,
108
109 started: bool,
110 block_ids: Vec<Uuid>,
111 cache: Option<Buffer>,
112 tasks: ConcurrentTasks<WriteInput<W>, Uuid>,
113}
114
115impl<W: BlockWrite> BlockWriter<W> {
116 pub fn new(info: Arc<AccessorInfo>, inner: W, concurrent: usize) -> Self {
118 let executor = info.executor();
119
120 Self {
121 w: Arc::new(inner),
122 executor: executor.clone(),
123 started: false,
124 block_ids: Vec::new(),
125 cache: None,
126
127 tasks: ConcurrentTasks::new(executor, concurrent, 8192, |input| {
128 Box::pin(async move {
129 let fut = input
130 .w
131 .write_block(
132 input.block_id,
133 input.bytes.len() as u64,
134 input.bytes.clone(),
135 )
136 .map_ok(|_| input.block_id);
137 match input.executor.timeout() {
138 None => {
139 let result = fut.await;
140 (input, result)
141 }
142 Some(timeout) => {
143 let result = select! {
144 result = fut.fuse() => {
145 result
146 }
147 _ = timeout.fuse() => {
148 Err(Error::new(
149 ErrorKind::Unexpected, "write block timeout")
150 .with_context("block_id", input.block_id.to_string())
151 .set_temporary())
152 }
153 };
154 (input, result)
155 }
156 }
157 })
158 }),
159 }
160 }
161
162 fn fill_cache(&mut self, bs: Buffer) -> usize {
163 let size = bs.len();
164 assert!(self.cache.is_none());
165 self.cache = Some(bs);
166 size
167 }
168}
169
170impl<W> oio::Write for BlockWriter<W>
171where
172 W: BlockWrite,
173{
174 async fn write(&mut self, bs: Buffer) -> Result<()> {
175 if !self.started && self.cache.is_none() {
176 self.fill_cache(bs);
177 return Ok(());
178 }
179
180 self.started = true;
182
183 let bytes = self.cache.clone().expect("pending write must exist");
184 self.tasks
185 .execute(WriteInput {
186 w: self.w.clone(),
187 executor: self.executor.clone(),
188 block_id: Uuid::new_v4(),
189 bytes,
190 })
191 .await?;
192 self.cache = None;
193 self.fill_cache(bs);
194 Ok(())
195 }
196
197 async fn close(&mut self) -> Result<Metadata> {
198 if !self.started {
199 let (size, body) = match self.cache.clone() {
200 Some(cache) => (cache.len(), cache),
201 None => (0, Buffer::new()),
202 };
203
204 let meta = self.w.write_once(size as u64, body).await?;
205 self.cache = None;
206 return Ok(meta);
207 }
208
209 if let Some(cache) = self.cache.clone() {
210 self.tasks
211 .execute(WriteInput {
212 w: self.w.clone(),
213 executor: self.executor.clone(),
214 block_id: Uuid::new_v4(),
215 bytes: cache,
216 })
217 .await?;
218 self.cache = None;
219 }
220
221 loop {
222 let Some(result) = self.tasks.next().await.transpose()? else {
223 break;
224 };
225 self.block_ids.push(result);
226 }
227
228 let block_ids = self.block_ids.clone();
229 self.w.complete_block(block_ids).await
230 }
231
232 async fn abort(&mut self) -> Result<()> {
233 if !self.started {
234 return Ok(());
235 }
236
237 self.tasks.clear();
238 self.cache = None;
239 self.w.abort_block(self.block_ids.clone()).await?;
240 Ok(())
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use std::collections::HashMap;
247 use std::sync::Mutex;
248
249 use pretty_assertions::assert_eq;
250 use rand::{Rng, RngExt, rng};
251 use tokio::time::sleep;
252
253 use super::*;
254 use crate::raw::oio::Write;
255
256 struct TestWrite {
257 length: u64,
258 bytes: HashMap<Uuid, Buffer>,
259 content: Option<Buffer>,
260 }
261
262 impl TestWrite {
263 pub fn new() -> Arc<Mutex<Self>> {
264 let v = Self {
265 length: 0,
266 bytes: HashMap::new(),
267 content: None,
268 };
269
270 Arc::new(Mutex::new(v))
271 }
272 }
273
274 impl BlockWrite for Arc<Mutex<TestWrite>> {
275 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
276 sleep(Duration::from_nanos(50)).await;
277
278 if rng().random_bool(1.0 / 10.0) {
279 return Err(
280 Error::new(ErrorKind::Unexpected, "I'm a crazy monkey!").set_temporary()
281 );
282 }
283
284 let mut this = self.lock().unwrap();
285 this.length = size;
286 this.content = Some(body);
287 Ok(Metadata::default())
288 }
289
290 async fn write_block(&self, block_id: Uuid, size: u64, body: Buffer) -> Result<()> {
291 sleep(Duration::from_millis(50)).await;
293
294 if rng().random_bool(1.0 / 10.0) {
296 return Err(
297 Error::new(ErrorKind::Unexpected, "I'm a crazy monkey!").set_temporary()
298 );
299 }
300
301 let mut this = self.lock().unwrap();
302 this.length += size;
303 this.bytes.insert(block_id, body);
304
305 Ok(())
306 }
307
308 async fn complete_block(&self, block_ids: Vec<Uuid>) -> Result<Metadata> {
309 let mut this = self.lock().unwrap();
310 let mut bs = Vec::new();
311 for id in block_ids {
312 bs.push(this.bytes[&id].clone());
313 }
314 this.content = Some(bs.into_iter().flatten().collect());
315
316 Ok(Metadata::default())
317 }
318
319 async fn abort_block(&self, _: Vec<Uuid>) -> Result<()> {
320 Ok(())
321 }
322 }
323
324 #[tokio::test]
325 async fn test_block_writer_with_concurrent_errors() {
326 let mut rng = rng();
327
328 let mut w = BlockWriter::new(Arc::default(), TestWrite::new(), 8);
329 let mut total_size = 0u64;
330 let mut expected_content = Vec::new();
331
332 for _ in 0..1000 {
333 let size = rng.random_range(1..1024);
334 total_size += size as u64;
335
336 let mut bs = vec![0; size];
337 rng.fill_bytes(&mut bs);
338
339 expected_content.extend_from_slice(&bs);
340
341 loop {
342 match w.write(bs.clone().into()).await {
343 Ok(_) => break,
344 Err(_) => continue,
345 }
346 }
347 }
348
349 loop {
350 match w.close().await {
351 Ok(_) => break,
352 Err(_) => continue,
353 }
354 }
355
356 let inner = w.w.lock().unwrap();
357
358 assert_eq!(total_size, inner.length, "length must be the same");
359 assert!(inner.content.is_some());
360 assert_eq!(
361 expected_content,
362 inner.content.clone().unwrap().to_bytes(),
363 "content must be the same"
364 );
365 }
366
367 #[tokio::test]
368 async fn test_block_writer_with_retry_when_write_once_error() {
369 let mut rng = rng();
370
371 for _ in 1..100 {
372 let mut w = BlockWriter::new(Arc::default(), TestWrite::new(), 8);
373
374 let size = rng.random_range(1..1024);
375 let mut bs = vec![0; size];
376 rng.fill_bytes(&mut bs);
377
378 loop {
379 match w.write(bs.clone().into()).await {
380 Ok(_) => break,
381 Err(_) => continue,
382 }
383 }
384
385 loop {
386 match w.close().await {
387 Ok(_) => break,
388 Err(_) => continue,
389 }
390 }
391
392 let inner = w.w.lock().unwrap();
393 assert_eq!(size as u64, inner.length, "length must be the same");
394 assert!(inner.content.is_some());
395 assert_eq!(
396 bs,
397 inner.content.clone().unwrap().to_bytes(),
398 "content must be the same"
399 );
400 }
401 }
402}