opendal_core/raw/oio/write/
block_write.rs1use std::sync::Arc;
19
20use futures::Future;
21use futures::FutureExt;
22use futures::TryFutureExt;
23use futures::select;
24use uuid::Uuid;
25
26use crate::raw::*;
27use crate::*;
28
29pub trait BlockWrite: Send + Sync + Unpin + 'static {
61 fn write_once(
67 &self,
68 size: u64,
69 body: Buffer,
70 ) -> impl Future<Output = Result<Metadata>> + MaybeSend;
71
72 fn write_block(
79 &self,
80 block_id: Uuid,
81 size: u64,
82 body: Buffer,
83 ) -> impl Future<Output = Result<()>> + MaybeSend;
84
85 fn complete_block(
88 &self,
89 block_ids: Vec<Uuid>,
90 ) -> impl Future<Output = Result<Metadata>> + MaybeSend;
91
92 fn abort_block(&self, block_ids: Vec<Uuid>) -> impl Future<Output = Result<()>> + MaybeSend;
94}
95
96struct WriteInput<W: BlockWrite> {
97 w: Arc<W>,
98 executor: Executor,
99 block_id: Uuid,
100 bytes: Buffer,
101}
102
103pub struct BlockWriter<W: BlockWrite> {
106 w: Arc<W>,
107 executor: Executor,
108
109 started: bool,
110 block_ids: Vec<Uuid>,
111 cache: Option<Buffer>,
112 tasks: ConcurrentTasks<WriteInput<W>, Uuid>,
113}
114
115impl<W: BlockWrite> BlockWriter<W> {
116 pub fn new(info: Arc<AccessorInfo>, inner: W, concurrent: usize) -> Self {
118 let executor = info.executor();
119
120 Self {
121 w: Arc::new(inner),
122 executor: executor.clone(),
123 started: false,
124 block_ids: Vec::new(),
125 cache: None,
126
127 tasks: ConcurrentTasks::new(executor, concurrent, 8192, |input| {
128 Box::pin(async move {
129 let fut = input
130 .w
131 .write_block(
132 input.block_id,
133 input.bytes.len() as u64,
134 input.bytes.clone(),
135 )
136 .map_ok(|_| input.block_id);
137 match input.executor.timeout() {
138 None => {
139 let result = fut.await;
140 (input, result)
141 }
142 Some(timeout) => {
143 let result = select! {
144 result = fut.fuse() => {
145 result
146 }
147 _ = timeout.fuse() => {
148 Err(Error::new(
149 ErrorKind::Unexpected, "write block timeout")
150 .with_context("block_id", input.block_id.to_string())
151 .set_temporary())
152 }
153 };
154 (input, result)
155 }
156 }
157 })
158 }),
159 }
160 }
161
162 fn fill_cache(&mut self, bs: Buffer) -> usize {
163 let size = bs.len();
164 assert!(self.cache.is_none());
165 self.cache = Some(bs);
166 size
167 }
168}
169
170impl<W> oio::Write for BlockWriter<W>
171where
172 W: BlockWrite,
173{
174 async fn write(&mut self, bs: Buffer) -> Result<()> {
175 if !self.started && self.cache.is_none() {
176 self.fill_cache(bs);
177 return Ok(());
178 }
179
180 self.started = true;
182
183 let bytes = self.cache.clone().expect("pending write must exist");
184 self.tasks
185 .execute(WriteInput {
186 w: self.w.clone(),
187 executor: self.executor.clone(),
188 block_id: Uuid::new_v4(),
189 bytes,
190 })
191 .await?;
192 self.cache = None;
193 self.fill_cache(bs);
194 Ok(())
195 }
196
197 async fn close(&mut self) -> Result<Metadata> {
198 if !self.started {
199 let (size, body) = match self.cache.clone() {
200 Some(cache) => (cache.len(), cache),
201 None => (0, Buffer::new()),
202 };
203
204 let meta = self.w.write_once(size as u64, body).await?;
205 self.cache = None;
206 return Ok(meta);
207 }
208
209 if let Some(cache) = self.cache.clone() {
210 self.tasks
211 .execute(WriteInput {
212 w: self.w.clone(),
213 executor: self.executor.clone(),
214 block_id: Uuid::new_v4(),
215 bytes: cache,
216 })
217 .await?;
218 self.cache = None;
219 }
220
221 loop {
222 let Some(result) = self.tasks.next().await.transpose()? else {
223 break;
224 };
225 self.block_ids.push(result);
226 }
227
228 let block_ids = self.block_ids.clone();
229 self.w.complete_block(block_ids).await
230 }
231
232 async fn abort(&mut self) -> Result<()> {
233 if !self.started {
234 return Ok(());
235 }
236
237 self.tasks.clear();
238 self.cache = None;
239 self.w.abort_block(self.block_ids.clone()).await?;
240 Ok(())
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use std::collections::HashMap;
247 use std::sync::Mutex;
248
249 use pretty_assertions::assert_eq;
250 use rand::Rng;
251 use rand::RngCore;
252 use rand::thread_rng;
253 use tokio::time::sleep;
254
255 use super::*;
256 use crate::raw::oio::Write;
257
258 struct TestWrite {
259 length: u64,
260 bytes: HashMap<Uuid, Buffer>,
261 content: Option<Buffer>,
262 }
263
264 impl TestWrite {
265 pub fn new() -> Arc<Mutex<Self>> {
266 let v = Self {
267 length: 0,
268 bytes: HashMap::new(),
269 content: None,
270 };
271
272 Arc::new(Mutex::new(v))
273 }
274 }
275
276 impl BlockWrite for Arc<Mutex<TestWrite>> {
277 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
278 sleep(Duration::from_nanos(50)).await;
279
280 if thread_rng().gen_bool(1.0 / 10.0) {
281 return Err(
282 Error::new(ErrorKind::Unexpected, "I'm a crazy monkey!").set_temporary()
283 );
284 }
285
286 let mut this = self.lock().unwrap();
287 this.length = size;
288 this.content = Some(body);
289 Ok(Metadata::default())
290 }
291
292 async fn write_block(&self, block_id: Uuid, size: u64, body: Buffer) -> Result<()> {
293 sleep(Duration::from_millis(50)).await;
295
296 if thread_rng().gen_bool(1.0 / 10.0) {
298 return Err(
299 Error::new(ErrorKind::Unexpected, "I'm a crazy monkey!").set_temporary()
300 );
301 }
302
303 let mut this = self.lock().unwrap();
304 this.length += size;
305 this.bytes.insert(block_id, body);
306
307 Ok(())
308 }
309
310 async fn complete_block(&self, block_ids: Vec<Uuid>) -> Result<Metadata> {
311 let mut this = self.lock().unwrap();
312 let mut bs = Vec::new();
313 for id in block_ids {
314 bs.push(this.bytes[&id].clone());
315 }
316 this.content = Some(bs.into_iter().flatten().collect());
317
318 Ok(Metadata::default())
319 }
320
321 async fn abort_block(&self, _: Vec<Uuid>) -> Result<()> {
322 Ok(())
323 }
324 }
325
326 #[tokio::test]
327 async fn test_block_writer_with_concurrent_errors() {
328 let mut rng = thread_rng();
329
330 let mut w = BlockWriter::new(Arc::default(), TestWrite::new(), 8);
331 let mut total_size = 0u64;
332 let mut expected_content = Vec::new();
333
334 for _ in 0..1000 {
335 let size = rng.gen_range(1..1024);
336 total_size += size as u64;
337
338 let mut bs = vec![0; size];
339 rng.fill_bytes(&mut bs);
340
341 expected_content.extend_from_slice(&bs);
342
343 loop {
344 match w.write(bs.clone().into()).await {
345 Ok(_) => break,
346 Err(_) => continue,
347 }
348 }
349 }
350
351 loop {
352 match w.close().await {
353 Ok(_) => break,
354 Err(_) => continue,
355 }
356 }
357
358 let inner = w.w.lock().unwrap();
359
360 assert_eq!(total_size, inner.length, "length must be the same");
361 assert!(inner.content.is_some());
362 assert_eq!(
363 expected_content,
364 inner.content.clone().unwrap().to_bytes(),
365 "content must be the same"
366 );
367 }
368
369 #[tokio::test]
370 async fn test_block_writer_with_retry_when_write_once_error() {
371 let mut rng = thread_rng();
372
373 for _ in 1..100 {
374 let mut w = BlockWriter::new(Arc::default(), TestWrite::new(), 8);
375
376 let size = rng.gen_range(1..1024);
377 let mut bs = vec![0; size];
378 rng.fill_bytes(&mut bs);
379
380 loop {
381 match w.write(bs.clone().into()).await {
382 Ok(_) => break,
383 Err(_) => continue,
384 }
385 }
386
387 loop {
388 match w.close().await {
389 Ok(_) => break,
390 Err(_) => continue,
391 }
392 }
393
394 let inner = w.w.lock().unwrap();
395 assert_eq!(size as u64, inner.length, "length must be the same");
396 assert!(inner.content.is_some());
397 assert_eq!(
398 bs,
399 inner.content.clone().unwrap().to_bytes(),
400 "content must be the same"
401 );
402 }
403 }
404}