opendal/services/gcs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use bytes::Buf;
24use http::Response;
25use http::StatusCode;
26use log::debug;
27use reqsign::GoogleCredentialLoader;
28use reqsign::GoogleSigner;
29use reqsign::GoogleTokenLoad;
30use reqsign::GoogleTokenLoader;
31use serde::Deserialize;
32use serde_json;
33
34use super::core::*;
35use super::delete::GcsDeleter;
36use super::error::parse_error;
37use super::lister::GcsLister;
38use super::writer::GcsWriter;
39use super::writer::GcsWriters;
40use crate::raw::oio::BatchDeleter;
41use crate::raw::*;
42use crate::services::GcsConfig;
43use crate::*;
44
45const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com";
46const DEFAULT_GCS_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.read_write";
47
48impl Configurator for GcsConfig {
49    type Builder = GcsBuilder;
50
51    #[allow(deprecated)]
52    fn into_builder(self) -> Self::Builder {
53        GcsBuilder {
54            config: self,
55            http_client: None,
56            customized_token_loader: None,
57        }
58    }
59}
60
61/// [Google Cloud Storage](https://cloud.google.com/storage) services support.
62#[doc = include_str!("docs.md")]
63#[derive(Default)]
64pub struct GcsBuilder {
65    config: GcsConfig,
66
67    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
68    http_client: Option<HttpClient>,
69    customized_token_loader: Option<Box<dyn GoogleTokenLoad>>,
70}
71
72impl Debug for GcsBuilder {
73    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74        let mut ds = f.debug_struct("GcsBuilder");
75
76        ds.field("config", &self.config);
77        ds.finish_non_exhaustive()
78    }
79}
80
81impl GcsBuilder {
82    /// set the working directory root of backend
83    pub fn root(mut self, root: &str) -> Self {
84        self.config.root = if root.is_empty() {
85            None
86        } else {
87            Some(root.to_string())
88        };
89
90        self
91    }
92
93    /// set the container's name
94    pub fn bucket(mut self, bucket: &str) -> Self {
95        self.config.bucket = bucket.to_string();
96        self
97    }
98
99    /// set the GCS service scope
100    ///
101    /// If not set, we will use `https://www.googleapis.com/auth/devstorage.read_write`.
102    ///
103    /// # Valid scope examples
104    ///
105    /// - read-only: `https://www.googleapis.com/auth/devstorage.read_only`
106    /// - read-write: `https://www.googleapis.com/auth/devstorage.read_write`
107    /// - full-control: `https://www.googleapis.com/auth/devstorage.full_control`
108    ///
109    /// Reference: [Cloud Storage authentication](https://cloud.google.com/storage/docs/authentication)
110    pub fn scope(mut self, scope: &str) -> Self {
111        if !scope.is_empty() {
112            self.config.scope = Some(scope.to_string())
113        };
114        self
115    }
116
117    /// Set the GCS service account.
118    ///
119    /// service account will be used for fetch token from vm metadata.
120    /// If not set, we will try to fetch with `default` service account.
121    pub fn service_account(mut self, service_account: &str) -> Self {
122        if !service_account.is_empty() {
123            self.config.service_account = Some(service_account.to_string())
124        };
125        self
126    }
127
128    /// set the endpoint GCS service uses
129    pub fn endpoint(mut self, endpoint: &str) -> Self {
130        if !endpoint.is_empty() {
131            self.config.endpoint = Some(endpoint.to_string())
132        };
133        self
134    }
135
136    /// set the base64 hashed credentials string used for OAuth2 authentication.
137    ///
138    /// this method allows to specify the credentials directly as a base64 hashed string.
139    /// alternatively, you can use `credential_path()` to provide the local path to a credentials file.
140    /// we will use one of `credential` and `credential_path` to complete the OAuth2 authentication.
141    ///
142    /// Reference: [Google Cloud Storage Authentication](https://cloud.google.com/docs/authentication).
143    pub fn credential(mut self, credential: &str) -> Self {
144        if !credential.is_empty() {
145            self.config.credential = Some(credential.to_string())
146        };
147        self
148    }
149
150    /// set the local path to credentials file which is used for OAuth2 authentication.
151    ///
152    /// credentials file contains the original credentials that have not been base64 hashed.
153    /// we will use one of `credential` and `credential_path` to complete the OAuth2 authentication.
154    ///
155    /// Reference: [Google Cloud Storage Authentication](https://cloud.google.com/docs/authentication).
156    pub fn credential_path(mut self, path: &str) -> Self {
157        if !path.is_empty() {
158            self.config.credential_path = Some(path.to_string())
159        };
160        self
161    }
162
163    /// Specify the http client that used by this service.
164    ///
165    /// # Notes
166    ///
167    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
168    /// during minor updates.
169    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
170    #[allow(deprecated)]
171    pub fn http_client(mut self, client: HttpClient) -> Self {
172        self.http_client = Some(client);
173        self
174    }
175
176    /// Specify the customized token loader used by this service.
177    pub fn customized_token_loader(mut self, token_load: Box<dyn GoogleTokenLoad>) -> Self {
178        self.customized_token_loader = Some(token_load);
179        self
180    }
181
182    /// Provide the OAuth2 token to use.
183    pub fn token(mut self, token: String) -> Self {
184        self.config.token = Some(token);
185        self
186    }
187
188    /// Disable attempting to load credentials from the GCE metadata server.
189    pub fn disable_vm_metadata(mut self) -> Self {
190        self.config.disable_vm_metadata = true;
191        self
192    }
193
194    /// Disable loading configuration from the environment.
195    pub fn disable_config_load(mut self) -> Self {
196        self.config.disable_config_load = true;
197        self
198    }
199
200    /// Set the predefined acl for GCS.
201    ///
202    /// Available values are:
203    /// - `authenticatedRead`
204    /// - `bucketOwnerFullControl`
205    /// - `bucketOwnerRead`
206    /// - `private`
207    /// - `projectPrivate`
208    /// - `publicRead`
209    pub fn predefined_acl(mut self, acl: &str) -> Self {
210        if !acl.is_empty() {
211            self.config.predefined_acl = Some(acl.to_string())
212        };
213        self
214    }
215
216    /// Set the default storage class for GCS.
217    ///
218    /// Available values are:
219    /// - `STANDARD`
220    /// - `NEARLINE`
221    /// - `COLDLINE`
222    /// - `ARCHIVE`
223    pub fn default_storage_class(mut self, class: &str) -> Self {
224        if !class.is_empty() {
225            self.config.default_storage_class = Some(class.to_string())
226        };
227        self
228    }
229
230    /// Allow anonymous requests.
231    ///
232    /// This is typically used for buckets which are open to the public or GCS
233    /// storage emulators.
234    pub fn allow_anonymous(mut self) -> Self {
235        self.config.allow_anonymous = true;
236        self
237    }
238}
239
240impl Builder for GcsBuilder {
241    const SCHEME: Scheme = Scheme::Gcs;
242    type Config = GcsConfig;
243
244    fn build(self) -> Result<impl Access> {
245        debug!("backend build started: {:?}", self);
246
247        let root = normalize_root(&self.config.root.unwrap_or_default());
248        debug!("backend use root {}", root);
249
250        // Handle endpoint and bucket name
251        let bucket = match self.config.bucket.is_empty() {
252            false => Ok(&self.config.bucket),
253            true => Err(
254                Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
255                    .with_operation("Builder::build")
256                    .with_context("service", Scheme::Gcs),
257            ),
258        }?;
259
260        // TODO: server side encryption
261
262        let endpoint = self
263            .config
264            .endpoint
265            .clone()
266            .unwrap_or_else(|| DEFAULT_GCS_ENDPOINT.to_string());
267        debug!("backend use endpoint: {endpoint}");
268
269        let mut cred_loader = GoogleCredentialLoader::default();
270        if let Some(cred) = &self.config.credential {
271            cred_loader = cred_loader.with_content(cred);
272        }
273        if let Some(cred) = &self.config.credential_path {
274            cred_loader = cred_loader.with_path(cred);
275        }
276        #[cfg(target_arch = "wasm32")]
277        {
278            cred_loader = cred_loader.with_disable_env();
279            cred_loader = cred_loader.with_disable_well_known_location();
280        }
281
282        if self.config.disable_config_load {
283            cred_loader = cred_loader
284                .with_disable_env()
285                .with_disable_well_known_location();
286        }
287
288        let scope = if let Some(scope) = &self.config.scope {
289            scope
290        } else {
291            DEFAULT_GCS_SCOPE
292        };
293
294        let mut token_loader = GoogleTokenLoader::new(scope, GLOBAL_REQWEST_CLIENT.clone());
295        if let Some(account) = &self.config.service_account {
296            token_loader = token_loader.with_service_account(account);
297        }
298        if let Ok(Some(cred)) = cred_loader.load() {
299            token_loader = token_loader.with_credentials(cred)
300        }
301        if let Some(loader) = self.customized_token_loader {
302            token_loader = token_loader.with_customized_token_loader(loader)
303        }
304
305        if self.config.disable_vm_metadata {
306            token_loader = token_loader.with_disable_vm_metadata(true);
307        }
308
309        let signer = GoogleSigner::new("storage");
310
311        let backend = GcsBackend {
312            core: Arc::new(GcsCore {
313                info: {
314                    let am = AccessorInfo::default();
315                    am.set_scheme(Scheme::Gcs)
316                        .set_root(&root)
317                        .set_name(bucket)
318                        .set_native_capability(Capability {
319                            stat: true,
320                            stat_with_if_match: true,
321                            stat_with_if_none_match: true,
322                            stat_has_etag: true,
323                            stat_has_content_md5: true,
324                            stat_has_content_length: true,
325                            stat_has_content_type: true,
326                            stat_has_content_encoding: true,
327                            stat_has_last_modified: true,
328                            stat_has_user_metadata: true,
329                            stat_has_cache_control: true,
330
331                            read: true,
332
333                            read_with_if_match: true,
334                            read_with_if_none_match: true,
335
336                            write: true,
337                            write_can_empty: true,
338                            write_can_multi: true,
339                            write_with_cache_control: true,
340                            write_with_content_type: true,
341                            write_with_content_encoding: true,
342                            write_with_user_metadata: true,
343                            write_with_if_not_exists: true,
344
345                            // The min multipart size of Gcs is 5 MiB.
346                            //
347                            // ref: <https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
348                            write_multi_min_size: Some(5 * 1024 * 1024),
349                            // The max multipart size of Gcs is 5 GiB.
350                            //
351                            // ref: <https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
352                            write_multi_max_size: if cfg!(target_pointer_width = "64") {
353                                Some(5 * 1024 * 1024 * 1024)
354                            } else {
355                                Some(usize::MAX)
356                            },
357
358                            delete: true,
359                            delete_max_size: Some(100),
360                            copy: true,
361
362                            list: true,
363                            list_with_limit: true,
364                            list_with_start_after: true,
365                            list_with_recursive: true,
366                            list_has_etag: true,
367                            list_has_content_md5: true,
368                            list_has_content_length: true,
369                            list_has_content_type: true,
370                            list_has_last_modified: true,
371
372                            presign: true,
373                            presign_stat: true,
374                            presign_read: true,
375                            presign_write: true,
376
377                            shared: true,
378
379                            ..Default::default()
380                        });
381
382                    // allow deprecated api here for compatibility
383                    #[allow(deprecated)]
384                    if let Some(client) = self.http_client {
385                        am.update_http_client(|_| client);
386                    }
387
388                    am.into()
389                },
390                endpoint,
391                bucket: bucket.to_string(),
392                root,
393                signer,
394                token_loader,
395                token: self.config.token,
396                scope: scope.to_string(),
397                credential_loader: cred_loader,
398                predefined_acl: self.config.predefined_acl.clone(),
399                default_storage_class: self.config.default_storage_class.clone(),
400                allow_anonymous: self.config.allow_anonymous,
401            }),
402        };
403
404        Ok(backend)
405    }
406}
407
408/// GCS storage backend
409#[derive(Clone, Debug)]
410pub struct GcsBackend {
411    core: Arc<GcsCore>,
412}
413
414impl Access for GcsBackend {
415    type Reader = HttpBody;
416    type Writer = GcsWriters;
417    type Lister = oio::PageLister<GcsLister>;
418    type Deleter = oio::BatchDeleter<GcsDeleter>;
419    type BlockingReader = ();
420    type BlockingWriter = ();
421    type BlockingLister = ();
422    type BlockingDeleter = ();
423
424    fn info(&self) -> Arc<AccessorInfo> {
425        self.core.info.clone()
426    }
427
428    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
429        let resp = self.core.gcs_get_object_metadata(path, &args).await?;
430
431        if !resp.status().is_success() {
432            return Err(parse_error(resp));
433        }
434
435        let slc = resp.into_body();
436
437        let meta: GetObjectJsonResponse =
438            serde_json::from_reader(slc.reader()).map_err(new_json_deserialize_error)?;
439
440        let mut m = Metadata::new(EntryMode::FILE);
441
442        m.set_etag(&meta.etag);
443        m.set_content_md5(&meta.md5_hash);
444
445        let size = meta
446            .size
447            .parse::<u64>()
448            .map_err(|e| Error::new(ErrorKind::Unexpected, "parse u64").set_source(e))?;
449        m.set_content_length(size);
450        if !meta.content_type.is_empty() {
451            m.set_content_type(&meta.content_type);
452        }
453
454        if !meta.content_encoding.is_empty() {
455            m.set_content_encoding(&meta.content_encoding);
456        }
457
458        if !meta.cache_control.is_empty() {
459            m.set_cache_control(&meta.cache_control);
460        }
461
462        m.set_last_modified(parse_datetime_from_rfc3339(&meta.updated)?);
463
464        if !meta.metadata.is_empty() {
465            m.with_user_metadata(meta.metadata);
466        }
467
468        Ok(RpStat::new(m))
469    }
470
471    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
472        let resp = self.core.gcs_get_object(path, args.range(), &args).await?;
473
474        let status = resp.status();
475
476        match status {
477            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
478                Ok((RpRead::default(), resp.into_body()))
479            }
480            _ => {
481                let (part, mut body) = resp.into_parts();
482                let buf = body.to_buffer().await?;
483                Err(parse_error(Response::from_parts(part, buf)))
484            }
485        }
486    }
487
488    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
489        let concurrent = args.concurrent();
490        let w = GcsWriter::new(self.core.clone(), path, args);
491        let w = oio::MultipartWriter::new(self.core.info.clone(), w, concurrent);
492
493        Ok((RpWrite::default(), w))
494    }
495
496    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
497        Ok((
498            RpDelete::default(),
499            BatchDeleter::new(GcsDeleter::new(self.core.clone())),
500        ))
501    }
502
503    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
504        let l = GcsLister::new(
505            self.core.clone(),
506            path,
507            args.recursive(),
508            args.limit(),
509            args.start_after(),
510        );
511
512        Ok((RpList::default(), oio::PageLister::new(l)))
513    }
514
515    async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
516        let resp = self.core.gcs_copy_object(from, to).await?;
517
518        if resp.status().is_success() {
519            Ok(RpCopy::default())
520        } else {
521            Err(parse_error(resp))
522        }
523    }
524
525    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
526        // We will not send this request out, just for signing.
527        let req = match args.operation() {
528            PresignOperation::Stat(v) => self.core.gcs_head_object_xml_request(path, v),
529            PresignOperation::Read(v) => self.core.gcs_get_object_xml_request(path, v),
530            PresignOperation::Write(v) => {
531                self.core
532                    .gcs_insert_object_xml_request(path, v, Buffer::new())
533            }
534            PresignOperation::Delete(_) => Err(Error::new(
535                ErrorKind::Unsupported,
536                "operation is not supported",
537            )),
538        };
539        let mut req = req?;
540        self.core.sign_query(&mut req, args.expire())?;
541
542        // We don't need this request anymore, consume it directly.
543        let (parts, _) = req.into_parts();
544
545        Ok(RpPresign::new(PresignedRequest::new(
546            parts.method,
547            parts.uri,
548            parts.headers,
549        )))
550    }
551}
552
553/// The raw json response returned by [`get`](https://cloud.google.com/storage/docs/json_api/v1/objects/get)
554#[derive(Debug, Default, Deserialize)]
555#[serde(default, rename_all = "camelCase")]
556struct GetObjectJsonResponse {
557    /// GCS will return size in string.
558    ///
559    /// For example: `"size": "56535"`
560    size: String,
561    /// etag is not quoted.
562    ///
563    /// For example: `"etag": "CKWasoTgyPkCEAE="`
564    etag: String,
565    /// RFC3339 styled datetime string.
566    ///
567    /// For example: `"updated": "2022-08-15T11:33:34.866Z"`
568    updated: String,
569    /// Content md5 hash
570    ///
571    /// For example: `"md5Hash": "fHcEH1vPwA6eTPqxuasXcg=="`
572    md5_hash: String,
573    /// Content type of this object.
574    ///
575    /// For example: `"contentType": "image/png",`
576    content_type: String,
577    /// Content encoding of this object
578    ///
579    /// For example: "contentEncoding": "br"
580    content_encoding: String,
581    /// Cache-Control directive for the object data.
582    cache_control: String,
583    /// Custom metadata of this object.
584    ///
585    /// For example: `"metadata" : { "my-key": "my-value" }`
586    metadata: HashMap<String, String>,
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592
593    #[test]
594    fn test_deserialize_get_object_json_response() {
595        let content = r#"{
596  "kind": "storage#object",
597  "id": "example/1.png/1660563214863653",
598  "selfLink": "https://www.googleapis.com/storage/v1/b/example/o/1.png",
599  "mediaLink": "https://content-storage.googleapis.com/download/storage/v1/b/example/o/1.png?generation=1660563214863653&alt=media",
600  "name": "1.png",
601  "bucket": "example",
602  "generation": "1660563214863653",
603  "metageneration": "1",
604  "contentType": "image/png",
605  "contentEncoding": "br",
606  "cacheControl": "public, max-age=3600",
607  "storageClass": "STANDARD",
608  "size": "56535",
609  "md5Hash": "fHcEH1vPwA6eTPqxuasXcg==",
610  "crc32c": "j/un9g==",
611  "etag": "CKWasoTgyPkCEAE=",
612  "timeCreated": "2022-08-15T11:33:34.866Z",
613  "updated": "2022-08-15T11:33:34.866Z",
614  "timeStorageClassUpdated": "2022-08-15T11:33:34.866Z",
615  "metadata" : {
616    "location" : "everywhere"
617  }
618}"#;
619
620        let meta: GetObjectJsonResponse =
621            serde_json::from_str(content).expect("json Deserialize must succeed");
622
623        assert_eq!(meta.size, "56535");
624        assert_eq!(meta.updated, "2022-08-15T11:33:34.866Z");
625        assert_eq!(meta.md5_hash, "fHcEH1vPwA6eTPqxuasXcg==");
626        assert_eq!(meta.etag, "CKWasoTgyPkCEAE=");
627        assert_eq!(meta.content_type, "image/png");
628        assert_eq!(meta.content_encoding, "br".to_string());
629        assert_eq!(meta.cache_control, "public, max-age=3600".to_string());
630        assert_eq!(
631            meta.metadata,
632            HashMap::from_iter([("location".to_string(), "everywhere".to_string())])
633        );
634    }
635}