opendal/services/gcs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::Response;
22use http::StatusCode;
23use log::debug;
24use reqsign::GoogleCredentialLoader;
25use reqsign::GoogleSigner;
26use reqsign::GoogleTokenLoad;
27use reqsign::GoogleTokenLoader;
28
29use super::GCS_SCHEME;
30use super::config::GcsConfig;
31use super::core::*;
32use super::deleter::GcsDeleter;
33use super::error::parse_error;
34use super::lister::GcsLister;
35use super::writer::GcsWriter;
36use super::writer::GcsWriters;
37use crate::raw::*;
38use crate::*;
39
40const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com";
41const DEFAULT_GCS_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.read_write";
42
43/// [Google Cloud Storage](https://cloud.google.com/storage) services support.
44#[doc = include_str!("docs.md")]
45#[derive(Default)]
46pub struct GcsBuilder {
47    pub(super) config: GcsConfig,
48
49    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
50    pub(super) http_client: Option<HttpClient>,
51    pub(super) customized_token_loader: Option<Box<dyn GoogleTokenLoad>>,
52}
53
54impl Debug for GcsBuilder {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("GcsBuilder")
57            .field("config", &self.config)
58            .finish_non_exhaustive()
59    }
60}
61
62impl GcsBuilder {
63    /// set the working directory root of backend
64    pub fn root(mut self, root: &str) -> Self {
65        self.config.root = if root.is_empty() {
66            None
67        } else {
68            Some(root.to_string())
69        };
70
71        self
72    }
73
74    /// set the container's name
75    pub fn bucket(mut self, bucket: &str) -> Self {
76        self.config.bucket = bucket.to_string();
77        self
78    }
79
80    /// set the GCS service scope
81    ///
82    /// If not set, we will use `https://www.googleapis.com/auth/devstorage.read_write`.
83    ///
84    /// # Valid scope examples
85    ///
86    /// - read-only: `https://www.googleapis.com/auth/devstorage.read_only`
87    /// - read-write: `https://www.googleapis.com/auth/devstorage.read_write`
88    /// - full-control: `https://www.googleapis.com/auth/devstorage.full_control`
89    ///
90    /// Reference: [Cloud Storage authentication](https://cloud.google.com/storage/docs/authentication)
91    pub fn scope(mut self, scope: &str) -> Self {
92        if !scope.is_empty() {
93            self.config.scope = Some(scope.to_string())
94        };
95        self
96    }
97
98    /// Set the GCS service account.
99    ///
100    /// service account will be used for fetch token from vm metadata.
101    /// If not set, we will try to fetch with `default` service account.
102    pub fn service_account(mut self, service_account: &str) -> Self {
103        if !service_account.is_empty() {
104            self.config.service_account = Some(service_account.to_string())
105        };
106        self
107    }
108
109    /// set the endpoint GCS service uses
110    pub fn endpoint(mut self, endpoint: &str) -> Self {
111        if !endpoint.is_empty() {
112            self.config.endpoint = Some(endpoint.to_string())
113        };
114        self
115    }
116
117    /// set the base64 hashed credentials string used for OAuth2 authentication.
118    ///
119    /// this method allows to specify the credentials directly as a base64 hashed string.
120    /// alternatively, you can use `credential_path()` to provide the local path to a credentials file.
121    /// we will use one of `credential` and `credential_path` to complete the OAuth2 authentication.
122    ///
123    /// Reference: [Google Cloud Storage Authentication](https://cloud.google.com/docs/authentication).
124    pub fn credential(mut self, credential: &str) -> Self {
125        if !credential.is_empty() {
126            self.config.credential = Some(credential.to_string())
127        };
128        self
129    }
130
131    /// set the local path to credentials file which is used for OAuth2 authentication.
132    ///
133    /// credentials file contains the original credentials that have not been base64 hashed.
134    /// we will use one of `credential` and `credential_path` to complete the OAuth2 authentication.
135    ///
136    /// Reference: [Google Cloud Storage Authentication](https://cloud.google.com/docs/authentication).
137    pub fn credential_path(mut self, path: &str) -> Self {
138        if !path.is_empty() {
139            self.config.credential_path = Some(path.to_string())
140        };
141        self
142    }
143
144    /// Specify the http client that used by this service.
145    ///
146    /// # Notes
147    ///
148    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
149    /// during minor updates.
150    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
151    #[allow(deprecated)]
152    pub fn http_client(mut self, client: HttpClient) -> Self {
153        self.http_client = Some(client);
154        self
155    }
156
157    /// Specify the customized token loader used by this service.
158    pub fn customized_token_loader(mut self, token_load: Box<dyn GoogleTokenLoad>) -> Self {
159        self.customized_token_loader = Some(token_load);
160        self
161    }
162
163    /// Provide the OAuth2 token to use.
164    pub fn token(mut self, token: String) -> Self {
165        self.config.token = Some(token);
166        self
167    }
168
169    /// Disable attempting to load credentials from the GCE metadata server.
170    pub fn disable_vm_metadata(mut self) -> Self {
171        self.config.disable_vm_metadata = true;
172        self
173    }
174
175    /// Disable loading configuration from the environment.
176    pub fn disable_config_load(mut self) -> Self {
177        self.config.disable_config_load = true;
178        self
179    }
180
181    /// Set the predefined acl for GCS.
182    ///
183    /// Available values are:
184    /// - `authenticatedRead`
185    /// - `bucketOwnerFullControl`
186    /// - `bucketOwnerRead`
187    /// - `private`
188    /// - `projectPrivate`
189    /// - `publicRead`
190    pub fn predefined_acl(mut self, acl: &str) -> Self {
191        if !acl.is_empty() {
192            self.config.predefined_acl = Some(acl.to_string())
193        };
194        self
195    }
196
197    /// Set the default storage class for GCS.
198    ///
199    /// Available values are:
200    /// - `STANDARD`
201    /// - `NEARLINE`
202    /// - `COLDLINE`
203    /// - `ARCHIVE`
204    pub fn default_storage_class(mut self, class: &str) -> Self {
205        if !class.is_empty() {
206            self.config.default_storage_class = Some(class.to_string())
207        };
208        self
209    }
210
211    /// Allow anonymous requests.
212    ///
213    /// This is typically used for buckets which are open to the public or GCS
214    /// storage emulators.
215    pub fn allow_anonymous(mut self) -> Self {
216        self.config.allow_anonymous = true;
217        self
218    }
219}
220
221impl Builder for GcsBuilder {
222    type Config = GcsConfig;
223
224    fn build(self) -> Result<impl Access> {
225        debug!("backend build started: {self:?}");
226
227        let root = normalize_root(&self.config.root.unwrap_or_default());
228        debug!("backend use root {root}");
229
230        // Handle endpoint and bucket name
231        let bucket = match self.config.bucket.is_empty() {
232            false => Ok(&self.config.bucket),
233            true => Err(
234                Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
235                    .with_operation("Builder::build")
236                    .with_context("service", GCS_SCHEME),
237            ),
238        }?;
239
240        // TODO: server side encryption
241
242        let endpoint = self
243            .config
244            .endpoint
245            .clone()
246            .unwrap_or_else(|| DEFAULT_GCS_ENDPOINT.to_string());
247        debug!("backend use endpoint: {endpoint}");
248
249        let mut cred_loader = GoogleCredentialLoader::default();
250        if let Some(cred) = &self.config.credential {
251            cred_loader = cred_loader.with_content(cred);
252        }
253        if let Some(cred) = &self.config.credential_path {
254            cred_loader = cred_loader.with_path(cred);
255        }
256        #[cfg(target_arch = "wasm32")]
257        {
258            cred_loader = cred_loader.with_disable_env();
259            cred_loader = cred_loader.with_disable_well_known_location();
260        }
261
262        if self.config.disable_config_load {
263            cred_loader = cred_loader
264                .with_disable_env()
265                .with_disable_well_known_location();
266        }
267
268        let scope = if let Some(scope) = &self.config.scope {
269            scope
270        } else {
271            DEFAULT_GCS_SCOPE
272        };
273
274        let mut token_loader = GoogleTokenLoader::new(scope, GLOBAL_REQWEST_CLIENT.clone());
275        if let Some(account) = &self.config.service_account {
276            token_loader = token_loader.with_service_account(account);
277        }
278        if let Ok(Some(cred)) = cred_loader.load() {
279            token_loader = token_loader.with_credentials(cred)
280        }
281        if let Some(loader) = self.customized_token_loader {
282            token_loader = token_loader.with_customized_token_loader(loader)
283        }
284
285        if self.config.disable_vm_metadata {
286            token_loader = token_loader.with_disable_vm_metadata(true);
287        }
288
289        let signer = GoogleSigner::new("storage");
290
291        let backend = GcsBackend {
292            core: Arc::new(GcsCore {
293                info: {
294                    let am = AccessorInfo::default();
295                    am.set_scheme(GCS_SCHEME)
296                        .set_root(&root)
297                        .set_name(bucket)
298                        .set_native_capability(Capability {
299                            stat: true,
300                            stat_with_if_match: true,
301                            stat_with_if_none_match: true,
302
303                            read: true,
304
305                            read_with_if_match: true,
306                            read_with_if_none_match: true,
307
308                            write: true,
309                            write_can_empty: true,
310                            write_can_multi: true,
311                            write_with_cache_control: true,
312                            write_with_content_type: true,
313                            write_with_content_encoding: true,
314                            write_with_user_metadata: true,
315                            write_with_if_not_exists: true,
316
317                            // The min multipart size of Gcs is 5 MiB.
318                            //
319                            // ref: <https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
320                            write_multi_min_size: Some(5 * 1024 * 1024),
321                            // The max multipart size of Gcs is 5 GiB.
322                            //
323                            // ref: <https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
324                            write_multi_max_size: if cfg!(target_pointer_width = "64") {
325                                Some(5 * 1024 * 1024 * 1024)
326                            } else {
327                                Some(usize::MAX)
328                            },
329
330                            delete: true,
331                            delete_max_size: Some(100),
332                            copy: true,
333
334                            list: true,
335                            list_with_limit: true,
336                            list_with_start_after: true,
337                            list_with_recursive: true,
338
339                            presign: true,
340                            presign_stat: true,
341                            presign_read: true,
342                            presign_write: true,
343
344                            shared: true,
345
346                            ..Default::default()
347                        });
348
349                    // allow deprecated api here for compatibility
350                    #[allow(deprecated)]
351                    if let Some(client) = self.http_client {
352                        am.update_http_client(|_| client);
353                    }
354
355                    am.into()
356                },
357                endpoint,
358                bucket: bucket.to_string(),
359                root,
360                signer,
361                token_loader,
362                token: self.config.token,
363                scope: scope.to_string(),
364                credential_loader: cred_loader,
365                predefined_acl: self.config.predefined_acl.clone(),
366                default_storage_class: self.config.default_storage_class.clone(),
367                allow_anonymous: self.config.allow_anonymous,
368            }),
369        };
370
371        Ok(backend)
372    }
373}
374
375/// GCS storage backend
376#[derive(Clone, Debug)]
377pub struct GcsBackend {
378    core: Arc<GcsCore>,
379}
380
381impl Access for GcsBackend {
382    type Reader = HttpBody;
383    type Writer = GcsWriters;
384    type Lister = oio::PageLister<GcsLister>;
385    type Deleter = oio::BatchDeleter<GcsDeleter>;
386
387    fn info(&self) -> Arc<AccessorInfo> {
388        self.core.info.clone()
389    }
390
391    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
392        let resp = self.core.gcs_get_object_metadata(path, &args).await?;
393
394        if !resp.status().is_success() {
395            return Err(parse_error(resp));
396        }
397
398        let slc = resp.into_body();
399        let m = GcsCore::build_metadata_from_object_response(path, slc)?;
400
401        Ok(RpStat::new(m))
402    }
403
404    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
405        let resp = self.core.gcs_get_object(path, args.range(), &args).await?;
406
407        let status = resp.status();
408
409        match status {
410            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
411                Ok((RpRead::default(), resp.into_body()))
412            }
413            _ => {
414                let (part, mut body) = resp.into_parts();
415                let buf = body.to_buffer().await?;
416                Err(parse_error(Response::from_parts(part, buf)))
417            }
418        }
419    }
420
421    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
422        let concurrent = args.concurrent();
423        let w = GcsWriter::new(self.core.clone(), path, args);
424        let w = oio::MultipartWriter::new(self.core.info.clone(), w, concurrent);
425
426        Ok((RpWrite::default(), w))
427    }
428
429    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
430        Ok((
431            RpDelete::default(),
432            oio::BatchDeleter::new(GcsDeleter::new(self.core.clone())),
433        ))
434    }
435
436    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
437        let l = GcsLister::new(
438            self.core.clone(),
439            path,
440            args.recursive(),
441            args.limit(),
442            args.start_after(),
443        );
444
445        Ok((RpList::default(), oio::PageLister::new(l)))
446    }
447
448    async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
449        let resp = self.core.gcs_copy_object(from, to).await?;
450
451        if resp.status().is_success() {
452            Ok(RpCopy::default())
453        } else {
454            Err(parse_error(resp))
455        }
456    }
457
458    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
459        // We will not send this request out, just for signing.
460        let req = match args.operation() {
461            PresignOperation::Stat(v) => self.core.gcs_head_object_xml_request(path, v),
462            PresignOperation::Read(v) => self.core.gcs_get_object_xml_request(path, v),
463            PresignOperation::Write(v) => {
464                self.core
465                    .gcs_insert_object_xml_request(path, v, Buffer::new())
466            }
467            PresignOperation::Delete(_) => Err(Error::new(
468                ErrorKind::Unsupported,
469                "operation is not supported",
470            )),
471        };
472        let mut req = req?;
473        self.core.sign_query(&mut req, args.expire())?;
474
475        // We don't need this request anymore, consume it directly.
476        let (parts, _) = req.into_parts();
477
478        Ok(RpPresign::new(PresignedRequest::new(
479            parts.method,
480            parts.uri,
481            parts.headers,
482        )))
483    }
484}