opendal/services/gdrive/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes;
23use bytes::Buf;
24use bytes::Bytes;
25use chrono::DateTime;
26use chrono::Utc;
27use http::header;
28use http::Request;
29use http::Response;
30use http::StatusCode;
31use serde::Deserialize;
32use serde_json::json;
33use tokio::sync::Mutex;
34
35use super::error::parse_error;
36use crate::raw::*;
37use crate::*;
38
39pub struct GdriveCore {
40    pub info: Arc<AccessorInfo>,
41
42    pub root: String,
43
44    pub signer: Arc<Mutex<GdriveSigner>>,
45
46    /// Cache the mapping from path to file id
47    pub path_cache: PathCacher<GdrivePathQuery>,
48}
49
50impl Debug for GdriveCore {
51    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52        let mut de = f.debug_struct("GdriveCore");
53        de.field("root", &self.root);
54        de.finish()
55    }
56}
57
58impl GdriveCore {
59    pub async fn gdrive_stat(&self, path: &str) -> Result<Response<Buffer>> {
60        let path = build_abs_path(&self.root, path);
61        let file_id = self.path_cache.get(&path).await?.ok_or(Error::new(
62            ErrorKind::NotFound,
63            format!("path not found: {path}"),
64        ))?;
65
66        // The file metadata in the Google Drive API is very complex.
67        // For now, we only need the file id, name, mime type and modified time.
68        let mut req = Request::get(format!(
69            "https://www.googleapis.com/drive/v3/files/{file_id}?fields=id,name,mimeType,size,modifiedTime"
70        ))
71            .extension(Operation::Stat)
72            .body(Buffer::new())
73            .map_err(new_request_build_error)?;
74        self.sign(&mut req).await?;
75
76        self.info.http_client().send(req).await
77    }
78
79    pub async fn gdrive_get(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
80        let path = build_abs_path(&self.root, path);
81        let path_id = self.path_cache.get(&path).await?.ok_or(Error::new(
82            ErrorKind::NotFound,
83            format!("path not found: {path}"),
84        ))?;
85
86        let url: String = format!("https://www.googleapis.com/drive/v3/files/{path_id}?alt=media");
87
88        let mut req = Request::get(&url)
89            .extension(Operation::Read)
90            .header(header::RANGE, range.to_header())
91            .body(Buffer::new())
92            .map_err(new_request_build_error)?;
93        self.sign(&mut req).await?;
94
95        self.info.http_client().fetch(req).await
96    }
97
98    pub async fn gdrive_list(
99        &self,
100        file_id: &str,
101        page_size: i32,
102        next_page_token: &str,
103    ) -> Result<Response<Buffer>> {
104        let q = format!("'{file_id}' in parents and trashed = false");
105        let url = "https://www.googleapis.com/drive/v3/files";
106        let mut url = QueryPairsWriter::new(url);
107        url = url.push("pageSize", &page_size.to_string());
108        url = url.push("q", &percent_encode_path(&q));
109        if !next_page_token.is_empty() {
110            url = url.push("pageToken", next_page_token);
111        };
112
113        let mut req = Request::get(url.finish())
114            .extension(Operation::List)
115            .body(Buffer::new())
116            .map_err(new_request_build_error)?;
117        self.sign(&mut req).await?;
118
119        self.info.http_client().send(req).await
120    }
121
122    // Update with content and metadata
123    pub async fn gdrive_patch_metadata_request(
124        &self,
125        source: &str,
126        target: &str,
127    ) -> Result<Response<Buffer>> {
128        let source_file_id = self.path_cache.get(source).await?.ok_or(Error::new(
129            ErrorKind::NotFound,
130            format!("source path not found: {source}"),
131        ))?;
132        let source_parent = get_parent(source);
133        let source_parent_id = self
134            .path_cache
135            .get(source_parent)
136            .await?
137            .expect("old parent must exist");
138
139        let target_parent_id = self.path_cache.ensure_dir(get_parent(target)).await?;
140        let target_file_name = get_basename(target);
141
142        let metadata = &json!({
143            "name": target_file_name,
144            "removeParents": [source_parent_id],
145            "addParents": [target_parent_id],
146        });
147
148        let url = format!("https://www.googleapis.com/drive/v3/files/{source_file_id}");
149        let mut req = Request::patch(url)
150            .extension(Operation::Rename)
151            .body(Buffer::from(Bytes::from(metadata.to_string())))
152            .map_err(new_request_build_error)?;
153
154        self.sign(&mut req).await?;
155
156        self.info.http_client().send(req).await
157    }
158
159    pub async fn gdrive_trash(&self, file_id: &str) -> Result<Response<Buffer>> {
160        let url = format!("https://www.googleapis.com/drive/v3/files/{file_id}");
161
162        let body = serde_json::to_vec(&json!({
163            "trashed": true
164        }))
165        .map_err(new_json_serialize_error)?;
166
167        let mut req = Request::patch(&url)
168            .extension(Operation::Delete)
169            .body(Buffer::from(Bytes::from(body)))
170            .map_err(new_request_build_error)?;
171
172        self.sign(&mut req).await?;
173
174        self.info.http_client().send(req).await
175    }
176
177    /// Create a file with the content.
178    pub async fn gdrive_upload_simple_request(
179        &self,
180        path: &str,
181        size: u64,
182        body: Buffer,
183    ) -> Result<Response<Buffer>> {
184        let parent = self.path_cache.ensure_dir(get_parent(path)).await?;
185
186        let url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart";
187
188        let file_name = get_basename(path);
189
190        let metadata = serde_json::to_vec(&json!({
191            "name": file_name,
192            "parents": [parent],
193        }))
194        .map_err(new_json_serialize_error)?;
195
196        let req = Request::post(url)
197            .header("X-Upload-Content-Length", size)
198            .extension(Operation::Write);
199
200        let multipart = Multipart::new()
201            .part(
202                FormDataPart::new("metadata")
203                    .header(
204                        header::CONTENT_TYPE,
205                        "application/json; charset=UTF-8".parse().unwrap(),
206                    )
207                    .content(metadata),
208            )
209            .part(
210                FormDataPart::new("file")
211                    .header(
212                        header::CONTENT_TYPE,
213                        "application/octet-stream".parse().unwrap(),
214                    )
215                    .content(body),
216            );
217
218        let mut req = multipart.apply(req)?;
219
220        self.sign(&mut req).await?;
221
222        self.info.http_client().send(req).await
223    }
224
225    /// Overwrite the file with the content.
226    ///
227    /// # Notes
228    ///
229    /// - The file id is required. Do not use this method to create a file.
230    pub async fn gdrive_upload_overwrite_simple_request(
231        &self,
232        file_id: &str,
233        size: u64,
234        body: Buffer,
235    ) -> Result<Response<Buffer>> {
236        let url =
237            format!("https://www.googleapis.com/upload/drive/v3/files/{file_id}?uploadType=media");
238
239        let mut req = Request::patch(url)
240            .header(header::CONTENT_TYPE, "application/octet-stream")
241            .header(header::CONTENT_LENGTH, size)
242            .header("X-Upload-Content-Length", size)
243            .extension(Operation::Write)
244            .body(body)
245            .map_err(new_request_build_error)?;
246
247        self.sign(&mut req).await?;
248
249        self.info.http_client().send(req).await
250    }
251
252    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
253        let mut signer = self.signer.lock().await;
254        signer.sign(req).await
255    }
256
257    pub async fn gdrive_copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
258        let from = build_abs_path(&self.root, from);
259
260        let from_file_id = self.path_cache.get(&from).await?.ok_or(Error::new(
261            ErrorKind::NotFound,
262            "the file to copy does not exist",
263        ))?;
264
265        let to_name = get_basename(to);
266        let to_path = build_abs_path(&self.root, to);
267        let to_parent_id = self.path_cache.ensure_dir(get_parent(&to_path)).await?;
268
269        // copy will overwrite `to`, delete it if exist
270        if let Some(id) = self.path_cache.get(&to_path).await? {
271            let resp = self.gdrive_trash(&id).await?;
272            let status = resp.status();
273            if status != StatusCode::OK {
274                return Err(parse_error(resp));
275            }
276
277            self.path_cache.remove(&to_path).await;
278        }
279
280        let url = format!("https://www.googleapis.com/drive/v3/files/{from_file_id}/copy");
281
282        let request_body = &json!({
283            "name": to_name,
284            "parents": [to_parent_id],
285        });
286        let body = Buffer::from(Bytes::from(request_body.to_string()));
287
288        let mut req = Request::post(&url)
289            .extension(Operation::Copy)
290            .body(body)
291            .map_err(new_request_build_error)?;
292        self.sign(&mut req).await?;
293
294        self.info.http_client().send(req).await
295    }
296}
297
298#[derive(Clone)]
299pub struct GdriveSigner {
300    pub info: Arc<AccessorInfo>,
301
302    pub client_id: String,
303    pub client_secret: String,
304    pub refresh_token: String,
305
306    pub access_token: String,
307    pub expires_in: DateTime<Utc>,
308}
309
310impl GdriveSigner {
311    /// Create a new signer.
312    pub fn new(info: Arc<AccessorInfo>) -> Self {
313        GdriveSigner {
314            info,
315
316            client_id: "".to_string(),
317            client_secret: "".to_string(),
318            refresh_token: "".to_string(),
319            access_token: "".to_string(),
320            expires_in: DateTime::<Utc>::MIN_UTC,
321        }
322    }
323
324    /// Sign a request.
325    pub async fn sign<T>(&mut self, req: &mut Request<T>) -> Result<()> {
326        if !self.access_token.is_empty() && self.expires_in > Utc::now() {
327            let value = format!("Bearer {}", self.access_token)
328                .parse()
329                .expect("access token must be valid header value");
330
331            req.headers_mut().insert(header::AUTHORIZATION, value);
332            return Ok(());
333        }
334
335        let url = format!(
336            "https://oauth2.googleapis.com/token?refresh_token={}&client_id={}&client_secret={}&grant_type=refresh_token",
337            self.refresh_token, self.client_id, self.client_secret
338        );
339
340        {
341            let req = Request::post(url)
342                .header(header::CONTENT_LENGTH, 0)
343                .body(Buffer::new())
344                .map_err(new_request_build_error)?;
345
346            let resp = self.info.http_client().send(req).await?;
347            let status = resp.status();
348
349            match status {
350                StatusCode::OK => {
351                    let resp_body = resp.into_body();
352                    let token: GdriveTokenResponse = serde_json::from_reader(resp_body.reader())
353                        .map_err(new_json_deserialize_error)?;
354                    self.access_token.clone_from(&token.access_token);
355                    self.expires_in = Utc::now()
356                        + chrono::TimeDelta::try_seconds(token.expires_in)
357                            .expect("expires_in must be valid seconds")
358                        - chrono::TimeDelta::try_seconds(120).expect("120 must be valid seconds");
359                }
360                _ => {
361                    return Err(parse_error(resp));
362                }
363            }
364        }
365
366        let auth_header_content = format!("Bearer {}", self.access_token);
367        req.headers_mut()
368            .insert(header::AUTHORIZATION, auth_header_content.parse().unwrap());
369
370        Ok(())
371    }
372}
373
374pub struct GdrivePathQuery {
375    pub info: Arc<AccessorInfo>,
376    pub signer: Arc<Mutex<GdriveSigner>>,
377}
378
379impl GdrivePathQuery {
380    pub fn new(info: Arc<AccessorInfo>, signer: Arc<Mutex<GdriveSigner>>) -> Self {
381        GdrivePathQuery { info, signer }
382    }
383}
384
385impl PathQuery for GdrivePathQuery {
386    async fn root(&self) -> Result<String> {
387        Ok("root".to_string())
388    }
389
390    async fn query(&self, parent_id: &str, name: &str) -> Result<Option<String>> {
391        let mut queries = vec![
392            // Make sure name has been replaced with escaped name.
393            //
394            // ref: <https://developers.google.com/drive/api/guides/ref-search-terms>
395            format!(
396                "name = '{}'",
397                name.replace('\'', "\\'").trim_end_matches('/')
398            ),
399            format!("'{}' in parents", parent_id),
400            "trashed = false".to_string(),
401        ];
402        if name.ends_with('/') {
403            queries.push("mimeType = 'application/vnd.google-apps.folder'".to_string());
404        }
405        let query = queries.join(" and ");
406
407        let url = format!(
408            "https://www.googleapis.com/drive/v3/files?q={}",
409            percent_encode_path(query.as_str())
410        );
411
412        let mut req = Request::get(&url)
413            .extension(Operation::Stat)
414            .body(Buffer::new())
415            .map_err(new_request_build_error)?;
416
417        self.signer.lock().await.sign(&mut req).await?;
418
419        let resp = self.info.http_client().send(req).await?;
420        let status = resp.status();
421
422        match status {
423            StatusCode::OK => {
424                let body = resp.into_body();
425                let meta: GdriveFileList =
426                    serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
427
428                if let Some(f) = meta.files.first() {
429                    Ok(Some(f.id.clone()))
430                } else {
431                    Ok(None)
432                }
433            }
434            _ => Err(parse_error(resp)),
435        }
436    }
437
438    async fn create_dir(&self, parent_id: &str, name: &str) -> Result<String> {
439        let url = "https://www.googleapis.com/drive/v3/files";
440
441        let content = serde_json::to_vec(&json!({
442            "name": name.trim_end_matches('/'),
443            "mimeType": "application/vnd.google-apps.folder",
444            // If the parent is not provided, the folder will be created in the root folder.
445            "parents": [parent_id],
446        }))
447        .map_err(new_json_serialize_error)?;
448
449        let mut req = Request::post(url)
450            .extension(Operation::CreateDir)
451            .header(header::CONTENT_TYPE, "application/json")
452            .body(Buffer::from(Bytes::from(content)))
453            .map_err(new_request_build_error)?;
454
455        self.signer.lock().await.sign(&mut req).await?;
456
457        let resp = self.info.http_client().send(req).await?;
458        if !resp.status().is_success() {
459            return Err(parse_error(resp));
460        }
461
462        let body = resp.into_body();
463        let file: GdriveFile =
464            serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
465        Ok(file.id)
466    }
467}
468
469#[derive(Deserialize)]
470pub struct GdriveTokenResponse {
471    access_token: String,
472    expires_in: i64,
473}
474
475/// This is the file struct returned by the Google Drive API.
476/// This is a complex struct, but we only add the fields we need.
477/// refer to https://developers.google.com/drive/api/reference/rest/v3/files#File
478#[derive(Deserialize, Debug)]
479#[serde(rename_all = "camelCase")]
480pub struct GdriveFile {
481    pub mime_type: String,
482    pub id: String,
483    pub name: String,
484    pub size: Option<String>,
485    // The modified time is not returned unless the `fields`
486    // query parameter contains `modifiedTime`.
487    // As we only need the modified time when we do `stat` operation,
488    // if other operations(such as search) do not specify the `fields` query parameter,
489    // try to access this field, it will be `None`.
490    pub modified_time: Option<String>,
491}
492
493/// refer to https://developers.google.com/drive/api/reference/rest/v3/files/list
494#[derive(Deserialize)]
495#[serde(rename_all = "camelCase")]
496pub(crate) struct GdriveFileList {
497    pub(crate) files: Vec<GdriveFile>,
498    pub(crate) next_page_token: Option<String>,
499}