opendal_core/raw/oio/copy/
block_copy.rs1use std::future::Future;
19use std::sync::Arc;
20
21use futures::FutureExt;
22use futures::select;
23use uuid::Uuid;
24
25use crate::raw::*;
26use crate::*;
27
28pub trait BlockCopy: Send + Sync + Unpin + 'static {
34 fn source_metadata(&self) -> impl Future<Output = Result<Metadata>> + MaybeSend;
39
40 fn copy_once(&self) -> impl Future<Output = Result<()>> + MaybeSend;
45
46 fn copy_block(
48 &self,
49 block_id: Uuid,
50 range: BytesRange,
51 ) -> impl Future<Output = Result<()>> + MaybeSend;
52
53 fn complete_block(&self, block_ids: Vec<Uuid>) -> impl Future<Output = Result<()>> + MaybeSend;
55
56 fn abort_block(&self, block_ids: Vec<Uuid>) -> impl Future<Output = Result<()>> + MaybeSend;
58}
59
60struct CopyInput<C: BlockCopy> {
61 copier: Arc<C>,
62 executor: Executor,
63 block_id: Uuid,
64 block_number: usize,
65 range: BytesRange,
66}
67
68impl<C: BlockCopy> Clone for CopyInput<C> {
69 fn clone(&self) -> Self {
70 Self {
71 copier: self.copier.clone(),
72 executor: self.executor.clone(),
73 block_id: self.block_id,
74 block_number: self.block_number,
75 range: self.range,
76 }
77 }
78}
79
80struct CopiedBlock {
81 block_id: Uuid,
82 block_number: usize,
83 size: u64,
84}
85
86pub struct BlockCopier<C: BlockCopy> {
88 copier: Arc<C>,
89 executor: Executor,
90
91 block_ids: Vec<(usize, Uuid)>,
92 scheduled_block_ids: Vec<Uuid>,
93 next_block_number: usize,
94 next_offset: u64,
95 source_size: Option<u64>,
96 copy_once_threshold: u64,
97 block_size: u64,
98 concurrent: usize,
99 completed: bool,
100
101 tasks: ConcurrentTasks<CopyInput<C>, CopiedBlock>,
102}
103
104impl<C: BlockCopy> BlockCopier<C> {
105 pub fn new(
107 info: Arc<AccessorInfo>,
108 inner: C,
109 source_content_length_hint: Option<u64>,
110 copy_once_threshold: u64,
111 block_size: u64,
112 concurrent: usize,
113 ) -> Self {
114 let copier = Arc::new(inner);
115 let executor = info.executor();
116 let concurrent = concurrent.max(1);
117
118 Self {
119 copier,
120 executor: executor.clone(),
121 block_ids: Vec::new(),
122 scheduled_block_ids: Vec::new(),
123 next_block_number: 0,
124 next_offset: 0,
125 source_size: source_content_length_hint,
126 copy_once_threshold,
127 block_size,
128 concurrent,
129 completed: false,
130
131 tasks: ConcurrentTasks::new(executor, concurrent, 8192, |input| {
132 Box::pin(async move {
133 let size = input.range.size().expect("block copy range must be sized");
134 let fut = input.copier.copy_block(input.block_id, input.range);
135
136 let result = match input.executor.timeout() {
137 None => fut.await.map(|_| CopiedBlock {
138 block_id: input.block_id,
139 block_number: input.block_number,
140 size,
141 }),
142 Some(timeout) => {
143 select! {
144 result = fut.fuse() => {
145 result.map(|_| CopiedBlock {
146 block_id: input.block_id,
147 block_number: input.block_number,
148 size,
149 })
150 }
151 _ = timeout.fuse() => {
152 Err(Error::new(
153 ErrorKind::Unexpected, "copy block timeout")
154 .with_context("block_id", input.block_id.to_string())
155 .set_temporary())
156 }
157 }
158 }
159 };
160
161 (input, result)
162 })
163 }),
164 }
165 }
166
167 async fn source_size(&mut self) -> Result<u64> {
168 match self.source_size {
169 Some(size) => Ok(size),
170 None => {
171 let size = self.copier.source_metadata().await?.content_length();
172 self.source_size = Some(size);
173 Ok(size)
174 }
175 }
176 }
177
178 async fn fill_tasks(&mut self, source_size: u64) -> Result<()> {
179 let mut scheduled = 0;
180
181 while self.next_offset < source_size
182 && self.tasks.has_remaining()
183 && scheduled < self.concurrent
184 {
185 let size = self.block_size.min(source_size - self.next_offset);
186 let range = BytesRange::new(self.next_offset, Some(size));
187
188 let input = CopyInput {
189 copier: self.copier.clone(),
190 executor: self.executor.clone(),
191 block_id: Uuid::new_v4(),
192 block_number: self.next_block_number,
193 range,
194 };
195
196 loop {
197 match self.tasks.execute(input.clone()).await {
198 Ok(()) => break,
199 Err(err) if err.is_temporary() => continue,
200 Err(err) => return Err(err),
201 }
202 }
203
204 self.scheduled_block_ids.push(input.block_id);
205 self.next_offset += size;
206 self.next_block_number += 1;
207 scheduled += 1;
208
209 if self.tasks.has_result() {
210 break;
211 }
212 }
213
214 Ok(())
215 }
216}
217
218impl<C> oio::Copy for BlockCopier<C>
219where
220 C: BlockCopy,
221{
222 async fn next(&mut self) -> Result<Option<usize>> {
223 if self.completed {
224 return Ok(None);
225 }
226
227 let source_size = self.source_size().await?;
228
229 if self.block_ids.is_empty() && source_size <= self.copy_once_threshold {
230 self.copier.copy_once().await?;
231 self.completed = true;
232 return Ok(None);
233 }
234
235 self.fill_tasks(source_size).await?;
236
237 loop {
238 match self.tasks.next().await {
239 Some(Ok(result)) => {
240 let size = result.size.try_into().map_err(|_| {
241 Error::new(ErrorKind::Unexpected, "block copy size exceeds usize")
242 })?;
243 self.block_ids.push((result.block_number, result.block_id));
244 return Ok(Some(size));
245 }
246 Some(Err(err)) if err.is_temporary() => continue,
247 Some(Err(err)) => return Err(err),
248 None => break,
249 }
250 }
251
252 if self.block_ids.len() != self.next_block_number {
253 return Err(Error::new(
254 ErrorKind::Unexpected,
255 "block copy numbers mismatch, please report bug to opendal",
256 )
257 .with_context("expected", self.next_block_number)
258 .with_context("actual", self.block_ids.len()));
259 }
260
261 self.block_ids
262 .sort_by_key(|(block_number, _)| *block_number);
263 let block_ids = self
264 .block_ids
265 .iter()
266 .map(|(_, block_id)| *block_id)
267 .collect();
268 self.copier.complete_block(block_ids).await?;
269 self.completed = true;
270 Ok(None)
271 }
272
273 async fn abort(&mut self) -> Result<()> {
274 self.tasks.clear();
275 if self.scheduled_block_ids.is_empty() {
276 self.completed = true;
277 return Ok(());
278 }
279
280 self.copier
281 .abort_block(self.scheduled_block_ids.clone())
282 .await?;
283 self.completed = true;
284 Ok(())
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use std::collections::HashMap;
291 use std::sync::Mutex;
292
293 use tokio::time::Duration;
294 use tokio::time::sleep;
295
296 use super::*;
297 use crate::raw::oio::Copy;
298
299 #[derive(Default)]
300 struct TestState {
301 ranges: HashMap<Uuid, BytesRange>,
302 completed_ranges: Vec<BytesRange>,
303 }
304
305 struct TestCopy {
306 state: Arc<Mutex<TestState>>,
307 }
308
309 impl BlockCopy for TestCopy {
310 async fn source_metadata(&self) -> Result<Metadata> {
311 Ok(Metadata::default().with_content_length(4))
312 }
313
314 async fn copy_once(&self) -> Result<()> {
315 Ok(())
316 }
317
318 async fn copy_block(&self, block_id: Uuid, range: BytesRange) -> Result<()> {
319 if range.offset() == 0 {
320 sleep(Duration::from_millis(50)).await;
321 }
322
323 self.state
324 .lock()
325 .expect("test state mutex poisoned")
326 .ranges
327 .insert(block_id, range);
328
329 Ok(())
330 }
331
332 async fn complete_block(&self, block_ids: Vec<Uuid>) -> Result<()> {
333 let mut state = self.state.lock().expect("test state mutex poisoned");
334 state.completed_ranges = block_ids
335 .into_iter()
336 .map(|block_id| state.ranges[&block_id])
337 .collect();
338 Ok(())
339 }
340
341 async fn abort_block(&self, _: Vec<Uuid>) -> Result<()> {
342 Ok(())
343 }
344 }
345
346 #[tokio::test]
347 async fn test_block_copier_completes_blocks_in_source_order() -> Result<()> {
348 let state = Arc::new(Mutex::new(TestState::default()));
349 let inner = TestCopy {
350 state: state.clone(),
351 };
352 let mut copier = BlockCopier::new(Arc::default(), inner, None, 0, 2, 2);
353
354 while copier.next().await?.is_some() {}
355
356 let completed_ranges = state
357 .lock()
358 .expect("test state mutex poisoned")
359 .completed_ranges
360 .clone();
361 assert_eq!(
362 completed_ranges,
363 vec![BytesRange::new(0, Some(2)), BytesRange::new(2, Some(2))]
364 );
365
366 Ok(())
367 }
368}