1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use bytes::Buf;
24use http::Response;
25use http::StatusCode;
26use log::debug;
27use reqsign::GoogleCredentialLoader;
28use reqsign::GoogleSigner;
29use reqsign::GoogleTokenLoad;
30use reqsign::GoogleTokenLoader;
31use serde::Deserialize;
32use serde_json;
33
34use super::core::*;
35use super::delete::GcsDeleter;
36use super::error::parse_error;
37use super::lister::GcsLister;
38use super::writer::GcsWriter;
39use super::writer::GcsWriters;
40use crate::raw::oio::BatchDeleter;
41use crate::raw::*;
42use crate::services::GcsConfig;
43use crate::*;
44
45const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com";
46const DEFAULT_GCS_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.read_write";
47
48impl Configurator for GcsConfig {
49 type Builder = GcsBuilder;
50
51 #[allow(deprecated)]
52 fn into_builder(self) -> Self::Builder {
53 GcsBuilder {
54 config: self,
55 http_client: None,
56 customized_token_loader: None,
57 }
58 }
59}
60
61#[doc = include_str!("docs.md")]
63#[derive(Default)]
64pub struct GcsBuilder {
65 config: GcsConfig,
66
67 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
68 http_client: Option<HttpClient>,
69 customized_token_loader: Option<Box<dyn GoogleTokenLoad>>,
70}
71
72impl Debug for GcsBuilder {
73 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74 let mut ds = f.debug_struct("GcsBuilder");
75
76 ds.field("config", &self.config);
77 ds.finish_non_exhaustive()
78 }
79}
80
81impl GcsBuilder {
82 pub fn root(mut self, root: &str) -> Self {
84 self.config.root = if root.is_empty() {
85 None
86 } else {
87 Some(root.to_string())
88 };
89
90 self
91 }
92
93 pub fn bucket(mut self, bucket: &str) -> Self {
95 self.config.bucket = bucket.to_string();
96 self
97 }
98
99 pub fn scope(mut self, scope: &str) -> Self {
111 if !scope.is_empty() {
112 self.config.scope = Some(scope.to_string())
113 };
114 self
115 }
116
117 pub fn service_account(mut self, service_account: &str) -> Self {
122 if !service_account.is_empty() {
123 self.config.service_account = Some(service_account.to_string())
124 };
125 self
126 }
127
128 pub fn endpoint(mut self, endpoint: &str) -> Self {
130 if !endpoint.is_empty() {
131 self.config.endpoint = Some(endpoint.to_string())
132 };
133 self
134 }
135
136 pub fn credential(mut self, credential: &str) -> Self {
144 if !credential.is_empty() {
145 self.config.credential = Some(credential.to_string())
146 };
147 self
148 }
149
150 pub fn credential_path(mut self, path: &str) -> Self {
157 if !path.is_empty() {
158 self.config.credential_path = Some(path.to_string())
159 };
160 self
161 }
162
163 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
170 #[allow(deprecated)]
171 pub fn http_client(mut self, client: HttpClient) -> Self {
172 self.http_client = Some(client);
173 self
174 }
175
176 pub fn customized_token_loader(mut self, token_load: Box<dyn GoogleTokenLoad>) -> Self {
178 self.customized_token_loader = Some(token_load);
179 self
180 }
181
182 pub fn token(mut self, token: String) -> Self {
184 self.config.token = Some(token);
185 self
186 }
187
188 pub fn disable_vm_metadata(mut self) -> Self {
190 self.config.disable_vm_metadata = true;
191 self
192 }
193
194 pub fn disable_config_load(mut self) -> Self {
196 self.config.disable_config_load = true;
197 self
198 }
199
200 pub fn predefined_acl(mut self, acl: &str) -> Self {
210 if !acl.is_empty() {
211 self.config.predefined_acl = Some(acl.to_string())
212 };
213 self
214 }
215
216 pub fn default_storage_class(mut self, class: &str) -> Self {
224 if !class.is_empty() {
225 self.config.default_storage_class = Some(class.to_string())
226 };
227 self
228 }
229
230 pub fn allow_anonymous(mut self) -> Self {
235 self.config.allow_anonymous = true;
236 self
237 }
238}
239
240impl Builder for GcsBuilder {
241 const SCHEME: Scheme = Scheme::Gcs;
242 type Config = GcsConfig;
243
244 fn build(self) -> Result<impl Access> {
245 debug!("backend build started: {:?}", self);
246
247 let root = normalize_root(&self.config.root.unwrap_or_default());
248 debug!("backend use root {}", root);
249
250 let bucket = match self.config.bucket.is_empty() {
252 false => Ok(&self.config.bucket),
253 true => Err(
254 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
255 .with_operation("Builder::build")
256 .with_context("service", Scheme::Gcs),
257 ),
258 }?;
259
260 let endpoint = self
263 .config
264 .endpoint
265 .clone()
266 .unwrap_or_else(|| DEFAULT_GCS_ENDPOINT.to_string());
267 debug!("backend use endpoint: {endpoint}");
268
269 let mut cred_loader = GoogleCredentialLoader::default();
270 if let Some(cred) = &self.config.credential {
271 cred_loader = cred_loader.with_content(cred);
272 }
273 if let Some(cred) = &self.config.credential_path {
274 cred_loader = cred_loader.with_path(cred);
275 }
276 #[cfg(target_arch = "wasm32")]
277 {
278 cred_loader = cred_loader.with_disable_env();
279 cred_loader = cred_loader.with_disable_well_known_location();
280 }
281
282 if self.config.disable_config_load {
283 cred_loader = cred_loader
284 .with_disable_env()
285 .with_disable_well_known_location();
286 }
287
288 let scope = if let Some(scope) = &self.config.scope {
289 scope
290 } else {
291 DEFAULT_GCS_SCOPE
292 };
293
294 let mut token_loader = GoogleTokenLoader::new(scope, GLOBAL_REQWEST_CLIENT.clone());
295 if let Some(account) = &self.config.service_account {
296 token_loader = token_loader.with_service_account(account);
297 }
298 if let Ok(Some(cred)) = cred_loader.load() {
299 token_loader = token_loader.with_credentials(cred)
300 }
301 if let Some(loader) = self.customized_token_loader {
302 token_loader = token_loader.with_customized_token_loader(loader)
303 }
304
305 if self.config.disable_vm_metadata {
306 token_loader = token_loader.with_disable_vm_metadata(true);
307 }
308
309 let signer = GoogleSigner::new("storage");
310
311 let backend = GcsBackend {
312 core: Arc::new(GcsCore {
313 info: {
314 let am = AccessorInfo::default();
315 am.set_scheme(Scheme::Gcs)
316 .set_root(&root)
317 .set_name(bucket)
318 .set_native_capability(Capability {
319 stat: true,
320 stat_with_if_match: true,
321 stat_with_if_none_match: true,
322 stat_has_etag: true,
323 stat_has_content_md5: true,
324 stat_has_content_length: true,
325 stat_has_content_type: true,
326 stat_has_content_encoding: true,
327 stat_has_last_modified: true,
328 stat_has_user_metadata: true,
329 stat_has_cache_control: true,
330
331 read: true,
332
333 read_with_if_match: true,
334 read_with_if_none_match: true,
335
336 write: true,
337 write_can_empty: true,
338 write_can_multi: true,
339 write_with_cache_control: true,
340 write_with_content_type: true,
341 write_with_content_encoding: true,
342 write_with_user_metadata: true,
343 write_with_if_not_exists: true,
344
345 write_multi_min_size: Some(5 * 1024 * 1024),
349 write_multi_max_size: if cfg!(target_pointer_width = "64") {
353 Some(5 * 1024 * 1024 * 1024)
354 } else {
355 Some(usize::MAX)
356 },
357
358 delete: true,
359 delete_max_size: Some(100),
360 copy: true,
361
362 list: true,
363 list_with_limit: true,
364 list_with_start_after: true,
365 list_with_recursive: true,
366 list_has_etag: true,
367 list_has_content_md5: true,
368 list_has_content_length: true,
369 list_has_content_type: true,
370 list_has_last_modified: true,
371
372 presign: true,
373 presign_stat: true,
374 presign_read: true,
375 presign_write: true,
376
377 shared: true,
378
379 ..Default::default()
380 });
381
382 #[allow(deprecated)]
384 if let Some(client) = self.http_client {
385 am.update_http_client(|_| client);
386 }
387
388 am.into()
389 },
390 endpoint,
391 bucket: bucket.to_string(),
392 root,
393 signer,
394 token_loader,
395 token: self.config.token,
396 scope: scope.to_string(),
397 credential_loader: cred_loader,
398 predefined_acl: self.config.predefined_acl.clone(),
399 default_storage_class: self.config.default_storage_class.clone(),
400 allow_anonymous: self.config.allow_anonymous,
401 }),
402 };
403
404 Ok(backend)
405 }
406}
407
408#[derive(Clone, Debug)]
410pub struct GcsBackend {
411 core: Arc<GcsCore>,
412}
413
414impl Access for GcsBackend {
415 type Reader = HttpBody;
416 type Writer = GcsWriters;
417 type Lister = oio::PageLister<GcsLister>;
418 type Deleter = oio::BatchDeleter<GcsDeleter>;
419 type BlockingReader = ();
420 type BlockingWriter = ();
421 type BlockingLister = ();
422 type BlockingDeleter = ();
423
424 fn info(&self) -> Arc<AccessorInfo> {
425 self.core.info.clone()
426 }
427
428 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
429 let resp = self.core.gcs_get_object_metadata(path, &args).await?;
430
431 if !resp.status().is_success() {
432 return Err(parse_error(resp));
433 }
434
435 let slc = resp.into_body();
436
437 let meta: GetObjectJsonResponse =
438 serde_json::from_reader(slc.reader()).map_err(new_json_deserialize_error)?;
439
440 let mut m = Metadata::new(EntryMode::FILE);
441
442 m.set_etag(&meta.etag);
443 m.set_content_md5(&meta.md5_hash);
444
445 let size = meta
446 .size
447 .parse::<u64>()
448 .map_err(|e| Error::new(ErrorKind::Unexpected, "parse u64").set_source(e))?;
449 m.set_content_length(size);
450 if !meta.content_type.is_empty() {
451 m.set_content_type(&meta.content_type);
452 }
453
454 if !meta.content_encoding.is_empty() {
455 m.set_content_encoding(&meta.content_encoding);
456 }
457
458 if !meta.cache_control.is_empty() {
459 m.set_cache_control(&meta.cache_control);
460 }
461
462 m.set_last_modified(parse_datetime_from_rfc3339(&meta.updated)?);
463
464 if !meta.metadata.is_empty() {
465 m.with_user_metadata(meta.metadata);
466 }
467
468 Ok(RpStat::new(m))
469 }
470
471 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
472 let resp = self.core.gcs_get_object(path, args.range(), &args).await?;
473
474 let status = resp.status();
475
476 match status {
477 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
478 Ok((RpRead::default(), resp.into_body()))
479 }
480 _ => {
481 let (part, mut body) = resp.into_parts();
482 let buf = body.to_buffer().await?;
483 Err(parse_error(Response::from_parts(part, buf)))
484 }
485 }
486 }
487
488 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
489 let concurrent = args.concurrent();
490 let w = GcsWriter::new(self.core.clone(), path, args);
491 let w = oio::MultipartWriter::new(self.core.info.clone(), w, concurrent);
492
493 Ok((RpWrite::default(), w))
494 }
495
496 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
497 Ok((
498 RpDelete::default(),
499 BatchDeleter::new(GcsDeleter::new(self.core.clone())),
500 ))
501 }
502
503 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
504 let l = GcsLister::new(
505 self.core.clone(),
506 path,
507 args.recursive(),
508 args.limit(),
509 args.start_after(),
510 );
511
512 Ok((RpList::default(), oio::PageLister::new(l)))
513 }
514
515 async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
516 let resp = self.core.gcs_copy_object(from, to).await?;
517
518 if resp.status().is_success() {
519 Ok(RpCopy::default())
520 } else {
521 Err(parse_error(resp))
522 }
523 }
524
525 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
526 let req = match args.operation() {
528 PresignOperation::Stat(v) => self.core.gcs_head_object_xml_request(path, v),
529 PresignOperation::Read(v) => self.core.gcs_get_object_xml_request(path, v),
530 PresignOperation::Write(v) => {
531 self.core
532 .gcs_insert_object_xml_request(path, v, Buffer::new())
533 }
534 PresignOperation::Delete(_) => Err(Error::new(
535 ErrorKind::Unsupported,
536 "operation is not supported",
537 )),
538 };
539 let mut req = req?;
540 self.core.sign_query(&mut req, args.expire())?;
541
542 let (parts, _) = req.into_parts();
544
545 Ok(RpPresign::new(PresignedRequest::new(
546 parts.method,
547 parts.uri,
548 parts.headers,
549 )))
550 }
551}
552
553#[derive(Debug, Default, Deserialize)]
555#[serde(default, rename_all = "camelCase")]
556struct GetObjectJsonResponse {
557 size: String,
561 etag: String,
565 updated: String,
569 md5_hash: String,
573 content_type: String,
577 content_encoding: String,
581 cache_control: String,
583 metadata: HashMap<String, String>,
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592
593 #[test]
594 fn test_deserialize_get_object_json_response() {
595 let content = r#"{
596 "kind": "storage#object",
597 "id": "example/1.png/1660563214863653",
598 "selfLink": "https://www.googleapis.com/storage/v1/b/example/o/1.png",
599 "mediaLink": "https://content-storage.googleapis.com/download/storage/v1/b/example/o/1.png?generation=1660563214863653&alt=media",
600 "name": "1.png",
601 "bucket": "example",
602 "generation": "1660563214863653",
603 "metageneration": "1",
604 "contentType": "image/png",
605 "contentEncoding": "br",
606 "cacheControl": "public, max-age=3600",
607 "storageClass": "STANDARD",
608 "size": "56535",
609 "md5Hash": "fHcEH1vPwA6eTPqxuasXcg==",
610 "crc32c": "j/un9g==",
611 "etag": "CKWasoTgyPkCEAE=",
612 "timeCreated": "2022-08-15T11:33:34.866Z",
613 "updated": "2022-08-15T11:33:34.866Z",
614 "timeStorageClassUpdated": "2022-08-15T11:33:34.866Z",
615 "metadata" : {
616 "location" : "everywhere"
617 }
618}"#;
619
620 let meta: GetObjectJsonResponse =
621 serde_json::from_str(content).expect("json Deserialize must succeed");
622
623 assert_eq!(meta.size, "56535");
624 assert_eq!(meta.updated, "2022-08-15T11:33:34.866Z");
625 assert_eq!(meta.md5_hash, "fHcEH1vPwA6eTPqxuasXcg==");
626 assert_eq!(meta.etag, "CKWasoTgyPkCEAE=");
627 assert_eq!(meta.content_type, "image/png");
628 assert_eq!(meta.content_encoding, "br".to_string());
629 assert_eq!(meta.cache_control, "public, max-age=3600".to_string());
630 assert_eq!(
631 meta.metadata,
632 HashMap::from_iter([("location".to_string(), "everywhere".to_string())])
633 );
634 }
635}