1use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use chrono::Utc;
23use http::header::HeaderValue;
24use http::header::{self};
25use http::Method;
26use http::Request;
27use serde::Deserialize;
28use serde::Serialize;
29use tokio::sync::Mutex;
30
31use super::error::parse_error;
32use crate::raw::*;
33use crate::*;
34
35#[derive(Debug, Deserialize, Default, Clone)]
37pub enum DriveType {
38 #[default]
40 Default,
41 Backup,
45 Resource,
49}
50
51pub enum AliyunDriveSign {
53 Refresh(String, String, String, Option<String>, i64),
54 Access(String),
55}
56
57pub struct AliyunDriveSigner {
58 pub drive_id: Option<String>,
59 pub sign: AliyunDriveSign,
60}
61
62pub struct AliyunDriveCore {
63 pub info: Arc<AccessorInfo>,
64
65 pub endpoint: String,
66 pub root: String,
67 pub drive_type: DriveType,
68
69 pub signer: Arc<Mutex<AliyunDriveSigner>>,
70 pub dir_lock: Arc<Mutex<()>>,
71}
72
73impl Debug for AliyunDriveCore {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("AliyunDriveCore")
76 .field("root", &self.root)
77 .field("drive_type", &self.drive_type)
78 .finish_non_exhaustive()
79 }
80}
81
82impl AliyunDriveCore {
83 async fn send(&self, mut req: Request<Buffer>, token: Option<&str>) -> Result<Buffer> {
84 req.headers_mut().insert(
86 header::USER_AGENT,
87 HeaderValue::from_str(&format!("opendal/{}", VERSION))
88 .expect("user agent must be valid header value"),
89 );
90 if req.method() == Method::POST {
91 req.headers_mut().insert(
92 header::CONTENT_TYPE,
93 HeaderValue::from_static("application/json;charset=UTF-8"),
94 );
95 }
96 if let Some(token) = token {
97 req.headers_mut().insert(
98 header::AUTHORIZATION,
99 HeaderValue::from_str(&format_authorization_by_bearer(token)?)
100 .expect("access token must be valid header value"),
101 );
102 }
103 let res = self.info.http_client().send(req).await?;
104 if !res.status().is_success() {
105 return Err(parse_error(res));
106 }
107 Ok(res.into_body())
108 }
109
110 async fn get_access_token(
111 &self,
112 client_id: &str,
113 client_secret: &str,
114 refresh_token: &str,
115 ) -> Result<Buffer> {
116 let body = serde_json::to_vec(&AccessTokenRequest {
117 refresh_token,
118 grant_type: "refresh_token",
119 client_id,
120 client_secret,
121 })
122 .map_err(new_json_serialize_error)?;
123 let req = Request::post(format!("{}/oauth/access_token", self.endpoint))
124 .body(Buffer::from(body))
125 .map_err(new_request_build_error)?;
126 self.send(req, None).await
127 }
128
129 async fn get_drive_id(&self, token: Option<&str>) -> Result<Buffer> {
130 let req = Request::post(format!("{}/adrive/v1.0/user/getDriveInfo", self.endpoint))
131 .body(Buffer::new())
132 .map_err(new_request_build_error)?;
133 self.send(req, token).await
134 }
135
136 pub async fn get_token_and_drive(&self) -> Result<(Option<String>, String)> {
137 let mut signer = self.signer.lock().await;
138 let token = match &mut signer.sign {
139 AliyunDriveSign::Access(access_token) => Some(access_token.clone()),
140 AliyunDriveSign::Refresh(
141 client_id,
142 client_secret,
143 refresh_token,
144 access_token,
145 expire_at,
146 ) => {
147 if *expire_at < Utc::now().timestamp() || access_token.is_none() {
148 let res = self
149 .get_access_token(client_id, client_secret, refresh_token)
150 .await?;
151 let output: RefreshTokenResponse = serde_json::from_reader(res.reader())
152 .map_err(new_json_deserialize_error)?;
153 *access_token = Some(output.access_token);
154 *expire_at = output.expires_in + Utc::now().timestamp();
155 *refresh_token = output.refresh_token;
156 }
157 access_token.clone()
158 }
159 };
160 let Some(drive_id) = &signer.drive_id else {
161 let res = self.get_drive_id(token.as_deref()).await?;
162 let output: DriveInfoResponse =
163 serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
164 let drive_id = match self.drive_type {
165 DriveType::Default => output.default_drive_id,
166 DriveType::Backup => output.backup_drive_id.unwrap_or(output.default_drive_id),
167 DriveType::Resource => output.resource_drive_id.unwrap_or(output.default_drive_id),
168 };
169 signer.drive_id = Some(drive_id.clone());
170 return Ok((token, drive_id));
171 };
172 Ok((token, drive_id.clone()))
173 }
174
175 pub fn build_path(&self, path: &str, rooted: bool) -> String {
176 let file_path = if rooted {
177 build_rooted_abs_path(&self.root, path)
178 } else {
179 build_abs_path(&self.root, path)
180 };
181 let file_path = file_path.strip_suffix('/').unwrap_or(&file_path);
182 if file_path.is_empty() {
183 return "/".to_string();
184 }
185 file_path.to_string()
186 }
187
188 pub async fn get_by_path(&self, path: &str) -> Result<Buffer> {
189 let file_path = self.build_path(path, true);
190 let req = Request::post(format!(
191 "{}/adrive/v1.0/openFile/get_by_path",
192 self.endpoint
193 ));
194 let (token, drive_id) = self.get_token_and_drive().await?;
195 let body = serde_json::to_vec(&GetByPathRequest {
196 drive_id: &drive_id,
197 file_path: &file_path,
198 })
199 .map_err(new_json_serialize_error)?;
200 let req = req
201 .extension(Operation::Read)
203 .body(Buffer::from(body))
204 .map_err(new_request_build_error)?;
205 self.send(req, token.as_deref()).await
206 }
207
208 pub async fn ensure_dir_exists(&self, path: &str) -> Result<String> {
209 let file_path = self.build_path(path, false);
210 if file_path == "/" {
211 return Ok("root".to_string());
212 }
213 let file_path = file_path.strip_suffix('/').unwrap_or(&file_path);
214 let paths = file_path.split('/').collect::<Vec<&str>>();
215 let mut parent: Option<String> = None;
216 for path in paths {
217 let _guard = self.dir_lock.lock().await;
218 let res = self
219 .create(
220 parent.as_deref(),
221 path,
222 CreateType::Folder,
223 CheckNameMode::Refuse,
224 )
225 .await?;
226 let output: CreateResponse =
227 serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
228 parent = Some(output.file_id);
229 }
230 Ok(parent.expect("ensure_dir_exists must succeed"))
231 }
232
233 pub async fn create_with_rapid_upload(
234 &self,
235 parent_file_id: Option<&str>,
236 name: &str,
237 typ: CreateType,
238 check_name_mode: CheckNameMode,
239 size: Option<u64>,
240 rapid_upload: Option<RapidUpload>,
241 ) -> Result<Buffer> {
242 let mut content_hash = None;
243 let mut proof_code = None;
244 let mut pre_hash = None;
245 if let Some(rapid_upload) = rapid_upload {
246 content_hash = rapid_upload.content_hash;
247 proof_code = rapid_upload.proof_code;
248 pre_hash = rapid_upload.pre_hash;
249 }
250
251 let (token, drive_id) = self.get_token_and_drive().await?;
252 let body = serde_json::to_vec(&CreateRequest {
253 drive_id: &drive_id,
254 parent_file_id: parent_file_id.unwrap_or("root"),
255 name,
256 typ,
257 check_name_mode,
258 size,
259 pre_hash: pre_hash.as_deref(),
260 content_hash: content_hash.as_deref(),
261 content_hash_name: content_hash.is_some().then_some("sha1"),
262 proof_code: proof_code.as_deref(),
263 proof_version: proof_code.is_some().then_some("v1"),
264 })
265 .map_err(new_json_serialize_error)?;
266 let req = Request::post(format!("{}/adrive/v1.0/openFile/create", self.endpoint))
267 .extension(Operation::Write)
269 .body(Buffer::from(body))
270 .map_err(new_request_build_error)?;
271 self.send(req, token.as_deref()).await
272 }
273
274 pub async fn create(
275 &self,
276 parent_file_id: Option<&str>,
277 name: &str,
278 typ: CreateType,
279 check_name_mode: CheckNameMode,
280 ) -> Result<Buffer> {
281 self.create_with_rapid_upload(parent_file_id, name, typ, check_name_mode, None, None)
282 .await
283 }
284
285 pub async fn get_download_url(&self, file_id: &str) -> Result<String> {
286 let (token, drive_id) = self.get_token_and_drive().await?;
287 let body = serde_json::to_vec(&FileRequest {
288 drive_id: &drive_id,
289 file_id,
290 })
291 .map_err(new_json_serialize_error)?;
292 let req = Request::post(format!(
293 "{}/adrive/v1.0/openFile/getDownloadUrl",
294 self.endpoint
295 ))
296 .extension(Operation::Read)
298 .body(Buffer::from(body))
299 .map_err(new_request_build_error)?;
300 let res = self.send(req, token.as_deref()).await?;
301 let output: GetDownloadUrlResponse =
302 serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
303 Ok(output.url)
304 }
305
306 pub async fn move_path(&self, file_id: &str, to_parent_file_id: &str) -> Result<()> {
307 let (token, drive_id) = self.get_token_and_drive().await?;
308 let body = serde_json::to_vec(&MovePathRequest {
309 drive_id: &drive_id,
310 file_id,
311 to_parent_file_id,
312 check_name_mode: CheckNameMode::AutoRename,
313 })
314 .map_err(new_json_serialize_error)?;
315 let req = Request::post(format!("{}/adrive/v1.0/openFile/move", self.endpoint))
316 .extension(Operation::Write)
318 .body(Buffer::from(body))
319 .map_err(new_request_build_error)?;
320 self.send(req, token.as_deref()).await?;
321 Ok(())
322 }
323
324 pub async fn update_path(&self, file_id: &str, name: &str) -> Result<()> {
325 let (token, drive_id) = self.get_token_and_drive().await?;
326 let body = serde_json::to_vec(&UpdatePathRequest {
327 drive_id: &drive_id,
328 file_id,
329 name,
330 check_name_mode: CheckNameMode::Refuse,
331 })
332 .map_err(new_json_serialize_error)?;
333 let req = Request::post(format!("{}/adrive/v1.0/openFile/update", self.endpoint))
334 .extension(Operation::Write)
336 .body(Buffer::from(body))
337 .map_err(new_request_build_error)?;
338 self.send(req, token.as_deref()).await?;
339 Ok(())
340 }
341
342 pub async fn copy_path(
343 &self,
344 file_id: &str,
345 to_parent_file_id: &str,
346 auto_rename: bool,
347 ) -> Result<Buffer> {
348 let (token, drive_id) = self.get_token_and_drive().await?;
349 let body = serde_json::to_vec(&CopyPathRequest {
350 drive_id: &drive_id,
351 file_id,
352 to_parent_file_id,
353 auto_rename,
354 })
355 .map_err(new_json_serialize_error)?;
356 let req = Request::post(format!("{}/adrive/v1.0/openFile/copy", self.endpoint))
357 .extension(Operation::Copy)
359 .body(Buffer::from(body))
360 .map_err(new_request_build_error)?;
361 self.send(req, token.as_deref()).await
362 }
363
364 pub async fn delete_path(&self, file_id: &str) -> Result<()> {
365 let (token, drive_id) = self.get_token_and_drive().await?;
366 let body = serde_json::to_vec(&FileRequest {
367 drive_id: &drive_id,
368 file_id,
369 })
370 .map_err(new_json_serialize_error)?;
371 let req = Request::post(format!("{}/adrive/v1.0/openFile/delete", self.endpoint))
372 .extension(Operation::Delete)
374 .body(Buffer::from(body))
375 .map_err(new_request_build_error)?;
376 self.send(req, token.as_deref()).await?;
377 Ok(())
378 }
379
380 pub async fn list(
381 &self,
382 parent_file_id: &str,
383 limit: Option<usize>,
384 marker: Option<String>,
385 ) -> Result<Buffer> {
386 let (token, drive_id) = self.get_token_and_drive().await?;
387 let body = serde_json::to_vec(&ListRequest {
388 drive_id: &drive_id,
389 parent_file_id,
390 limit,
391 marker: marker.as_deref(),
392 })
393 .map_err(new_json_serialize_error)?;
394 let req = Request::post(format!("{}/adrive/v1.0/openFile/list", self.endpoint))
395 .extension(Operation::List)
397 .body(Buffer::from(body))
398 .map_err(new_request_build_error)?;
399 self.send(req, token.as_deref()).await
400 }
401
402 pub async fn upload(&self, upload_url: &str, body: Buffer) -> Result<Buffer> {
403 let req = Request::put(upload_url)
404 .extension(Operation::Write)
406 .body(body)
407 .map_err(new_request_build_error)?;
408 self.send(req, None).await
409 }
410
411 pub async fn complete(&self, file_id: &str, upload_id: &str) -> Result<Buffer> {
412 let (token, drive_id) = self.get_token_and_drive().await?;
413 let body = serde_json::to_vec(&CompleteRequest {
414 drive_id: &drive_id,
415 file_id,
416 upload_id,
417 })
418 .map_err(new_json_serialize_error)?;
419 let req = Request::post(format!("{}/adrive/v1.0/openFile/complete", self.endpoint))
420 .extension(Operation::Write)
422 .body(Buffer::from(body))
423 .map_err(new_request_build_error)?;
424 self.send(req, token.as_deref()).await
425 }
426
427 pub async fn get_upload_url(
428 &self,
429 file_id: &str,
430 upload_id: &str,
431 part_number: Option<usize>,
432 ) -> Result<Buffer> {
433 let (token, drive_id) = self.get_token_and_drive().await?;
434 let part_info_list = part_number.map(|part_number| {
435 vec![PartInfoItem {
436 part_number: Some(part_number),
437 }]
438 });
439 let body = serde_json::to_vec(&GetUploadRequest {
440 drive_id: &drive_id,
441 file_id,
442 upload_id,
443 part_info_list,
444 })
445 .map_err(new_json_serialize_error)?;
446 let req = Request::post(format!(
447 "{}/adrive/v1.0/openFile/getUploadUrl",
448 self.endpoint
449 ))
450 .extension(Operation::Write)
452 .body(Buffer::from(body))
453 .map_err(new_request_build_error)?;
454 self.send(req, token.as_deref()).await
455 }
456}
457
458pub struct RapidUpload {
459 pub pre_hash: Option<String>,
460 pub content_hash: Option<String>,
461 pub proof_code: Option<String>,
462}
463
464#[derive(Debug, Deserialize)]
465pub struct RefreshTokenResponse {
466 pub access_token: String,
467 pub expires_in: i64,
468 pub refresh_token: String,
469}
470
471#[derive(Debug, Deserialize)]
472pub struct DriveInfoResponse {
473 pub default_drive_id: String,
474 pub resource_drive_id: Option<String>,
475 pub backup_drive_id: Option<String>,
476}
477
478#[derive(Debug, Serialize)]
479#[serde(rename_all = "snake_case")]
480pub enum CreateType {
481 File,
482 Folder,
483}
484
485#[derive(Debug, Serialize)]
486#[serde(rename_all = "snake_case")]
487pub enum CheckNameMode {
488 Refuse,
489 AutoRename,
490}
491
492#[derive(Deserialize)]
493pub struct UploadUrlResponse {
494 pub part_info_list: Option<Vec<PartInfo>>,
495}
496
497#[derive(Deserialize)]
498pub struct CreateResponse {
499 pub file_id: String,
500 pub upload_id: Option<String>,
501 pub exist: Option<bool>,
502}
503
504#[derive(Serialize, Deserialize)]
505pub struct PartInfo {
506 pub etag: Option<String>,
507 pub part_number: usize,
508 pub part_size: Option<u64>,
509 pub upload_url: String,
510 pub content_type: Option<String>,
511}
512
513#[derive(Deserialize)]
514pub struct AliyunDriveFileList {
515 pub items: Vec<AliyunDriveFile>,
516 pub next_marker: Option<String>,
517}
518
519#[derive(Deserialize)]
520pub struct CopyResponse {
521 pub file_id: String,
522}
523
524#[derive(Deserialize)]
525pub struct AliyunDriveFile {
526 pub file_id: String,
527 pub parent_file_id: String,
528 pub name: String,
529 pub size: Option<u64>,
530 pub content_type: Option<String>,
531 #[serde(rename = "type")]
532 pub path_type: String,
533 pub updated_at: String,
534}
535
536#[derive(Deserialize)]
537pub struct GetDownloadUrlResponse {
538 pub url: String,
539}
540
541#[derive(Serialize)]
542pub struct AccessTokenRequest<'a> {
543 refresh_token: &'a str,
544 grant_type: &'a str,
545 client_id: &'a str,
546 client_secret: &'a str,
547}
548
549#[derive(Serialize)]
550pub struct GetByPathRequest<'a> {
551 drive_id: &'a str,
552 file_path: &'a str,
553}
554
555#[derive(Serialize)]
556pub struct CreateRequest<'a> {
557 drive_id: &'a str,
558 parent_file_id: &'a str,
559 name: &'a str,
560 #[serde(rename = "type")]
561 typ: CreateType,
562 check_name_mode: CheckNameMode,
563 size: Option<u64>,
564 pre_hash: Option<&'a str>,
565 content_hash: Option<&'a str>,
566 content_hash_name: Option<&'a str>,
567 proof_code: Option<&'a str>,
568 proof_version: Option<&'a str>,
569}
570
571#[derive(Serialize)]
572pub struct FileRequest<'a> {
573 drive_id: &'a str,
574 file_id: &'a str,
575}
576
577#[derive(Serialize)]
578pub struct MovePathRequest<'a> {
579 drive_id: &'a str,
580 file_id: &'a str,
581 to_parent_file_id: &'a str,
582 check_name_mode: CheckNameMode,
583}
584
585#[derive(Serialize)]
586pub struct UpdatePathRequest<'a> {
587 drive_id: &'a str,
588 file_id: &'a str,
589 name: &'a str,
590 check_name_mode: CheckNameMode,
591}
592
593#[derive(Serialize)]
594pub struct CopyPathRequest<'a> {
595 drive_id: &'a str,
596 file_id: &'a str,
597 to_parent_file_id: &'a str,
598 auto_rename: bool,
599}
600
601#[derive(Serialize)]
602pub struct ListRequest<'a> {
603 drive_id: &'a str,
604 parent_file_id: &'a str,
605 limit: Option<usize>,
606 marker: Option<&'a str>,
607}
608
609#[derive(Serialize)]
610pub struct CompleteRequest<'a> {
611 drive_id: &'a str,
612 file_id: &'a str,
613 upload_id: &'a str,
614}
615
616#[derive(Serialize)]
617pub struct GetUploadRequest<'a> {
618 drive_id: &'a str,
619 file_id: &'a str,
620 upload_id: &'a str,
621 part_info_list: Option<Vec<PartInfoItem>>,
622}
623
624#[derive(Serialize)]
625pub struct PartInfoItem {
626 part_number: Option<usize>,
627}