opendal/services/aliyun_drive/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use chrono::Utc;
23use http::header::HeaderValue;
24use http::header::{self};
25use http::Request;
26use http::{Method, Response};
27use serde::Deserialize;
28use serde::Serialize;
29use tokio::sync::Mutex;
30
31use super::error::parse_error;
32use crate::raw::*;
33use crate::*;
34
35/// Available Aliyun Drive Type.
36#[derive(Debug, Deserialize, Default, Clone)]
37pub enum DriveType {
38    /// Use the default type of Aliyun Drive.
39    #[default]
40    Default,
41    /// Use the backup type of Aliyun Drive.
42    ///
43    /// Fallback to the default type if no backup drive is found.
44    Backup,
45    /// Use the resource type of Aliyun Drive.
46    ///
47    /// Fallback to the default type if no resource drive is found.
48    Resource,
49}
50
51/// Available Aliyun Drive Signer Set
52pub enum AliyunDriveSign {
53    Refresh(String, String, String, Option<String>, i64),
54    Access(String),
55}
56
57pub struct AliyunDriveSigner {
58    pub drive_id: Option<String>,
59    pub sign: AliyunDriveSign,
60}
61
62pub struct AliyunDriveCore {
63    pub info: Arc<AccessorInfo>,
64
65    pub endpoint: String,
66    pub root: String,
67    pub drive_type: DriveType,
68
69    pub signer: Arc<Mutex<AliyunDriveSigner>>,
70    pub dir_lock: Arc<Mutex<()>>,
71}
72
73impl Debug for AliyunDriveCore {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("AliyunDriveCore")
76            .field("root", &self.root)
77            .field("drive_type", &self.drive_type)
78            .finish_non_exhaustive()
79    }
80}
81
82impl AliyunDriveCore {
83    async fn send(&self, mut req: Request<Buffer>, token: Option<&str>) -> Result<Buffer> {
84        // AliyunDrive raise NullPointerException if you haven't set a user-agent.
85        req.headers_mut().insert(
86            header::USER_AGENT,
87            HeaderValue::from_str(&format!("opendal/{}", VERSION))
88                .expect("user agent must be valid header value"),
89        );
90        if req.method() == Method::POST {
91            req.headers_mut().insert(
92                header::CONTENT_TYPE,
93                HeaderValue::from_static("application/json;charset=UTF-8"),
94            );
95        }
96        if let Some(token) = token {
97            req.headers_mut().insert(
98                header::AUTHORIZATION,
99                HeaderValue::from_str(&format_authorization_by_bearer(token)?)
100                    .expect("access token must be valid header value"),
101            );
102        }
103        let res = self.info.http_client().send(req).await?;
104        if !res.status().is_success() {
105            return Err(parse_error(res));
106        }
107        Ok(res.into_body())
108    }
109
110    async fn get_access_token(
111        &self,
112        client_id: &str,
113        client_secret: &str,
114        refresh_token: &str,
115    ) -> Result<Buffer> {
116        let body = serde_json::to_vec(&AccessTokenRequest {
117            refresh_token,
118            grant_type: "refresh_token",
119            client_id,
120            client_secret,
121        })
122        .map_err(new_json_serialize_error)?;
123        let req = Request::post(format!("{}/oauth/access_token", self.endpoint))
124            .body(Buffer::from(body))
125            .map_err(new_request_build_error)?;
126        self.send(req, None).await
127    }
128
129    async fn get_drive_id(&self, token: Option<&str>) -> Result<Buffer> {
130        let req = Request::post(format!("{}/adrive/v1.0/user/getDriveInfo", self.endpoint))
131            .body(Buffer::new())
132            .map_err(new_request_build_error)?;
133        self.send(req, token).await
134    }
135
136    pub async fn get_token_and_drive(&self) -> Result<(Option<String>, String)> {
137        let mut signer = self.signer.lock().await;
138        let token = match &mut signer.sign {
139            AliyunDriveSign::Access(access_token) => Some(access_token.clone()),
140            AliyunDriveSign::Refresh(
141                client_id,
142                client_secret,
143                refresh_token,
144                access_token,
145                expire_at,
146            ) => {
147                if *expire_at < Utc::now().timestamp() || access_token.is_none() {
148                    let res = self
149                        .get_access_token(client_id, client_secret, refresh_token)
150                        .await?;
151                    let output: RefreshTokenResponse = serde_json::from_reader(res.reader())
152                        .map_err(new_json_deserialize_error)?;
153                    *access_token = Some(output.access_token);
154                    *expire_at = output.expires_in + Utc::now().timestamp();
155                    *refresh_token = output.refresh_token;
156                }
157                access_token.clone()
158            }
159        };
160        let Some(drive_id) = &signer.drive_id else {
161            let res = self.get_drive_id(token.as_deref()).await?;
162            let output: DriveInfoResponse =
163                serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
164            let drive_id = match self.drive_type {
165                DriveType::Default => output.default_drive_id,
166                DriveType::Backup => output.backup_drive_id.unwrap_or(output.default_drive_id),
167                DriveType::Resource => output.resource_drive_id.unwrap_or(output.default_drive_id),
168            };
169            signer.drive_id = Some(drive_id.clone());
170            return Ok((token, drive_id));
171        };
172        Ok((token, drive_id.clone()))
173    }
174
175    pub fn build_path(&self, path: &str, rooted: bool) -> String {
176        let file_path = if rooted {
177            build_rooted_abs_path(&self.root, path)
178        } else {
179            build_abs_path(&self.root, path)
180        };
181        let file_path = file_path.strip_suffix('/').unwrap_or(&file_path);
182        if file_path.is_empty() {
183            return "/".to_string();
184        }
185        file_path.to_string()
186    }
187
188    pub async fn get_by_path(&self, path: &str) -> Result<Buffer> {
189        let file_path = self.build_path(path, true);
190        let req = Request::post(format!(
191            "{}/adrive/v1.0/openFile/get_by_path",
192            self.endpoint
193        ));
194        let (token, drive_id) = self.get_token_and_drive().await?;
195        let body = serde_json::to_vec(&GetByPathRequest {
196            drive_id: &drive_id,
197            file_path: &file_path,
198        })
199        .map_err(new_json_serialize_error)?;
200        let req = req
201            .extension(Operation::Read)
202            .body(Buffer::from(body))
203            .map_err(new_request_build_error)?;
204        self.send(req, token.as_deref()).await
205    }
206
207    pub async fn ensure_dir_exists(&self, path: &str) -> Result<String> {
208        let file_path = self.build_path(path, false);
209        if file_path == "/" {
210            return Ok("root".to_string());
211        }
212        let file_path = file_path.strip_suffix('/').unwrap_or(&file_path);
213        let paths = file_path.split('/').collect::<Vec<&str>>();
214        let mut parent: Option<String> = None;
215        for path in paths {
216            let _guard = self.dir_lock.lock().await;
217            let res = self
218                .create(
219                    parent.as_deref(),
220                    path,
221                    CreateType::Folder,
222                    CheckNameMode::Refuse,
223                )
224                .await?;
225            let output: CreateResponse =
226                serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
227            parent = Some(output.file_id);
228        }
229        Ok(parent.expect("ensure_dir_exists must succeed"))
230    }
231
232    pub async fn create_with_rapid_upload(
233        &self,
234        parent_file_id: Option<&str>,
235        name: &str,
236        typ: CreateType,
237        check_name_mode: CheckNameMode,
238        size: Option<u64>,
239        rapid_upload: Option<RapidUpload>,
240    ) -> Result<Buffer> {
241        let mut content_hash = None;
242        let mut proof_code = None;
243        let mut pre_hash = None;
244        if let Some(rapid_upload) = rapid_upload {
245            content_hash = rapid_upload.content_hash;
246            proof_code = rapid_upload.proof_code;
247            pre_hash = rapid_upload.pre_hash;
248        }
249
250        let (token, drive_id) = self.get_token_and_drive().await?;
251        let body = serde_json::to_vec(&CreateRequest {
252            drive_id: &drive_id,
253            parent_file_id: parent_file_id.unwrap_or("root"),
254            name,
255            typ,
256            check_name_mode,
257            size,
258            pre_hash: pre_hash.as_deref(),
259            content_hash: content_hash.as_deref(),
260            content_hash_name: content_hash.is_some().then_some("sha1"),
261            proof_code: proof_code.as_deref(),
262            proof_version: proof_code.is_some().then_some("v1"),
263        })
264        .map_err(new_json_serialize_error)?;
265        let req = Request::post(format!("{}/adrive/v1.0/openFile/create", self.endpoint))
266            .extension(Operation::Write)
267            .body(Buffer::from(body))
268            .map_err(new_request_build_error)?;
269        self.send(req, token.as_deref()).await
270    }
271
272    pub async fn create(
273        &self,
274        parent_file_id: Option<&str>,
275        name: &str,
276        typ: CreateType,
277        check_name_mode: CheckNameMode,
278    ) -> Result<Buffer> {
279        self.create_with_rapid_upload(parent_file_id, name, typ, check_name_mode, None, None)
280            .await
281    }
282
283    async fn get_download_url(&self, file_id: &str) -> Result<String> {
284        let (token, drive_id) = self.get_token_and_drive().await?;
285        let body = serde_json::to_vec(&FileRequest {
286            drive_id: &drive_id,
287            file_id,
288        })
289        .map_err(new_json_serialize_error)?;
290
291        let req = Request::post(format!(
292            "{}/adrive/v1.0/openFile/getDownloadUrl",
293            self.endpoint
294        ))
295        .extension(Operation::Read)
296        .body(Buffer::from(body))
297        .map_err(new_request_build_error)?;
298
299        let res = self.send(req, token.as_deref()).await?;
300
301        let output: GetDownloadUrlResponse =
302            serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
303
304        Ok(output.url)
305    }
306
307    pub async fn download(&self, file_id: &str, range: BytesRange) -> Result<Response<HttpBody>> {
308        let download_url = self.get_download_url(file_id).await?;
309        let req = Request::get(download_url)
310            .extension(Operation::Read)
311            .header(header::RANGE, range.to_header())
312            .body(Buffer::new())
313            .map_err(new_request_build_error)?;
314        self.info.http_client().fetch(req).await
315    }
316
317    pub async fn move_path(&self, file_id: &str, to_parent_file_id: &str) -> Result<()> {
318        let (token, drive_id) = self.get_token_and_drive().await?;
319        let body = serde_json::to_vec(&MovePathRequest {
320            drive_id: &drive_id,
321            file_id,
322            to_parent_file_id,
323            check_name_mode: CheckNameMode::AutoRename,
324        })
325        .map_err(new_json_serialize_error)?;
326        let req = Request::post(format!("{}/adrive/v1.0/openFile/move", self.endpoint))
327            .extension(Operation::Write)
328            .body(Buffer::from(body))
329            .map_err(new_request_build_error)?;
330        self.send(req, token.as_deref()).await?;
331        Ok(())
332    }
333
334    pub async fn update_path(&self, file_id: &str, name: &str) -> Result<()> {
335        let (token, drive_id) = self.get_token_and_drive().await?;
336        let body = serde_json::to_vec(&UpdatePathRequest {
337            drive_id: &drive_id,
338            file_id,
339            name,
340            check_name_mode: CheckNameMode::Refuse,
341        })
342        .map_err(new_json_serialize_error)?;
343        let req = Request::post(format!("{}/adrive/v1.0/openFile/update", self.endpoint))
344            .extension(Operation::Write)
345            .body(Buffer::from(body))
346            .map_err(new_request_build_error)?;
347        self.send(req, token.as_deref()).await?;
348        Ok(())
349    }
350
351    pub async fn copy_path(
352        &self,
353        file_id: &str,
354        to_parent_file_id: &str,
355        auto_rename: bool,
356    ) -> Result<Buffer> {
357        let (token, drive_id) = self.get_token_and_drive().await?;
358        let body = serde_json::to_vec(&CopyPathRequest {
359            drive_id: &drive_id,
360            file_id,
361            to_parent_file_id,
362            auto_rename,
363        })
364        .map_err(new_json_serialize_error)?;
365        let req = Request::post(format!("{}/adrive/v1.0/openFile/copy", self.endpoint))
366            .extension(Operation::Copy)
367            .body(Buffer::from(body))
368            .map_err(new_request_build_error)?;
369        self.send(req, token.as_deref()).await
370    }
371
372    pub async fn delete_path(&self, file_id: &str) -> Result<()> {
373        let (token, drive_id) = self.get_token_and_drive().await?;
374        let body = serde_json::to_vec(&FileRequest {
375            drive_id: &drive_id,
376            file_id,
377        })
378        .map_err(new_json_serialize_error)?;
379        let req = Request::post(format!("{}/adrive/v1.0/openFile/delete", self.endpoint))
380            .extension(Operation::Delete)
381            .body(Buffer::from(body))
382            .map_err(new_request_build_error)?;
383        self.send(req, token.as_deref()).await?;
384        Ok(())
385    }
386
387    pub async fn list(
388        &self,
389        parent_file_id: &str,
390        limit: Option<usize>,
391        marker: Option<String>,
392    ) -> Result<Buffer> {
393        let (token, drive_id) = self.get_token_and_drive().await?;
394        let body = serde_json::to_vec(&ListRequest {
395            drive_id: &drive_id,
396            parent_file_id,
397            limit,
398            marker: marker.as_deref(),
399        })
400        .map_err(new_json_serialize_error)?;
401        let req = Request::post(format!("{}/adrive/v1.0/openFile/list", self.endpoint))
402            .extension(Operation::List)
403            .body(Buffer::from(body))
404            .map_err(new_request_build_error)?;
405        self.send(req, token.as_deref()).await
406    }
407
408    pub async fn complete(&self, file_id: &str, upload_id: &str) -> Result<Buffer> {
409        let (token, drive_id) = self.get_token_and_drive().await?;
410        let body = serde_json::to_vec(&CompleteRequest {
411            drive_id: &drive_id,
412            file_id,
413            upload_id,
414        })
415        .map_err(new_json_serialize_error)?;
416        let req = Request::post(format!("{}/adrive/v1.0/openFile/complete", self.endpoint))
417            .extension(Operation::Write)
418            .body(Buffer::from(body))
419            .map_err(new_request_build_error)?;
420        self.send(req, token.as_deref()).await
421    }
422
423    async fn get_upload_url(
424        &self,
425        file_id: &str,
426        upload_id: &str,
427        part_number: usize,
428    ) -> Result<String> {
429        let (token, drive_id) = self.get_token_and_drive().await?;
430        let part_info_list = vec![PartInfoItem {
431            part_number: Some(part_number),
432        }];
433        let body = serde_json::to_vec(&GetUploadRequest {
434            drive_id: &drive_id,
435            file_id,
436            upload_id,
437            part_info_list: Some(part_info_list),
438        })
439        .map_err(new_json_serialize_error)?;
440
441        let req = Request::post(format!(
442            "{}/adrive/v1.0/openFile/getUploadUrl",
443            self.endpoint
444        ))
445        .extension(Operation::Write)
446        .body(Buffer::from(body))
447        .map_err(new_request_build_error)?;
448
449        let res = self.send(req, token.as_deref()).await?;
450
451        let mut output: UploadUrlResponse =
452            serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
453
454        let Some(upload_url) = output
455            .part_info_list
456            .take()
457            .map(|mut list| list.swap_remove(0))
458            .map(|part_info| part_info.upload_url)
459        else {
460            return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
461        };
462
463        Ok(upload_url)
464    }
465    pub async fn upload(
466        &self,
467        file_id: &str,
468        upload_id: &str,
469        part_number: usize,
470        body: Buffer,
471    ) -> Result<Buffer> {
472        let upload_url = self.get_upload_url(file_id, upload_id, part_number).await?;
473        let req = Request::put(upload_url)
474            .extension(Operation::Write)
475            .body(body)
476            .map_err(new_request_build_error)?;
477        self.send(req, None).await
478    }
479}
480
481pub struct RapidUpload {
482    pub pre_hash: Option<String>,
483    pub content_hash: Option<String>,
484    pub proof_code: Option<String>,
485}
486
487#[derive(Debug, Deserialize)]
488pub struct RefreshTokenResponse {
489    pub access_token: String,
490    pub expires_in: i64,
491    pub refresh_token: String,
492}
493
494#[derive(Debug, Deserialize)]
495pub struct DriveInfoResponse {
496    pub default_drive_id: String,
497    pub resource_drive_id: Option<String>,
498    pub backup_drive_id: Option<String>,
499}
500
501#[derive(Debug, Serialize)]
502#[serde(rename_all = "snake_case")]
503pub enum CreateType {
504    File,
505    Folder,
506}
507
508#[derive(Debug, Serialize)]
509#[serde(rename_all = "snake_case")]
510pub enum CheckNameMode {
511    Refuse,
512    AutoRename,
513}
514
515#[derive(Deserialize)]
516pub struct UploadUrlResponse {
517    pub part_info_list: Option<Vec<PartInfo>>,
518}
519
520#[derive(Deserialize)]
521pub struct CreateResponse {
522    pub file_id: String,
523    pub upload_id: Option<String>,
524    pub exist: Option<bool>,
525}
526
527#[derive(Serialize, Deserialize)]
528pub struct PartInfo {
529    pub etag: Option<String>,
530    pub part_number: usize,
531    pub part_size: Option<u64>,
532    pub upload_url: String,
533    pub content_type: Option<String>,
534}
535
536#[derive(Deserialize)]
537pub struct AliyunDriveFileList {
538    pub items: Vec<AliyunDriveFile>,
539    pub next_marker: Option<String>,
540}
541
542#[derive(Deserialize)]
543pub struct CopyResponse {
544    pub file_id: String,
545}
546
547#[derive(Deserialize)]
548pub struct AliyunDriveFile {
549    pub file_id: String,
550    pub parent_file_id: String,
551    pub name: String,
552    pub size: Option<u64>,
553    pub content_type: Option<String>,
554    #[serde(rename = "type")]
555    pub path_type: String,
556    pub updated_at: String,
557}
558
559#[derive(Deserialize)]
560pub struct GetDownloadUrlResponse {
561    pub url: String,
562}
563
564#[derive(Serialize)]
565pub struct AccessTokenRequest<'a> {
566    refresh_token: &'a str,
567    grant_type: &'a str,
568    client_id: &'a str,
569    client_secret: &'a str,
570}
571
572#[derive(Serialize)]
573pub struct GetByPathRequest<'a> {
574    drive_id: &'a str,
575    file_path: &'a str,
576}
577
578#[derive(Serialize)]
579pub struct CreateRequest<'a> {
580    drive_id: &'a str,
581    parent_file_id: &'a str,
582    name: &'a str,
583    #[serde(rename = "type")]
584    typ: CreateType,
585    check_name_mode: CheckNameMode,
586    size: Option<u64>,
587    pre_hash: Option<&'a str>,
588    content_hash: Option<&'a str>,
589    content_hash_name: Option<&'a str>,
590    proof_code: Option<&'a str>,
591    proof_version: Option<&'a str>,
592}
593
594#[derive(Serialize)]
595pub struct FileRequest<'a> {
596    drive_id: &'a str,
597    file_id: &'a str,
598}
599
600#[derive(Serialize)]
601pub struct MovePathRequest<'a> {
602    drive_id: &'a str,
603    file_id: &'a str,
604    to_parent_file_id: &'a str,
605    check_name_mode: CheckNameMode,
606}
607
608#[derive(Serialize)]
609pub struct UpdatePathRequest<'a> {
610    drive_id: &'a str,
611    file_id: &'a str,
612    name: &'a str,
613    check_name_mode: CheckNameMode,
614}
615
616#[derive(Serialize)]
617pub struct CopyPathRequest<'a> {
618    drive_id: &'a str,
619    file_id: &'a str,
620    to_parent_file_id: &'a str,
621    auto_rename: bool,
622}
623
624#[derive(Serialize)]
625pub struct ListRequest<'a> {
626    drive_id: &'a str,
627    parent_file_id: &'a str,
628    limit: Option<usize>,
629    marker: Option<&'a str>,
630}
631
632#[derive(Serialize)]
633pub struct CompleteRequest<'a> {
634    drive_id: &'a str,
635    file_id: &'a str,
636    upload_id: &'a str,
637}
638
639#[derive(Serialize)]
640pub struct GetUploadRequest<'a> {
641    drive_id: &'a str,
642    file_id: &'a str,
643    upload_id: &'a str,
644    part_info_list: Option<Vec<PartInfoItem>>,
645}
646
647#[derive(Serialize)]
648pub struct PartInfoItem {
649    part_number: Option<usize>,
650}