opendal_core/services/gdrive/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use bytes::Bytes;
23use http::Request;
24use http::Response;
25use http::StatusCode;
26use http::header;
27use mea::mutex::Mutex;
28use serde::Deserialize;
29use serde_json::json;
30
31use super::error::parse_error;
32use crate::raw::*;
33use crate::*;
34
35pub struct GdriveCore {
36    pub info: Arc<AccessorInfo>,
37
38    pub root: String,
39
40    pub signer: Arc<Mutex<GdriveSigner>>,
41
42    /// Cache the mapping from path to file id
43    pub path_cache: PathCacher<GdrivePathQuery>,
44}
45
46impl Debug for GdriveCore {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("GdriveCore")
49            .field("root", &self.root)
50            .finish()
51    }
52}
53
54impl GdriveCore {
55    pub async fn gdrive_stat(&self, path: &str) -> Result<Response<Buffer>> {
56        let path = build_abs_path(&self.root, path);
57        let file_id = self.path_cache.get(&path).await?.ok_or(Error::new(
58            ErrorKind::NotFound,
59            format!("path not found: {path}"),
60        ))?;
61
62        // The file metadata in the Google Drive API is very complex.
63        // For now, we only need the file id, name, mime type and modified time.
64        let mut req = Request::get(format!(
65            "https://www.googleapis.com/drive/v3/files/{file_id}?fields=id,name,mimeType,size,modifiedTime"
66        ))
67            .extension(Operation::Stat)
68            .body(Buffer::new())
69            .map_err(new_request_build_error)?;
70        self.sign(&mut req).await?;
71
72        self.info.http_client().send(req).await
73    }
74
75    pub async fn gdrive_get(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
76        let path = build_abs_path(&self.root, path);
77        let path_id = self.path_cache.get(&path).await?.ok_or(Error::new(
78            ErrorKind::NotFound,
79            format!("path not found: {path}"),
80        ))?;
81
82        let url: String = format!("https://www.googleapis.com/drive/v3/files/{path_id}?alt=media");
83
84        let mut req = Request::get(&url)
85            .extension(Operation::Read)
86            .header(header::RANGE, range.to_header())
87            .body(Buffer::new())
88            .map_err(new_request_build_error)?;
89        self.sign(&mut req).await?;
90
91        self.info.http_client().fetch(req).await
92    }
93
94    pub async fn gdrive_list(
95        &self,
96        file_id: &str,
97        page_size: i32,
98        next_page_token: &str,
99    ) -> Result<Response<Buffer>> {
100        let q = format!("'{file_id}' in parents and trashed = false");
101        let url = "https://www.googleapis.com/drive/v3/files";
102        let mut url = QueryPairsWriter::new(url);
103        url = url.push("pageSize", &page_size.to_string());
104        url = url.push("q", &percent_encode_path(&q));
105        if !next_page_token.is_empty() {
106            url = url.push("pageToken", next_page_token);
107        };
108
109        let mut req = Request::get(url.finish())
110            .extension(Operation::List)
111            .body(Buffer::new())
112            .map_err(new_request_build_error)?;
113        self.sign(&mut req).await?;
114
115        self.info.http_client().send(req).await
116    }
117
118    // Update with content and metadata
119    pub async fn gdrive_patch_metadata_request(
120        &self,
121        source: &str,
122        target: &str,
123    ) -> Result<Response<Buffer>> {
124        let source_file_id = self.path_cache.get(source).await?.ok_or(Error::new(
125            ErrorKind::NotFound,
126            format!("source path not found: {source}"),
127        ))?;
128        let source_parent = get_parent(source);
129        let source_parent_id = self
130            .path_cache
131            .get(source_parent)
132            .await?
133            .expect("old parent must exist");
134
135        let target_parent_id = self.path_cache.ensure_dir(get_parent(target)).await?;
136        let target_file_name = get_basename(target);
137
138        let metadata = &json!({
139            "name": target_file_name,
140            "removeParents": [source_parent_id],
141            "addParents": [target_parent_id],
142        });
143
144        let url = format!("https://www.googleapis.com/drive/v3/files/{source_file_id}");
145        let mut req = Request::patch(url)
146            .extension(Operation::Rename)
147            .body(Buffer::from(Bytes::from(metadata.to_string())))
148            .map_err(new_request_build_error)?;
149
150        self.sign(&mut req).await?;
151
152        self.info.http_client().send(req).await
153    }
154
155    pub async fn gdrive_trash(&self, file_id: &str) -> Result<Response<Buffer>> {
156        let url = format!("https://www.googleapis.com/drive/v3/files/{file_id}");
157
158        let body = serde_json::to_vec(&json!({
159            "trashed": true
160        }))
161        .map_err(new_json_serialize_error)?;
162
163        let mut req = Request::patch(&url)
164            .extension(Operation::Delete)
165            .body(Buffer::from(Bytes::from(body)))
166            .map_err(new_request_build_error)?;
167
168        self.sign(&mut req).await?;
169
170        self.info.http_client().send(req).await
171    }
172
173    /// Create a file with the content.
174    pub async fn gdrive_upload_simple_request(
175        &self,
176        path: &str,
177        size: u64,
178        body: Buffer,
179    ) -> Result<Response<Buffer>> {
180        let parent = self.path_cache.ensure_dir(get_parent(path)).await?;
181
182        let url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart";
183
184        let file_name = get_basename(path);
185
186        let metadata = serde_json::to_vec(&json!({
187            "name": file_name,
188            "parents": [parent],
189        }))
190        .map_err(new_json_serialize_error)?;
191
192        let req = Request::post(url)
193            .header("X-Upload-Content-Length", size)
194            .extension(Operation::Write);
195
196        let multipart = Multipart::new()
197            .part(
198                FormDataPart::new("metadata")
199                    .header(
200                        header::CONTENT_TYPE,
201                        "application/json; charset=UTF-8".parse().unwrap(),
202                    )
203                    .content(metadata),
204            )
205            .part(
206                FormDataPart::new("file")
207                    .header(
208                        header::CONTENT_TYPE,
209                        "application/octet-stream".parse().unwrap(),
210                    )
211                    .content(body),
212            );
213
214        let mut req = multipart.apply(req)?;
215
216        self.sign(&mut req).await?;
217
218        self.info.http_client().send(req).await
219    }
220
221    /// Overwrite the file with the content.
222    ///
223    /// # Notes
224    ///
225    /// - The file id is required. Do not use this method to create a file.
226    pub async fn gdrive_upload_overwrite_simple_request(
227        &self,
228        file_id: &str,
229        size: u64,
230        body: Buffer,
231    ) -> Result<Response<Buffer>> {
232        let url =
233            format!("https://www.googleapis.com/upload/drive/v3/files/{file_id}?uploadType=media");
234
235        let mut req = Request::patch(url)
236            .header(header::CONTENT_TYPE, "application/octet-stream")
237            .header(header::CONTENT_LENGTH, size)
238            .header("X-Upload-Content-Length", size)
239            .extension(Operation::Write)
240            .body(body)
241            .map_err(new_request_build_error)?;
242
243        self.sign(&mut req).await?;
244
245        self.info.http_client().send(req).await
246    }
247
248    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
249        let mut signer = self.signer.lock().await;
250        signer.sign(req).await
251    }
252
253    pub async fn gdrive_copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
254        let from = build_abs_path(&self.root, from);
255
256        let from_file_id = self.path_cache.get(&from).await?.ok_or(Error::new(
257            ErrorKind::NotFound,
258            "the file to copy does not exist",
259        ))?;
260
261        let to_name = get_basename(to);
262        let to_path = build_abs_path(&self.root, to);
263        let to_parent_id = self.path_cache.ensure_dir(get_parent(&to_path)).await?;
264
265        // copy will overwrite `to`, delete it if exist
266        if let Some(id) = self.path_cache.get(&to_path).await? {
267            let resp = self.gdrive_trash(&id).await?;
268            let status = resp.status();
269            if status != StatusCode::OK {
270                return Err(parse_error(resp));
271            }
272
273            self.path_cache.remove(&to_path).await;
274        }
275
276        let url = format!("https://www.googleapis.com/drive/v3/files/{from_file_id}/copy");
277
278        let request_body = &json!({
279            "name": to_name,
280            "parents": [to_parent_id],
281        });
282        let body = Buffer::from(Bytes::from(request_body.to_string()));
283
284        let mut req = Request::post(&url)
285            .extension(Operation::Copy)
286            .body(body)
287            .map_err(new_request_build_error)?;
288        self.sign(&mut req).await?;
289
290        self.info.http_client().send(req).await
291    }
292}
293
294#[derive(Clone)]
295pub struct GdriveSigner {
296    pub info: Arc<AccessorInfo>,
297
298    pub client_id: String,
299    pub client_secret: String,
300    pub refresh_token: String,
301
302    pub access_token: String,
303    pub expires_in: Timestamp,
304}
305
306impl GdriveSigner {
307    /// Create a new signer.
308    pub fn new(info: Arc<AccessorInfo>) -> Self {
309        GdriveSigner {
310            info,
311
312            client_id: "".to_string(),
313            client_secret: "".to_string(),
314            refresh_token: "".to_string(),
315            access_token: "".to_string(),
316            expires_in: Timestamp::MIN,
317        }
318    }
319
320    /// Sign a request.
321    pub async fn sign<T>(&mut self, req: &mut Request<T>) -> Result<()> {
322        if !self.access_token.is_empty() && self.expires_in > Timestamp::now() {
323            let value = format!("Bearer {}", self.access_token)
324                .parse()
325                .expect("access token must be valid header value");
326
327            req.headers_mut().insert(header::AUTHORIZATION, value);
328            return Ok(());
329        }
330
331        let url = format!(
332            "https://oauth2.googleapis.com/token?refresh_token={}&client_id={}&client_secret={}&grant_type=refresh_token",
333            self.refresh_token, self.client_id, self.client_secret
334        );
335
336        {
337            let req = Request::post(url)
338                .header(header::CONTENT_LENGTH, 0)
339                .body(Buffer::new())
340                .map_err(new_request_build_error)?;
341
342            let resp = self.info.http_client().send(req).await?;
343            let status = resp.status();
344
345            match status {
346                StatusCode::OK => {
347                    let resp_body = resp.into_body();
348                    let token: GdriveTokenResponse = serde_json::from_reader(resp_body.reader())
349                        .map_err(new_json_deserialize_error)?;
350                    self.access_token.clone_from(&token.access_token);
351                    self.expires_in = Timestamp::now() + Duration::from_secs(token.expires_in)
352                        - Duration::from_secs(120);
353                }
354                _ => {
355                    return Err(parse_error(resp));
356                }
357            }
358        }
359
360        let auth_header_content = format!("Bearer {}", self.access_token);
361        req.headers_mut()
362            .insert(header::AUTHORIZATION, auth_header_content.parse().unwrap());
363
364        Ok(())
365    }
366}
367
368pub struct GdrivePathQuery {
369    pub info: Arc<AccessorInfo>,
370    pub signer: Arc<Mutex<GdriveSigner>>,
371}
372
373impl GdrivePathQuery {
374    pub fn new(info: Arc<AccessorInfo>, signer: Arc<Mutex<GdriveSigner>>) -> Self {
375        GdrivePathQuery { info, signer }
376    }
377}
378
379impl PathQuery for GdrivePathQuery {
380    async fn root(&self) -> Result<String> {
381        Ok("root".to_string())
382    }
383
384    async fn query(&self, parent_id: &str, name: &str) -> Result<Option<String>> {
385        let mut queries = vec![
386            // Make sure name has been replaced with escaped name.
387            //
388            // ref: <https://developers.google.com/drive/api/guides/ref-search-terms>
389            format!(
390                "name = '{}'",
391                name.replace('\'', "\\'").trim_end_matches('/')
392            ),
393            format!("'{}' in parents", parent_id),
394            "trashed = false".to_string(),
395        ];
396        if name.ends_with('/') {
397            queries.push("mimeType = 'application/vnd.google-apps.folder'".to_string());
398        }
399        let query = queries.join(" and ");
400
401        let url = format!(
402            "https://www.googleapis.com/drive/v3/files?q={}",
403            percent_encode_path(query.as_str())
404        );
405
406        let mut req = Request::get(&url)
407            .extension(Operation::Stat)
408            .body(Buffer::new())
409            .map_err(new_request_build_error)?;
410
411        self.signer.lock().await.sign(&mut req).await?;
412
413        let resp = self.info.http_client().send(req).await?;
414        let status = resp.status();
415
416        match status {
417            StatusCode::OK => {
418                let body = resp.into_body();
419                let meta: GdriveFileList =
420                    serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
421
422                if let Some(f) = meta.files.first() {
423                    Ok(Some(f.id.clone()))
424                } else {
425                    Ok(None)
426                }
427            }
428            _ => Err(parse_error(resp)),
429        }
430    }
431
432    async fn create_dir(&self, parent_id: &str, name: &str) -> Result<String> {
433        let url = "https://www.googleapis.com/drive/v3/files";
434
435        let content = serde_json::to_vec(&json!({
436            "name": name.trim_end_matches('/'),
437            "mimeType": "application/vnd.google-apps.folder",
438            // If the parent is not provided, the folder will be created in the root folder.
439            "parents": [parent_id],
440        }))
441        .map_err(new_json_serialize_error)?;
442
443        let mut req = Request::post(url)
444            .extension(Operation::CreateDir)
445            .header(header::CONTENT_TYPE, "application/json")
446            .body(Buffer::from(Bytes::from(content)))
447            .map_err(new_request_build_error)?;
448
449        self.signer.lock().await.sign(&mut req).await?;
450
451        let resp = self.info.http_client().send(req).await?;
452        if !resp.status().is_success() {
453            return Err(parse_error(resp));
454        }
455
456        let body = resp.into_body();
457        let file: GdriveFile =
458            serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
459        Ok(file.id)
460    }
461}
462
463#[derive(Deserialize)]
464pub struct GdriveTokenResponse {
465    access_token: String,
466    expires_in: u64,
467}
468
469/// This is the file struct returned by the Google Drive API.
470/// This is a complex struct, but we only add the fields we need.
471/// refer to https://developers.google.com/drive/api/reference/rest/v3/files#File
472#[derive(Deserialize, Debug)]
473#[serde(rename_all = "camelCase")]
474pub struct GdriveFile {
475    pub mime_type: String,
476    pub id: String,
477    pub name: String,
478    pub size: Option<String>,
479    // The modified time is not returned unless the `fields`
480    // query parameter contains `modifiedTime`.
481    // As we only need the modified time when we do `stat` operation,
482    // if other operations(such as search) do not specify the `fields` query parameter,
483    // try to access this field, it will be `None`.
484    pub modified_time: Option<String>,
485}
486
487/// refer to https://developers.google.com/drive/api/reference/rest/v3/files/list
488#[derive(Deserialize)]
489#[serde(rename_all = "camelCase")]
490pub(crate) struct GdriveFileList {
491    pub(crate) files: Vec<GdriveFile>,
492    pub(crate) next_page_token: Option<String>,
493}