1use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use chrono::Utc;
23use http::header::HeaderValue;
24use http::header::{self};
25use http::Request;
26use http::{Method, Response};
27use serde::Deserialize;
28use serde::Serialize;
29use tokio::sync::Mutex;
30
31use super::error::parse_error;
32use crate::raw::*;
33use crate::*;
34
35#[derive(Debug, Deserialize, Default, Clone)]
37pub enum DriveType {
38 #[default]
40 Default,
41 Backup,
45 Resource,
49}
50
51pub enum AliyunDriveSign {
53 Refresh(String, String, String, Option<String>, i64),
54 Access(String),
55}
56
57pub struct AliyunDriveSigner {
58 pub drive_id: Option<String>,
59 pub sign: AliyunDriveSign,
60}
61
62pub struct AliyunDriveCore {
63 pub info: Arc<AccessorInfo>,
64
65 pub endpoint: String,
66 pub root: String,
67 pub drive_type: DriveType,
68
69 pub signer: Arc<Mutex<AliyunDriveSigner>>,
70 pub dir_lock: Arc<Mutex<()>>,
71}
72
73impl Debug for AliyunDriveCore {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("AliyunDriveCore")
76 .field("root", &self.root)
77 .field("drive_type", &self.drive_type)
78 .finish_non_exhaustive()
79 }
80}
81
82impl AliyunDriveCore {
83 async fn send(&self, mut req: Request<Buffer>, token: Option<&str>) -> Result<Buffer> {
84 req.headers_mut().insert(
86 header::USER_AGENT,
87 HeaderValue::from_str(&format!("opendal/{}", VERSION))
88 .expect("user agent must be valid header value"),
89 );
90 if req.method() == Method::POST {
91 req.headers_mut().insert(
92 header::CONTENT_TYPE,
93 HeaderValue::from_static("application/json;charset=UTF-8"),
94 );
95 }
96 if let Some(token) = token {
97 req.headers_mut().insert(
98 header::AUTHORIZATION,
99 HeaderValue::from_str(&format_authorization_by_bearer(token)?)
100 .expect("access token must be valid header value"),
101 );
102 }
103 let res = self.info.http_client().send(req).await?;
104 if !res.status().is_success() {
105 return Err(parse_error(res));
106 }
107 Ok(res.into_body())
108 }
109
110 async fn get_access_token(
111 &self,
112 client_id: &str,
113 client_secret: &str,
114 refresh_token: &str,
115 ) -> Result<Buffer> {
116 let body = serde_json::to_vec(&AccessTokenRequest {
117 refresh_token,
118 grant_type: "refresh_token",
119 client_id,
120 client_secret,
121 })
122 .map_err(new_json_serialize_error)?;
123 let req = Request::post(format!("{}/oauth/access_token", self.endpoint))
124 .body(Buffer::from(body))
125 .map_err(new_request_build_error)?;
126 self.send(req, None).await
127 }
128
129 async fn get_drive_id(&self, token: Option<&str>) -> Result<Buffer> {
130 let req = Request::post(format!("{}/adrive/v1.0/user/getDriveInfo", self.endpoint))
131 .body(Buffer::new())
132 .map_err(new_request_build_error)?;
133 self.send(req, token).await
134 }
135
136 pub async fn get_token_and_drive(&self) -> Result<(Option<String>, String)> {
137 let mut signer = self.signer.lock().await;
138 let token = match &mut signer.sign {
139 AliyunDriveSign::Access(access_token) => Some(access_token.clone()),
140 AliyunDriveSign::Refresh(
141 client_id,
142 client_secret,
143 refresh_token,
144 access_token,
145 expire_at,
146 ) => {
147 if *expire_at < Utc::now().timestamp() || access_token.is_none() {
148 let res = self
149 .get_access_token(client_id, client_secret, refresh_token)
150 .await?;
151 let output: RefreshTokenResponse = serde_json::from_reader(res.reader())
152 .map_err(new_json_deserialize_error)?;
153 *access_token = Some(output.access_token);
154 *expire_at = output.expires_in + Utc::now().timestamp();
155 *refresh_token = output.refresh_token;
156 }
157 access_token.clone()
158 }
159 };
160 let Some(drive_id) = &signer.drive_id else {
161 let res = self.get_drive_id(token.as_deref()).await?;
162 let output: DriveInfoResponse =
163 serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
164 let drive_id = match self.drive_type {
165 DriveType::Default => output.default_drive_id,
166 DriveType::Backup => output.backup_drive_id.unwrap_or(output.default_drive_id),
167 DriveType::Resource => output.resource_drive_id.unwrap_or(output.default_drive_id),
168 };
169 signer.drive_id = Some(drive_id.clone());
170 return Ok((token, drive_id));
171 };
172 Ok((token, drive_id.clone()))
173 }
174
175 pub fn build_path(&self, path: &str, rooted: bool) -> String {
176 let file_path = if rooted {
177 build_rooted_abs_path(&self.root, path)
178 } else {
179 build_abs_path(&self.root, path)
180 };
181 let file_path = file_path.strip_suffix('/').unwrap_or(&file_path);
182 if file_path.is_empty() {
183 return "/".to_string();
184 }
185 file_path.to_string()
186 }
187
188 pub async fn get_by_path(&self, path: &str) -> Result<Buffer> {
189 let file_path = self.build_path(path, true);
190 let req = Request::post(format!(
191 "{}/adrive/v1.0/openFile/get_by_path",
192 self.endpoint
193 ));
194 let (token, drive_id) = self.get_token_and_drive().await?;
195 let body = serde_json::to_vec(&GetByPathRequest {
196 drive_id: &drive_id,
197 file_path: &file_path,
198 })
199 .map_err(new_json_serialize_error)?;
200 let req = req
201 .extension(Operation::Read)
202 .body(Buffer::from(body))
203 .map_err(new_request_build_error)?;
204 self.send(req, token.as_deref()).await
205 }
206
207 pub async fn ensure_dir_exists(&self, path: &str) -> Result<String> {
208 let file_path = self.build_path(path, false);
209 if file_path == "/" {
210 return Ok("root".to_string());
211 }
212 let file_path = file_path.strip_suffix('/').unwrap_or(&file_path);
213 let paths = file_path.split('/').collect::<Vec<&str>>();
214 let mut parent: Option<String> = None;
215 for path in paths {
216 let _guard = self.dir_lock.lock().await;
217 let res = self
218 .create(
219 parent.as_deref(),
220 path,
221 CreateType::Folder,
222 CheckNameMode::Refuse,
223 )
224 .await?;
225 let output: CreateResponse =
226 serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
227 parent = Some(output.file_id);
228 }
229 Ok(parent.expect("ensure_dir_exists must succeed"))
230 }
231
232 pub async fn create_with_rapid_upload(
233 &self,
234 parent_file_id: Option<&str>,
235 name: &str,
236 typ: CreateType,
237 check_name_mode: CheckNameMode,
238 size: Option<u64>,
239 rapid_upload: Option<RapidUpload>,
240 ) -> Result<Buffer> {
241 let mut content_hash = None;
242 let mut proof_code = None;
243 let mut pre_hash = None;
244 if let Some(rapid_upload) = rapid_upload {
245 content_hash = rapid_upload.content_hash;
246 proof_code = rapid_upload.proof_code;
247 pre_hash = rapid_upload.pre_hash;
248 }
249
250 let (token, drive_id) = self.get_token_and_drive().await?;
251 let body = serde_json::to_vec(&CreateRequest {
252 drive_id: &drive_id,
253 parent_file_id: parent_file_id.unwrap_or("root"),
254 name,
255 typ,
256 check_name_mode,
257 size,
258 pre_hash: pre_hash.as_deref(),
259 content_hash: content_hash.as_deref(),
260 content_hash_name: content_hash.is_some().then_some("sha1"),
261 proof_code: proof_code.as_deref(),
262 proof_version: proof_code.is_some().then_some("v1"),
263 })
264 .map_err(new_json_serialize_error)?;
265 let req = Request::post(format!("{}/adrive/v1.0/openFile/create", self.endpoint))
266 .extension(Operation::Write)
267 .body(Buffer::from(body))
268 .map_err(new_request_build_error)?;
269 self.send(req, token.as_deref()).await
270 }
271
272 pub async fn create(
273 &self,
274 parent_file_id: Option<&str>,
275 name: &str,
276 typ: CreateType,
277 check_name_mode: CheckNameMode,
278 ) -> Result<Buffer> {
279 self.create_with_rapid_upload(parent_file_id, name, typ, check_name_mode, None, None)
280 .await
281 }
282
283 async fn get_download_url(&self, file_id: &str) -> Result<String> {
284 let (token, drive_id) = self.get_token_and_drive().await?;
285 let body = serde_json::to_vec(&FileRequest {
286 drive_id: &drive_id,
287 file_id,
288 })
289 .map_err(new_json_serialize_error)?;
290
291 let req = Request::post(format!(
292 "{}/adrive/v1.0/openFile/getDownloadUrl",
293 self.endpoint
294 ))
295 .extension(Operation::Read)
296 .body(Buffer::from(body))
297 .map_err(new_request_build_error)?;
298
299 let res = self.send(req, token.as_deref()).await?;
300
301 let output: GetDownloadUrlResponse =
302 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
303
304 Ok(output.url)
305 }
306
307 pub async fn download(&self, file_id: &str, range: BytesRange) -> Result<Response<HttpBody>> {
308 let download_url = self.get_download_url(file_id).await?;
309 let req = Request::get(download_url)
310 .extension(Operation::Read)
311 .header(header::RANGE, range.to_header())
312 .body(Buffer::new())
313 .map_err(new_request_build_error)?;
314 self.info.http_client().fetch(req).await
315 }
316
317 pub async fn move_path(&self, file_id: &str, to_parent_file_id: &str) -> Result<()> {
318 let (token, drive_id) = self.get_token_and_drive().await?;
319 let body = serde_json::to_vec(&MovePathRequest {
320 drive_id: &drive_id,
321 file_id,
322 to_parent_file_id,
323 check_name_mode: CheckNameMode::AutoRename,
324 })
325 .map_err(new_json_serialize_error)?;
326 let req = Request::post(format!("{}/adrive/v1.0/openFile/move", self.endpoint))
327 .extension(Operation::Write)
328 .body(Buffer::from(body))
329 .map_err(new_request_build_error)?;
330 self.send(req, token.as_deref()).await?;
331 Ok(())
332 }
333
334 pub async fn update_path(&self, file_id: &str, name: &str) -> Result<()> {
335 let (token, drive_id) = self.get_token_and_drive().await?;
336 let body = serde_json::to_vec(&UpdatePathRequest {
337 drive_id: &drive_id,
338 file_id,
339 name,
340 check_name_mode: CheckNameMode::Refuse,
341 })
342 .map_err(new_json_serialize_error)?;
343 let req = Request::post(format!("{}/adrive/v1.0/openFile/update", self.endpoint))
344 .extension(Operation::Write)
345 .body(Buffer::from(body))
346 .map_err(new_request_build_error)?;
347 self.send(req, token.as_deref()).await?;
348 Ok(())
349 }
350
351 pub async fn copy_path(
352 &self,
353 file_id: &str,
354 to_parent_file_id: &str,
355 auto_rename: bool,
356 ) -> Result<Buffer> {
357 let (token, drive_id) = self.get_token_and_drive().await?;
358 let body = serde_json::to_vec(&CopyPathRequest {
359 drive_id: &drive_id,
360 file_id,
361 to_parent_file_id,
362 auto_rename,
363 })
364 .map_err(new_json_serialize_error)?;
365 let req = Request::post(format!("{}/adrive/v1.0/openFile/copy", self.endpoint))
366 .extension(Operation::Copy)
367 .body(Buffer::from(body))
368 .map_err(new_request_build_error)?;
369 self.send(req, token.as_deref()).await
370 }
371
372 pub async fn delete_path(&self, file_id: &str) -> Result<()> {
373 let (token, drive_id) = self.get_token_and_drive().await?;
374 let body = serde_json::to_vec(&FileRequest {
375 drive_id: &drive_id,
376 file_id,
377 })
378 .map_err(new_json_serialize_error)?;
379 let req = Request::post(format!("{}/adrive/v1.0/openFile/delete", self.endpoint))
380 .extension(Operation::Delete)
381 .body(Buffer::from(body))
382 .map_err(new_request_build_error)?;
383 self.send(req, token.as_deref()).await?;
384 Ok(())
385 }
386
387 pub async fn list(
388 &self,
389 parent_file_id: &str,
390 limit: Option<usize>,
391 marker: Option<String>,
392 ) -> Result<Buffer> {
393 let (token, drive_id) = self.get_token_and_drive().await?;
394 let body = serde_json::to_vec(&ListRequest {
395 drive_id: &drive_id,
396 parent_file_id,
397 limit,
398 marker: marker.as_deref(),
399 })
400 .map_err(new_json_serialize_error)?;
401 let req = Request::post(format!("{}/adrive/v1.0/openFile/list", self.endpoint))
402 .extension(Operation::List)
403 .body(Buffer::from(body))
404 .map_err(new_request_build_error)?;
405 self.send(req, token.as_deref()).await
406 }
407
408 pub async fn complete(&self, file_id: &str, upload_id: &str) -> Result<Buffer> {
409 let (token, drive_id) = self.get_token_and_drive().await?;
410 let body = serde_json::to_vec(&CompleteRequest {
411 drive_id: &drive_id,
412 file_id,
413 upload_id,
414 })
415 .map_err(new_json_serialize_error)?;
416 let req = Request::post(format!("{}/adrive/v1.0/openFile/complete", self.endpoint))
417 .extension(Operation::Write)
418 .body(Buffer::from(body))
419 .map_err(new_request_build_error)?;
420 self.send(req, token.as_deref()).await
421 }
422
423 async fn get_upload_url(
424 &self,
425 file_id: &str,
426 upload_id: &str,
427 part_number: usize,
428 ) -> Result<String> {
429 let (token, drive_id) = self.get_token_and_drive().await?;
430 let part_info_list = vec![PartInfoItem {
431 part_number: Some(part_number),
432 }];
433 let body = serde_json::to_vec(&GetUploadRequest {
434 drive_id: &drive_id,
435 file_id,
436 upload_id,
437 part_info_list: Some(part_info_list),
438 })
439 .map_err(new_json_serialize_error)?;
440
441 let req = Request::post(format!(
442 "{}/adrive/v1.0/openFile/getUploadUrl",
443 self.endpoint
444 ))
445 .extension(Operation::Write)
446 .body(Buffer::from(body))
447 .map_err(new_request_build_error)?;
448
449 let res = self.send(req, token.as_deref()).await?;
450
451 let mut output: UploadUrlResponse =
452 serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
453
454 let Some(upload_url) = output
455 .part_info_list
456 .take()
457 .map(|mut list| list.swap_remove(0))
458 .map(|part_info| part_info.upload_url)
459 else {
460 return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
461 };
462
463 Ok(upload_url)
464 }
465 pub async fn upload(
466 &self,
467 file_id: &str,
468 upload_id: &str,
469 part_number: usize,
470 body: Buffer,
471 ) -> Result<Buffer> {
472 let upload_url = self.get_upload_url(file_id, upload_id, part_number).await?;
473 let req = Request::put(upload_url)
474 .extension(Operation::Write)
475 .body(body)
476 .map_err(new_request_build_error)?;
477 self.send(req, None).await
478 }
479}
480
481pub struct RapidUpload {
482 pub pre_hash: Option<String>,
483 pub content_hash: Option<String>,
484 pub proof_code: Option<String>,
485}
486
487#[derive(Debug, Deserialize)]
488pub struct RefreshTokenResponse {
489 pub access_token: String,
490 pub expires_in: i64,
491 pub refresh_token: String,
492}
493
494#[derive(Debug, Deserialize)]
495pub struct DriveInfoResponse {
496 pub default_drive_id: String,
497 pub resource_drive_id: Option<String>,
498 pub backup_drive_id: Option<String>,
499}
500
501#[derive(Debug, Serialize)]
502#[serde(rename_all = "snake_case")]
503pub enum CreateType {
504 File,
505 Folder,
506}
507
508#[derive(Debug, Serialize)]
509#[serde(rename_all = "snake_case")]
510pub enum CheckNameMode {
511 Refuse,
512 AutoRename,
513}
514
515#[derive(Deserialize)]
516pub struct UploadUrlResponse {
517 pub part_info_list: Option<Vec<PartInfo>>,
518}
519
520#[derive(Deserialize)]
521pub struct CreateResponse {
522 pub file_id: String,
523 pub upload_id: Option<String>,
524 pub exist: Option<bool>,
525}
526
527#[derive(Serialize, Deserialize)]
528pub struct PartInfo {
529 pub etag: Option<String>,
530 pub part_number: usize,
531 pub part_size: Option<u64>,
532 pub upload_url: String,
533 pub content_type: Option<String>,
534}
535
536#[derive(Deserialize)]
537pub struct AliyunDriveFileList {
538 pub items: Vec<AliyunDriveFile>,
539 pub next_marker: Option<String>,
540}
541
542#[derive(Deserialize)]
543pub struct CopyResponse {
544 pub file_id: String,
545}
546
547#[derive(Deserialize)]
548pub struct AliyunDriveFile {
549 pub file_id: String,
550 pub parent_file_id: String,
551 pub name: String,
552 pub size: Option<u64>,
553 pub content_type: Option<String>,
554 #[serde(rename = "type")]
555 pub path_type: String,
556 pub updated_at: String,
557}
558
559#[derive(Deserialize)]
560pub struct GetDownloadUrlResponse {
561 pub url: String,
562}
563
564#[derive(Serialize)]
565pub struct AccessTokenRequest<'a> {
566 refresh_token: &'a str,
567 grant_type: &'a str,
568 client_id: &'a str,
569 client_secret: &'a str,
570}
571
572#[derive(Serialize)]
573pub struct GetByPathRequest<'a> {
574 drive_id: &'a str,
575 file_path: &'a str,
576}
577
578#[derive(Serialize)]
579pub struct CreateRequest<'a> {
580 drive_id: &'a str,
581 parent_file_id: &'a str,
582 name: &'a str,
583 #[serde(rename = "type")]
584 typ: CreateType,
585 check_name_mode: CheckNameMode,
586 size: Option<u64>,
587 pre_hash: Option<&'a str>,
588 content_hash: Option<&'a str>,
589 content_hash_name: Option<&'a str>,
590 proof_code: Option<&'a str>,
591 proof_version: Option<&'a str>,
592}
593
594#[derive(Serialize)]
595pub struct FileRequest<'a> {
596 drive_id: &'a str,
597 file_id: &'a str,
598}
599
600#[derive(Serialize)]
601pub struct MovePathRequest<'a> {
602 drive_id: &'a str,
603 file_id: &'a str,
604 to_parent_file_id: &'a str,
605 check_name_mode: CheckNameMode,
606}
607
608#[derive(Serialize)]
609pub struct UpdatePathRequest<'a> {
610 drive_id: &'a str,
611 file_id: &'a str,
612 name: &'a str,
613 check_name_mode: CheckNameMode,
614}
615
616#[derive(Serialize)]
617pub struct CopyPathRequest<'a> {
618 drive_id: &'a str,
619 file_id: &'a str,
620 to_parent_file_id: &'a str,
621 auto_rename: bool,
622}
623
624#[derive(Serialize)]
625pub struct ListRequest<'a> {
626 drive_id: &'a str,
627 parent_file_id: &'a str,
628 limit: Option<usize>,
629 marker: Option<&'a str>,
630}
631
632#[derive(Serialize)]
633pub struct CompleteRequest<'a> {
634 drive_id: &'a str,
635 file_id: &'a str,
636 upload_id: &'a str,
637}
638
639#[derive(Serialize)]
640pub struct GetUploadRequest<'a> {
641 drive_id: &'a str,
642 file_id: &'a str,
643 upload_id: &'a str,
644 part_info_list: Option<Vec<PartInfoItem>>,
645}
646
647#[derive(Serialize)]
648pub struct PartInfoItem {
649 part_number: Option<usize>,
650}