opendal/services/aliyun_drive/
backend.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use tokio::sync::Mutex;
26
27use super::ALIYUN_DRIVE_SCHEME;
28use super::config::AliyunDriveConfig;
29use super::core::*;
30use super::deleter::AliyunDriveDeleter;
31use super::error::parse_error;
32use super::lister::AliyunDriveLister;
33use super::lister::AliyunDriveParent;
34use super::writer::AliyunDriveWriter;
35use crate::raw::*;
36use crate::*;
37
38#[doc = include_str!("docs.md")]
39#[derive(Default)]
40pub struct AliyunDriveBuilder {
41 pub(super) config: AliyunDriveConfig,
42
43 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
44 pub(super) http_client: Option<HttpClient>,
45}
46
47impl Debug for AliyunDriveBuilder {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("AliyunDriveBuilder")
50 .field("config", &self.config)
51 .finish_non_exhaustive()
52 }
53}
54
55impl AliyunDriveBuilder {
56 pub fn root(mut self, root: &str) -> Self {
60 self.config.root = if root.is_empty() {
61 None
62 } else {
63 Some(root.to_string())
64 };
65
66 self
67 }
68
69 pub fn access_token(mut self, access_token: &str) -> Self {
71 self.config.access_token = Some(access_token.to_string());
72
73 self
74 }
75
76 pub fn client_id(mut self, client_id: &str) -> Self {
78 self.config.client_id = Some(client_id.to_string());
79
80 self
81 }
82
83 pub fn client_secret(mut self, client_secret: &str) -> Self {
85 self.config.client_secret = Some(client_secret.to_string());
86
87 self
88 }
89
90 pub fn refresh_token(mut self, refresh_token: &str) -> Self {
92 self.config.refresh_token = Some(refresh_token.to_string());
93
94 self
95 }
96
97 pub fn drive_type(mut self, drive_type: &str) -> Self {
99 self.config.drive_type = drive_type.to_string();
100
101 self
102 }
103
104 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
111 #[allow(deprecated)]
112 pub fn http_client(mut self, client: HttpClient) -> Self {
113 self.http_client = Some(client);
114 self
115 }
116}
117
118impl Builder for AliyunDriveBuilder {
119 type Config = AliyunDriveConfig;
120
121 fn build(self) -> Result<impl Access> {
122 debug!("backend build started: {:?}", &self);
123
124 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
125 debug!("backend use root {}", &root);
126
127 let sign = match self.config.access_token.clone() {
128 Some(access_token) if !access_token.is_empty() => {
129 AliyunDriveSign::Access(access_token)
130 }
131 _ => match (
132 self.config.client_id.clone(),
133 self.config.client_secret.clone(),
134 self.config.refresh_token.clone(),
135 ) {
136 (Some(client_id), Some(client_secret), Some(refresh_token)) if
137 !client_id.is_empty() && !client_secret.is_empty() && !refresh_token.is_empty() => {
138 AliyunDriveSign::Refresh(client_id, client_secret, refresh_token, None, 0)
139 }
140 _ => return Err(Error::new(
141 ErrorKind::ConfigInvalid,
142 "access_token and a set of client_id, client_secret, and refresh_token are both missing.")
143 .with_operation("Builder::build")
144 .with_context("service", ALIYUN_DRIVE_SCHEME)),
145 },
146 };
147
148 let drive_type = match self.config.drive_type.as_str() {
149 "" | "default" => DriveType::Default,
150 "resource" => DriveType::Resource,
151 "backup" => DriveType::Backup,
152 _ => {
153 return Err(Error::new(
154 ErrorKind::ConfigInvalid,
155 "drive_type is invalid.",
156 ));
157 }
158 };
159 debug!("backend use drive_type {drive_type:?}");
160
161 Ok(AliyunDriveBackend {
162 core: Arc::new(AliyunDriveCore {
163 info: {
164 let am = AccessorInfo::default();
165 am.set_scheme(ALIYUN_DRIVE_SCHEME)
166 .set_root(&root)
167 .set_native_capability(Capability {
168 stat: true,
169 create_dir: true,
170 read: true,
171 write: true,
172 write_can_multi: true,
173 write_multi_min_size: Some(100 * 1024),
175 write_multi_max_size: if cfg!(target_pointer_width = "64") {
177 Some(5 * 1024 * 1024 * 1024)
178 } else {
179 Some(usize::MAX)
180 },
181 delete: true,
182 copy: true,
183 rename: true,
184 list: true,
185 list_with_limit: true,
186 shared: true,
187 ..Default::default()
188 });
189
190 #[allow(deprecated)]
192 if let Some(client) = self.http_client {
193 am.update_http_client(|_| client);
194 }
195
196 am.into()
197 },
198 endpoint: "https://openapi.alipan.com".to_string(),
199 root,
200 drive_type,
201 signer: Arc::new(Mutex::new(AliyunDriveSigner {
202 drive_id: None,
203 sign,
204 })),
205 dir_lock: Arc::new(Mutex::new(())),
206 }),
207 })
208 }
209}
210
211#[derive(Clone, Debug)]
212pub struct AliyunDriveBackend {
213 core: Arc<AliyunDriveCore>,
214}
215
216impl Access for AliyunDriveBackend {
217 type Reader = HttpBody;
218 type Writer = AliyunDriveWriter;
219 type Lister = oio::PageLister<AliyunDriveLister>;
220 type Deleter = oio::OneShotDeleter<AliyunDriveDeleter>;
221
222 fn info(&self) -> Arc<AccessorInfo> {
223 self.core.info.clone()
224 }
225
226 async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
227 self.core.ensure_dir_exists(path).await?;
228
229 Ok(RpCreateDir::default())
230 }
231
232 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
233 if from == to {
234 return Ok(RpRename::default());
235 }
236 let res = self.core.get_by_path(from).await?;
237 let file: AliyunDriveFile =
238 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
239 match self.core.get_by_path(to).await {
241 Err(err) if err.kind() == ErrorKind::NotFound => {}
242 Err(err) => return Err(err),
243 Ok(res) => {
244 let file: AliyunDriveFile =
245 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
246 self.core.delete_path(&file.file_id).await?;
247 }
248 };
249
250 let parent_file_id = self.core.ensure_dir_exists(get_parent(to)).await?;
251 self.core.move_path(&file.file_id, &parent_file_id).await?;
252
253 let from_name = get_basename(from);
254 let to_name = get_basename(to);
255
256 if from_name != to_name {
257 self.core.update_path(&file.file_id, to_name).await?;
258 }
259
260 Ok(RpRename::default())
261 }
262
263 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
264 if from == to {
265 return Ok(RpCopy::default());
266 }
267 let res = self.core.get_by_path(from).await?;
268 let file: AliyunDriveFile =
269 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
270 match self.core.get_by_path(to).await {
272 Err(err) if err.kind() == ErrorKind::NotFound => {}
273 Err(err) => return Err(err),
274 Ok(res) => {
275 let file: AliyunDriveFile =
276 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
277 self.core.delete_path(&file.file_id).await?;
278 }
279 };
280 let parent_path = get_parent(to);
283 let parent_file_id = self.core.ensure_dir_exists(parent_path).await?;
284
285 let auto_rename = file.parent_file_id == parent_file_id;
289 let res = self
290 .core
291 .copy_path(&file.file_id, &parent_file_id, auto_rename)
292 .await?;
293 let file: CopyResponse =
294 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
295 let file_id = file.file_id;
296
297 let from_name = get_basename(from);
298 let to_name = get_basename(to);
299
300 if from_name != to_name {
301 self.core.update_path(&file_id, to_name).await?;
302 }
303
304 Ok(RpCopy::default())
305 }
306
307 async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
308 let res = self.core.get_by_path(path).await?;
309 let file: AliyunDriveFile =
310 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
311
312 if file.path_type == "folder" {
313 let meta = Metadata::new(EntryMode::DIR).with_last_modified(
314 file.updated_at.parse::<Timestamp>().map_err(|e| {
315 Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e)
316 })?,
317 );
318
319 return Ok(RpStat::new(meta));
320 }
321
322 let mut meta = Metadata::new(EntryMode::FILE).with_last_modified(
323 file.updated_at.parse::<Timestamp>().map_err(|e| {
324 Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e)
325 })?,
326 );
327 if let Some(v) = file.size {
328 meta = meta.with_content_length(v);
329 }
330 if let Some(v) = file.content_type {
331 meta = meta.with_content_type(v);
332 }
333
334 Ok(RpStat::new(meta))
335 }
336
337 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
338 let res = self.core.get_by_path(path).await?;
339 let file: AliyunDriveFile =
340 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
341 let resp = self.core.download(&file.file_id, args.range()).await?;
342
343 let status = resp.status();
344 match status {
345 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
346 Ok((RpRead::default(), resp.into_body()))
347 }
348 _ => {
349 let (part, mut body) = resp.into_parts();
350 let buf = body.to_buffer().await?;
351 Err(parse_error(Response::from_parts(part, buf)))
352 }
353 }
354 }
355
356 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
357 Ok((
358 RpDelete::default(),
359 oio::OneShotDeleter::new(AliyunDriveDeleter::new(self.core.clone())),
360 ))
361 }
362
363 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
364 let parent = match self.core.get_by_path(path).await {
365 Err(err) if err.kind() == ErrorKind::NotFound => None,
366 Err(err) => return Err(err),
367 Ok(res) => {
368 let file: AliyunDriveFile =
369 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
370 Some(AliyunDriveParent {
371 file_id: file.file_id,
372 path: path.to_string(),
373 updated_at: file.updated_at,
374 })
375 }
376 };
377
378 let l = AliyunDriveLister::new(self.core.clone(), parent, args.limit());
379
380 Ok((RpList::default(), oio::PageLister::new(l)))
381 }
382
383 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
384 let parent_path = get_parent(path);
385 let parent_file_id = self.core.ensure_dir_exists(parent_path).await?;
386
387 match self.core.get_by_path(path).await {
389 Err(err) if err.kind() == ErrorKind::NotFound => {}
390 Err(err) => return Err(err),
391 Ok(res) => {
392 let file: AliyunDriveFile =
393 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
394 self.core.delete_path(&file.file_id).await?;
395 }
396 };
397
398 let writer =
399 AliyunDriveWriter::new(self.core.clone(), &parent_file_id, get_basename(path), args);
400
401 Ok((RpWrite::default(), writer))
402 }
403}