opendal/services/aliyun_drive/
writer.rs1use std::sync::Arc;
19
20use bytes::Buf;
21
22use super::core::AliyunDriveCore;
23use super::core::CheckNameMode;
24use super::core::CreateResponse;
25use super::core::CreateType;
26use crate::raw::*;
27use crate::*;
28
29pub struct AliyunDriveWriter {
30 core: Arc<AliyunDriveCore>,
31
32 _op: OpWrite,
33 parent_file_id: String,
34 name: String,
35
36 file_id: Option<String>,
37 upload_id: Option<String>,
38 part_number: usize,
39}
40
41impl AliyunDriveWriter {
42 pub fn new(core: Arc<AliyunDriveCore>, parent_file_id: &str, name: &str, op: OpWrite) -> Self {
43 AliyunDriveWriter {
44 core,
45 _op: op,
46 parent_file_id: parent_file_id.to_string(),
47 name: name.to_string(),
48 file_id: None,
49 upload_id: None,
50 part_number: 1, }
52 }
53}
54
55impl oio::Write for AliyunDriveWriter {
56 async fn write(&mut self, bs: Buffer) -> Result<()> {
57 let (upload_id, file_id) = match (self.upload_id.as_ref(), self.file_id.as_ref()) {
58 (Some(upload_id), Some(file_id)) => (upload_id, file_id),
59 _ => {
60 let res = self
61 .core
62 .create(
63 Some(&self.parent_file_id),
64 &self.name,
65 CreateType::File,
66 CheckNameMode::Refuse,
67 )
68 .await?;
69 let output: CreateResponse =
70 serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
71 if output.exist.is_some_and(|x| x) {
72 return Err(Error::new(ErrorKind::AlreadyExists, "file exists"));
73 }
74 self.upload_id = output.upload_id;
75 self.file_id = Some(output.file_id);
76 (
77 self.upload_id.as_ref().expect("cannot find upload_id"),
78 self.file_id.as_ref().expect("cannot find file_id"),
79 )
80 }
81 };
82
83 if let Err(err) = self
84 .core
85 .upload(file_id, upload_id, self.part_number, bs)
86 .await
87 {
88 if err.kind() != ErrorKind::AlreadyExists {
89 return Err(err);
90 }
91 };
92
93 self.part_number += 1;
94
95 Ok(())
96 }
97
98 async fn close(&mut self) -> Result<Metadata> {
99 let (Some(upload_id), Some(file_id)) = (self.upload_id.as_ref(), self.file_id.as_ref())
100 else {
101 return Ok(Metadata::default());
102 };
103
104 self.core.complete(file_id, upload_id).await?;
105 Ok(Metadata::default())
106 }
107
108 async fn abort(&mut self) -> Result<()> {
109 let Some(file_id) = self.file_id.as_ref() else {
110 return Ok(());
111 };
112 self.core.delete_path(file_id).await
113 }
114}