1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use reqsign::GoogleCredentialLoader;
26use reqsign::GoogleSigner;
27use reqsign::GoogleTokenLoad;
28use reqsign::GoogleTokenLoader;
29
30use super::core::*;
31use super::delete::GcsDeleter;
32use super::error::parse_error;
33use super::lister::GcsLister;
34use super::writer::GcsWriter;
35use super::writer::GcsWriters;
36use crate::raw::oio::BatchDeleter;
37use crate::raw::*;
38use crate::services::GcsConfig;
39use crate::*;
40
41const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com";
42const DEFAULT_GCS_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.read_write";
43
44impl Configurator for GcsConfig {
45 type Builder = GcsBuilder;
46
47 #[allow(deprecated)]
48 fn into_builder(self) -> Self::Builder {
49 GcsBuilder {
50 config: self,
51 http_client: None,
52 customized_token_loader: None,
53 }
54 }
55}
56
57#[doc = include_str!("docs.md")]
59#[derive(Default)]
60pub struct GcsBuilder {
61 config: GcsConfig,
62
63 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
64 http_client: Option<HttpClient>,
65 customized_token_loader: Option<Box<dyn GoogleTokenLoad>>,
66}
67
68impl Debug for GcsBuilder {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 let mut ds = f.debug_struct("GcsBuilder");
71
72 ds.field("config", &self.config);
73 ds.finish_non_exhaustive()
74 }
75}
76
77impl GcsBuilder {
78 pub fn root(mut self, root: &str) -> Self {
80 self.config.root = if root.is_empty() {
81 None
82 } else {
83 Some(root.to_string())
84 };
85
86 self
87 }
88
89 pub fn bucket(mut self, bucket: &str) -> Self {
91 self.config.bucket = bucket.to_string();
92 self
93 }
94
95 pub fn scope(mut self, scope: &str) -> Self {
107 if !scope.is_empty() {
108 self.config.scope = Some(scope.to_string())
109 };
110 self
111 }
112
113 pub fn service_account(mut self, service_account: &str) -> Self {
118 if !service_account.is_empty() {
119 self.config.service_account = Some(service_account.to_string())
120 };
121 self
122 }
123
124 pub fn endpoint(mut self, endpoint: &str) -> Self {
126 if !endpoint.is_empty() {
127 self.config.endpoint = Some(endpoint.to_string())
128 };
129 self
130 }
131
132 pub fn credential(mut self, credential: &str) -> Self {
140 if !credential.is_empty() {
141 self.config.credential = Some(credential.to_string())
142 };
143 self
144 }
145
146 pub fn credential_path(mut self, path: &str) -> Self {
153 if !path.is_empty() {
154 self.config.credential_path = Some(path.to_string())
155 };
156 self
157 }
158
159 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
166 #[allow(deprecated)]
167 pub fn http_client(mut self, client: HttpClient) -> Self {
168 self.http_client = Some(client);
169 self
170 }
171
172 pub fn customized_token_loader(mut self, token_load: Box<dyn GoogleTokenLoad>) -> Self {
174 self.customized_token_loader = Some(token_load);
175 self
176 }
177
178 pub fn token(mut self, token: String) -> Self {
180 self.config.token = Some(token);
181 self
182 }
183
184 pub fn disable_vm_metadata(mut self) -> Self {
186 self.config.disable_vm_metadata = true;
187 self
188 }
189
190 pub fn disable_config_load(mut self) -> Self {
192 self.config.disable_config_load = true;
193 self
194 }
195
196 pub fn predefined_acl(mut self, acl: &str) -> Self {
206 if !acl.is_empty() {
207 self.config.predefined_acl = Some(acl.to_string())
208 };
209 self
210 }
211
212 pub fn default_storage_class(mut self, class: &str) -> Self {
220 if !class.is_empty() {
221 self.config.default_storage_class = Some(class.to_string())
222 };
223 self
224 }
225
226 pub fn allow_anonymous(mut self) -> Self {
231 self.config.allow_anonymous = true;
232 self
233 }
234}
235
236impl Builder for GcsBuilder {
237 const SCHEME: Scheme = Scheme::Gcs;
238 type Config = GcsConfig;
239
240 fn build(self) -> Result<impl Access> {
241 debug!("backend build started: {:?}", self);
242
243 let root = normalize_root(&self.config.root.unwrap_or_default());
244 debug!("backend use root {}", root);
245
246 let bucket = match self.config.bucket.is_empty() {
248 false => Ok(&self.config.bucket),
249 true => Err(
250 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
251 .with_operation("Builder::build")
252 .with_context("service", Scheme::Gcs),
253 ),
254 }?;
255
256 let endpoint = self
259 .config
260 .endpoint
261 .clone()
262 .unwrap_or_else(|| DEFAULT_GCS_ENDPOINT.to_string());
263 debug!("backend use endpoint: {endpoint}");
264
265 let mut cred_loader = GoogleCredentialLoader::default();
266 if let Some(cred) = &self.config.credential {
267 cred_loader = cred_loader.with_content(cred);
268 }
269 if let Some(cred) = &self.config.credential_path {
270 cred_loader = cred_loader.with_path(cred);
271 }
272 #[cfg(target_arch = "wasm32")]
273 {
274 cred_loader = cred_loader.with_disable_env();
275 cred_loader = cred_loader.with_disable_well_known_location();
276 }
277
278 if self.config.disable_config_load {
279 cred_loader = cred_loader
280 .with_disable_env()
281 .with_disable_well_known_location();
282 }
283
284 let scope = if let Some(scope) = &self.config.scope {
285 scope
286 } else {
287 DEFAULT_GCS_SCOPE
288 };
289
290 let mut token_loader = GoogleTokenLoader::new(scope, GLOBAL_REQWEST_CLIENT.clone());
291 if let Some(account) = &self.config.service_account {
292 token_loader = token_loader.with_service_account(account);
293 }
294 if let Ok(Some(cred)) = cred_loader.load() {
295 token_loader = token_loader.with_credentials(cred)
296 }
297 if let Some(loader) = self.customized_token_loader {
298 token_loader = token_loader.with_customized_token_loader(loader)
299 }
300
301 if self.config.disable_vm_metadata {
302 token_loader = token_loader.with_disable_vm_metadata(true);
303 }
304
305 let signer = GoogleSigner::new("storage");
306
307 let backend = GcsBackend {
308 core: Arc::new(GcsCore {
309 info: {
310 let am = AccessorInfo::default();
311 am.set_scheme(Scheme::Gcs)
312 .set_root(&root)
313 .set_name(bucket)
314 .set_native_capability(Capability {
315 stat: true,
316 stat_with_if_match: true,
317 stat_with_if_none_match: true,
318 stat_has_etag: true,
319 stat_has_content_md5: true,
320 stat_has_content_length: true,
321 stat_has_content_type: true,
322 stat_has_content_encoding: true,
323 stat_has_last_modified: true,
324 stat_has_user_metadata: true,
325 stat_has_cache_control: true,
326
327 read: true,
328
329 read_with_if_match: true,
330 read_with_if_none_match: true,
331
332 write: true,
333 write_can_empty: true,
334 write_can_multi: true,
335 write_with_cache_control: true,
336 write_with_content_type: true,
337 write_with_content_encoding: true,
338 write_with_user_metadata: true,
339 write_with_if_not_exists: true,
340
341 write_multi_min_size: Some(5 * 1024 * 1024),
345 write_multi_max_size: if cfg!(target_pointer_width = "64") {
349 Some(5 * 1024 * 1024 * 1024)
350 } else {
351 Some(usize::MAX)
352 },
353
354 delete: true,
355 delete_max_size: Some(100),
356 copy: true,
357
358 list: true,
359 list_with_limit: true,
360 list_with_start_after: true,
361 list_with_recursive: true,
362 list_has_etag: true,
363 list_has_content_md5: true,
364 list_has_content_length: true,
365 list_has_content_type: true,
366 list_has_last_modified: true,
367
368 presign: true,
369 presign_stat: true,
370 presign_read: true,
371 presign_write: true,
372
373 shared: true,
374
375 ..Default::default()
376 });
377
378 #[allow(deprecated)]
380 if let Some(client) = self.http_client {
381 am.update_http_client(|_| client);
382 }
383
384 am.into()
385 },
386 endpoint,
387 bucket: bucket.to_string(),
388 root,
389 signer,
390 token_loader,
391 token: self.config.token,
392 scope: scope.to_string(),
393 credential_loader: cred_loader,
394 predefined_acl: self.config.predefined_acl.clone(),
395 default_storage_class: self.config.default_storage_class.clone(),
396 allow_anonymous: self.config.allow_anonymous,
397 }),
398 };
399
400 Ok(backend)
401 }
402}
403
404#[derive(Clone, Debug)]
406pub struct GcsBackend {
407 core: Arc<GcsCore>,
408}
409
410impl Access for GcsBackend {
411 type Reader = HttpBody;
412 type Writer = GcsWriters;
413 type Lister = oio::PageLister<GcsLister>;
414 type Deleter = oio::BatchDeleter<GcsDeleter>;
415 type BlockingReader = ();
416 type BlockingWriter = ();
417 type BlockingLister = ();
418 type BlockingDeleter = ();
419
420 fn info(&self) -> Arc<AccessorInfo> {
421 self.core.info.clone()
422 }
423
424 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
425 let resp = self.core.gcs_get_object_metadata(path, &args).await?;
426
427 if !resp.status().is_success() {
428 return Err(parse_error(resp));
429 }
430
431 let slc = resp.into_body();
432 let m = GcsCore::build_metadata_from_object_response(path, slc)?;
433
434 Ok(RpStat::new(m))
435 }
436
437 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
438 let resp = self.core.gcs_get_object(path, args.range(), &args).await?;
439
440 let status = resp.status();
441
442 match status {
443 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
444 Ok((RpRead::default(), resp.into_body()))
445 }
446 _ => {
447 let (part, mut body) = resp.into_parts();
448 let buf = body.to_buffer().await?;
449 Err(parse_error(Response::from_parts(part, buf)))
450 }
451 }
452 }
453
454 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
455 let concurrent = args.concurrent();
456 let w = GcsWriter::new(self.core.clone(), path, args);
457 let w = oio::MultipartWriter::new(self.core.info.clone(), w, concurrent);
458
459 Ok((RpWrite::default(), w))
460 }
461
462 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
463 Ok((
464 RpDelete::default(),
465 BatchDeleter::new(GcsDeleter::new(self.core.clone())),
466 ))
467 }
468
469 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
470 let l = GcsLister::new(
471 self.core.clone(),
472 path,
473 args.recursive(),
474 args.limit(),
475 args.start_after(),
476 );
477
478 Ok((RpList::default(), oio::PageLister::new(l)))
479 }
480
481 async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
482 let resp = self.core.gcs_copy_object(from, to).await?;
483
484 if resp.status().is_success() {
485 Ok(RpCopy::default())
486 } else {
487 Err(parse_error(resp))
488 }
489 }
490
491 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
492 let req = match args.operation() {
494 PresignOperation::Stat(v) => self.core.gcs_head_object_xml_request(path, v),
495 PresignOperation::Read(v) => self.core.gcs_get_object_xml_request(path, v),
496 PresignOperation::Write(v) => {
497 self.core
498 .gcs_insert_object_xml_request(path, v, Buffer::new())
499 }
500 PresignOperation::Delete(_) => Err(Error::new(
501 ErrorKind::Unsupported,
502 "operation is not supported",
503 )),
504 };
505 let mut req = req?;
506 self.core.sign_query(&mut req, args.expire())?;
507
508 let (parts, _) = req.into_parts();
510
511 Ok(RpPresign::new(PresignedRequest::new(
512 parts.method,
513 parts.uri,
514 parts.headers,
515 )))
516 }
517}