opendal/services/dropbox/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::default::Default;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use bytes::Buf;
24use bytes::Bytes;
25use chrono::DateTime;
26use chrono::Utc;
27use http::header;
28use http::header::CONTENT_LENGTH;
29use http::header::CONTENT_TYPE;
30use http::Request;
31use http::Response;
32use http::StatusCode;
33use serde::Deserialize;
34use serde::Serialize;
35use tokio::sync::Mutex;
36
37use super::error::parse_error;
38use crate::raw::*;
39use crate::*;
40
41pub struct DropboxCore {
42    pub info: Arc<AccessorInfo>,
43    pub root: String,
44    pub signer: Arc<Mutex<DropboxSigner>>,
45}
46
47impl Debug for DropboxCore {
48    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("DropboxCore")
50            .field("root", &self.root)
51            .finish()
52    }
53}
54
55impl DropboxCore {
56    fn build_path(&self, path: &str) -> String {
57        let path = build_rooted_abs_path(&self.root, path);
58        // For dropbox, even the path is a directory,
59        // we still need to remove the trailing slash.
60        path.trim_end_matches('/').to_string()
61    }
62
63    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
64        let mut signer = self.signer.lock().await;
65
66        // Access token is valid, use it directly.
67        if !signer.access_token.is_empty() && signer.expires_in > Utc::now() {
68            let value = format!("Bearer {}", signer.access_token)
69                .parse()
70                .expect("token must be valid header value");
71            req.headers_mut().insert(header::AUTHORIZATION, value);
72            return Ok(());
73        }
74
75        // Refresh invalid token.
76        let url = "https://api.dropboxapi.com/oauth2/token".to_string();
77
78        let content = format!(
79            "grant_type=refresh_token&refresh_token={}&client_id={}&client_secret={}",
80            signer.refresh_token, signer.client_id, signer.client_secret
81        );
82        let bs = Bytes::from(content);
83
84        let request = Request::post(&url)
85            .header(CONTENT_TYPE, "application/x-www-form-urlencoded")
86            .header(CONTENT_LENGTH, bs.len())
87            .body(Buffer::from(bs))
88            .map_err(new_request_build_error)?;
89
90        let resp = self.info.http_client().send(request).await?;
91        let body = resp.into_body();
92
93        let token: DropboxTokenResponse =
94            serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
95
96        // Update signer after token refreshed.
97        signer.access_token.clone_from(&token.access_token);
98
99        // Refresh it 2 minutes earlier.
100        signer.expires_in = Utc::now()
101            + chrono::TimeDelta::try_seconds(token.expires_in as i64)
102                .expect("expires_in must be valid seconds")
103            - chrono::TimeDelta::try_seconds(120).expect("120 must be valid seconds");
104
105        let value = format!("Bearer {}", token.access_token)
106            .parse()
107            .expect("token must be valid header value");
108        req.headers_mut().insert(header::AUTHORIZATION, value);
109
110        Ok(())
111    }
112
113    pub async fn dropbox_get(
114        &self,
115        path: &str,
116        range: BytesRange,
117        _: &OpRead,
118    ) -> Result<Response<HttpBody>> {
119        let url: String = "https://content.dropboxapi.com/2/files/download".to_string();
120        let download_args = DropboxDownloadArgs {
121            path: build_rooted_abs_path(&self.root, path),
122        };
123        let request_payload =
124            serde_json::to_string(&download_args).map_err(new_json_serialize_error)?;
125
126        let mut req = Request::post(&url)
127            .header("Dropbox-API-Arg", request_payload)
128            .header(CONTENT_LENGTH, 0);
129
130        if !range.is_full() {
131            req = req.header(header::RANGE, range.to_header());
132        }
133
134        let req = req.extension(Operation::Read);
135
136        let mut request = req.body(Buffer::new()).map_err(new_request_build_error)?;
137
138        self.sign(&mut request).await?;
139        self.info.http_client().fetch(request).await
140    }
141
142    pub async fn dropbox_update(
143        &self,
144        path: &str,
145        size: Option<usize>,
146        args: &OpWrite,
147        body: Buffer,
148    ) -> Result<Response<Buffer>> {
149        let url = "https://content.dropboxapi.com/2/files/upload".to_string();
150        let dropbox_update_args = DropboxUploadArgs {
151            path: build_rooted_abs_path(&self.root, path),
152            ..Default::default()
153        };
154        let mut request_builder = Request::post(&url);
155        if let Some(size) = size {
156            request_builder = request_builder.header(CONTENT_LENGTH, size);
157        }
158        request_builder = request_builder.header(
159            CONTENT_TYPE,
160            args.content_type().unwrap_or("application/octet-stream"),
161        );
162
163        let request_builder = request_builder.extension(Operation::Write);
164
165        let mut request = request_builder
166            .header(
167                "Dropbox-API-Arg",
168                serde_json::to_string(&dropbox_update_args).map_err(new_json_serialize_error)?,
169            )
170            .body(body)
171            .map_err(new_request_build_error)?;
172
173        self.sign(&mut request).await?;
174        self.info.http_client().send(request).await
175    }
176
177    pub async fn dropbox_delete(&self, path: &str) -> Result<Response<Buffer>> {
178        let url = "https://api.dropboxapi.com/2/files/delete_v2".to_string();
179        let args = DropboxDeleteArgs {
180            path: self.build_path(path),
181        };
182
183        let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);
184
185        let mut request = Request::post(&url)
186            .header(CONTENT_TYPE, "application/json")
187            .header(CONTENT_LENGTH, bs.len())
188            .extension(Operation::Delete)
189            .body(Buffer::from(bs))
190            .map_err(new_request_build_error)?;
191
192        self.sign(&mut request).await?;
193        self.info.http_client().send(request).await
194    }
195
196    pub async fn dropbox_create_folder(&self, path: &str) -> Result<RpCreateDir> {
197        let url = "https://api.dropboxapi.com/2/files/create_folder_v2".to_string();
198        let args = DropboxCreateFolderArgs {
199            path: self.build_path(path),
200        };
201
202        let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);
203
204        let mut request = Request::post(&url)
205            .header(CONTENT_TYPE, "application/json")
206            .header(CONTENT_LENGTH, bs.len())
207            .extension(Operation::CreateDir)
208            .body(Buffer::from(bs))
209            .map_err(new_request_build_error)?;
210
211        self.sign(&mut request).await?;
212        let resp = self.info.http_client().send(request).await?;
213        let status = resp.status();
214        match status {
215            StatusCode::OK => Ok(RpCreateDir::default()),
216            _ => {
217                let err = parse_error(resp);
218                match err.kind() {
219                    ErrorKind::AlreadyExists => Ok(RpCreateDir::default()),
220                    _ => Err(err),
221                }
222            }
223        }
224    }
225
226    pub async fn dropbox_list(
227        &self,
228        path: &str,
229        recursive: bool,
230        limit: Option<usize>,
231    ) -> Result<Response<Buffer>> {
232        let url = "https://api.dropboxapi.com/2/files/list_folder".to_string();
233
234        // The default settings here align with the DropboxAPI default settings.
235        // Refer: https://www.dropbox.com/developers/documentation/http/documentation#files-list_folder
236        let args = DropboxListArgs {
237            path: self.build_path(path),
238            recursive,
239            limit: limit.unwrap_or(1000),
240        };
241
242        let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);
243
244        let mut request = Request::post(&url)
245            .header(CONTENT_TYPE, "application/json")
246            .header(CONTENT_LENGTH, bs.len())
247            .extension(Operation::List)
248            .body(Buffer::from(bs))
249            .map_err(new_request_build_error)?;
250
251        self.sign(&mut request).await?;
252        self.info.http_client().send(request).await
253    }
254
255    pub async fn dropbox_list_continue(&self, cursor: &str) -> Result<Response<Buffer>> {
256        let url = "https://api.dropboxapi.com/2/files/list_folder/continue".to_string();
257
258        let args = DropboxListContinueArgs {
259            cursor: cursor.to_string(),
260        };
261
262        let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);
263
264        let mut request = Request::post(&url)
265            .header(CONTENT_TYPE, "application/json")
266            .header(CONTENT_LENGTH, bs.len())
267            .extension(Operation::List)
268            .body(Buffer::from(bs))
269            .map_err(new_request_build_error)?;
270
271        self.sign(&mut request).await?;
272        self.info.http_client().send(request).await
273    }
274
275    pub async fn dropbox_copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
276        let url = "https://api.dropboxapi.com/2/files/copy_v2".to_string();
277
278        let args = DropboxCopyArgs {
279            from_path: self.build_path(from),
280            to_path: self.build_path(to),
281        };
282
283        let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);
284
285        let mut request = Request::post(&url)
286            .header(CONTENT_TYPE, "application/json")
287            .header(CONTENT_LENGTH, bs.len())
288            .extension(Operation::Copy)
289            .body(Buffer::from(bs))
290            .map_err(new_request_build_error)?;
291
292        self.sign(&mut request).await?;
293        self.info.http_client().send(request).await
294    }
295
296    pub async fn dropbox_move(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
297        let url = "https://api.dropboxapi.com/2/files/move_v2".to_string();
298
299        let args = DropboxMoveArgs {
300            from_path: self.build_path(from),
301            to_path: self.build_path(to),
302        };
303
304        let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);
305
306        let mut request = Request::post(&url)
307            .header(CONTENT_TYPE, "application/json")
308            .header(CONTENT_LENGTH, bs.len())
309            .extension(Operation::Rename)
310            .body(Buffer::from(bs))
311            .map_err(new_request_build_error)?;
312
313        self.sign(&mut request).await?;
314        self.info.http_client().send(request).await
315    }
316
317    pub async fn dropbox_get_metadata(&self, path: &str) -> Result<Response<Buffer>> {
318        let url = "https://api.dropboxapi.com/2/files/get_metadata".to_string();
319        let args = DropboxMetadataArgs {
320            path: self.build_path(path),
321            ..Default::default()
322        };
323
324        let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);
325
326        let mut request = Request::post(&url)
327            .header(CONTENT_TYPE, "application/json")
328            .header(CONTENT_LENGTH, bs.len())
329            .extension(Operation::Stat)
330            .body(Buffer::from(bs))
331            .map_err(new_request_build_error)?;
332
333        self.sign(&mut request).await?;
334
335        self.info.http_client().send(request).await
336    }
337}
338
339#[derive(Clone)]
340pub struct DropboxSigner {
341    pub client_id: String,
342    pub client_secret: String,
343    pub refresh_token: String,
344
345    pub access_token: String,
346    pub expires_in: DateTime<Utc>,
347}
348
349impl Default for DropboxSigner {
350    fn default() -> Self {
351        DropboxSigner {
352            refresh_token: String::new(),
353            client_id: String::new(),
354            client_secret: String::new(),
355
356            access_token: String::new(),
357            expires_in: DateTime::<Utc>::MIN_UTC,
358        }
359    }
360}
361
362#[derive(Clone, Debug, Deserialize, Serialize)]
363struct DropboxDownloadArgs {
364    path: String,
365}
366
367#[derive(Clone, Debug, Deserialize, Serialize)]
368struct DropboxUploadArgs {
369    path: String,
370    mode: String,
371    mute: bool,
372    autorename: bool,
373    strict_conflict: bool,
374}
375
376impl Default for DropboxUploadArgs {
377    fn default() -> Self {
378        DropboxUploadArgs {
379            mode: "overwrite".to_string(),
380            path: "".to_string(),
381            mute: true,
382            autorename: false,
383            strict_conflict: false,
384        }
385    }
386}
387
388#[derive(Clone, Debug, Deserialize, Serialize)]
389struct DropboxDeleteArgs {
390    path: String,
391}
392
393#[derive(Clone, Debug, Deserialize, Serialize)]
394struct DropboxDeleteBatchEntry {
395    path: String,
396}
397
398#[derive(Clone, Debug, Deserialize, Serialize)]
399struct DropboxDeleteBatchArgs {
400    entries: Vec<DropboxDeleteBatchEntry>,
401}
402
403#[derive(Clone, Debug, Deserialize, Serialize)]
404struct DropboxDeleteBatchCheckArgs {
405    async_job_id: String,
406}
407
408#[derive(Clone, Debug, Deserialize, Serialize)]
409struct DropboxCreateFolderArgs {
410    path: String,
411}
412
413#[derive(Clone, Debug, Deserialize, Serialize)]
414struct DropboxListArgs {
415    path: String,
416    recursive: bool,
417    limit: usize,
418}
419
420#[derive(Clone, Debug, Deserialize, Serialize)]
421struct DropboxListContinueArgs {
422    cursor: String,
423}
424
425#[derive(Clone, Debug, Deserialize, Serialize)]
426struct DropboxCopyArgs {
427    from_path: String,
428    to_path: String,
429}
430
431#[derive(Clone, Debug, Deserialize, Serialize)]
432struct DropboxMoveArgs {
433    from_path: String,
434    to_path: String,
435}
436
437#[derive(Default, Clone, Debug, Deserialize, Serialize)]
438struct DropboxMetadataArgs {
439    include_deleted: bool,
440    include_has_explicit_shared_members: bool,
441    include_media_info: bool,
442    path: String,
443}
444
445#[derive(Clone, Deserialize)]
446struct DropboxTokenResponse {
447    access_token: String,
448    expires_in: usize,
449}
450
451#[derive(Default, Debug, Deserialize)]
452#[serde(default)]
453pub struct DropboxMetadataResponse {
454    #[serde(rename(deserialize = ".tag"))]
455    pub tag: String,
456    pub client_modified: String,
457    pub content_hash: Option<String>,
458    pub file_lock_info: Option<DropboxMetadataFileLockInfo>,
459    pub has_explicit_shared_members: Option<bool>,
460    pub id: String,
461    pub is_downloadable: Option<bool>,
462    pub name: String,
463    pub path_display: String,
464    pub path_lower: String,
465    pub property_groups: Option<Vec<DropboxMetadataPropertyGroup>>,
466    pub rev: Option<String>,
467    pub server_modified: Option<String>,
468    pub sharing_info: Option<DropboxMetadataSharingInfo>,
469    pub size: Option<u64>,
470}
471
472#[derive(Default, Debug, Deserialize)]
473#[serde(default)]
474pub struct DropboxMetadataFileLockInfo {
475    pub created: Option<String>,
476    pub is_lockholder: bool,
477    pub lockholder_name: Option<String>,
478}
479
480#[derive(Default, Debug, Deserialize)]
481#[serde(default)]
482pub struct DropboxMetadataPropertyGroup {
483    pub fields: Vec<DropboxMetadataPropertyGroupField>,
484    pub template_id: String,
485}
486
487#[derive(Default, Debug, Deserialize)]
488#[serde(default)]
489pub struct DropboxMetadataPropertyGroupField {
490    pub name: String,
491    pub value: String,
492}
493
494#[derive(Default, Debug, Deserialize)]
495#[serde(default)]
496pub struct DropboxMetadataSharingInfo {
497    pub modified_by: Option<String>,
498    pub parent_shared_folder_id: Option<String>,
499    pub read_only: Option<bool>,
500    pub shared_folder_id: Option<String>,
501    pub traverse_only: Option<bool>,
502    pub no_access: Option<bool>,
503}
504
505#[derive(Default, Debug, Deserialize)]
506#[serde(default)]
507pub struct DropboxListResponse {
508    pub entries: Vec<DropboxMetadataResponse>,
509    pub cursor: String,
510    pub has_more: bool,
511}
512
513#[derive(Default, Debug, Deserialize)]
514#[serde(default)]
515pub struct DropboxDeleteBatchResponse {
516    #[serde(rename(deserialize = ".tag"))]
517    pub tag: String,
518    pub async_job_id: Option<String>,
519    pub entries: Option<Vec<DropboxDeleteBatchResponseEntry>>,
520}
521
522#[derive(Default, Debug, Deserialize)]
523#[serde(default)]
524pub struct DropboxDeleteBatchResponseEntry {
525    #[serde(rename(deserialize = ".tag"))]
526    pub tag: String,
527    pub metadata: Option<DropboxMetadataResponse>,
528}
529
530#[derive(Default, Debug, Deserialize)]
531#[serde(default)]
532pub struct DropboxDeleteBatchFailureResponseCause {
533    #[serde(rename(deserialize = ".tag"))]
534    pub tag: String,
535}