1use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::Response;
22use http::StatusCode;
23use log::debug;
24use reqsign::GoogleCredentialLoader;
25use reqsign::GoogleSigner;
26use reqsign::GoogleTokenLoad;
27use reqsign::GoogleTokenLoader;
28
29use super::GCS_SCHEME;
30use super::config::GcsConfig;
31use super::core::*;
32use super::deleter::GcsDeleter;
33use super::error::parse_error;
34use super::lister::GcsLister;
35use super::writer::GcsWriter;
36use super::writer::GcsWriters;
37use crate::raw::*;
38use crate::*;
39
40const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com";
41const DEFAULT_GCS_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.read_write";
42
43#[doc = include_str!("docs.md")]
45#[derive(Default)]
46pub struct GcsBuilder {
47 pub(super) config: GcsConfig,
48
49 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
50 pub(super) http_client: Option<HttpClient>,
51 pub(super) customized_token_loader: Option<Box<dyn GoogleTokenLoad>>,
52}
53
54impl Debug for GcsBuilder {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("GcsBuilder")
57 .field("config", &self.config)
58 .finish_non_exhaustive()
59 }
60}
61
62impl GcsBuilder {
63 pub fn root(mut self, root: &str) -> Self {
65 self.config.root = if root.is_empty() {
66 None
67 } else {
68 Some(root.to_string())
69 };
70
71 self
72 }
73
74 pub fn bucket(mut self, bucket: &str) -> Self {
76 self.config.bucket = bucket.to_string();
77 self
78 }
79
80 pub fn scope(mut self, scope: &str) -> Self {
92 if !scope.is_empty() {
93 self.config.scope = Some(scope.to_string())
94 };
95 self
96 }
97
98 pub fn service_account(mut self, service_account: &str) -> Self {
103 if !service_account.is_empty() {
104 self.config.service_account = Some(service_account.to_string())
105 };
106 self
107 }
108
109 pub fn endpoint(mut self, endpoint: &str) -> Self {
111 if !endpoint.is_empty() {
112 self.config.endpoint = Some(endpoint.to_string())
113 };
114 self
115 }
116
117 pub fn credential(mut self, credential: &str) -> Self {
125 if !credential.is_empty() {
126 self.config.credential = Some(credential.to_string())
127 };
128 self
129 }
130
131 pub fn credential_path(mut self, path: &str) -> Self {
138 if !path.is_empty() {
139 self.config.credential_path = Some(path.to_string())
140 };
141 self
142 }
143
144 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
151 #[allow(deprecated)]
152 pub fn http_client(mut self, client: HttpClient) -> Self {
153 self.http_client = Some(client);
154 self
155 }
156
157 pub fn customized_token_loader(mut self, token_load: Box<dyn GoogleTokenLoad>) -> Self {
159 self.customized_token_loader = Some(token_load);
160 self
161 }
162
163 pub fn token(mut self, token: String) -> Self {
165 self.config.token = Some(token);
166 self
167 }
168
169 pub fn disable_vm_metadata(mut self) -> Self {
171 self.config.disable_vm_metadata = true;
172 self
173 }
174
175 pub fn disable_config_load(mut self) -> Self {
177 self.config.disable_config_load = true;
178 self
179 }
180
181 pub fn predefined_acl(mut self, acl: &str) -> Self {
191 if !acl.is_empty() {
192 self.config.predefined_acl = Some(acl.to_string())
193 };
194 self
195 }
196
197 pub fn default_storage_class(mut self, class: &str) -> Self {
205 if !class.is_empty() {
206 self.config.default_storage_class = Some(class.to_string())
207 };
208 self
209 }
210
211 pub fn allow_anonymous(mut self) -> Self {
216 self.config.allow_anonymous = true;
217 self
218 }
219}
220
221impl Builder for GcsBuilder {
222 type Config = GcsConfig;
223
224 fn build(self) -> Result<impl Access> {
225 debug!("backend build started: {self:?}");
226
227 let root = normalize_root(&self.config.root.unwrap_or_default());
228 debug!("backend use root {root}");
229
230 let bucket = match self.config.bucket.is_empty() {
232 false => Ok(&self.config.bucket),
233 true => Err(
234 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
235 .with_operation("Builder::build")
236 .with_context("service", GCS_SCHEME),
237 ),
238 }?;
239
240 let endpoint = self
243 .config
244 .endpoint
245 .clone()
246 .unwrap_or_else(|| DEFAULT_GCS_ENDPOINT.to_string());
247 debug!("backend use endpoint: {endpoint}");
248
249 let mut cred_loader = GoogleCredentialLoader::default();
250 if let Some(cred) = &self.config.credential {
251 cred_loader = cred_loader.with_content(cred);
252 }
253 if let Some(cred) = &self.config.credential_path {
254 cred_loader = cred_loader.with_path(cred);
255 }
256 #[cfg(target_arch = "wasm32")]
257 {
258 cred_loader = cred_loader.with_disable_env();
259 cred_loader = cred_loader.with_disable_well_known_location();
260 }
261
262 if self.config.disable_config_load {
263 cred_loader = cred_loader
264 .with_disable_env()
265 .with_disable_well_known_location();
266 }
267
268 let scope = if let Some(scope) = &self.config.scope {
269 scope
270 } else {
271 DEFAULT_GCS_SCOPE
272 };
273
274 let mut token_loader = GoogleTokenLoader::new(scope, GLOBAL_REQWEST_CLIENT.clone());
275 if let Some(account) = &self.config.service_account {
276 token_loader = token_loader.with_service_account(account);
277 }
278 if let Ok(Some(cred)) = cred_loader.load() {
279 token_loader = token_loader.with_credentials(cred)
280 }
281 if let Some(loader) = self.customized_token_loader {
282 token_loader = token_loader.with_customized_token_loader(loader)
283 }
284
285 if self.config.disable_vm_metadata {
286 token_loader = token_loader.with_disable_vm_metadata(true);
287 }
288
289 let signer = GoogleSigner::new("storage");
290
291 let backend = GcsBackend {
292 core: Arc::new(GcsCore {
293 info: {
294 let am = AccessorInfo::default();
295 am.set_scheme(GCS_SCHEME)
296 .set_root(&root)
297 .set_name(bucket)
298 .set_native_capability(Capability {
299 stat: true,
300 stat_with_if_match: true,
301 stat_with_if_none_match: true,
302
303 read: true,
304
305 read_with_if_match: true,
306 read_with_if_none_match: true,
307
308 write: true,
309 write_can_empty: true,
310 write_can_multi: true,
311 write_with_cache_control: true,
312 write_with_content_type: true,
313 write_with_content_encoding: true,
314 write_with_user_metadata: true,
315 write_with_if_not_exists: true,
316
317 write_multi_min_size: Some(5 * 1024 * 1024),
321 write_multi_max_size: if cfg!(target_pointer_width = "64") {
325 Some(5 * 1024 * 1024 * 1024)
326 } else {
327 Some(usize::MAX)
328 },
329
330 delete: true,
331 delete_max_size: Some(100),
332 copy: true,
333
334 list: true,
335 list_with_limit: true,
336 list_with_start_after: true,
337 list_with_recursive: true,
338
339 presign: true,
340 presign_stat: true,
341 presign_read: true,
342 presign_write: true,
343
344 shared: true,
345
346 ..Default::default()
347 });
348
349 #[allow(deprecated)]
351 if let Some(client) = self.http_client {
352 am.update_http_client(|_| client);
353 }
354
355 am.into()
356 },
357 endpoint,
358 bucket: bucket.to_string(),
359 root,
360 signer,
361 token_loader,
362 token: self.config.token,
363 scope: scope.to_string(),
364 credential_loader: cred_loader,
365 predefined_acl: self.config.predefined_acl.clone(),
366 default_storage_class: self.config.default_storage_class.clone(),
367 allow_anonymous: self.config.allow_anonymous,
368 }),
369 };
370
371 Ok(backend)
372 }
373}
374
375#[derive(Clone, Debug)]
377pub struct GcsBackend {
378 core: Arc<GcsCore>,
379}
380
381impl Access for GcsBackend {
382 type Reader = HttpBody;
383 type Writer = GcsWriters;
384 type Lister = oio::PageLister<GcsLister>;
385 type Deleter = oio::BatchDeleter<GcsDeleter>;
386
387 fn info(&self) -> Arc<AccessorInfo> {
388 self.core.info.clone()
389 }
390
391 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
392 let resp = self.core.gcs_get_object_metadata(path, &args).await?;
393
394 if !resp.status().is_success() {
395 return Err(parse_error(resp));
396 }
397
398 let slc = resp.into_body();
399 let m = GcsCore::build_metadata_from_object_response(path, slc)?;
400
401 Ok(RpStat::new(m))
402 }
403
404 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
405 let resp = self.core.gcs_get_object(path, args.range(), &args).await?;
406
407 let status = resp.status();
408
409 match status {
410 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
411 Ok((RpRead::default(), resp.into_body()))
412 }
413 _ => {
414 let (part, mut body) = resp.into_parts();
415 let buf = body.to_buffer().await?;
416 Err(parse_error(Response::from_parts(part, buf)))
417 }
418 }
419 }
420
421 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
422 let concurrent = args.concurrent();
423 let w = GcsWriter::new(self.core.clone(), path, args);
424 let w = oio::MultipartWriter::new(self.core.info.clone(), w, concurrent);
425
426 Ok((RpWrite::default(), w))
427 }
428
429 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
430 Ok((
431 RpDelete::default(),
432 oio::BatchDeleter::new(GcsDeleter::new(self.core.clone())),
433 ))
434 }
435
436 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
437 let l = GcsLister::new(
438 self.core.clone(),
439 path,
440 args.recursive(),
441 args.limit(),
442 args.start_after(),
443 );
444
445 Ok((RpList::default(), oio::PageLister::new(l)))
446 }
447
448 async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
449 let resp = self.core.gcs_copy_object(from, to).await?;
450
451 if resp.status().is_success() {
452 Ok(RpCopy::default())
453 } else {
454 Err(parse_error(resp))
455 }
456 }
457
458 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
459 let req = match args.operation() {
461 PresignOperation::Stat(v) => self.core.gcs_head_object_xml_request(path, v),
462 PresignOperation::Read(v) => self.core.gcs_get_object_xml_request(path, v),
463 PresignOperation::Write(v) => {
464 self.core
465 .gcs_insert_object_xml_request(path, v, Buffer::new())
466 }
467 PresignOperation::Delete(_) => Err(Error::new(
468 ErrorKind::Unsupported,
469 "operation is not supported",
470 )),
471 };
472 let mut req = req?;
473 self.core.sign_query(&mut req, args.expire())?;
474
475 let (parts, _) = req.into_parts();
477
478 Ok(RpPresign::new(PresignedRequest::new(
479 parts.method,
480 parts.uri,
481 parts.headers,
482 )))
483 }
484}