opendal/services/gcs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use reqsign::GoogleCredentialLoader;
26use reqsign::GoogleSigner;
27use reqsign::GoogleTokenLoad;
28use reqsign::GoogleTokenLoader;
29
30use super::core::*;
31use super::delete::GcsDeleter;
32use super::error::parse_error;
33use super::lister::GcsLister;
34use super::writer::GcsWriter;
35use super::writer::GcsWriters;
36use crate::raw::oio::BatchDeleter;
37use crate::raw::*;
38use crate::services::GcsConfig;
39use crate::*;
40
41const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com";
42const DEFAULT_GCS_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.read_write";
43
44impl Configurator for GcsConfig {
45    type Builder = GcsBuilder;
46
47    #[allow(deprecated)]
48    fn into_builder(self) -> Self::Builder {
49        GcsBuilder {
50            config: self,
51            http_client: None,
52            customized_token_loader: None,
53        }
54    }
55}
56
57/// [Google Cloud Storage](https://cloud.google.com/storage) services support.
58#[doc = include_str!("docs.md")]
59#[derive(Default)]
60pub struct GcsBuilder {
61    config: GcsConfig,
62
63    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
64    http_client: Option<HttpClient>,
65    customized_token_loader: Option<Box<dyn GoogleTokenLoad>>,
66}
67
68impl Debug for GcsBuilder {
69    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70        let mut ds = f.debug_struct("GcsBuilder");
71
72        ds.field("config", &self.config);
73        ds.finish_non_exhaustive()
74    }
75}
76
77impl GcsBuilder {
78    /// set the working directory root of backend
79    pub fn root(mut self, root: &str) -> Self {
80        self.config.root = if root.is_empty() {
81            None
82        } else {
83            Some(root.to_string())
84        };
85
86        self
87    }
88
89    /// set the container's name
90    pub fn bucket(mut self, bucket: &str) -> Self {
91        self.config.bucket = bucket.to_string();
92        self
93    }
94
95    /// set the GCS service scope
96    ///
97    /// If not set, we will use `https://www.googleapis.com/auth/devstorage.read_write`.
98    ///
99    /// # Valid scope examples
100    ///
101    /// - read-only: `https://www.googleapis.com/auth/devstorage.read_only`
102    /// - read-write: `https://www.googleapis.com/auth/devstorage.read_write`
103    /// - full-control: `https://www.googleapis.com/auth/devstorage.full_control`
104    ///
105    /// Reference: [Cloud Storage authentication](https://cloud.google.com/storage/docs/authentication)
106    pub fn scope(mut self, scope: &str) -> Self {
107        if !scope.is_empty() {
108            self.config.scope = Some(scope.to_string())
109        };
110        self
111    }
112
113    /// Set the GCS service account.
114    ///
115    /// service account will be used for fetch token from vm metadata.
116    /// If not set, we will try to fetch with `default` service account.
117    pub fn service_account(mut self, service_account: &str) -> Self {
118        if !service_account.is_empty() {
119            self.config.service_account = Some(service_account.to_string())
120        };
121        self
122    }
123
124    /// set the endpoint GCS service uses
125    pub fn endpoint(mut self, endpoint: &str) -> Self {
126        if !endpoint.is_empty() {
127            self.config.endpoint = Some(endpoint.to_string())
128        };
129        self
130    }
131
132    /// set the base64 hashed credentials string used for OAuth2 authentication.
133    ///
134    /// this method allows to specify the credentials directly as a base64 hashed string.
135    /// alternatively, you can use `credential_path()` to provide the local path to a credentials file.
136    /// we will use one of `credential` and `credential_path` to complete the OAuth2 authentication.
137    ///
138    /// Reference: [Google Cloud Storage Authentication](https://cloud.google.com/docs/authentication).
139    pub fn credential(mut self, credential: &str) -> Self {
140        if !credential.is_empty() {
141            self.config.credential = Some(credential.to_string())
142        };
143        self
144    }
145
146    /// set the local path to credentials file which is used for OAuth2 authentication.
147    ///
148    /// credentials file contains the original credentials that have not been base64 hashed.
149    /// we will use one of `credential` and `credential_path` to complete the OAuth2 authentication.
150    ///
151    /// Reference: [Google Cloud Storage Authentication](https://cloud.google.com/docs/authentication).
152    pub fn credential_path(mut self, path: &str) -> Self {
153        if !path.is_empty() {
154            self.config.credential_path = Some(path.to_string())
155        };
156        self
157    }
158
159    /// Specify the http client that used by this service.
160    ///
161    /// # Notes
162    ///
163    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
164    /// during minor updates.
165    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
166    #[allow(deprecated)]
167    pub fn http_client(mut self, client: HttpClient) -> Self {
168        self.http_client = Some(client);
169        self
170    }
171
172    /// Specify the customized token loader used by this service.
173    pub fn customized_token_loader(mut self, token_load: Box<dyn GoogleTokenLoad>) -> Self {
174        self.customized_token_loader = Some(token_load);
175        self
176    }
177
178    /// Provide the OAuth2 token to use.
179    pub fn token(mut self, token: String) -> Self {
180        self.config.token = Some(token);
181        self
182    }
183
184    /// Disable attempting to load credentials from the GCE metadata server.
185    pub fn disable_vm_metadata(mut self) -> Self {
186        self.config.disable_vm_metadata = true;
187        self
188    }
189
190    /// Disable loading configuration from the environment.
191    pub fn disable_config_load(mut self) -> Self {
192        self.config.disable_config_load = true;
193        self
194    }
195
196    /// Set the predefined acl for GCS.
197    ///
198    /// Available values are:
199    /// - `authenticatedRead`
200    /// - `bucketOwnerFullControl`
201    /// - `bucketOwnerRead`
202    /// - `private`
203    /// - `projectPrivate`
204    /// - `publicRead`
205    pub fn predefined_acl(mut self, acl: &str) -> Self {
206        if !acl.is_empty() {
207            self.config.predefined_acl = Some(acl.to_string())
208        };
209        self
210    }
211
212    /// Set the default storage class for GCS.
213    ///
214    /// Available values are:
215    /// - `STANDARD`
216    /// - `NEARLINE`
217    /// - `COLDLINE`
218    /// - `ARCHIVE`
219    pub fn default_storage_class(mut self, class: &str) -> Self {
220        if !class.is_empty() {
221            self.config.default_storage_class = Some(class.to_string())
222        };
223        self
224    }
225
226    /// Allow anonymous requests.
227    ///
228    /// This is typically used for buckets which are open to the public or GCS
229    /// storage emulators.
230    pub fn allow_anonymous(mut self) -> Self {
231        self.config.allow_anonymous = true;
232        self
233    }
234}
235
236impl Builder for GcsBuilder {
237    const SCHEME: Scheme = Scheme::Gcs;
238    type Config = GcsConfig;
239
240    fn build(self) -> Result<impl Access> {
241        debug!("backend build started: {:?}", self);
242
243        let root = normalize_root(&self.config.root.unwrap_or_default());
244        debug!("backend use root {}", root);
245
246        // Handle endpoint and bucket name
247        let bucket = match self.config.bucket.is_empty() {
248            false => Ok(&self.config.bucket),
249            true => Err(
250                Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
251                    .with_operation("Builder::build")
252                    .with_context("service", Scheme::Gcs),
253            ),
254        }?;
255
256        // TODO: server side encryption
257
258        let endpoint = self
259            .config
260            .endpoint
261            .clone()
262            .unwrap_or_else(|| DEFAULT_GCS_ENDPOINT.to_string());
263        debug!("backend use endpoint: {endpoint}");
264
265        let mut cred_loader = GoogleCredentialLoader::default();
266        if let Some(cred) = &self.config.credential {
267            cred_loader = cred_loader.with_content(cred);
268        }
269        if let Some(cred) = &self.config.credential_path {
270            cred_loader = cred_loader.with_path(cred);
271        }
272        #[cfg(target_arch = "wasm32")]
273        {
274            cred_loader = cred_loader.with_disable_env();
275            cred_loader = cred_loader.with_disable_well_known_location();
276        }
277
278        if self.config.disable_config_load {
279            cred_loader = cred_loader
280                .with_disable_env()
281                .with_disable_well_known_location();
282        }
283
284        let scope = if let Some(scope) = &self.config.scope {
285            scope
286        } else {
287            DEFAULT_GCS_SCOPE
288        };
289
290        let mut token_loader = GoogleTokenLoader::new(scope, GLOBAL_REQWEST_CLIENT.clone());
291        if let Some(account) = &self.config.service_account {
292            token_loader = token_loader.with_service_account(account);
293        }
294        if let Ok(Some(cred)) = cred_loader.load() {
295            token_loader = token_loader.with_credentials(cred)
296        }
297        if let Some(loader) = self.customized_token_loader {
298            token_loader = token_loader.with_customized_token_loader(loader)
299        }
300
301        if self.config.disable_vm_metadata {
302            token_loader = token_loader.with_disable_vm_metadata(true);
303        }
304
305        let signer = GoogleSigner::new("storage");
306
307        let backend = GcsBackend {
308            core: Arc::new(GcsCore {
309                info: {
310                    let am = AccessorInfo::default();
311                    am.set_scheme(Scheme::Gcs)
312                        .set_root(&root)
313                        .set_name(bucket)
314                        .set_native_capability(Capability {
315                            stat: true,
316                            stat_with_if_match: true,
317                            stat_with_if_none_match: true,
318                            stat_has_etag: true,
319                            stat_has_content_md5: true,
320                            stat_has_content_length: true,
321                            stat_has_content_type: true,
322                            stat_has_content_encoding: true,
323                            stat_has_last_modified: true,
324                            stat_has_user_metadata: true,
325                            stat_has_cache_control: true,
326
327                            read: true,
328
329                            read_with_if_match: true,
330                            read_with_if_none_match: true,
331
332                            write: true,
333                            write_can_empty: true,
334                            write_can_multi: true,
335                            write_with_cache_control: true,
336                            write_with_content_type: true,
337                            write_with_content_encoding: true,
338                            write_with_user_metadata: true,
339                            write_with_if_not_exists: true,
340
341                            // The min multipart size of Gcs is 5 MiB.
342                            //
343                            // ref: <https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
344                            write_multi_min_size: Some(5 * 1024 * 1024),
345                            // The max multipart size of Gcs is 5 GiB.
346                            //
347                            // ref: <https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
348                            write_multi_max_size: if cfg!(target_pointer_width = "64") {
349                                Some(5 * 1024 * 1024 * 1024)
350                            } else {
351                                Some(usize::MAX)
352                            },
353
354                            delete: true,
355                            delete_max_size: Some(100),
356                            copy: true,
357
358                            list: true,
359                            list_with_limit: true,
360                            list_with_start_after: true,
361                            list_with_recursive: true,
362                            list_has_etag: true,
363                            list_has_content_md5: true,
364                            list_has_content_length: true,
365                            list_has_content_type: true,
366                            list_has_last_modified: true,
367
368                            presign: true,
369                            presign_stat: true,
370                            presign_read: true,
371                            presign_write: true,
372
373                            shared: true,
374
375                            ..Default::default()
376                        });
377
378                    // allow deprecated api here for compatibility
379                    #[allow(deprecated)]
380                    if let Some(client) = self.http_client {
381                        am.update_http_client(|_| client);
382                    }
383
384                    am.into()
385                },
386                endpoint,
387                bucket: bucket.to_string(),
388                root,
389                signer,
390                token_loader,
391                token: self.config.token,
392                scope: scope.to_string(),
393                credential_loader: cred_loader,
394                predefined_acl: self.config.predefined_acl.clone(),
395                default_storage_class: self.config.default_storage_class.clone(),
396                allow_anonymous: self.config.allow_anonymous,
397            }),
398        };
399
400        Ok(backend)
401    }
402}
403
404/// GCS storage backend
405#[derive(Clone, Debug)]
406pub struct GcsBackend {
407    core: Arc<GcsCore>,
408}
409
410impl Access for GcsBackend {
411    type Reader = HttpBody;
412    type Writer = GcsWriters;
413    type Lister = oio::PageLister<GcsLister>;
414    type Deleter = oio::BatchDeleter<GcsDeleter>;
415    type BlockingReader = ();
416    type BlockingWriter = ();
417    type BlockingLister = ();
418    type BlockingDeleter = ();
419
420    fn info(&self) -> Arc<AccessorInfo> {
421        self.core.info.clone()
422    }
423
424    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
425        let resp = self.core.gcs_get_object_metadata(path, &args).await?;
426
427        if !resp.status().is_success() {
428            return Err(parse_error(resp));
429        }
430
431        let slc = resp.into_body();
432        let m = GcsCore::build_metadata_from_object_response(path, slc)?;
433
434        Ok(RpStat::new(m))
435    }
436
437    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
438        let resp = self.core.gcs_get_object(path, args.range(), &args).await?;
439
440        let status = resp.status();
441
442        match status {
443            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
444                Ok((RpRead::default(), resp.into_body()))
445            }
446            _ => {
447                let (part, mut body) = resp.into_parts();
448                let buf = body.to_buffer().await?;
449                Err(parse_error(Response::from_parts(part, buf)))
450            }
451        }
452    }
453
454    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
455        let concurrent = args.concurrent();
456        let w = GcsWriter::new(self.core.clone(), path, args);
457        let w = oio::MultipartWriter::new(self.core.info.clone(), w, concurrent);
458
459        Ok((RpWrite::default(), w))
460    }
461
462    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
463        Ok((
464            RpDelete::default(),
465            BatchDeleter::new(GcsDeleter::new(self.core.clone())),
466        ))
467    }
468
469    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
470        let l = GcsLister::new(
471            self.core.clone(),
472            path,
473            args.recursive(),
474            args.limit(),
475            args.start_after(),
476        );
477
478        Ok((RpList::default(), oio::PageLister::new(l)))
479    }
480
481    async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
482        let resp = self.core.gcs_copy_object(from, to).await?;
483
484        if resp.status().is_success() {
485            Ok(RpCopy::default())
486        } else {
487            Err(parse_error(resp))
488        }
489    }
490
491    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
492        // We will not send this request out, just for signing.
493        let req = match args.operation() {
494            PresignOperation::Stat(v) => self.core.gcs_head_object_xml_request(path, v),
495            PresignOperation::Read(v) => self.core.gcs_get_object_xml_request(path, v),
496            PresignOperation::Write(v) => {
497                self.core
498                    .gcs_insert_object_xml_request(path, v, Buffer::new())
499            }
500            PresignOperation::Delete(_) => Err(Error::new(
501                ErrorKind::Unsupported,
502                "operation is not supported",
503            )),
504        };
505        let mut req = req?;
506        self.core.sign_query(&mut req, args.expire())?;
507
508        // We don't need this request anymore, consume it directly.
509        let (parts, _) = req.into_parts();
510
511        Ok(RpPresign::new(PresignedRequest::new(
512            parts.method,
513            parts.uri,
514            parts.headers,
515        )))
516    }
517}