1use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use chrono::Utc;
23use http::header::HeaderValue;
24use http::header::{self};
25use http::Method;
26use http::Request;
27use http::Response;
28use serde::Deserialize;
29use serde::Serialize;
30use tokio::sync::Mutex;
31
32use super::error::parse_error;
33use crate::raw::*;
34use crate::*;
35
36#[derive(Debug, Deserialize, Default, Clone)]
38pub enum DriveType {
39 #[default]
41 Default,
42 Backup,
46 Resource,
50}
51
52pub enum AliyunDriveSign {
54 Refresh(String, String, String, Option<String>, i64),
55 Access(String),
56}
57
58pub struct AliyunDriveSigner {
59 pub drive_id: Option<String>,
60 pub sign: AliyunDriveSign,
61}
62
63pub struct AliyunDriveCore {
64 pub info: Arc<AccessorInfo>,
65
66 pub endpoint: String,
67 pub root: String,
68 pub drive_type: DriveType,
69
70 pub signer: Arc<Mutex<AliyunDriveSigner>>,
71 pub dir_lock: Arc<Mutex<()>>,
72}
73
74impl Debug for AliyunDriveCore {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct("AliyunDriveCore")
77 .field("root", &self.root)
78 .field("drive_type", &self.drive_type)
79 .finish_non_exhaustive()
80 }
81}
82
83impl AliyunDriveCore {
84 async fn send(&self, mut req: Request<Buffer>, token: Option<&str>) -> Result<Buffer> {
85 req.headers_mut().insert(
87 header::USER_AGENT,
88 HeaderValue::from_str(&format!("opendal/{}", VERSION))
89 .expect("user agent must be valid header value"),
90 );
91 if req.method() == Method::POST {
92 req.headers_mut().insert(
93 header::CONTENT_TYPE,
94 HeaderValue::from_static("application/json;charset=UTF-8"),
95 );
96 }
97 if let Some(token) = token {
98 req.headers_mut().insert(
99 header::AUTHORIZATION,
100 HeaderValue::from_str(&format_authorization_by_bearer(token)?)
101 .expect("access token must be valid header value"),
102 );
103 }
104 let res = self.info.http_client().send(req).await?;
105 if !res.status().is_success() {
106 return Err(parse_error(res));
107 }
108 Ok(res.into_body())
109 }
110
111 async fn get_access_token(
112 &self,
113 client_id: &str,
114 client_secret: &str,
115 refresh_token: &str,
116 ) -> Result<Buffer> {
117 let body = serde_json::to_vec(&AccessTokenRequest {
118 refresh_token,
119 grant_type: "refresh_token",
120 client_id,
121 client_secret,
122 })
123 .map_err(new_json_serialize_error)?;
124 let req = Request::post(format!("{}/oauth/access_token", self.endpoint))
125 .body(Buffer::from(body))
126 .map_err(new_request_build_error)?;
127 self.send(req, None).await
128 }
129
130 async fn get_drive_id(&self, token: Option<&str>) -> Result<Buffer> {
131 let req = Request::post(format!("{}/adrive/v1.0/user/getDriveInfo", self.endpoint))
132 .body(Buffer::new())
133 .map_err(new_request_build_error)?;
134 self.send(req, token).await
135 }
136
137 pub async fn get_token_and_drive(&self) -> Result<(Option<String>, String)> {
138 let mut signer = self.signer.lock().await;
139 let token = match &mut signer.sign {
140 AliyunDriveSign::Access(access_token) => Some(access_token.clone()),
141 AliyunDriveSign::Refresh(
142 client_id,
143 client_secret,
144 refresh_token,
145 access_token,
146 expire_at,
147 ) => {
148 if *expire_at < Utc::now().timestamp() || access_token.is_none() {
149 let res = self
150 .get_access_token(client_id, client_secret, refresh_token)
151 .await?;
152 let output: RefreshTokenResponse = serde_json::from_reader(res.reader())
153 .map_err(new_json_deserialize_error)?;
154 *access_token = Some(output.access_token);
155 *expire_at = output.expires_in + Utc::now().timestamp();
156 *refresh_token = output.refresh_token;
157 }
158 access_token.clone()
159 }
160 };
161 let Some(drive_id) = &signer.drive_id else {
162 let res = self.get_drive_id(token.as_deref()).await?;
163 let output: DriveInfoResponse =
164 serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
165 let drive_id = match self.drive_type {
166 DriveType::Default => output.default_drive_id,
167 DriveType::Backup => output.backup_drive_id.unwrap_or(output.default_drive_id),
168 DriveType::Resource => output.resource_drive_id.unwrap_or(output.default_drive_id),
169 };
170 signer.drive_id = Some(drive_id.clone());
171 return Ok((token, drive_id));
172 };
173 Ok((token, drive_id.clone()))
174 }
175
176 pub fn build_path(&self, path: &str, rooted: bool) -> String {
177 let file_path = if rooted {
178 build_rooted_abs_path(&self.root, path)
179 } else {
180 build_abs_path(&self.root, path)
181 };
182 let file_path = file_path.strip_suffix('/').unwrap_or(&file_path);
183 if file_path.is_empty() {
184 return "/".to_string();
185 }
186 file_path.to_string()
187 }
188
189 pub async fn get_by_path(&self, path: &str) -> Result<Buffer> {
190 let file_path = self.build_path(path, true);
191 let req = Request::post(format!(
192 "{}/adrive/v1.0/openFile/get_by_path",
193 self.endpoint
194 ));
195 let (token, drive_id) = self.get_token_and_drive().await?;
196 let body = serde_json::to_vec(&GetByPathRequest {
197 drive_id: &drive_id,
198 file_path: &file_path,
199 })
200 .map_err(new_json_serialize_error)?;
201 let req = req
202 .extension(Operation::Read)
203 .body(Buffer::from(body))
204 .map_err(new_request_build_error)?;
205 self.send(req, token.as_deref()).await
206 }
207
208 pub async fn ensure_dir_exists(&self, path: &str) -> Result<String> {
209 let file_path = self.build_path(path, false);
210 if file_path == "/" {
211 return Ok("root".to_string());
212 }
213 let file_path = file_path.strip_suffix('/').unwrap_or(&file_path);
214 let paths = file_path.split('/').collect::<Vec<&str>>();
215 let mut parent: Option<String> = None;
216 for path in paths {
217 let _guard = self.dir_lock.lock().await;
218 let res = self
219 .create(
220 parent.as_deref(),
221 path,
222 CreateType::Folder,
223 CheckNameMode::Refuse,
224 )
225 .await?;
226 let output: CreateResponse =
227 serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
228 parent = Some(output.file_id);
229 }
230 Ok(parent.expect("ensure_dir_exists must succeed"))
231 }
232
233 pub async fn create_with_rapid_upload(
234 &self,
235 parent_file_id: Option<&str>,
236 name: &str,
237 typ: CreateType,
238 check_name_mode: CheckNameMode,
239 size: Option<u64>,
240 rapid_upload: Option<RapidUpload>,
241 ) -> Result<Buffer> {
242 let mut content_hash = None;
243 let mut proof_code = None;
244 let mut pre_hash = None;
245 if let Some(rapid_upload) = rapid_upload {
246 content_hash = rapid_upload.content_hash;
247 proof_code = rapid_upload.proof_code;
248 pre_hash = rapid_upload.pre_hash;
249 }
250
251 let (token, drive_id) = self.get_token_and_drive().await?;
252 let body = serde_json::to_vec(&CreateRequest {
253 drive_id: &drive_id,
254 parent_file_id: parent_file_id.unwrap_or("root"),
255 name,
256 typ,
257 check_name_mode,
258 size,
259 pre_hash: pre_hash.as_deref(),
260 content_hash: content_hash.as_deref(),
261 content_hash_name: content_hash.is_some().then_some("sha1"),
262 proof_code: proof_code.as_deref(),
263 proof_version: proof_code.is_some().then_some("v1"),
264 })
265 .map_err(new_json_serialize_error)?;
266 let req = Request::post(format!("{}/adrive/v1.0/openFile/create", self.endpoint))
267 .extension(Operation::Write)
268 .body(Buffer::from(body))
269 .map_err(new_request_build_error)?;
270 self.send(req, token.as_deref()).await
271 }
272
273 pub async fn create(
274 &self,
275 parent_file_id: Option<&str>,
276 name: &str,
277 typ: CreateType,
278 check_name_mode: CheckNameMode,
279 ) -> Result<Buffer> {
280 self.create_with_rapid_upload(parent_file_id, name, typ, check_name_mode, None, None)
281 .await
282 }
283
284 async fn get_download_url(&self, file_id: &str) -> Result<String> {
285 let (token, drive_id) = self.get_token_and_drive().await?;
286 let body = serde_json::to_vec(&FileRequest {
287 drive_id: &drive_id,
288 file_id,
289 })
290 .map_err(new_json_serialize_error)?;
291
292 let req = Request::post(format!(
293 "{}/adrive/v1.0/openFile/getDownloadUrl",
294 self.endpoint
295 ))
296 .extension(Operation::Read)
297 .body(Buffer::from(body))
298 .map_err(new_request_build_error)?;
299
300 let res = self.send(req, token.as_deref()).await?;
301
302 let output: GetDownloadUrlResponse =
303 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
304
305 Ok(output.url)
306 }
307
308 pub async fn download(&self, file_id: &str, range: BytesRange) -> Result<Response<HttpBody>> {
309 let download_url = self.get_download_url(file_id).await?;
310 let req = Request::get(download_url)
311 .extension(Operation::Read)
312 .header(header::RANGE, range.to_header())
313 .body(Buffer::new())
314 .map_err(new_request_build_error)?;
315 self.info.http_client().fetch(req).await
316 }
317
318 pub async fn move_path(&self, file_id: &str, to_parent_file_id: &str) -> Result<()> {
319 let (token, drive_id) = self.get_token_and_drive().await?;
320 let body = serde_json::to_vec(&MovePathRequest {
321 drive_id: &drive_id,
322 file_id,
323 to_parent_file_id,
324 check_name_mode: CheckNameMode::AutoRename,
325 })
326 .map_err(new_json_serialize_error)?;
327 let req = Request::post(format!("{}/adrive/v1.0/openFile/move", self.endpoint))
328 .extension(Operation::Write)
329 .body(Buffer::from(body))
330 .map_err(new_request_build_error)?;
331 self.send(req, token.as_deref()).await?;
332 Ok(())
333 }
334
335 pub async fn update_path(&self, file_id: &str, name: &str) -> Result<()> {
336 let (token, drive_id) = self.get_token_and_drive().await?;
337 let body = serde_json::to_vec(&UpdatePathRequest {
338 drive_id: &drive_id,
339 file_id,
340 name,
341 check_name_mode: CheckNameMode::Refuse,
342 })
343 .map_err(new_json_serialize_error)?;
344 let req = Request::post(format!("{}/adrive/v1.0/openFile/update", self.endpoint))
345 .extension(Operation::Write)
346 .body(Buffer::from(body))
347 .map_err(new_request_build_error)?;
348 self.send(req, token.as_deref()).await?;
349 Ok(())
350 }
351
352 pub async fn copy_path(
353 &self,
354 file_id: &str,
355 to_parent_file_id: &str,
356 auto_rename: bool,
357 ) -> Result<Buffer> {
358 let (token, drive_id) = self.get_token_and_drive().await?;
359 let body = serde_json::to_vec(&CopyPathRequest {
360 drive_id: &drive_id,
361 file_id,
362 to_parent_file_id,
363 auto_rename,
364 })
365 .map_err(new_json_serialize_error)?;
366 let req = Request::post(format!("{}/adrive/v1.0/openFile/copy", self.endpoint))
367 .extension(Operation::Copy)
368 .body(Buffer::from(body))
369 .map_err(new_request_build_error)?;
370 self.send(req, token.as_deref()).await
371 }
372
373 pub async fn delete_path(&self, file_id: &str) -> Result<()> {
374 let (token, drive_id) = self.get_token_and_drive().await?;
375 let body = serde_json::to_vec(&FileRequest {
376 drive_id: &drive_id,
377 file_id,
378 })
379 .map_err(new_json_serialize_error)?;
380 let req = Request::post(format!("{}/adrive/v1.0/openFile/delete", self.endpoint))
381 .extension(Operation::Delete)
382 .body(Buffer::from(body))
383 .map_err(new_request_build_error)?;
384 self.send(req, token.as_deref()).await?;
385 Ok(())
386 }
387
388 pub async fn list(
389 &self,
390 parent_file_id: &str,
391 limit: Option<usize>,
392 marker: Option<String>,
393 ) -> Result<Buffer> {
394 let (token, drive_id) = self.get_token_and_drive().await?;
395 let body = serde_json::to_vec(&ListRequest {
396 drive_id: &drive_id,
397 parent_file_id,
398 limit,
399 marker: marker.as_deref(),
400 })
401 .map_err(new_json_serialize_error)?;
402 let req = Request::post(format!("{}/adrive/v1.0/openFile/list", self.endpoint))
403 .extension(Operation::List)
404 .body(Buffer::from(body))
405 .map_err(new_request_build_error)?;
406 self.send(req, token.as_deref()).await
407 }
408
409 pub async fn complete(&self, file_id: &str, upload_id: &str) -> Result<Buffer> {
410 let (token, drive_id) = self.get_token_and_drive().await?;
411 let body = serde_json::to_vec(&CompleteRequest {
412 drive_id: &drive_id,
413 file_id,
414 upload_id,
415 })
416 .map_err(new_json_serialize_error)?;
417 let req = Request::post(format!("{}/adrive/v1.0/openFile/complete", self.endpoint))
418 .extension(Operation::Write)
419 .body(Buffer::from(body))
420 .map_err(new_request_build_error)?;
421 self.send(req, token.as_deref()).await
422 }
423
424 async fn get_upload_url(
425 &self,
426 file_id: &str,
427 upload_id: &str,
428 part_number: usize,
429 ) -> Result<String> {
430 let (token, drive_id) = self.get_token_and_drive().await?;
431 let part_info_list = vec![PartInfoItem {
432 part_number: Some(part_number),
433 }];
434 let body = serde_json::to_vec(&GetUploadRequest {
435 drive_id: &drive_id,
436 file_id,
437 upload_id,
438 part_info_list: Some(part_info_list),
439 })
440 .map_err(new_json_serialize_error)?;
441
442 let req = Request::post(format!(
443 "{}/adrive/v1.0/openFile/getUploadUrl",
444 self.endpoint
445 ))
446 .extension(Operation::Write)
447 .body(Buffer::from(body))
448 .map_err(new_request_build_error)?;
449
450 let res = self.send(req, token.as_deref()).await?;
451
452 let mut output: UploadUrlResponse =
453 serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
454
455 let Some(upload_url) = output
456 .part_info_list
457 .take()
458 .map(|mut list| list.swap_remove(0))
459 .map(|part_info| part_info.upload_url)
460 else {
461 return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
462 };
463
464 Ok(upload_url)
465 }
466 pub async fn upload(
467 &self,
468 file_id: &str,
469 upload_id: &str,
470 part_number: usize,
471 body: Buffer,
472 ) -> Result<Buffer> {
473 let upload_url = self.get_upload_url(file_id, upload_id, part_number).await?;
474 let req = Request::put(upload_url)
475 .extension(Operation::Write)
476 .body(body)
477 .map_err(new_request_build_error)?;
478 self.send(req, None).await
479 }
480}
481
482pub struct RapidUpload {
483 pub pre_hash: Option<String>,
484 pub content_hash: Option<String>,
485 pub proof_code: Option<String>,
486}
487
488#[derive(Debug, Deserialize)]
489pub struct RefreshTokenResponse {
490 pub access_token: String,
491 pub expires_in: i64,
492 pub refresh_token: String,
493}
494
495#[derive(Debug, Deserialize)]
496pub struct DriveInfoResponse {
497 pub default_drive_id: String,
498 pub resource_drive_id: Option<String>,
499 pub backup_drive_id: Option<String>,
500}
501
502#[derive(Debug, Serialize)]
503#[serde(rename_all = "snake_case")]
504pub enum CreateType {
505 File,
506 Folder,
507}
508
509#[derive(Debug, Serialize)]
510#[serde(rename_all = "snake_case")]
511pub enum CheckNameMode {
512 Refuse,
513 AutoRename,
514}
515
516#[derive(Deserialize)]
517pub struct UploadUrlResponse {
518 pub part_info_list: Option<Vec<PartInfo>>,
519}
520
521#[derive(Deserialize)]
522pub struct CreateResponse {
523 pub file_id: String,
524 pub upload_id: Option<String>,
525 pub exist: Option<bool>,
526}
527
528#[derive(Serialize, Deserialize)]
529pub struct PartInfo {
530 pub etag: Option<String>,
531 pub part_number: usize,
532 pub part_size: Option<u64>,
533 pub upload_url: String,
534 pub content_type: Option<String>,
535}
536
537#[derive(Deserialize)]
538pub struct AliyunDriveFileList {
539 pub items: Vec<AliyunDriveFile>,
540 pub next_marker: Option<String>,
541}
542
543#[derive(Deserialize)]
544pub struct CopyResponse {
545 pub file_id: String,
546}
547
548#[derive(Deserialize)]
549pub struct AliyunDriveFile {
550 pub file_id: String,
551 pub parent_file_id: String,
552 pub name: String,
553 pub size: Option<u64>,
554 pub content_type: Option<String>,
555 #[serde(rename = "type")]
556 pub path_type: String,
557 pub updated_at: String,
558}
559
560#[derive(Deserialize)]
561pub struct GetDownloadUrlResponse {
562 pub url: String,
563}
564
565#[derive(Serialize)]
566pub struct AccessTokenRequest<'a> {
567 refresh_token: &'a str,
568 grant_type: &'a str,
569 client_id: &'a str,
570 client_secret: &'a str,
571}
572
573#[derive(Serialize)]
574pub struct GetByPathRequest<'a> {
575 drive_id: &'a str,
576 file_path: &'a str,
577}
578
579#[derive(Serialize)]
580pub struct CreateRequest<'a> {
581 drive_id: &'a str,
582 parent_file_id: &'a str,
583 name: &'a str,
584 #[serde(rename = "type")]
585 typ: CreateType,
586 check_name_mode: CheckNameMode,
587 size: Option<u64>,
588 pre_hash: Option<&'a str>,
589 content_hash: Option<&'a str>,
590 content_hash_name: Option<&'a str>,
591 proof_code: Option<&'a str>,
592 proof_version: Option<&'a str>,
593}
594
595#[derive(Serialize)]
596pub struct FileRequest<'a> {
597 drive_id: &'a str,
598 file_id: &'a str,
599}
600
601#[derive(Serialize)]
602pub struct MovePathRequest<'a> {
603 drive_id: &'a str,
604 file_id: &'a str,
605 to_parent_file_id: &'a str,
606 check_name_mode: CheckNameMode,
607}
608
609#[derive(Serialize)]
610pub struct UpdatePathRequest<'a> {
611 drive_id: &'a str,
612 file_id: &'a str,
613 name: &'a str,
614 check_name_mode: CheckNameMode,
615}
616
617#[derive(Serialize)]
618pub struct CopyPathRequest<'a> {
619 drive_id: &'a str,
620 file_id: &'a str,
621 to_parent_file_id: &'a str,
622 auto_rename: bool,
623}
624
625#[derive(Serialize)]
626pub struct ListRequest<'a> {
627 drive_id: &'a str,
628 parent_file_id: &'a str,
629 limit: Option<usize>,
630 marker: Option<&'a str>,
631}
632
633#[derive(Serialize)]
634pub struct CompleteRequest<'a> {
635 drive_id: &'a str,
636 file_id: &'a str,
637 upload_id: &'a str,
638}
639
640#[derive(Serialize)]
641pub struct GetUploadRequest<'a> {
642 drive_id: &'a str,
643 file_id: &'a str,
644 upload_id: &'a str,
645 part_info_list: Option<Vec<PartInfoItem>>,
646}
647
648#[derive(Serialize)]
649pub struct PartInfoItem {
650 part_number: Option<usize>,
651}