opendal/services/aliyun_drive/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use chrono::Utc;
23use http::header::HeaderValue;
24use http::header::{self};
25use http::Method;
26use http::Request;
27use serde::Deserialize;
28use serde::Serialize;
29use tokio::sync::Mutex;
30
31use super::error::parse_error;
32use crate::raw::*;
33use crate::*;
34
35/// Available Aliyun Drive Type.
36#[derive(Debug, Deserialize, Default, Clone)]
37pub enum DriveType {
38    /// Use the default type of Aliyun Drive.
39    #[default]
40    Default,
41    /// Use the backup type of Aliyun Drive.
42    ///
43    /// Fallback to the default type if no backup drive is found.
44    Backup,
45    /// Use the resource type of Aliyun Drive.
46    ///
47    /// Fallback to the default type if no resource drive is found.
48    Resource,
49}
50
51/// Available Aliyun Drive Signer Set
52pub enum AliyunDriveSign {
53    Refresh(String, String, String, Option<String>, i64),
54    Access(String),
55}
56
57pub struct AliyunDriveSigner {
58    pub drive_id: Option<String>,
59    pub sign: AliyunDriveSign,
60}
61
62pub struct AliyunDriveCore {
63    pub info: Arc<AccessorInfo>,
64
65    pub endpoint: String,
66    pub root: String,
67    pub drive_type: DriveType,
68
69    pub signer: Arc<Mutex<AliyunDriveSigner>>,
70    pub dir_lock: Arc<Mutex<()>>,
71}
72
73impl Debug for AliyunDriveCore {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("AliyunDriveCore")
76            .field("root", &self.root)
77            .field("drive_type", &self.drive_type)
78            .finish_non_exhaustive()
79    }
80}
81
82impl AliyunDriveCore {
83    async fn send(&self, mut req: Request<Buffer>, token: Option<&str>) -> Result<Buffer> {
84        // AliyunDrive raise NullPointerException if you haven't set a user-agent.
85        req.headers_mut().insert(
86            header::USER_AGENT,
87            HeaderValue::from_str(&format!("opendal/{}", VERSION))
88                .expect("user agent must be valid header value"),
89        );
90        if req.method() == Method::POST {
91            req.headers_mut().insert(
92                header::CONTENT_TYPE,
93                HeaderValue::from_static("application/json;charset=UTF-8"),
94            );
95        }
96        if let Some(token) = token {
97            req.headers_mut().insert(
98                header::AUTHORIZATION,
99                HeaderValue::from_str(&format_authorization_by_bearer(token)?)
100                    .expect("access token must be valid header value"),
101            );
102        }
103        let res = self.info.http_client().send(req).await?;
104        if !res.status().is_success() {
105            return Err(parse_error(res));
106        }
107        Ok(res.into_body())
108    }
109
110    async fn get_access_token(
111        &self,
112        client_id: &str,
113        client_secret: &str,
114        refresh_token: &str,
115    ) -> Result<Buffer> {
116        let body = serde_json::to_vec(&AccessTokenRequest {
117            refresh_token,
118            grant_type: "refresh_token",
119            client_id,
120            client_secret,
121        })
122        .map_err(new_json_serialize_error)?;
123        let req = Request::post(format!("{}/oauth/access_token", self.endpoint))
124            .body(Buffer::from(body))
125            .map_err(new_request_build_error)?;
126        self.send(req, None).await
127    }
128
129    async fn get_drive_id(&self, token: Option<&str>) -> Result<Buffer> {
130        let req = Request::post(format!("{}/adrive/v1.0/user/getDriveInfo", self.endpoint))
131            .body(Buffer::new())
132            .map_err(new_request_build_error)?;
133        self.send(req, token).await
134    }
135
136    pub async fn get_token_and_drive(&self) -> Result<(Option<String>, String)> {
137        let mut signer = self.signer.lock().await;
138        let token = match &mut signer.sign {
139            AliyunDriveSign::Access(access_token) => Some(access_token.clone()),
140            AliyunDriveSign::Refresh(
141                client_id,
142                client_secret,
143                refresh_token,
144                access_token,
145                expire_at,
146            ) => {
147                if *expire_at < Utc::now().timestamp() || access_token.is_none() {
148                    let res = self
149                        .get_access_token(client_id, client_secret, refresh_token)
150                        .await?;
151                    let output: RefreshTokenResponse = serde_json::from_reader(res.reader())
152                        .map_err(new_json_deserialize_error)?;
153                    *access_token = Some(output.access_token);
154                    *expire_at = output.expires_in + Utc::now().timestamp();
155                    *refresh_token = output.refresh_token;
156                }
157                access_token.clone()
158            }
159        };
160        let Some(drive_id) = &signer.drive_id else {
161            let res = self.get_drive_id(token.as_deref()).await?;
162            let output: DriveInfoResponse =
163                serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
164            let drive_id = match self.drive_type {
165                DriveType::Default => output.default_drive_id,
166                DriveType::Backup => output.backup_drive_id.unwrap_or(output.default_drive_id),
167                DriveType::Resource => output.resource_drive_id.unwrap_or(output.default_drive_id),
168            };
169            signer.drive_id = Some(drive_id.clone());
170            return Ok((token, drive_id));
171        };
172        Ok((token, drive_id.clone()))
173    }
174
175    pub fn build_path(&self, path: &str, rooted: bool) -> String {
176        let file_path = if rooted {
177            build_rooted_abs_path(&self.root, path)
178        } else {
179            build_abs_path(&self.root, path)
180        };
181        let file_path = file_path.strip_suffix('/').unwrap_or(&file_path);
182        if file_path.is_empty() {
183            return "/".to_string();
184        }
185        file_path.to_string()
186    }
187
188    pub async fn get_by_path(&self, path: &str) -> Result<Buffer> {
189        let file_path = self.build_path(path, true);
190        let req = Request::post(format!(
191            "{}/adrive/v1.0/openFile/get_by_path",
192            self.endpoint
193        ));
194        let (token, drive_id) = self.get_token_and_drive().await?;
195        let body = serde_json::to_vec(&GetByPathRequest {
196            drive_id: &drive_id,
197            file_path: &file_path,
198        })
199        .map_err(new_json_serialize_error)?;
200        let req = req
201            // Inject operation to the request.
202            .extension(Operation::Read)
203            .body(Buffer::from(body))
204            .map_err(new_request_build_error)?;
205        self.send(req, token.as_deref()).await
206    }
207
208    pub async fn ensure_dir_exists(&self, path: &str) -> Result<String> {
209        let file_path = self.build_path(path, false);
210        if file_path == "/" {
211            return Ok("root".to_string());
212        }
213        let file_path = file_path.strip_suffix('/').unwrap_or(&file_path);
214        let paths = file_path.split('/').collect::<Vec<&str>>();
215        let mut parent: Option<String> = None;
216        for path in paths {
217            let _guard = self.dir_lock.lock().await;
218            let res = self
219                .create(
220                    parent.as_deref(),
221                    path,
222                    CreateType::Folder,
223                    CheckNameMode::Refuse,
224                )
225                .await?;
226            let output: CreateResponse =
227                serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
228            parent = Some(output.file_id);
229        }
230        Ok(parent.expect("ensure_dir_exists must succeed"))
231    }
232
233    pub async fn create_with_rapid_upload(
234        &self,
235        parent_file_id: Option<&str>,
236        name: &str,
237        typ: CreateType,
238        check_name_mode: CheckNameMode,
239        size: Option<u64>,
240        rapid_upload: Option<RapidUpload>,
241    ) -> Result<Buffer> {
242        let mut content_hash = None;
243        let mut proof_code = None;
244        let mut pre_hash = None;
245        if let Some(rapid_upload) = rapid_upload {
246            content_hash = rapid_upload.content_hash;
247            proof_code = rapid_upload.proof_code;
248            pre_hash = rapid_upload.pre_hash;
249        }
250
251        let (token, drive_id) = self.get_token_and_drive().await?;
252        let body = serde_json::to_vec(&CreateRequest {
253            drive_id: &drive_id,
254            parent_file_id: parent_file_id.unwrap_or("root"),
255            name,
256            typ,
257            check_name_mode,
258            size,
259            pre_hash: pre_hash.as_deref(),
260            content_hash: content_hash.as_deref(),
261            content_hash_name: content_hash.is_some().then_some("sha1"),
262            proof_code: proof_code.as_deref(),
263            proof_version: proof_code.is_some().then_some("v1"),
264        })
265        .map_err(new_json_serialize_error)?;
266        let req = Request::post(format!("{}/adrive/v1.0/openFile/create", self.endpoint))
267            // Inject operation to the request.
268            .extension(Operation::Write)
269            .body(Buffer::from(body))
270            .map_err(new_request_build_error)?;
271        self.send(req, token.as_deref()).await
272    }
273
274    pub async fn create(
275        &self,
276        parent_file_id: Option<&str>,
277        name: &str,
278        typ: CreateType,
279        check_name_mode: CheckNameMode,
280    ) -> Result<Buffer> {
281        self.create_with_rapid_upload(parent_file_id, name, typ, check_name_mode, None, None)
282            .await
283    }
284
285    pub async fn get_download_url(&self, file_id: &str) -> Result<String> {
286        let (token, drive_id) = self.get_token_and_drive().await?;
287        let body = serde_json::to_vec(&FileRequest {
288            drive_id: &drive_id,
289            file_id,
290        })
291        .map_err(new_json_serialize_error)?;
292        let req = Request::post(format!(
293            "{}/adrive/v1.0/openFile/getDownloadUrl",
294            self.endpoint
295        ))
296        // Inject operation to the request.
297        .extension(Operation::Read)
298        .body(Buffer::from(body))
299        .map_err(new_request_build_error)?;
300        let res = self.send(req, token.as_deref()).await?;
301        let output: GetDownloadUrlResponse =
302            serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
303        Ok(output.url)
304    }
305
306    pub async fn move_path(&self, file_id: &str, to_parent_file_id: &str) -> Result<()> {
307        let (token, drive_id) = self.get_token_and_drive().await?;
308        let body = serde_json::to_vec(&MovePathRequest {
309            drive_id: &drive_id,
310            file_id,
311            to_parent_file_id,
312            check_name_mode: CheckNameMode::AutoRename,
313        })
314        .map_err(new_json_serialize_error)?;
315        let req = Request::post(format!("{}/adrive/v1.0/openFile/move", self.endpoint))
316            // Inject operation to the request.
317            .extension(Operation::Write)
318            .body(Buffer::from(body))
319            .map_err(new_request_build_error)?;
320        self.send(req, token.as_deref()).await?;
321        Ok(())
322    }
323
324    pub async fn update_path(&self, file_id: &str, name: &str) -> Result<()> {
325        let (token, drive_id) = self.get_token_and_drive().await?;
326        let body = serde_json::to_vec(&UpdatePathRequest {
327            drive_id: &drive_id,
328            file_id,
329            name,
330            check_name_mode: CheckNameMode::Refuse,
331        })
332        .map_err(new_json_serialize_error)?;
333        let req = Request::post(format!("{}/adrive/v1.0/openFile/update", self.endpoint))
334            // Inject operation to the request.
335            .extension(Operation::Write)
336            .body(Buffer::from(body))
337            .map_err(new_request_build_error)?;
338        self.send(req, token.as_deref()).await?;
339        Ok(())
340    }
341
342    pub async fn copy_path(
343        &self,
344        file_id: &str,
345        to_parent_file_id: &str,
346        auto_rename: bool,
347    ) -> Result<Buffer> {
348        let (token, drive_id) = self.get_token_and_drive().await?;
349        let body = serde_json::to_vec(&CopyPathRequest {
350            drive_id: &drive_id,
351            file_id,
352            to_parent_file_id,
353            auto_rename,
354        })
355        .map_err(new_json_serialize_error)?;
356        let req = Request::post(format!("{}/adrive/v1.0/openFile/copy", self.endpoint))
357            // Inject operation to the request.
358            .extension(Operation::Copy)
359            .body(Buffer::from(body))
360            .map_err(new_request_build_error)?;
361        self.send(req, token.as_deref()).await
362    }
363
364    pub async fn delete_path(&self, file_id: &str) -> Result<()> {
365        let (token, drive_id) = self.get_token_and_drive().await?;
366        let body = serde_json::to_vec(&FileRequest {
367            drive_id: &drive_id,
368            file_id,
369        })
370        .map_err(new_json_serialize_error)?;
371        let req = Request::post(format!("{}/adrive/v1.0/openFile/delete", self.endpoint))
372            // Inject operation to the request.
373            .extension(Operation::Delete)
374            .body(Buffer::from(body))
375            .map_err(new_request_build_error)?;
376        self.send(req, token.as_deref()).await?;
377        Ok(())
378    }
379
380    pub async fn list(
381        &self,
382        parent_file_id: &str,
383        limit: Option<usize>,
384        marker: Option<String>,
385    ) -> Result<Buffer> {
386        let (token, drive_id) = self.get_token_and_drive().await?;
387        let body = serde_json::to_vec(&ListRequest {
388            drive_id: &drive_id,
389            parent_file_id,
390            limit,
391            marker: marker.as_deref(),
392        })
393        .map_err(new_json_serialize_error)?;
394        let req = Request::post(format!("{}/adrive/v1.0/openFile/list", self.endpoint))
395            // Inject operation to the request.
396            .extension(Operation::List)
397            .body(Buffer::from(body))
398            .map_err(new_request_build_error)?;
399        self.send(req, token.as_deref()).await
400    }
401
402    pub async fn upload(&self, upload_url: &str, body: Buffer) -> Result<Buffer> {
403        let req = Request::put(upload_url)
404            // Inject operation to the request.
405            .extension(Operation::Write)
406            .body(body)
407            .map_err(new_request_build_error)?;
408        self.send(req, None).await
409    }
410
411    pub async fn complete(&self, file_id: &str, upload_id: &str) -> Result<Buffer> {
412        let (token, drive_id) = self.get_token_and_drive().await?;
413        let body = serde_json::to_vec(&CompleteRequest {
414            drive_id: &drive_id,
415            file_id,
416            upload_id,
417        })
418        .map_err(new_json_serialize_error)?;
419        let req = Request::post(format!("{}/adrive/v1.0/openFile/complete", self.endpoint))
420            // Inject operation to the request.
421            .extension(Operation::Write)
422            .body(Buffer::from(body))
423            .map_err(new_request_build_error)?;
424        self.send(req, token.as_deref()).await
425    }
426
427    pub async fn get_upload_url(
428        &self,
429        file_id: &str,
430        upload_id: &str,
431        part_number: Option<usize>,
432    ) -> Result<Buffer> {
433        let (token, drive_id) = self.get_token_and_drive().await?;
434        let part_info_list = part_number.map(|part_number| {
435            vec![PartInfoItem {
436                part_number: Some(part_number),
437            }]
438        });
439        let body = serde_json::to_vec(&GetUploadRequest {
440            drive_id: &drive_id,
441            file_id,
442            upload_id,
443            part_info_list,
444        })
445        .map_err(new_json_serialize_error)?;
446        let req = Request::post(format!(
447            "{}/adrive/v1.0/openFile/getUploadUrl",
448            self.endpoint
449        ))
450        // Inject operation to the request.
451        .extension(Operation::Write)
452        .body(Buffer::from(body))
453        .map_err(new_request_build_error)?;
454        self.send(req, token.as_deref()).await
455    }
456}
457
458pub struct RapidUpload {
459    pub pre_hash: Option<String>,
460    pub content_hash: Option<String>,
461    pub proof_code: Option<String>,
462}
463
464#[derive(Debug, Deserialize)]
465pub struct RefreshTokenResponse {
466    pub access_token: String,
467    pub expires_in: i64,
468    pub refresh_token: String,
469}
470
471#[derive(Debug, Deserialize)]
472pub struct DriveInfoResponse {
473    pub default_drive_id: String,
474    pub resource_drive_id: Option<String>,
475    pub backup_drive_id: Option<String>,
476}
477
478#[derive(Debug, Serialize)]
479#[serde(rename_all = "snake_case")]
480pub enum CreateType {
481    File,
482    Folder,
483}
484
485#[derive(Debug, Serialize)]
486#[serde(rename_all = "snake_case")]
487pub enum CheckNameMode {
488    Refuse,
489    AutoRename,
490}
491
492#[derive(Deserialize)]
493pub struct UploadUrlResponse {
494    pub part_info_list: Option<Vec<PartInfo>>,
495}
496
497#[derive(Deserialize)]
498pub struct CreateResponse {
499    pub file_id: String,
500    pub upload_id: Option<String>,
501    pub exist: Option<bool>,
502}
503
504#[derive(Serialize, Deserialize)]
505pub struct PartInfo {
506    pub etag: Option<String>,
507    pub part_number: usize,
508    pub part_size: Option<u64>,
509    pub upload_url: String,
510    pub content_type: Option<String>,
511}
512
513#[derive(Deserialize)]
514pub struct AliyunDriveFileList {
515    pub items: Vec<AliyunDriveFile>,
516    pub next_marker: Option<String>,
517}
518
519#[derive(Deserialize)]
520pub struct CopyResponse {
521    pub file_id: String,
522}
523
524#[derive(Deserialize)]
525pub struct AliyunDriveFile {
526    pub file_id: String,
527    pub parent_file_id: String,
528    pub name: String,
529    pub size: Option<u64>,
530    pub content_type: Option<String>,
531    #[serde(rename = "type")]
532    pub path_type: String,
533    pub updated_at: String,
534}
535
536#[derive(Deserialize)]
537pub struct GetDownloadUrlResponse {
538    pub url: String,
539}
540
541#[derive(Serialize)]
542pub struct AccessTokenRequest<'a> {
543    refresh_token: &'a str,
544    grant_type: &'a str,
545    client_id: &'a str,
546    client_secret: &'a str,
547}
548
549#[derive(Serialize)]
550pub struct GetByPathRequest<'a> {
551    drive_id: &'a str,
552    file_path: &'a str,
553}
554
555#[derive(Serialize)]
556pub struct CreateRequest<'a> {
557    drive_id: &'a str,
558    parent_file_id: &'a str,
559    name: &'a str,
560    #[serde(rename = "type")]
561    typ: CreateType,
562    check_name_mode: CheckNameMode,
563    size: Option<u64>,
564    pre_hash: Option<&'a str>,
565    content_hash: Option<&'a str>,
566    content_hash_name: Option<&'a str>,
567    proof_code: Option<&'a str>,
568    proof_version: Option<&'a str>,
569}
570
571#[derive(Serialize)]
572pub struct FileRequest<'a> {
573    drive_id: &'a str,
574    file_id: &'a str,
575}
576
577#[derive(Serialize)]
578pub struct MovePathRequest<'a> {
579    drive_id: &'a str,
580    file_id: &'a str,
581    to_parent_file_id: &'a str,
582    check_name_mode: CheckNameMode,
583}
584
585#[derive(Serialize)]
586pub struct UpdatePathRequest<'a> {
587    drive_id: &'a str,
588    file_id: &'a str,
589    name: &'a str,
590    check_name_mode: CheckNameMode,
591}
592
593#[derive(Serialize)]
594pub struct CopyPathRequest<'a> {
595    drive_id: &'a str,
596    file_id: &'a str,
597    to_parent_file_id: &'a str,
598    auto_rename: bool,
599}
600
601#[derive(Serialize)]
602pub struct ListRequest<'a> {
603    drive_id: &'a str,
604    parent_file_id: &'a str,
605    limit: Option<usize>,
606    marker: Option<&'a str>,
607}
608
609#[derive(Serialize)]
610pub struct CompleteRequest<'a> {
611    drive_id: &'a str,
612    file_id: &'a str,
613    upload_id: &'a str,
614}
615
616#[derive(Serialize)]
617pub struct GetUploadRequest<'a> {
618    drive_id: &'a str,
619    file_id: &'a str,
620    upload_id: &'a str,
621    part_info_list: Option<Vec<PartInfoItem>>,
622}
623
624#[derive(Serialize)]
625pub struct PartInfoItem {
626    part_number: Option<usize>,
627}