opendal/services/gdrive/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes;
23use bytes::Buf;
24use bytes::Bytes;
25use chrono::DateTime;
26use chrono::Utc;
27use http::header;
28use http::Request;
29use http::Response;
30use http::StatusCode;
31use serde::Deserialize;
32use serde_json::json;
33use tokio::sync::Mutex;
34
35use super::error::parse_error;
36use crate::raw::*;
37use crate::*;
38
39pub struct GdriveCore {
40    pub info: Arc<AccessorInfo>,
41
42    pub root: String,
43
44    pub signer: Arc<Mutex<GdriveSigner>>,
45
46    /// Cache the mapping from path to file id
47    pub path_cache: PathCacher<GdrivePathQuery>,
48}
49
50impl Debug for GdriveCore {
51    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52        let mut de = f.debug_struct("GdriveCore");
53        de.field("root", &self.root);
54        de.finish()
55    }
56}
57
58impl GdriveCore {
59    pub async fn gdrive_stat(&self, path: &str) -> Result<Response<Buffer>> {
60        let path = build_abs_path(&self.root, path);
61        let file_id = self.path_cache.get(&path).await?.ok_or(Error::new(
62            ErrorKind::NotFound,
63            format!("path not found: {}", path),
64        ))?;
65
66        // The file metadata in the Google Drive API is very complex.
67        // For now, we only need the file id, name, mime type and modified time.
68        let mut req = Request::get(format!(
69            "https://www.googleapis.com/drive/v3/files/{}?fields=id,name,mimeType,size,modifiedTime",
70            file_id
71        ))
72            .extension(Operation::Stat)
73            .body(Buffer::new())
74            .map_err(new_request_build_error)?;
75        self.sign(&mut req).await?;
76
77        self.info.http_client().send(req).await
78    }
79
80    pub async fn gdrive_get(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
81        let path = build_abs_path(&self.root, path);
82        let path_id = self.path_cache.get(&path).await?.ok_or(Error::new(
83            ErrorKind::NotFound,
84            format!("path not found: {}", path),
85        ))?;
86
87        let url: String = format!(
88            "https://www.googleapis.com/drive/v3/files/{}?alt=media",
89            path_id
90        );
91
92        let mut req = Request::get(&url)
93            .extension(Operation::Read)
94            .header(header::RANGE, range.to_header())
95            .body(Buffer::new())
96            .map_err(new_request_build_error)?;
97        self.sign(&mut req).await?;
98
99        self.info.http_client().fetch(req).await
100    }
101
102    pub async fn gdrive_list(
103        &self,
104        file_id: &str,
105        page_size: i32,
106        next_page_token: &str,
107    ) -> Result<Response<Buffer>> {
108        let q = format!("'{}' in parents and trashed = false", file_id);
109        let url = "https://www.googleapis.com/drive/v3/files";
110        let mut url = QueryPairsWriter::new(url);
111        url = url.push("pageSize", &page_size.to_string());
112        url = url.push("q", &percent_encode_path(&q));
113        if !next_page_token.is_empty() {
114            url = url.push("pageToken", next_page_token);
115        };
116
117        let mut req = Request::get(url.finish())
118            .extension(Operation::List)
119            .body(Buffer::new())
120            .map_err(new_request_build_error)?;
121        self.sign(&mut req).await?;
122
123        self.info.http_client().send(req).await
124    }
125
126    // Update with content and metadata
127    pub async fn gdrive_patch_metadata_request(
128        &self,
129        source: &str,
130        target: &str,
131    ) -> Result<Response<Buffer>> {
132        let source_file_id = self.path_cache.get(source).await?.ok_or(Error::new(
133            ErrorKind::NotFound,
134            format!("source path not found: {}", source),
135        ))?;
136        let source_parent = get_parent(source);
137        let source_parent_id = self
138            .path_cache
139            .get(source_parent)
140            .await?
141            .expect("old parent must exist");
142
143        let target_parent_id = self.path_cache.ensure_dir(get_parent(target)).await?;
144        let target_file_name = get_basename(target);
145
146        let metadata = &json!({
147            "name": target_file_name,
148            "removeParents": [source_parent_id],
149            "addParents": [target_parent_id],
150        });
151
152        let url = format!(
153            "https://www.googleapis.com/drive/v3/files/{}",
154            source_file_id
155        );
156        let mut req = Request::patch(url)
157            .extension(Operation::Rename)
158            .body(Buffer::from(Bytes::from(metadata.to_string())))
159            .map_err(new_request_build_error)?;
160
161        self.sign(&mut req).await?;
162
163        self.info.http_client().send(req).await
164    }
165
166    pub async fn gdrive_trash(&self, file_id: &str) -> Result<Response<Buffer>> {
167        let url = format!("https://www.googleapis.com/drive/v3/files/{}", file_id);
168
169        let body = serde_json::to_vec(&json!({
170            "trashed": true
171        }))
172        .map_err(new_json_serialize_error)?;
173
174        let mut req = Request::patch(&url)
175            .extension(Operation::Delete)
176            .body(Buffer::from(Bytes::from(body)))
177            .map_err(new_request_build_error)?;
178
179        self.sign(&mut req).await?;
180
181        self.info.http_client().send(req).await
182    }
183
184    /// Create a file with the content.
185    pub async fn gdrive_upload_simple_request(
186        &self,
187        path: &str,
188        size: u64,
189        body: Buffer,
190    ) -> Result<Response<Buffer>> {
191        let parent = self.path_cache.ensure_dir(get_parent(path)).await?;
192
193        let url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart";
194
195        let file_name = get_basename(path);
196
197        let metadata = serde_json::to_vec(&json!({
198            "name": file_name,
199            "parents": [parent],
200        }))
201        .map_err(new_json_serialize_error)?;
202
203        let req = Request::post(url)
204            .header("X-Upload-Content-Length", size)
205            .extension(Operation::Write);
206
207        let multipart = Multipart::new()
208            .part(
209                FormDataPart::new("metadata")
210                    .header(
211                        header::CONTENT_TYPE,
212                        "application/json; charset=UTF-8".parse().unwrap(),
213                    )
214                    .content(metadata),
215            )
216            .part(
217                FormDataPart::new("file")
218                    .header(
219                        header::CONTENT_TYPE,
220                        "application/octet-stream".parse().unwrap(),
221                    )
222                    .content(body),
223            );
224
225        let mut req = multipart.apply(req)?;
226
227        self.sign(&mut req).await?;
228
229        self.info.http_client().send(req).await
230    }
231
232    /// Overwrite the file with the content.
233    ///
234    /// # Notes
235    ///
236    /// - The file id is required. Do not use this method to create a file.
237    pub async fn gdrive_upload_overwrite_simple_request(
238        &self,
239        file_id: &str,
240        size: u64,
241        body: Buffer,
242    ) -> Result<Response<Buffer>> {
243        let url = format!(
244            "https://www.googleapis.com/upload/drive/v3/files/{}?uploadType=media",
245            file_id
246        );
247
248        let mut req = Request::patch(url)
249            .header(header::CONTENT_TYPE, "application/octet-stream")
250            .header(header::CONTENT_LENGTH, size)
251            .header("X-Upload-Content-Length", size)
252            .extension(Operation::Write)
253            .body(body)
254            .map_err(new_request_build_error)?;
255
256        self.sign(&mut req).await?;
257
258        self.info.http_client().send(req).await
259    }
260
261    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
262        let mut signer = self.signer.lock().await;
263        signer.sign(req).await
264    }
265
266    pub async fn gdrive_copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
267        let from = build_abs_path(&self.root, from);
268
269        let from_file_id = self.path_cache.get(&from).await?.ok_or(Error::new(
270            ErrorKind::NotFound,
271            "the file to copy does not exist",
272        ))?;
273
274        let to_name = get_basename(to);
275        let to_path = build_abs_path(&self.root, to);
276        let to_parent_id = self.path_cache.ensure_dir(get_parent(&to_path)).await?;
277
278        // copy will overwrite `to`, delete it if exist
279        if let Some(id) = self.path_cache.get(&to_path).await? {
280            let resp = self.gdrive_trash(&id).await?;
281            let status = resp.status();
282            if status != StatusCode::OK {
283                return Err(parse_error(resp));
284            }
285
286            self.path_cache.remove(&to_path).await;
287        }
288
289        let url = format!(
290            "https://www.googleapis.com/drive/v3/files/{}/copy",
291            from_file_id
292        );
293
294        let request_body = &json!({
295            "name": to_name,
296            "parents": [to_parent_id],
297        });
298        let body = Buffer::from(Bytes::from(request_body.to_string()));
299
300        let mut req = Request::post(&url)
301            .extension(Operation::Copy)
302            .body(body)
303            .map_err(new_request_build_error)?;
304        self.sign(&mut req).await?;
305
306        self.info.http_client().send(req).await
307    }
308}
309
310#[derive(Clone)]
311pub struct GdriveSigner {
312    pub info: Arc<AccessorInfo>,
313
314    pub client_id: String,
315    pub client_secret: String,
316    pub refresh_token: String,
317
318    pub access_token: String,
319    pub expires_in: DateTime<Utc>,
320}
321
322impl GdriveSigner {
323    /// Create a new signer.
324    pub fn new(info: Arc<AccessorInfo>) -> Self {
325        GdriveSigner {
326            info,
327
328            client_id: "".to_string(),
329            client_secret: "".to_string(),
330            refresh_token: "".to_string(),
331            access_token: "".to_string(),
332            expires_in: DateTime::<Utc>::MIN_UTC,
333        }
334    }
335
336    /// Sign a request.
337    pub async fn sign<T>(&mut self, req: &mut Request<T>) -> Result<()> {
338        if !self.access_token.is_empty() && self.expires_in > Utc::now() {
339            let value = format!("Bearer {}", self.access_token)
340                .parse()
341                .expect("access token must be valid header value");
342
343            req.headers_mut().insert(header::AUTHORIZATION, value);
344            return Ok(());
345        }
346
347        let url = format!(
348            "https://oauth2.googleapis.com/token?refresh_token={}&client_id={}&client_secret={}&grant_type=refresh_token",
349            self.refresh_token, self.client_id, self.client_secret
350        );
351
352        {
353            let req = Request::post(url)
354                .header(header::CONTENT_LENGTH, 0)
355                .body(Buffer::new())
356                .map_err(new_request_build_error)?;
357
358            let resp = self.info.http_client().send(req).await?;
359            let status = resp.status();
360
361            match status {
362                StatusCode::OK => {
363                    let resp_body = resp.into_body();
364                    let token: GdriveTokenResponse = serde_json::from_reader(resp_body.reader())
365                        .map_err(new_json_deserialize_error)?;
366                    self.access_token.clone_from(&token.access_token);
367                    self.expires_in = Utc::now()
368                        + chrono::TimeDelta::try_seconds(token.expires_in)
369                            .expect("expires_in must be valid seconds")
370                        - chrono::TimeDelta::try_seconds(120).expect("120 must be valid seconds");
371                }
372                _ => {
373                    return Err(parse_error(resp));
374                }
375            }
376        }
377
378        let auth_header_content = format!("Bearer {}", self.access_token);
379        req.headers_mut()
380            .insert(header::AUTHORIZATION, auth_header_content.parse().unwrap());
381
382        Ok(())
383    }
384}
385
386pub struct GdrivePathQuery {
387    pub info: Arc<AccessorInfo>,
388    pub signer: Arc<Mutex<GdriveSigner>>,
389}
390
391impl GdrivePathQuery {
392    pub fn new(info: Arc<AccessorInfo>, signer: Arc<Mutex<GdriveSigner>>) -> Self {
393        GdrivePathQuery { info, signer }
394    }
395}
396
397impl PathQuery for GdrivePathQuery {
398    async fn root(&self) -> Result<String> {
399        Ok("root".to_string())
400    }
401
402    async fn query(&self, parent_id: &str, name: &str) -> Result<Option<String>> {
403        let mut queries = vec![
404            // Make sure name has been replaced with escaped name.
405            //
406            // ref: <https://developers.google.com/drive/api/guides/ref-search-terms>
407            format!(
408                "name = '{}'",
409                name.replace('\'', "\\'").trim_end_matches('/')
410            ),
411            format!("'{}' in parents", parent_id),
412            "trashed = false".to_string(),
413        ];
414        if name.ends_with('/') {
415            queries.push("mimeType = 'application/vnd.google-apps.folder'".to_string());
416        }
417        let query = queries.join(" and ");
418
419        let url = format!(
420            "https://www.googleapis.com/drive/v3/files?q={}",
421            percent_encode_path(query.as_str())
422        );
423
424        let mut req = Request::get(&url)
425            .extension(Operation::Stat)
426            .body(Buffer::new())
427            .map_err(new_request_build_error)?;
428
429        self.signer.lock().await.sign(&mut req).await?;
430
431        let resp = self.info.http_client().send(req).await?;
432        let status = resp.status();
433
434        match status {
435            StatusCode::OK => {
436                let body = resp.into_body();
437                let meta: GdriveFileList =
438                    serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
439
440                if let Some(f) = meta.files.first() {
441                    Ok(Some(f.id.clone()))
442                } else {
443                    Ok(None)
444                }
445            }
446            _ => Err(parse_error(resp)),
447        }
448    }
449
450    async fn create_dir(&self, parent_id: &str, name: &str) -> Result<String> {
451        let url = "https://www.googleapis.com/drive/v3/files";
452
453        let content = serde_json::to_vec(&json!({
454            "name": name.trim_end_matches('/'),
455            "mimeType": "application/vnd.google-apps.folder",
456            // If the parent is not provided, the folder will be created in the root folder.
457            "parents": [parent_id],
458        }))
459        .map_err(new_json_serialize_error)?;
460
461        let mut req = Request::post(url)
462            .extension(Operation::CreateDir)
463            .header(header::CONTENT_TYPE, "application/json")
464            .body(Buffer::from(Bytes::from(content)))
465            .map_err(new_request_build_error)?;
466
467        self.signer.lock().await.sign(&mut req).await?;
468
469        let resp = self.info.http_client().send(req).await?;
470        if !resp.status().is_success() {
471            return Err(parse_error(resp));
472        }
473
474        let body = resp.into_body();
475        let file: GdriveFile =
476            serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
477        Ok(file.id)
478    }
479}
480
481#[derive(Deserialize)]
482pub struct GdriveTokenResponse {
483    access_token: String,
484    expires_in: i64,
485}
486
487/// This is the file struct returned by the Google Drive API.
488/// This is a complex struct, but we only add the fields we need.
489/// refer to https://developers.google.com/drive/api/reference/rest/v3/files#File
490#[derive(Deserialize, Debug)]
491#[serde(rename_all = "camelCase")]
492pub struct GdriveFile {
493    pub mime_type: String,
494    pub id: String,
495    pub name: String,
496    pub size: Option<String>,
497    // The modified time is not returned unless the `fields`
498    // query parameter contains `modifiedTime`.
499    // As we only need the modified time when we do `stat` operation,
500    // if other operations(such as search) do not specify the `fields` query parameter,
501    // try to access this field, it will be `None`.
502    pub modified_time: Option<String>,
503}
504
505/// refer to https://developers.google.com/drive/api/reference/rest/v3/files/list
506#[derive(Deserialize)]
507#[serde(rename_all = "camelCase")]
508pub(crate) struct GdriveFileList {
509    pub(crate) files: Vec<GdriveFile>,
510    pub(crate) next_page_token: Option<String>,
511}