opendal/services/aliyun_drive/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use chrono::Utc;
23use http::header::HeaderValue;
24use http::header::{self};
25use http::Method;
26use http::Request;
27use http::Response;
28use serde::Deserialize;
29use serde::Serialize;
30use tokio::sync::Mutex;
31
32use super::error::parse_error;
33use crate::raw::*;
34use crate::*;
35
36/// Available Aliyun Drive Type.
37#[derive(Debug, Deserialize, Default, Clone)]
38pub enum DriveType {
39    /// Use the default type of Aliyun Drive.
40    #[default]
41    Default,
42    /// Use the backup type of Aliyun Drive.
43    ///
44    /// Fallback to the default type if no backup drive is found.
45    Backup,
46    /// Use the resource type of Aliyun Drive.
47    ///
48    /// Fallback to the default type if no resource drive is found.
49    Resource,
50}
51
52/// Available Aliyun Drive Signer Set
53pub enum AliyunDriveSign {
54    Refresh(String, String, String, Option<String>, i64),
55    Access(String),
56}
57
58pub struct AliyunDriveSigner {
59    pub drive_id: Option<String>,
60    pub sign: AliyunDriveSign,
61}
62
63pub struct AliyunDriveCore {
64    pub info: Arc<AccessorInfo>,
65
66    pub endpoint: String,
67    pub root: String,
68    pub drive_type: DriveType,
69
70    pub signer: Arc<Mutex<AliyunDriveSigner>>,
71    pub dir_lock: Arc<Mutex<()>>,
72}
73
74impl Debug for AliyunDriveCore {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        f.debug_struct("AliyunDriveCore")
77            .field("root", &self.root)
78            .field("drive_type", &self.drive_type)
79            .finish_non_exhaustive()
80    }
81}
82
83impl AliyunDriveCore {
84    async fn send(&self, mut req: Request<Buffer>, token: Option<&str>) -> Result<Buffer> {
85        // AliyunDrive raise NullPointerException if you haven't set a user-agent.
86        req.headers_mut().insert(
87            header::USER_AGENT,
88            HeaderValue::from_str(&format!("opendal/{}", VERSION))
89                .expect("user agent must be valid header value"),
90        );
91        if req.method() == Method::POST {
92            req.headers_mut().insert(
93                header::CONTENT_TYPE,
94                HeaderValue::from_static("application/json;charset=UTF-8"),
95            );
96        }
97        if let Some(token) = token {
98            req.headers_mut().insert(
99                header::AUTHORIZATION,
100                HeaderValue::from_str(&format_authorization_by_bearer(token)?)
101                    .expect("access token must be valid header value"),
102            );
103        }
104        let res = self.info.http_client().send(req).await?;
105        if !res.status().is_success() {
106            return Err(parse_error(res));
107        }
108        Ok(res.into_body())
109    }
110
111    async fn get_access_token(
112        &self,
113        client_id: &str,
114        client_secret: &str,
115        refresh_token: &str,
116    ) -> Result<Buffer> {
117        let body = serde_json::to_vec(&AccessTokenRequest {
118            refresh_token,
119            grant_type: "refresh_token",
120            client_id,
121            client_secret,
122        })
123        .map_err(new_json_serialize_error)?;
124        let req = Request::post(format!("{}/oauth/access_token", self.endpoint))
125            .body(Buffer::from(body))
126            .map_err(new_request_build_error)?;
127        self.send(req, None).await
128    }
129
130    async fn get_drive_id(&self, token: Option<&str>) -> Result<Buffer> {
131        let req = Request::post(format!("{}/adrive/v1.0/user/getDriveInfo", self.endpoint))
132            .body(Buffer::new())
133            .map_err(new_request_build_error)?;
134        self.send(req, token).await
135    }
136
137    pub async fn get_token_and_drive(&self) -> Result<(Option<String>, String)> {
138        let mut signer = self.signer.lock().await;
139        let token = match &mut signer.sign {
140            AliyunDriveSign::Access(access_token) => Some(access_token.clone()),
141            AliyunDriveSign::Refresh(
142                client_id,
143                client_secret,
144                refresh_token,
145                access_token,
146                expire_at,
147            ) => {
148                if *expire_at < Utc::now().timestamp() || access_token.is_none() {
149                    let res = self
150                        .get_access_token(client_id, client_secret, refresh_token)
151                        .await?;
152                    let output: RefreshTokenResponse = serde_json::from_reader(res.reader())
153                        .map_err(new_json_deserialize_error)?;
154                    *access_token = Some(output.access_token);
155                    *expire_at = output.expires_in + Utc::now().timestamp();
156                    *refresh_token = output.refresh_token;
157                }
158                access_token.clone()
159            }
160        };
161        let Some(drive_id) = &signer.drive_id else {
162            let res = self.get_drive_id(token.as_deref()).await?;
163            let output: DriveInfoResponse =
164                serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
165            let drive_id = match self.drive_type {
166                DriveType::Default => output.default_drive_id,
167                DriveType::Backup => output.backup_drive_id.unwrap_or(output.default_drive_id),
168                DriveType::Resource => output.resource_drive_id.unwrap_or(output.default_drive_id),
169            };
170            signer.drive_id = Some(drive_id.clone());
171            return Ok((token, drive_id));
172        };
173        Ok((token, drive_id.clone()))
174    }
175
176    pub fn build_path(&self, path: &str, rooted: bool) -> String {
177        let file_path = if rooted {
178            build_rooted_abs_path(&self.root, path)
179        } else {
180            build_abs_path(&self.root, path)
181        };
182        let file_path = file_path.strip_suffix('/').unwrap_or(&file_path);
183        if file_path.is_empty() {
184            return "/".to_string();
185        }
186        file_path.to_string()
187    }
188
189    pub async fn get_by_path(&self, path: &str) -> Result<Buffer> {
190        let file_path = self.build_path(path, true);
191        let req = Request::post(format!(
192            "{}/adrive/v1.0/openFile/get_by_path",
193            self.endpoint
194        ));
195        let (token, drive_id) = self.get_token_and_drive().await?;
196        let body = serde_json::to_vec(&GetByPathRequest {
197            drive_id: &drive_id,
198            file_path: &file_path,
199        })
200        .map_err(new_json_serialize_error)?;
201        let req = req
202            .extension(Operation::Read)
203            .body(Buffer::from(body))
204            .map_err(new_request_build_error)?;
205        self.send(req, token.as_deref()).await
206    }
207
208    pub async fn ensure_dir_exists(&self, path: &str) -> Result<String> {
209        let file_path = self.build_path(path, false);
210        if file_path == "/" {
211            return Ok("root".to_string());
212        }
213        let file_path = file_path.strip_suffix('/').unwrap_or(&file_path);
214        let paths = file_path.split('/').collect::<Vec<&str>>();
215        let mut parent: Option<String> = None;
216        for path in paths {
217            let _guard = self.dir_lock.lock().await;
218            let res = self
219                .create(
220                    parent.as_deref(),
221                    path,
222                    CreateType::Folder,
223                    CheckNameMode::Refuse,
224                )
225                .await?;
226            let output: CreateResponse =
227                serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
228            parent = Some(output.file_id);
229        }
230        Ok(parent.expect("ensure_dir_exists must succeed"))
231    }
232
233    pub async fn create_with_rapid_upload(
234        &self,
235        parent_file_id: Option<&str>,
236        name: &str,
237        typ: CreateType,
238        check_name_mode: CheckNameMode,
239        size: Option<u64>,
240        rapid_upload: Option<RapidUpload>,
241    ) -> Result<Buffer> {
242        let mut content_hash = None;
243        let mut proof_code = None;
244        let mut pre_hash = None;
245        if let Some(rapid_upload) = rapid_upload {
246            content_hash = rapid_upload.content_hash;
247            proof_code = rapid_upload.proof_code;
248            pre_hash = rapid_upload.pre_hash;
249        }
250
251        let (token, drive_id) = self.get_token_and_drive().await?;
252        let body = serde_json::to_vec(&CreateRequest {
253            drive_id: &drive_id,
254            parent_file_id: parent_file_id.unwrap_or("root"),
255            name,
256            typ,
257            check_name_mode,
258            size,
259            pre_hash: pre_hash.as_deref(),
260            content_hash: content_hash.as_deref(),
261            content_hash_name: content_hash.is_some().then_some("sha1"),
262            proof_code: proof_code.as_deref(),
263            proof_version: proof_code.is_some().then_some("v1"),
264        })
265        .map_err(new_json_serialize_error)?;
266        let req = Request::post(format!("{}/adrive/v1.0/openFile/create", self.endpoint))
267            .extension(Operation::Write)
268            .body(Buffer::from(body))
269            .map_err(new_request_build_error)?;
270        self.send(req, token.as_deref()).await
271    }
272
273    pub async fn create(
274        &self,
275        parent_file_id: Option<&str>,
276        name: &str,
277        typ: CreateType,
278        check_name_mode: CheckNameMode,
279    ) -> Result<Buffer> {
280        self.create_with_rapid_upload(parent_file_id, name, typ, check_name_mode, None, None)
281            .await
282    }
283
284    async fn get_download_url(&self, file_id: &str) -> Result<String> {
285        let (token, drive_id) = self.get_token_and_drive().await?;
286        let body = serde_json::to_vec(&FileRequest {
287            drive_id: &drive_id,
288            file_id,
289        })
290        .map_err(new_json_serialize_error)?;
291
292        let req = Request::post(format!(
293            "{}/adrive/v1.0/openFile/getDownloadUrl",
294            self.endpoint
295        ))
296        .extension(Operation::Read)
297        .body(Buffer::from(body))
298        .map_err(new_request_build_error)?;
299
300        let res = self.send(req, token.as_deref()).await?;
301
302        let output: GetDownloadUrlResponse =
303            serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
304
305        Ok(output.url)
306    }
307
308    pub async fn download(&self, file_id: &str, range: BytesRange) -> Result<Response<HttpBody>> {
309        let download_url = self.get_download_url(file_id).await?;
310        let req = Request::get(download_url)
311            .extension(Operation::Read)
312            .header(header::RANGE, range.to_header())
313            .body(Buffer::new())
314            .map_err(new_request_build_error)?;
315        self.info.http_client().fetch(req).await
316    }
317
318    pub async fn move_path(&self, file_id: &str, to_parent_file_id: &str) -> Result<()> {
319        let (token, drive_id) = self.get_token_and_drive().await?;
320        let body = serde_json::to_vec(&MovePathRequest {
321            drive_id: &drive_id,
322            file_id,
323            to_parent_file_id,
324            check_name_mode: CheckNameMode::AutoRename,
325        })
326        .map_err(new_json_serialize_error)?;
327        let req = Request::post(format!("{}/adrive/v1.0/openFile/move", self.endpoint))
328            .extension(Operation::Write)
329            .body(Buffer::from(body))
330            .map_err(new_request_build_error)?;
331        self.send(req, token.as_deref()).await?;
332        Ok(())
333    }
334
335    pub async fn update_path(&self, file_id: &str, name: &str) -> Result<()> {
336        let (token, drive_id) = self.get_token_and_drive().await?;
337        let body = serde_json::to_vec(&UpdatePathRequest {
338            drive_id: &drive_id,
339            file_id,
340            name,
341            check_name_mode: CheckNameMode::Refuse,
342        })
343        .map_err(new_json_serialize_error)?;
344        let req = Request::post(format!("{}/adrive/v1.0/openFile/update", self.endpoint))
345            .extension(Operation::Write)
346            .body(Buffer::from(body))
347            .map_err(new_request_build_error)?;
348        self.send(req, token.as_deref()).await?;
349        Ok(())
350    }
351
352    pub async fn copy_path(
353        &self,
354        file_id: &str,
355        to_parent_file_id: &str,
356        auto_rename: bool,
357    ) -> Result<Buffer> {
358        let (token, drive_id) = self.get_token_and_drive().await?;
359        let body = serde_json::to_vec(&CopyPathRequest {
360            drive_id: &drive_id,
361            file_id,
362            to_parent_file_id,
363            auto_rename,
364        })
365        .map_err(new_json_serialize_error)?;
366        let req = Request::post(format!("{}/adrive/v1.0/openFile/copy", self.endpoint))
367            .extension(Operation::Copy)
368            .body(Buffer::from(body))
369            .map_err(new_request_build_error)?;
370        self.send(req, token.as_deref()).await
371    }
372
373    pub async fn delete_path(&self, file_id: &str) -> Result<()> {
374        let (token, drive_id) = self.get_token_and_drive().await?;
375        let body = serde_json::to_vec(&FileRequest {
376            drive_id: &drive_id,
377            file_id,
378        })
379        .map_err(new_json_serialize_error)?;
380        let req = Request::post(format!("{}/adrive/v1.0/openFile/delete", self.endpoint))
381            .extension(Operation::Delete)
382            .body(Buffer::from(body))
383            .map_err(new_request_build_error)?;
384        self.send(req, token.as_deref()).await?;
385        Ok(())
386    }
387
388    pub async fn list(
389        &self,
390        parent_file_id: &str,
391        limit: Option<usize>,
392        marker: Option<String>,
393    ) -> Result<Buffer> {
394        let (token, drive_id) = self.get_token_and_drive().await?;
395        let body = serde_json::to_vec(&ListRequest {
396            drive_id: &drive_id,
397            parent_file_id,
398            limit,
399            marker: marker.as_deref(),
400        })
401        .map_err(new_json_serialize_error)?;
402        let req = Request::post(format!("{}/adrive/v1.0/openFile/list", self.endpoint))
403            .extension(Operation::List)
404            .body(Buffer::from(body))
405            .map_err(new_request_build_error)?;
406        self.send(req, token.as_deref()).await
407    }
408
409    pub async fn complete(&self, file_id: &str, upload_id: &str) -> Result<Buffer> {
410        let (token, drive_id) = self.get_token_and_drive().await?;
411        let body = serde_json::to_vec(&CompleteRequest {
412            drive_id: &drive_id,
413            file_id,
414            upload_id,
415        })
416        .map_err(new_json_serialize_error)?;
417        let req = Request::post(format!("{}/adrive/v1.0/openFile/complete", self.endpoint))
418            .extension(Operation::Write)
419            .body(Buffer::from(body))
420            .map_err(new_request_build_error)?;
421        self.send(req, token.as_deref()).await
422    }
423
424    async fn get_upload_url(
425        &self,
426        file_id: &str,
427        upload_id: &str,
428        part_number: usize,
429    ) -> Result<String> {
430        let (token, drive_id) = self.get_token_and_drive().await?;
431        let part_info_list = vec![PartInfoItem {
432            part_number: Some(part_number),
433        }];
434        let body = serde_json::to_vec(&GetUploadRequest {
435            drive_id: &drive_id,
436            file_id,
437            upload_id,
438            part_info_list: Some(part_info_list),
439        })
440        .map_err(new_json_serialize_error)?;
441
442        let req = Request::post(format!(
443            "{}/adrive/v1.0/openFile/getUploadUrl",
444            self.endpoint
445        ))
446        .extension(Operation::Write)
447        .body(Buffer::from(body))
448        .map_err(new_request_build_error)?;
449
450        let res = self.send(req, token.as_deref()).await?;
451
452        let mut output: UploadUrlResponse =
453            serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
454
455        let Some(upload_url) = output
456            .part_info_list
457            .take()
458            .map(|mut list| list.swap_remove(0))
459            .map(|part_info| part_info.upload_url)
460        else {
461            return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
462        };
463
464        Ok(upload_url)
465    }
466    pub async fn upload(
467        &self,
468        file_id: &str,
469        upload_id: &str,
470        part_number: usize,
471        body: Buffer,
472    ) -> Result<Buffer> {
473        let upload_url = self.get_upload_url(file_id, upload_id, part_number).await?;
474        let req = Request::put(upload_url)
475            .extension(Operation::Write)
476            .body(body)
477            .map_err(new_request_build_error)?;
478        self.send(req, None).await
479    }
480}
481
482pub struct RapidUpload {
483    pub pre_hash: Option<String>,
484    pub content_hash: Option<String>,
485    pub proof_code: Option<String>,
486}
487
488#[derive(Debug, Deserialize)]
489pub struct RefreshTokenResponse {
490    pub access_token: String,
491    pub expires_in: i64,
492    pub refresh_token: String,
493}
494
495#[derive(Debug, Deserialize)]
496pub struct DriveInfoResponse {
497    pub default_drive_id: String,
498    pub resource_drive_id: Option<String>,
499    pub backup_drive_id: Option<String>,
500}
501
502#[derive(Debug, Serialize)]
503#[serde(rename_all = "snake_case")]
504pub enum CreateType {
505    File,
506    Folder,
507}
508
509#[derive(Debug, Serialize)]
510#[serde(rename_all = "snake_case")]
511pub enum CheckNameMode {
512    Refuse,
513    AutoRename,
514}
515
516#[derive(Deserialize)]
517pub struct UploadUrlResponse {
518    pub part_info_list: Option<Vec<PartInfo>>,
519}
520
521#[derive(Deserialize)]
522pub struct CreateResponse {
523    pub file_id: String,
524    pub upload_id: Option<String>,
525    pub exist: Option<bool>,
526}
527
528#[derive(Serialize, Deserialize)]
529pub struct PartInfo {
530    pub etag: Option<String>,
531    pub part_number: usize,
532    pub part_size: Option<u64>,
533    pub upload_url: String,
534    pub content_type: Option<String>,
535}
536
537#[derive(Deserialize)]
538pub struct AliyunDriveFileList {
539    pub items: Vec<AliyunDriveFile>,
540    pub next_marker: Option<String>,
541}
542
543#[derive(Deserialize)]
544pub struct CopyResponse {
545    pub file_id: String,
546}
547
548#[derive(Deserialize)]
549pub struct AliyunDriveFile {
550    pub file_id: String,
551    pub parent_file_id: String,
552    pub name: String,
553    pub size: Option<u64>,
554    pub content_type: Option<String>,
555    #[serde(rename = "type")]
556    pub path_type: String,
557    pub updated_at: String,
558}
559
560#[derive(Deserialize)]
561pub struct GetDownloadUrlResponse {
562    pub url: String,
563}
564
565#[derive(Serialize)]
566pub struct AccessTokenRequest<'a> {
567    refresh_token: &'a str,
568    grant_type: &'a str,
569    client_id: &'a str,
570    client_secret: &'a str,
571}
572
573#[derive(Serialize)]
574pub struct GetByPathRequest<'a> {
575    drive_id: &'a str,
576    file_path: &'a str,
577}
578
579#[derive(Serialize)]
580pub struct CreateRequest<'a> {
581    drive_id: &'a str,
582    parent_file_id: &'a str,
583    name: &'a str,
584    #[serde(rename = "type")]
585    typ: CreateType,
586    check_name_mode: CheckNameMode,
587    size: Option<u64>,
588    pre_hash: Option<&'a str>,
589    content_hash: Option<&'a str>,
590    content_hash_name: Option<&'a str>,
591    proof_code: Option<&'a str>,
592    proof_version: Option<&'a str>,
593}
594
595#[derive(Serialize)]
596pub struct FileRequest<'a> {
597    drive_id: &'a str,
598    file_id: &'a str,
599}
600
601#[derive(Serialize)]
602pub struct MovePathRequest<'a> {
603    drive_id: &'a str,
604    file_id: &'a str,
605    to_parent_file_id: &'a str,
606    check_name_mode: CheckNameMode,
607}
608
609#[derive(Serialize)]
610pub struct UpdatePathRequest<'a> {
611    drive_id: &'a str,
612    file_id: &'a str,
613    name: &'a str,
614    check_name_mode: CheckNameMode,
615}
616
617#[derive(Serialize)]
618pub struct CopyPathRequest<'a> {
619    drive_id: &'a str,
620    file_id: &'a str,
621    to_parent_file_id: &'a str,
622    auto_rename: bool,
623}
624
625#[derive(Serialize)]
626pub struct ListRequest<'a> {
627    drive_id: &'a str,
628    parent_file_id: &'a str,
629    limit: Option<usize>,
630    marker: Option<&'a str>,
631}
632
633#[derive(Serialize)]
634pub struct CompleteRequest<'a> {
635    drive_id: &'a str,
636    file_id: &'a str,
637    upload_id: &'a str,
638}
639
640#[derive(Serialize)]
641pub struct GetUploadRequest<'a> {
642    drive_id: &'a str,
643    file_id: &'a str,
644    upload_id: &'a str,
645    part_info_list: Option<Vec<PartInfoItem>>,
646}
647
648#[derive(Serialize)]
649pub struct PartInfoItem {
650    part_number: Option<usize>,
651}