opendal/services/gdrive/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes;
23use bytes::Buf;
24use bytes::Bytes;
25use chrono::DateTime;
26use chrono::Utc;
27use http::header;
28use http::Request;
29use http::Response;
30use http::StatusCode;
31use serde::Deserialize;
32use serde_json::json;
33use tokio::sync::Mutex;
34
35use super::error::parse_error;
36use crate::raw::*;
37use crate::*;
38
39pub struct GdriveCore {
40    pub info: Arc<AccessorInfo>,
41
42    pub root: String,
43
44    pub signer: Arc<Mutex<GdriveSigner>>,
45
46    /// Cache the mapping from path to file id
47    pub path_cache: PathCacher<GdrivePathQuery>,
48}
49
50impl Debug for GdriveCore {
51    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52        let mut de = f.debug_struct("GdriveCore");
53        de.field("root", &self.root);
54        de.finish()
55    }
56}
57
58impl GdriveCore {
59    pub async fn gdrive_stat(&self, path: &str) -> Result<Response<Buffer>> {
60        let path = build_abs_path(&self.root, path);
61        let file_id = self.path_cache.get(&path).await?.ok_or(Error::new(
62            ErrorKind::NotFound,
63            format!("path not found: {}", path),
64        ))?;
65
66        // The file metadata in the Google Drive API is very complex.
67        // For now, we only need the file id, name, mime type and modified time.
68        let mut req = Request::get(format!(
69            "https://www.googleapis.com/drive/v3/files/{}?fields=id,name,mimeType,size,modifiedTime",
70            file_id
71        ))
72            .body(Buffer::new())
73            .map_err(new_request_build_error)?;
74        self.sign(&mut req).await?;
75
76        self.info.http_client().send(req).await
77    }
78
79    pub async fn gdrive_get(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
80        let path = build_abs_path(&self.root, path);
81        let path_id = self.path_cache.get(&path).await?.ok_or(Error::new(
82            ErrorKind::NotFound,
83            format!("path not found: {}", path),
84        ))?;
85
86        let url: String = format!(
87            "https://www.googleapis.com/drive/v3/files/{}?alt=media",
88            path_id
89        );
90
91        let mut req = Request::get(&url)
92            .header(header::RANGE, range.to_header())
93            .body(Buffer::new())
94            .map_err(new_request_build_error)?;
95        self.sign(&mut req).await?;
96
97        self.info.http_client().fetch(req).await
98    }
99
100    pub async fn gdrive_list(
101        &self,
102        file_id: &str,
103        page_size: i32,
104        next_page_token: &str,
105    ) -> Result<Response<Buffer>> {
106        let q = format!("'{}' in parents and trashed = false", file_id);
107        let url = "https://www.googleapis.com/drive/v3/files";
108        let mut url = QueryPairsWriter::new(url);
109        url = url.push("pageSize", &page_size.to_string());
110        url = url.push("q", &percent_encode_path(&q));
111        if !next_page_token.is_empty() {
112            url = url.push("pageToken", next_page_token);
113        };
114
115        let mut req = Request::get(url.finish())
116            .body(Buffer::new())
117            .map_err(new_request_build_error)?;
118        self.sign(&mut req).await?;
119
120        self.info.http_client().send(req).await
121    }
122
123    // Update with content and metadata
124    pub async fn gdrive_patch_metadata_request(
125        &self,
126        source: &str,
127        target: &str,
128    ) -> Result<Response<Buffer>> {
129        let source_file_id = self.path_cache.get(source).await?.ok_or(Error::new(
130            ErrorKind::NotFound,
131            format!("source path not found: {}", source),
132        ))?;
133        let source_parent = get_parent(source);
134        let source_parent_id = self
135            .path_cache
136            .get(source_parent)
137            .await?
138            .expect("old parent must exist");
139
140        let target_parent_id = self.path_cache.ensure_dir(get_parent(target)).await?;
141        let target_file_name = get_basename(target);
142
143        let metadata = &json!({
144            "name": target_file_name,
145            "removeParents": [source_parent_id],
146            "addParents": [target_parent_id],
147        });
148
149        let url = format!(
150            "https://www.googleapis.com/drive/v3/files/{}",
151            source_file_id
152        );
153        let mut req = Request::patch(url)
154            .body(Buffer::from(Bytes::from(metadata.to_string())))
155            .map_err(new_request_build_error)?;
156
157        self.sign(&mut req).await?;
158
159        self.info.http_client().send(req).await
160    }
161
162    pub async fn gdrive_trash(&self, file_id: &str) -> Result<Response<Buffer>> {
163        let url = format!("https://www.googleapis.com/drive/v3/files/{}", file_id);
164
165        let body = serde_json::to_vec(&json!({
166            "trashed": true
167        }))
168        .map_err(new_json_serialize_error)?;
169
170        let mut req = Request::patch(&url)
171            .body(Buffer::from(Bytes::from(body)))
172            .map_err(new_request_build_error)?;
173
174        self.sign(&mut req).await?;
175
176        self.info.http_client().send(req).await
177    }
178
179    /// Create a file with the content.
180    pub async fn gdrive_upload_simple_request(
181        &self,
182        path: &str,
183        size: u64,
184        body: Buffer,
185    ) -> Result<Response<Buffer>> {
186        let parent = self.path_cache.ensure_dir(get_parent(path)).await?;
187
188        let url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart";
189
190        let file_name = get_basename(path);
191
192        let metadata = serde_json::to_vec(&json!({
193            "name": file_name,
194            "parents": [parent],
195        }))
196        .map_err(new_json_serialize_error)?;
197
198        let req = Request::post(url).header("X-Upload-Content-Length", size);
199
200        let multipart = Multipart::new()
201            .part(
202                FormDataPart::new("metadata")
203                    .header(
204                        header::CONTENT_TYPE,
205                        "application/json; charset=UTF-8".parse().unwrap(),
206                    )
207                    .content(metadata),
208            )
209            .part(
210                FormDataPart::new("file")
211                    .header(
212                        header::CONTENT_TYPE,
213                        "application/octet-stream".parse().unwrap(),
214                    )
215                    .content(body),
216            );
217
218        let mut req = multipart.apply(req)?;
219
220        self.sign(&mut req).await?;
221
222        self.info.http_client().send(req).await
223    }
224
225    /// Overwrite the file with the content.
226    ///
227    /// # Notes
228    ///
229    /// - The file id is required. Do not use this method to create a file.
230    pub async fn gdrive_upload_overwrite_simple_request(
231        &self,
232        file_id: &str,
233        size: u64,
234        body: Buffer,
235    ) -> Result<Response<Buffer>> {
236        let url = format!(
237            "https://www.googleapis.com/upload/drive/v3/files/{}?uploadType=media",
238            file_id
239        );
240
241        let mut req = Request::patch(url)
242            .header(header::CONTENT_TYPE, "application/octet-stream")
243            .header(header::CONTENT_LENGTH, size)
244            .header("X-Upload-Content-Length", size)
245            .body(body)
246            .map_err(new_request_build_error)?;
247
248        self.sign(&mut req).await?;
249
250        self.info.http_client().send(req).await
251    }
252
253    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
254        let mut signer = self.signer.lock().await;
255        signer.sign(req).await
256    }
257}
258
259#[derive(Clone)]
260pub struct GdriveSigner {
261    pub info: Arc<AccessorInfo>,
262
263    pub client_id: String,
264    pub client_secret: String,
265    pub refresh_token: String,
266
267    pub access_token: String,
268    pub expires_in: DateTime<Utc>,
269}
270
271impl GdriveSigner {
272    /// Create a new signer.
273    pub fn new(info: Arc<AccessorInfo>) -> Self {
274        GdriveSigner {
275            info,
276
277            client_id: "".to_string(),
278            client_secret: "".to_string(),
279            refresh_token: "".to_string(),
280            access_token: "".to_string(),
281            expires_in: DateTime::<Utc>::MIN_UTC,
282        }
283    }
284
285    /// Sign a request.
286    pub async fn sign<T>(&mut self, req: &mut Request<T>) -> Result<()> {
287        if !self.access_token.is_empty() && self.expires_in > Utc::now() {
288            let value = format!("Bearer {}", self.access_token)
289                .parse()
290                .expect("access token must be valid header value");
291
292            req.headers_mut().insert(header::AUTHORIZATION, value);
293            return Ok(());
294        }
295
296        let url = format!(
297            "https://oauth2.googleapis.com/token?refresh_token={}&client_id={}&client_secret={}&grant_type=refresh_token",
298            self.refresh_token, self.client_id, self.client_secret
299        );
300
301        {
302            let req = Request::post(url)
303                .header(header::CONTENT_LENGTH, 0)
304                .body(Buffer::new())
305                .map_err(new_request_build_error)?;
306
307            let resp = self.info.http_client().send(req).await?;
308            let status = resp.status();
309
310            match status {
311                StatusCode::OK => {
312                    let resp_body = resp.into_body();
313                    let token: GdriveTokenResponse = serde_json::from_reader(resp_body.reader())
314                        .map_err(new_json_deserialize_error)?;
315                    self.access_token.clone_from(&token.access_token);
316                    self.expires_in = Utc::now()
317                        + chrono::TimeDelta::try_seconds(token.expires_in)
318                            .expect("expires_in must be valid seconds")
319                        - chrono::TimeDelta::try_seconds(120).expect("120 must be valid seconds");
320                }
321                _ => {
322                    return Err(parse_error(resp));
323                }
324            }
325        }
326
327        let auth_header_content = format!("Bearer {}", self.access_token);
328        req.headers_mut()
329            .insert(header::AUTHORIZATION, auth_header_content.parse().unwrap());
330
331        Ok(())
332    }
333}
334
335pub struct GdrivePathQuery {
336    pub info: Arc<AccessorInfo>,
337    pub signer: Arc<Mutex<GdriveSigner>>,
338}
339
340impl GdrivePathQuery {
341    pub fn new(info: Arc<AccessorInfo>, signer: Arc<Mutex<GdriveSigner>>) -> Self {
342        GdrivePathQuery { info, signer }
343    }
344}
345
346impl PathQuery for GdrivePathQuery {
347    async fn root(&self) -> Result<String> {
348        Ok("root".to_string())
349    }
350
351    async fn query(&self, parent_id: &str, name: &str) -> Result<Option<String>> {
352        let mut queries = vec![
353            // Make sure name has been replaced with escaped name.
354            //
355            // ref: <https://developers.google.com/drive/api/guides/ref-search-terms>
356            format!(
357                "name = '{}'",
358                name.replace('\'', "\\'").trim_end_matches('/')
359            ),
360            format!("'{}' in parents", parent_id),
361            "trashed = false".to_string(),
362        ];
363        if name.ends_with('/') {
364            queries.push("mimeType = 'application/vnd.google-apps.folder'".to_string());
365        }
366        let query = queries.join(" and ");
367
368        let url = format!(
369            "https://www.googleapis.com/drive/v3/files?q={}",
370            percent_encode_path(query.as_str())
371        );
372
373        let mut req = Request::get(&url)
374            .body(Buffer::new())
375            .map_err(new_request_build_error)?;
376
377        self.signer.lock().await.sign(&mut req).await?;
378
379        let resp = self.info.http_client().send(req).await?;
380        let status = resp.status();
381
382        match status {
383            StatusCode::OK => {
384                let body = resp.into_body();
385                let meta: GdriveFileList =
386                    serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
387
388                if let Some(f) = meta.files.first() {
389                    Ok(Some(f.id.clone()))
390                } else {
391                    Ok(None)
392                }
393            }
394            _ => Err(parse_error(resp)),
395        }
396    }
397
398    async fn create_dir(&self, parent_id: &str, name: &str) -> Result<String> {
399        let url = "https://www.googleapis.com/drive/v3/files";
400
401        let content = serde_json::to_vec(&json!({
402            "name": name.trim_end_matches('/'),
403            "mimeType": "application/vnd.google-apps.folder",
404            // If the parent is not provided, the folder will be created in the root folder.
405            "parents": [parent_id],
406        }))
407        .map_err(new_json_serialize_error)?;
408
409        let mut req = Request::post(url)
410            .header(header::CONTENT_TYPE, "application/json")
411            .body(Buffer::from(Bytes::from(content)))
412            .map_err(new_request_build_error)?;
413
414        self.signer.lock().await.sign(&mut req).await?;
415
416        let resp = self.info.http_client().send(req).await?;
417        if !resp.status().is_success() {
418            return Err(parse_error(resp));
419        }
420
421        let body = resp.into_body();
422        let file: GdriveFile =
423            serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
424        Ok(file.id)
425    }
426}
427
428#[derive(Deserialize)]
429pub struct GdriveTokenResponse {
430    access_token: String,
431    expires_in: i64,
432}
433
434/// This is the file struct returned by the Google Drive API.
435/// This is a complex struct, but we only add the fields we need.
436/// refer to https://developers.google.com/drive/api/reference/rest/v3/files#File
437#[derive(Deserialize, Debug)]
438#[serde(rename_all = "camelCase")]
439pub struct GdriveFile {
440    pub mime_type: String,
441    pub id: String,
442    pub name: String,
443    pub size: Option<String>,
444    // The modified time is not returned unless the `fields`
445    // query parameter contains `modifiedTime`.
446    // As we only need the modified time when we do `stat` operation,
447    // if other operations(such as search) do not specify the `fields` query parameter,
448    // try to access this field, it will be `None`.
449    pub modified_time: Option<String>,
450}
451
452/// refer to https://developers.google.com/drive/api/reference/rest/v3/files/list
453#[derive(Deserialize)]
454#[serde(rename_all = "camelCase")]
455pub(crate) struct GdriveFileList {
456    pub(crate) files: Vec<GdriveFile>,
457    pub(crate) next_page_token: Option<String>,
458}