opendal/services/gdrive/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes;
19use bytes::Buf;
20use bytes::Bytes;
21use http::Request;
22use http::Response;
23use http::StatusCode;
24use http::header;
25use serde::Deserialize;
26use serde_json::json;
27use std::fmt::Debug;
28use std::fmt::Formatter;
29use std::sync::Arc;
30use std::time::Duration;
31use tokio::sync::Mutex;
32
33use super::error::parse_error;
34use crate::raw::*;
35use crate::*;
36
37pub struct GdriveCore {
38    pub info: Arc<AccessorInfo>,
39
40    pub root: String,
41
42    pub signer: Arc<Mutex<GdriveSigner>>,
43
44    /// Cache the mapping from path to file id
45    pub path_cache: PathCacher<GdrivePathQuery>,
46}
47
48impl Debug for GdriveCore {
49    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50        let mut de = f.debug_struct("GdriveCore");
51        de.field("root", &self.root);
52        de.finish()
53    }
54}
55
56impl GdriveCore {
57    pub async fn gdrive_stat(&self, path: &str) -> Result<Response<Buffer>> {
58        let path = build_abs_path(&self.root, path);
59        let file_id = self.path_cache.get(&path).await?.ok_or(Error::new(
60            ErrorKind::NotFound,
61            format!("path not found: {path}"),
62        ))?;
63
64        // The file metadata in the Google Drive API is very complex.
65        // For now, we only need the file id, name, mime type and modified time.
66        let mut req = Request::get(format!(
67            "https://www.googleapis.com/drive/v3/files/{file_id}?fields=id,name,mimeType,size,modifiedTime"
68        ))
69            .extension(Operation::Stat)
70            .body(Buffer::new())
71            .map_err(new_request_build_error)?;
72        self.sign(&mut req).await?;
73
74        self.info.http_client().send(req).await
75    }
76
77    pub async fn gdrive_get(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
78        let path = build_abs_path(&self.root, path);
79        let path_id = self.path_cache.get(&path).await?.ok_or(Error::new(
80            ErrorKind::NotFound,
81            format!("path not found: {path}"),
82        ))?;
83
84        let url: String = format!("https://www.googleapis.com/drive/v3/files/{path_id}?alt=media");
85
86        let mut req = Request::get(&url)
87            .extension(Operation::Read)
88            .header(header::RANGE, range.to_header())
89            .body(Buffer::new())
90            .map_err(new_request_build_error)?;
91        self.sign(&mut req).await?;
92
93        self.info.http_client().fetch(req).await
94    }
95
96    pub async fn gdrive_list(
97        &self,
98        file_id: &str,
99        page_size: i32,
100        next_page_token: &str,
101    ) -> Result<Response<Buffer>> {
102        let q = format!("'{file_id}' in parents and trashed = false");
103        let url = "https://www.googleapis.com/drive/v3/files";
104        let mut url = QueryPairsWriter::new(url);
105        url = url.push("pageSize", &page_size.to_string());
106        url = url.push("q", &percent_encode_path(&q));
107        if !next_page_token.is_empty() {
108            url = url.push("pageToken", next_page_token);
109        };
110
111        let mut req = Request::get(url.finish())
112            .extension(Operation::List)
113            .body(Buffer::new())
114            .map_err(new_request_build_error)?;
115        self.sign(&mut req).await?;
116
117        self.info.http_client().send(req).await
118    }
119
120    // Update with content and metadata
121    pub async fn gdrive_patch_metadata_request(
122        &self,
123        source: &str,
124        target: &str,
125    ) -> Result<Response<Buffer>> {
126        let source_file_id = self.path_cache.get(source).await?.ok_or(Error::new(
127            ErrorKind::NotFound,
128            format!("source path not found: {source}"),
129        ))?;
130        let source_parent = get_parent(source);
131        let source_parent_id = self
132            .path_cache
133            .get(source_parent)
134            .await?
135            .expect("old parent must exist");
136
137        let target_parent_id = self.path_cache.ensure_dir(get_parent(target)).await?;
138        let target_file_name = get_basename(target);
139
140        let metadata = &json!({
141            "name": target_file_name,
142            "removeParents": [source_parent_id],
143            "addParents": [target_parent_id],
144        });
145
146        let url = format!("https://www.googleapis.com/drive/v3/files/{source_file_id}");
147        let mut req = Request::patch(url)
148            .extension(Operation::Rename)
149            .body(Buffer::from(Bytes::from(metadata.to_string())))
150            .map_err(new_request_build_error)?;
151
152        self.sign(&mut req).await?;
153
154        self.info.http_client().send(req).await
155    }
156
157    pub async fn gdrive_trash(&self, file_id: &str) -> Result<Response<Buffer>> {
158        let url = format!("https://www.googleapis.com/drive/v3/files/{file_id}");
159
160        let body = serde_json::to_vec(&json!({
161            "trashed": true
162        }))
163        .map_err(new_json_serialize_error)?;
164
165        let mut req = Request::patch(&url)
166            .extension(Operation::Delete)
167            .body(Buffer::from(Bytes::from(body)))
168            .map_err(new_request_build_error)?;
169
170        self.sign(&mut req).await?;
171
172        self.info.http_client().send(req).await
173    }
174
175    /// Create a file with the content.
176    pub async fn gdrive_upload_simple_request(
177        &self,
178        path: &str,
179        size: u64,
180        body: Buffer,
181    ) -> Result<Response<Buffer>> {
182        let parent = self.path_cache.ensure_dir(get_parent(path)).await?;
183
184        let url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart";
185
186        let file_name = get_basename(path);
187
188        let metadata = serde_json::to_vec(&json!({
189            "name": file_name,
190            "parents": [parent],
191        }))
192        .map_err(new_json_serialize_error)?;
193
194        let req = Request::post(url)
195            .header("X-Upload-Content-Length", size)
196            .extension(Operation::Write);
197
198        let multipart = Multipart::new()
199            .part(
200                FormDataPart::new("metadata")
201                    .header(
202                        header::CONTENT_TYPE,
203                        "application/json; charset=UTF-8".parse().unwrap(),
204                    )
205                    .content(metadata),
206            )
207            .part(
208                FormDataPart::new("file")
209                    .header(
210                        header::CONTENT_TYPE,
211                        "application/octet-stream".parse().unwrap(),
212                    )
213                    .content(body),
214            );
215
216        let mut req = multipart.apply(req)?;
217
218        self.sign(&mut req).await?;
219
220        self.info.http_client().send(req).await
221    }
222
223    /// Overwrite the file with the content.
224    ///
225    /// # Notes
226    ///
227    /// - The file id is required. Do not use this method to create a file.
228    pub async fn gdrive_upload_overwrite_simple_request(
229        &self,
230        file_id: &str,
231        size: u64,
232        body: Buffer,
233    ) -> Result<Response<Buffer>> {
234        let url =
235            format!("https://www.googleapis.com/upload/drive/v3/files/{file_id}?uploadType=media");
236
237        let mut req = Request::patch(url)
238            .header(header::CONTENT_TYPE, "application/octet-stream")
239            .header(header::CONTENT_LENGTH, size)
240            .header("X-Upload-Content-Length", size)
241            .extension(Operation::Write)
242            .body(body)
243            .map_err(new_request_build_error)?;
244
245        self.sign(&mut req).await?;
246
247        self.info.http_client().send(req).await
248    }
249
250    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
251        let mut signer = self.signer.lock().await;
252        signer.sign(req).await
253    }
254
255    pub async fn gdrive_copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
256        let from = build_abs_path(&self.root, from);
257
258        let from_file_id = self.path_cache.get(&from).await?.ok_or(Error::new(
259            ErrorKind::NotFound,
260            "the file to copy does not exist",
261        ))?;
262
263        let to_name = get_basename(to);
264        let to_path = build_abs_path(&self.root, to);
265        let to_parent_id = self.path_cache.ensure_dir(get_parent(&to_path)).await?;
266
267        // copy will overwrite `to`, delete it if exist
268        if let Some(id) = self.path_cache.get(&to_path).await? {
269            let resp = self.gdrive_trash(&id).await?;
270            let status = resp.status();
271            if status != StatusCode::OK {
272                return Err(parse_error(resp));
273            }
274
275            self.path_cache.remove(&to_path).await;
276        }
277
278        let url = format!("https://www.googleapis.com/drive/v3/files/{from_file_id}/copy");
279
280        let request_body = &json!({
281            "name": to_name,
282            "parents": [to_parent_id],
283        });
284        let body = Buffer::from(Bytes::from(request_body.to_string()));
285
286        let mut req = Request::post(&url)
287            .extension(Operation::Copy)
288            .body(body)
289            .map_err(new_request_build_error)?;
290        self.sign(&mut req).await?;
291
292        self.info.http_client().send(req).await
293    }
294}
295
296#[derive(Clone)]
297pub struct GdriveSigner {
298    pub info: Arc<AccessorInfo>,
299
300    pub client_id: String,
301    pub client_secret: String,
302    pub refresh_token: String,
303
304    pub access_token: String,
305    pub expires_in: Timestamp,
306}
307
308impl GdriveSigner {
309    /// Create a new signer.
310    pub fn new(info: Arc<AccessorInfo>) -> Self {
311        GdriveSigner {
312            info,
313
314            client_id: "".to_string(),
315            client_secret: "".to_string(),
316            refresh_token: "".to_string(),
317            access_token: "".to_string(),
318            expires_in: Timestamp::MIN,
319        }
320    }
321
322    /// Sign a request.
323    pub async fn sign<T>(&mut self, req: &mut Request<T>) -> Result<()> {
324        if !self.access_token.is_empty() && self.expires_in > Timestamp::now() {
325            let value = format!("Bearer {}", self.access_token)
326                .parse()
327                .expect("access token must be valid header value");
328
329            req.headers_mut().insert(header::AUTHORIZATION, value);
330            return Ok(());
331        }
332
333        let url = format!(
334            "https://oauth2.googleapis.com/token?refresh_token={}&client_id={}&client_secret={}&grant_type=refresh_token",
335            self.refresh_token, self.client_id, self.client_secret
336        );
337
338        {
339            let req = Request::post(url)
340                .header(header::CONTENT_LENGTH, 0)
341                .body(Buffer::new())
342                .map_err(new_request_build_error)?;
343
344            let resp = self.info.http_client().send(req).await?;
345            let status = resp.status();
346
347            match status {
348                StatusCode::OK => {
349                    let resp_body = resp.into_body();
350                    let token: GdriveTokenResponse = serde_json::from_reader(resp_body.reader())
351                        .map_err(new_json_deserialize_error)?;
352                    self.access_token.clone_from(&token.access_token);
353                    self.expires_in = Timestamp::now() + Duration::from_secs(token.expires_in)
354                        - Duration::from_secs(120);
355                }
356                _ => {
357                    return Err(parse_error(resp));
358                }
359            }
360        }
361
362        let auth_header_content = format!("Bearer {}", self.access_token);
363        req.headers_mut()
364            .insert(header::AUTHORIZATION, auth_header_content.parse().unwrap());
365
366        Ok(())
367    }
368}
369
370pub struct GdrivePathQuery {
371    pub info: Arc<AccessorInfo>,
372    pub signer: Arc<Mutex<GdriveSigner>>,
373}
374
375impl GdrivePathQuery {
376    pub fn new(info: Arc<AccessorInfo>, signer: Arc<Mutex<GdriveSigner>>) -> Self {
377        GdrivePathQuery { info, signer }
378    }
379}
380
381impl PathQuery for GdrivePathQuery {
382    async fn root(&self) -> Result<String> {
383        Ok("root".to_string())
384    }
385
386    async fn query(&self, parent_id: &str, name: &str) -> Result<Option<String>> {
387        let mut queries = vec![
388            // Make sure name has been replaced with escaped name.
389            //
390            // ref: <https://developers.google.com/drive/api/guides/ref-search-terms>
391            format!(
392                "name = '{}'",
393                name.replace('\'', "\\'").trim_end_matches('/')
394            ),
395            format!("'{}' in parents", parent_id),
396            "trashed = false".to_string(),
397        ];
398        if name.ends_with('/') {
399            queries.push("mimeType = 'application/vnd.google-apps.folder'".to_string());
400        }
401        let query = queries.join(" and ");
402
403        let url = format!(
404            "https://www.googleapis.com/drive/v3/files?q={}",
405            percent_encode_path(query.as_str())
406        );
407
408        let mut req = Request::get(&url)
409            .extension(Operation::Stat)
410            .body(Buffer::new())
411            .map_err(new_request_build_error)?;
412
413        self.signer.lock().await.sign(&mut req).await?;
414
415        let resp = self.info.http_client().send(req).await?;
416        let status = resp.status();
417
418        match status {
419            StatusCode::OK => {
420                let body = resp.into_body();
421                let meta: GdriveFileList =
422                    serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
423
424                if let Some(f) = meta.files.first() {
425                    Ok(Some(f.id.clone()))
426                } else {
427                    Ok(None)
428                }
429            }
430            _ => Err(parse_error(resp)),
431        }
432    }
433
434    async fn create_dir(&self, parent_id: &str, name: &str) -> Result<String> {
435        let url = "https://www.googleapis.com/drive/v3/files";
436
437        let content = serde_json::to_vec(&json!({
438            "name": name.trim_end_matches('/'),
439            "mimeType": "application/vnd.google-apps.folder",
440            // If the parent is not provided, the folder will be created in the root folder.
441            "parents": [parent_id],
442        }))
443        .map_err(new_json_serialize_error)?;
444
445        let mut req = Request::post(url)
446            .extension(Operation::CreateDir)
447            .header(header::CONTENT_TYPE, "application/json")
448            .body(Buffer::from(Bytes::from(content)))
449            .map_err(new_request_build_error)?;
450
451        self.signer.lock().await.sign(&mut req).await?;
452
453        let resp = self.info.http_client().send(req).await?;
454        if !resp.status().is_success() {
455            return Err(parse_error(resp));
456        }
457
458        let body = resp.into_body();
459        let file: GdriveFile =
460            serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
461        Ok(file.id)
462    }
463}
464
465#[derive(Deserialize)]
466pub struct GdriveTokenResponse {
467    access_token: String,
468    expires_in: u64,
469}
470
471/// This is the file struct returned by the Google Drive API.
472/// This is a complex struct, but we only add the fields we need.
473/// refer to https://developers.google.com/drive/api/reference/rest/v3/files#File
474#[derive(Deserialize, Debug)]
475#[serde(rename_all = "camelCase")]
476pub struct GdriveFile {
477    pub mime_type: String,
478    pub id: String,
479    pub name: String,
480    pub size: Option<String>,
481    // The modified time is not returned unless the `fields`
482    // query parameter contains `modifiedTime`.
483    // As we only need the modified time when we do `stat` operation,
484    // if other operations(such as search) do not specify the `fields` query parameter,
485    // try to access this field, it will be `None`.
486    pub modified_time: Option<String>,
487}
488
489/// refer to https://developers.google.com/drive/api/reference/rest/v3/files/list
490#[derive(Deserialize)]
491#[serde(rename_all = "camelCase")]
492pub(crate) struct GdriveFileList {
493    pub(crate) files: Vec<GdriveFile>,
494    pub(crate) next_page_token: Option<String>,
495}