opendal_core/services/gdrive/
core.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use bytes::Bytes;
23use http::Request;
24use http::Response;
25use http::StatusCode;
26use http::header;
27use mea::mutex::Mutex;
28use serde::Deserialize;
29use serde_json::json;
30
31use super::error::parse_error;
32use crate::raw::*;
33use crate::*;
34
35pub struct GdriveCore {
36 pub info: Arc<AccessorInfo>,
37
38 pub root: String,
39
40 pub signer: Arc<Mutex<GdriveSigner>>,
41
42 pub path_cache: PathCacher<GdrivePathQuery>,
44}
45
46impl Debug for GdriveCore {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("GdriveCore")
49 .field("root", &self.root)
50 .finish()
51 }
52}
53
54impl GdriveCore {
55 pub async fn gdrive_stat(&self, path: &str) -> Result<Response<Buffer>> {
56 let path = build_abs_path(&self.root, path);
57 let file_id = self.path_cache.get(&path).await?.ok_or(Error::new(
58 ErrorKind::NotFound,
59 format!("path not found: {path}"),
60 ))?;
61
62 let mut req = Request::get(format!(
65 "https://www.googleapis.com/drive/v3/files/{file_id}?fields=id,name,mimeType,size,modifiedTime"
66 ))
67 .extension(Operation::Stat)
68 .body(Buffer::new())
69 .map_err(new_request_build_error)?;
70 self.sign(&mut req).await?;
71
72 self.info.http_client().send(req).await
73 }
74
75 pub async fn gdrive_get(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
76 let path = build_abs_path(&self.root, path);
77 let path_id = self.path_cache.get(&path).await?.ok_or(Error::new(
78 ErrorKind::NotFound,
79 format!("path not found: {path}"),
80 ))?;
81
82 let url: String = format!("https://www.googleapis.com/drive/v3/files/{path_id}?alt=media");
83
84 let mut req = Request::get(&url)
85 .extension(Operation::Read)
86 .header(header::RANGE, range.to_header())
87 .body(Buffer::new())
88 .map_err(new_request_build_error)?;
89 self.sign(&mut req).await?;
90
91 self.info.http_client().fetch(req).await
92 }
93
94 pub async fn gdrive_list(
95 &self,
96 file_id: &str,
97 page_size: i32,
98 next_page_token: &str,
99 ) -> Result<Response<Buffer>> {
100 let q = format!("'{file_id}' in parents and trashed = false");
101 let url = "https://www.googleapis.com/drive/v3/files";
102 let mut url = QueryPairsWriter::new(url);
103 url = url.push("pageSize", &page_size.to_string());
104 url = url.push("q", &percent_encode_path(&q));
105 if !next_page_token.is_empty() {
106 url = url.push("pageToken", next_page_token);
107 };
108
109 let mut req = Request::get(url.finish())
110 .extension(Operation::List)
111 .body(Buffer::new())
112 .map_err(new_request_build_error)?;
113 self.sign(&mut req).await?;
114
115 self.info.http_client().send(req).await
116 }
117
118 pub async fn gdrive_patch_metadata_request(
120 &self,
121 source: &str,
122 target: &str,
123 ) -> Result<Response<Buffer>> {
124 let source_file_id = self.path_cache.get(source).await?.ok_or(Error::new(
125 ErrorKind::NotFound,
126 format!("source path not found: {source}"),
127 ))?;
128 let source_parent = get_parent(source);
129 let source_parent_id = self
130 .path_cache
131 .get(source_parent)
132 .await?
133 .expect("old parent must exist");
134
135 let target_parent_id = self.path_cache.ensure_dir(get_parent(target)).await?;
136 let target_file_name = get_basename(target);
137
138 let metadata = &json!({
139 "name": target_file_name,
140 "removeParents": [source_parent_id],
141 "addParents": [target_parent_id],
142 });
143
144 let url = format!("https://www.googleapis.com/drive/v3/files/{source_file_id}");
145 let mut req = Request::patch(url)
146 .extension(Operation::Rename)
147 .body(Buffer::from(Bytes::from(metadata.to_string())))
148 .map_err(new_request_build_error)?;
149
150 self.sign(&mut req).await?;
151
152 self.info.http_client().send(req).await
153 }
154
155 pub async fn gdrive_trash(&self, file_id: &str) -> Result<Response<Buffer>> {
156 let url = format!("https://www.googleapis.com/drive/v3/files/{file_id}");
157
158 let body = serde_json::to_vec(&json!({
159 "trashed": true
160 }))
161 .map_err(new_json_serialize_error)?;
162
163 let mut req = Request::patch(&url)
164 .extension(Operation::Delete)
165 .body(Buffer::from(Bytes::from(body)))
166 .map_err(new_request_build_error)?;
167
168 self.sign(&mut req).await?;
169
170 self.info.http_client().send(req).await
171 }
172
173 pub async fn gdrive_upload_simple_request(
175 &self,
176 path: &str,
177 size: u64,
178 body: Buffer,
179 ) -> Result<Response<Buffer>> {
180 let parent = self.path_cache.ensure_dir(get_parent(path)).await?;
181
182 let url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart";
183
184 let file_name = get_basename(path);
185
186 let metadata = serde_json::to_vec(&json!({
187 "name": file_name,
188 "parents": [parent],
189 }))
190 .map_err(new_json_serialize_error)?;
191
192 let req = Request::post(url)
193 .header("X-Upload-Content-Length", size)
194 .extension(Operation::Write);
195
196 let multipart = Multipart::new()
197 .part(
198 FormDataPart::new("metadata")
199 .header(
200 header::CONTENT_TYPE,
201 "application/json; charset=UTF-8".parse().unwrap(),
202 )
203 .content(metadata),
204 )
205 .part(
206 FormDataPart::new("file")
207 .header(
208 header::CONTENT_TYPE,
209 "application/octet-stream".parse().unwrap(),
210 )
211 .content(body),
212 );
213
214 let mut req = multipart.apply(req)?;
215
216 self.sign(&mut req).await?;
217
218 self.info.http_client().send(req).await
219 }
220
221 pub async fn gdrive_upload_overwrite_simple_request(
227 &self,
228 file_id: &str,
229 size: u64,
230 body: Buffer,
231 ) -> Result<Response<Buffer>> {
232 let url =
233 format!("https://www.googleapis.com/upload/drive/v3/files/{file_id}?uploadType=media");
234
235 let mut req = Request::patch(url)
236 .header(header::CONTENT_TYPE, "application/octet-stream")
237 .header(header::CONTENT_LENGTH, size)
238 .header("X-Upload-Content-Length", size)
239 .extension(Operation::Write)
240 .body(body)
241 .map_err(new_request_build_error)?;
242
243 self.sign(&mut req).await?;
244
245 self.info.http_client().send(req).await
246 }
247
248 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
249 let mut signer = self.signer.lock().await;
250 signer.sign(req).await
251 }
252
253 pub async fn gdrive_copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
254 let from = build_abs_path(&self.root, from);
255
256 let from_file_id = self.path_cache.get(&from).await?.ok_or(Error::new(
257 ErrorKind::NotFound,
258 "the file to copy does not exist",
259 ))?;
260
261 let to_name = get_basename(to);
262 let to_path = build_abs_path(&self.root, to);
263 let to_parent_id = self.path_cache.ensure_dir(get_parent(&to_path)).await?;
264
265 if let Some(id) = self.path_cache.get(&to_path).await? {
267 let resp = self.gdrive_trash(&id).await?;
268 let status = resp.status();
269 if status != StatusCode::OK {
270 return Err(parse_error(resp));
271 }
272
273 self.path_cache.remove(&to_path).await;
274 }
275
276 let url = format!("https://www.googleapis.com/drive/v3/files/{from_file_id}/copy");
277
278 let request_body = &json!({
279 "name": to_name,
280 "parents": [to_parent_id],
281 });
282 let body = Buffer::from(Bytes::from(request_body.to_string()));
283
284 let mut req = Request::post(&url)
285 .extension(Operation::Copy)
286 .body(body)
287 .map_err(new_request_build_error)?;
288 self.sign(&mut req).await?;
289
290 self.info.http_client().send(req).await
291 }
292}
293
294#[derive(Clone)]
295pub struct GdriveSigner {
296 pub info: Arc<AccessorInfo>,
297
298 pub client_id: String,
299 pub client_secret: String,
300 pub refresh_token: String,
301
302 pub access_token: String,
303 pub expires_in: Timestamp,
304}
305
306impl GdriveSigner {
307 pub fn new(info: Arc<AccessorInfo>) -> Self {
309 GdriveSigner {
310 info,
311
312 client_id: "".to_string(),
313 client_secret: "".to_string(),
314 refresh_token: "".to_string(),
315 access_token: "".to_string(),
316 expires_in: Timestamp::MIN,
317 }
318 }
319
320 pub async fn sign<T>(&mut self, req: &mut Request<T>) -> Result<()> {
322 if !self.access_token.is_empty() && self.expires_in > Timestamp::now() {
323 let value = format!("Bearer {}", self.access_token)
324 .parse()
325 .expect("access token must be valid header value");
326
327 req.headers_mut().insert(header::AUTHORIZATION, value);
328 return Ok(());
329 }
330
331 let url = format!(
332 "https://oauth2.googleapis.com/token?refresh_token={}&client_id={}&client_secret={}&grant_type=refresh_token",
333 self.refresh_token, self.client_id, self.client_secret
334 );
335
336 {
337 let req = Request::post(url)
338 .header(header::CONTENT_LENGTH, 0)
339 .body(Buffer::new())
340 .map_err(new_request_build_error)?;
341
342 let resp = self.info.http_client().send(req).await?;
343 let status = resp.status();
344
345 match status {
346 StatusCode::OK => {
347 let resp_body = resp.into_body();
348 let token: GdriveTokenResponse = serde_json::from_reader(resp_body.reader())
349 .map_err(new_json_deserialize_error)?;
350 self.access_token.clone_from(&token.access_token);
351 self.expires_in = Timestamp::now() + Duration::from_secs(token.expires_in)
352 - Duration::from_secs(120);
353 }
354 _ => {
355 return Err(parse_error(resp));
356 }
357 }
358 }
359
360 let auth_header_content = format!("Bearer {}", self.access_token);
361 req.headers_mut()
362 .insert(header::AUTHORIZATION, auth_header_content.parse().unwrap());
363
364 Ok(())
365 }
366}
367
368pub struct GdrivePathQuery {
369 pub info: Arc<AccessorInfo>,
370 pub signer: Arc<Mutex<GdriveSigner>>,
371}
372
373impl GdrivePathQuery {
374 pub fn new(info: Arc<AccessorInfo>, signer: Arc<Mutex<GdriveSigner>>) -> Self {
375 GdrivePathQuery { info, signer }
376 }
377}
378
379impl PathQuery for GdrivePathQuery {
380 async fn root(&self) -> Result<String> {
381 Ok("root".to_string())
382 }
383
384 async fn query(&self, parent_id: &str, name: &str) -> Result<Option<String>> {
385 let mut queries = vec![
386 format!(
390 "name = '{}'",
391 name.replace('\'', "\\'").trim_end_matches('/')
392 ),
393 format!("'{}' in parents", parent_id),
394 "trashed = false".to_string(),
395 ];
396 if name.ends_with('/') {
397 queries.push("mimeType = 'application/vnd.google-apps.folder'".to_string());
398 }
399 let query = queries.join(" and ");
400
401 let url = format!(
402 "https://www.googleapis.com/drive/v3/files?q={}",
403 percent_encode_path(query.as_str())
404 );
405
406 let mut req = Request::get(&url)
407 .extension(Operation::Stat)
408 .body(Buffer::new())
409 .map_err(new_request_build_error)?;
410
411 self.signer.lock().await.sign(&mut req).await?;
412
413 let resp = self.info.http_client().send(req).await?;
414 let status = resp.status();
415
416 match status {
417 StatusCode::OK => {
418 let body = resp.into_body();
419 let meta: GdriveFileList =
420 serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
421
422 if let Some(f) = meta.files.first() {
423 Ok(Some(f.id.clone()))
424 } else {
425 Ok(None)
426 }
427 }
428 _ => Err(parse_error(resp)),
429 }
430 }
431
432 async fn create_dir(&self, parent_id: &str, name: &str) -> Result<String> {
433 let url = "https://www.googleapis.com/drive/v3/files";
434
435 let content = serde_json::to_vec(&json!({
436 "name": name.trim_end_matches('/'),
437 "mimeType": "application/vnd.google-apps.folder",
438 "parents": [parent_id],
440 }))
441 .map_err(new_json_serialize_error)?;
442
443 let mut req = Request::post(url)
444 .extension(Operation::CreateDir)
445 .header(header::CONTENT_TYPE, "application/json")
446 .body(Buffer::from(Bytes::from(content)))
447 .map_err(new_request_build_error)?;
448
449 self.signer.lock().await.sign(&mut req).await?;
450
451 let resp = self.info.http_client().send(req).await?;
452 if !resp.status().is_success() {
453 return Err(parse_error(resp));
454 }
455
456 let body = resp.into_body();
457 let file: GdriveFile =
458 serde_json::from_reader(body.reader()).map_err(new_json_deserialize_error)?;
459 Ok(file.id)
460 }
461}
462
463#[derive(Deserialize)]
464pub struct GdriveTokenResponse {
465 access_token: String,
466 expires_in: u64,
467}
468
469#[derive(Deserialize, Debug)]
473#[serde(rename_all = "camelCase")]
474pub struct GdriveFile {
475 pub mime_type: String,
476 pub id: String,
477 pub name: String,
478 pub size: Option<String>,
479 pub modified_time: Option<String>,
485}
486
487#[derive(Deserialize)]
489#[serde(rename_all = "camelCase")]
490pub(crate) struct GdriveFileList {
491 pub(crate) files: Vec<GdriveFile>,
492 pub(crate) next_page_token: Option<String>,
493}