opendal/services/oss/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::Response;
22use http::StatusCode;
23use http::Uri;
24use log::debug;
25use reqsign::AliyunConfig;
26use reqsign::AliyunLoader;
27use reqsign::AliyunOssSigner;
28
29use super::OSS_SCHEME;
30use super::config::OssConfig;
31use super::core::*;
32use super::deleter::OssDeleter;
33use super::error::parse_error;
34use super::lister::OssLister;
35use super::lister::OssListers;
36use super::lister::OssObjectVersionsLister;
37use super::writer::OssWriter;
38use super::writer::OssWriters;
39use crate::raw::*;
40use crate::*;
41
42const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
43
44/// Aliyun Object Storage Service (OSS) support
45#[doc = include_str!("docs.md")]
46#[derive(Default)]
47pub struct OssBuilder {
48    pub(super) config: OssConfig,
49
50    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
51    pub(super) http_client: Option<HttpClient>,
52}
53
54impl Debug for OssBuilder {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("OssBuilder")
57            .field("config", &self.config)
58            .finish_non_exhaustive()
59    }
60}
61
62impl OssBuilder {
63    /// Set root of this backend.
64    ///
65    /// All operations will happen under this root.
66    pub fn root(mut self, root: &str) -> Self {
67        self.config.root = if root.is_empty() {
68            None
69        } else {
70            Some(root.to_string())
71        };
72
73        self
74    }
75
76    /// Set bucket name of this backend.
77    pub fn bucket(mut self, bucket: &str) -> Self {
78        self.config.bucket = bucket.to_string();
79
80        self
81    }
82
83    /// Set endpoint of this backend.
84    pub fn endpoint(mut self, endpoint: &str) -> Self {
85        if !endpoint.is_empty() {
86            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
87            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
88        }
89
90        self
91    }
92
93    /// Set addressing style for the endpoint.
94    ///
95    /// Available values: `virtual`, `cname`, `path`.
96    ///
97    /// - `virtual`: Use virtual addressing style, i.e. `http://bucket.oss-<region>.aliyuncs.com/object`
98    /// - `cname`: Use cname addressing style, i.e. `http://mydomain.com/object` with mydomain.com bound to your bucket.
99    /// - `path`: Use path addressing style. i.e. `http://oss-<region>.aliyuncs.com/bucket/object`
100    ///
101    /// - If not set, default value is `virtual`.
102    pub fn addressing_style(mut self, addressing_style: &str) -> Self {
103        self.config.addressing_style = Some(addressing_style.to_string());
104
105        self
106    }
107
108    /// Set bucket versioning status for this backend
109    pub fn enable_versioning(mut self, enabled: bool) -> Self {
110        self.config.enable_versioning = enabled;
111
112        self
113    }
114
115    /// Set an endpoint for generating presigned urls.
116    ///
117    /// You can offer a public endpoint like <https://oss-cn-beijing.aliyuncs.com> to return a presinged url for
118    /// public accessors, along with an internal endpoint like <https://oss-cn-beijing-internal.aliyuncs.com>
119    /// to access objects in a faster path.
120    ///
121    /// - If presign_endpoint is set, we will use presign_endpoint on generating presigned urls.
122    /// - if not, we will use endpoint as default.
123    pub fn presign_endpoint(mut self, endpoint: &str) -> Self {
124        if !endpoint.is_empty() {
125            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
126            self.config.presign_endpoint = Some(endpoint.trim_end_matches('/').to_string())
127        }
128
129        self
130    }
131
132    /// Set addressing style for presign endpoint.
133    ///
134    /// Similar to setting addressing style for endpoint.
135    ///
136    /// - If both presign_endpoint and presign_addressing_style are not set, they are the same as endpoint's configurations.
137    ///
138    /// - If presign_endpoint is set, but presign_addressing_style is not set, default value is `virtual`.
139    pub fn presign_addressing_style(mut self, addressing_style: &str) -> Self {
140        self.config.presign_addressing_style = Some(addressing_style.to_string());
141
142        self
143    }
144
145    /// Set access_key_id of this backend.
146    ///
147    /// - If access_key_id is set, we will take user's input first.
148    /// - If not, we will try to load it from environment.
149    pub fn access_key_id(mut self, v: &str) -> Self {
150        if !v.is_empty() {
151            self.config.access_key_id = Some(v.to_string())
152        }
153
154        self
155    }
156
157    /// Set access_key_secret of this backend.
158    ///
159    /// - If access_key_secret is set, we will take user's input first.
160    /// - If not, we will try to load it from environment.
161    pub fn access_key_secret(mut self, v: &str) -> Self {
162        if !v.is_empty() {
163            self.config.access_key_secret = Some(v.to_string())
164        }
165
166        self
167    }
168
169    /// Set security_token for this backend.
170    ///
171    /// - If security_token is set, we will take user's input first.
172    /// - If not, we will try to load it from environment.
173    pub fn security_token(mut self, security_token: &str) -> Self {
174        if !security_token.is_empty() {
175            self.config.security_token = Some(security_token.to_string())
176        }
177
178        self
179    }
180
181    /// Specify the http client that used by this service.
182    ///
183    /// # Notes
184    ///
185    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
186    /// during minor updates.
187    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
188    #[allow(deprecated)]
189    pub fn http_client(mut self, client: HttpClient) -> Self {
190        self.http_client = Some(client);
191        self
192    }
193
194    /// preprocess the endpoint option
195    fn parse_endpoint(
196        &self,
197        endpoint: &Option<String>,
198        bucket: &str,
199        addressing_style: AddressingStyle,
200    ) -> Result<(String, String)> {
201        let (endpoint, host) = match endpoint.clone() {
202            Some(ep) => {
203                let uri = ep.parse::<Uri>().map_err(|err| {
204                    Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
205                        .with_context("service", OSS_SCHEME)
206                        .with_context("endpoint", &ep)
207                        .set_source(err)
208                })?;
209                let host = uri.host().ok_or_else(|| {
210                    Error::new(ErrorKind::ConfigInvalid, "endpoint host is empty")
211                        .with_context("service", OSS_SCHEME)
212                        .with_context("endpoint", &ep)
213                })?;
214                let full_host = match addressing_style {
215                    AddressingStyle::Virtual => {
216                        if let Some(port) = uri.port_u16() {
217                            format!("{bucket}.{host}:{port}")
218                        } else {
219                            format!("{bucket}.{host}")
220                        }
221                    }
222                    AddressingStyle::Cname | AddressingStyle::Path => {
223                        if let Some(port) = uri.port_u16() {
224                            format!("{host}:{port}")
225                        } else {
226                            host.to_string()
227                        }
228                    }
229                };
230                if let Some(port) = uri.port_u16() {
231                    format!("{bucket}.{host}:{port}")
232                } else {
233                    format!("{bucket}.{host}")
234                };
235                let endpoint = match uri.scheme_str() {
236                    Some(scheme_str) => match scheme_str {
237                        "http" | "https" => format!("{scheme_str}://{full_host}"),
238                        _ => {
239                            return Err(Error::new(
240                                ErrorKind::ConfigInvalid,
241                                "endpoint protocol is invalid",
242                            )
243                            .with_context("service", OSS_SCHEME));
244                        }
245                    },
246                    None => format!("https://{full_host}"),
247                };
248                let endpoint = match addressing_style {
249                    AddressingStyle::Path => format!("{}/{}", endpoint, bucket),
250                    AddressingStyle::Cname | AddressingStyle::Virtual => endpoint,
251                };
252                (endpoint, full_host)
253            }
254            None => {
255                return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
256                    .with_context("service", OSS_SCHEME));
257            }
258        };
259        Ok((endpoint, host))
260    }
261
262    /// Set server_side_encryption for this backend.
263    ///
264    /// Available values: `AES256`, `KMS`.
265    ///
266    /// Reference: <https://www.alibabacloud.com/help/en/object-storage-service/latest/server-side-encryption-5>
267    /// Brief explanation:
268    /// There are two server-side encryption methods available:
269    /// SSE-AES256:
270    ///     1. Configure the bucket encryption mode as OSS-managed and specify the encryption algorithm as AES256.
271    ///     2. Include the `x-oss-server-side-encryption` parameter in the request and set its value to AES256.
272    /// SSE-KMS:
273    ///     1. To use this service, you need to first enable KMS.
274    ///     2. Configure the bucket encryption mode as KMS, and specify the specific CMK ID for BYOK (Bring Your Own Key)
275    ///        or not specify the specific CMK ID for OSS-managed KMS key.
276    ///     3. Include the `x-oss-server-side-encryption` parameter in the request and set its value to KMS.
277    ///     4. If a specific CMK ID is specified, include the `x-oss-server-side-encryption-key-id` parameter in the request, and set its value to the specified CMK ID.
278    pub fn server_side_encryption(mut self, v: &str) -> Self {
279        if !v.is_empty() {
280            self.config.server_side_encryption = Some(v.to_string())
281        }
282        self
283    }
284
285    /// Set server_side_encryption_key_id for this backend.
286    ///
287    /// # Notes
288    ///
289    /// This option only takes effect when server_side_encryption equals to KMS.
290    pub fn server_side_encryption_key_id(mut self, v: &str) -> Self {
291        if !v.is_empty() {
292            self.config.server_side_encryption_key_id = Some(v.to_string())
293        }
294        self
295    }
296
297    /// Set maximum batch operations of this backend.
298    #[deprecated(
299        since = "0.52.0",
300        note = "Please use `delete_max_size` instead of `batch_max_operations`"
301    )]
302    pub fn batch_max_operations(mut self, delete_max_size: usize) -> Self {
303        self.config.delete_max_size = Some(delete_max_size);
304
305        self
306    }
307
308    /// Set maximum delete operations of this backend.
309    pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
310        self.config.delete_max_size = Some(delete_max_size);
311
312        self
313    }
314
315    /// Allow anonymous will allow opendal to send request without signing
316    /// when credential is not loaded.
317    pub fn allow_anonymous(mut self) -> Self {
318        self.config.allow_anonymous = true;
319        self
320    }
321
322    /// Set role_arn for this backend.
323    ///
324    /// If `role_arn` is set, we will use already known config as source
325    /// credential to assume role with `role_arn`.
326    pub fn role_arn(mut self, role_arn: &str) -> Self {
327        if !role_arn.is_empty() {
328            self.config.role_arn = Some(role_arn.to_string())
329        }
330
331        self
332    }
333
334    /// Set role_session_name for this backend.
335    pub fn role_session_name(mut self, role_session_name: &str) -> Self {
336        if !role_session_name.is_empty() {
337            self.config.role_session_name = Some(role_session_name.to_string())
338        }
339
340        self
341    }
342
343    /// Set oidc_provider_arn for this backend.
344    pub fn oidc_provider_arn(mut self, oidc_provider_arn: &str) -> Self {
345        if !oidc_provider_arn.is_empty() {
346            self.config.oidc_provider_arn = Some(oidc_provider_arn.to_string())
347        }
348
349        self
350    }
351
352    /// Set oidc_token_file for this backend.
353    pub fn oidc_token_file(mut self, oidc_token_file: &str) -> Self {
354        if !oidc_token_file.is_empty() {
355            self.config.oidc_token_file = Some(oidc_token_file.to_string())
356        }
357
358        self
359    }
360
361    /// Set sts_endpoint for this backend.
362    pub fn sts_endpoint(mut self, sts_endpoint: &str) -> Self {
363        if !sts_endpoint.is_empty() {
364            self.config.sts_endpoint = Some(sts_endpoint.to_string())
365        }
366
367        self
368    }
369}
370
371enum AddressingStyle {
372    Path,
373    Cname,
374    Virtual,
375}
376
377impl TryFrom<&Option<String>> for AddressingStyle {
378    type Error = Error;
379
380    fn try_from(value: &Option<String>) -> Result<Self> {
381        match value.as_deref() {
382            None | Some("virtual") => Ok(AddressingStyle::Virtual),
383            Some("path") => Ok(AddressingStyle::Path),
384            Some("cname") => Ok(AddressingStyle::Cname),
385            Some(v) => Err(Error::new(
386                ErrorKind::ConfigInvalid,
387                "Invalid addressing style, available: `virtual`, `path`, `cname`",
388            )
389            .with_context("service", OSS_SCHEME)
390            .with_context("addressing_style", v)),
391        }
392    }
393}
394
395impl Builder for OssBuilder {
396    type Config = OssConfig;
397
398    fn build(self) -> Result<impl Access> {
399        debug!("backend build started: {:?}", &self);
400
401        let root = normalize_root(&self.config.root.clone().unwrap_or_default());
402        debug!("backend use root {}", &root);
403
404        // Handle endpoint, region and bucket name.
405        let bucket = match self.config.bucket.is_empty() {
406            false => Ok(&self.config.bucket),
407            true => Err(
408                Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
409                    .with_context("service", OSS_SCHEME),
410            ),
411        }?;
412
413        // Retrieve endpoint and host by parsing the endpoint option and bucket. If presign_endpoint is not
414        // set, take endpoint as default presign_endpoint.
415        let (endpoint, host) = self.parse_endpoint(
416            &self.config.endpoint,
417            bucket,
418            (&self.config.addressing_style).try_into()?,
419        )?;
420        debug!("backend use bucket {}, endpoint: {}", &bucket, &endpoint);
421
422        let presign_endpoint = if self.config.presign_endpoint.is_some() {
423            self.parse_endpoint(
424                &self.config.presign_endpoint,
425                bucket,
426                (&self.config.presign_addressing_style).try_into()?,
427            )?
428            .0
429        } else {
430            endpoint.clone()
431        };
432        debug!("backend use presign_endpoint: {}", &presign_endpoint);
433
434        let server_side_encryption = match &self.config.server_side_encryption {
435            None => None,
436            Some(v) => Some(
437                build_header_value(v)
438                    .map_err(|err| err.with_context("key", "server_side_encryption"))?,
439            ),
440        };
441
442        let server_side_encryption_key_id = match &self.config.server_side_encryption_key_id {
443            None => None,
444            Some(v) => Some(
445                build_header_value(v)
446                    .map_err(|err| err.with_context("key", "server_side_encryption_key_id"))?,
447            ),
448        };
449
450        let mut cfg = AliyunConfig::default();
451        // Load cfg from env first.
452        cfg = cfg.from_env();
453
454        if let Some(v) = self.config.access_key_id {
455            cfg.access_key_id = Some(v);
456        }
457
458        if let Some(v) = self.config.access_key_secret {
459            cfg.access_key_secret = Some(v);
460        }
461
462        if let Some(v) = self.config.security_token {
463            cfg.security_token = Some(v);
464        }
465
466        if let Some(v) = self.config.role_arn {
467            cfg.role_arn = Some(v);
468        }
469
470        // override default role_session_name if set
471        if let Some(v) = self.config.role_session_name {
472            cfg.role_session_name = v;
473        }
474
475        if let Some(v) = self.config.oidc_provider_arn {
476            cfg.oidc_provider_arn = Some(v);
477        }
478
479        if let Some(v) = self.config.oidc_token_file {
480            cfg.oidc_token_file = Some(v);
481        }
482
483        if let Some(v) = self.config.sts_endpoint {
484            cfg.sts_endpoint = Some(v);
485        }
486
487        let loader = AliyunLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);
488
489        let signer = AliyunOssSigner::new(bucket);
490
491        let delete_max_size = self
492            .config
493            .delete_max_size
494            .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
495
496        Ok(OssBackend {
497            core: Arc::new(OssCore {
498                info: {
499                    let am = AccessorInfo::default();
500                    am.set_scheme(OSS_SCHEME)
501                        .set_root(&root)
502                        .set_name(bucket)
503                        .set_native_capability(Capability {
504                            stat: true,
505                            stat_with_if_match: true,
506                            stat_with_if_none_match: true,
507                            stat_with_version: self.config.enable_versioning,
508
509                            read: true,
510
511                            read_with_if_match: true,
512                            read_with_if_none_match: true,
513                            read_with_version: self.config.enable_versioning,
514                            read_with_if_modified_since: true,
515                            read_with_if_unmodified_since: true,
516
517                            write: true,
518                            write_can_empty: true,
519                            write_can_append: true,
520                            write_can_multi: true,
521                            write_with_cache_control: true,
522                            write_with_content_type: true,
523                            write_with_content_disposition: true,
524                            // TODO: set this to false while version has been enabled.
525                            write_with_if_not_exists: !self.config.enable_versioning,
526
527                            // The min multipart size of OSS is 100 KiB.
528                            //
529                            // ref: <https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
530                            write_multi_min_size: Some(100 * 1024),
531                            // The max multipart size of OSS is 5 GiB.
532                            //
533                            // ref: <https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
534                            write_multi_max_size: if cfg!(target_pointer_width = "64") {
535                                Some(5 * 1024 * 1024 * 1024)
536                            } else {
537                                Some(usize::MAX)
538                            },
539                            write_with_user_metadata: true,
540
541                            delete: true,
542                            delete_with_version: self.config.enable_versioning,
543                            delete_max_size: Some(delete_max_size),
544
545                            copy: true,
546
547                            list: true,
548                            list_with_limit: true,
549                            list_with_start_after: true,
550                            list_with_recursive: true,
551                            list_with_versions: self.config.enable_versioning,
552                            list_with_deleted: self.config.enable_versioning,
553
554                            presign: true,
555                            presign_stat: true,
556                            presign_read: true,
557                            presign_write: true,
558
559                            shared: true,
560
561                            ..Default::default()
562                        });
563
564                    // allow deprecated api here for compatibility
565                    #[allow(deprecated)]
566                    if let Some(client) = self.http_client {
567                        am.update_http_client(|_| client);
568                    }
569
570                    am.into()
571                },
572                root,
573                bucket: bucket.to_owned(),
574                endpoint,
575                host,
576                presign_endpoint,
577                allow_anonymous: self.config.allow_anonymous,
578                signer,
579                loader,
580                server_side_encryption,
581                server_side_encryption_key_id,
582            }),
583        })
584    }
585}
586
587#[derive(Debug, Clone)]
588/// Aliyun Object Storage Service backend
589pub struct OssBackend {
590    core: Arc<OssCore>,
591}
592
593impl Access for OssBackend {
594    type Reader = HttpBody;
595    type Writer = OssWriters;
596    type Lister = OssListers;
597    type Deleter = oio::BatchDeleter<OssDeleter>;
598
599    fn info(&self) -> Arc<AccessorInfo> {
600        self.core.info.clone()
601    }
602
603    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
604        let resp = self.core.oss_head_object(path, &args).await?;
605
606        let status = resp.status();
607
608        match status {
609            StatusCode::OK => {
610                let headers = resp.headers();
611                let mut meta = self.core.parse_metadata(path, resp.headers())?;
612
613                if let Some(v) = parse_header_to_str(headers, constants::X_OSS_VERSION_ID)? {
614                    meta.set_version(v);
615                }
616
617                Ok(RpStat::new(meta))
618            }
619            _ => Err(parse_error(resp)),
620        }
621    }
622
623    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
624        let resp = self.core.oss_get_object(path, &args).await?;
625
626        let status = resp.status();
627
628        match status {
629            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
630                Ok((RpRead::default(), resp.into_body()))
631            }
632            _ => {
633                let (part, mut body) = resp.into_parts();
634                let buf = body.to_buffer().await?;
635                Err(parse_error(Response::from_parts(part, buf)))
636            }
637        }
638    }
639
640    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
641        let writer = OssWriter::new(self.core.clone(), path, args.clone());
642
643        let w = if args.append() {
644            OssWriters::Two(oio::AppendWriter::new(writer))
645        } else {
646            OssWriters::One(oio::MultipartWriter::new(
647                self.core.info.clone(),
648                writer,
649                args.concurrent(),
650            ))
651        };
652
653        Ok((RpWrite::default(), w))
654    }
655
656    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
657        Ok((
658            RpDelete::default(),
659            oio::BatchDeleter::new(
660                OssDeleter::new(self.core.clone()),
661                self.core.info.full_capability().delete_max_size,
662            ),
663        ))
664    }
665
666    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
667        let l = if args.versions() || args.deleted() {
668            TwoWays::Two(oio::PageLister::new(OssObjectVersionsLister::new(
669                self.core.clone(),
670                path,
671                args,
672            )))
673        } else {
674            TwoWays::One(oio::PageLister::new(OssLister::new(
675                self.core.clone(),
676                path,
677                args.recursive(),
678                args.limit(),
679                args.start_after(),
680            )))
681        };
682
683        Ok((RpList::default(), l))
684    }
685
686    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
687        let resp = self.core.oss_copy_object(from, to).await?;
688        let status = resp.status();
689
690        match status {
691            StatusCode::OK => Ok(RpCopy::default()),
692            _ => Err(parse_error(resp)),
693        }
694    }
695
696    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
697        // We will not send this request out, just for signing.
698        let req = match args.operation() {
699            PresignOperation::Stat(v) => self.core.oss_head_object_request(path, true, v),
700            PresignOperation::Read(v) => self.core.oss_get_object_request(path, true, v),
701            PresignOperation::Write(v) => {
702                self.core
703                    .oss_put_object_request(path, None, v, Buffer::new(), true)
704            }
705            PresignOperation::Delete(_) => Err(Error::new(
706                ErrorKind::Unsupported,
707                "operation is not supported",
708            )),
709        };
710        let mut req = req?;
711
712        self.core.sign_query(&mut req, args.expire()).await?;
713
714        // We don't need this request anymore, consume it directly.
715        let (parts, _) = req.into_parts();
716
717        Ok(RpPresign::new(PresignedRequest::new(
718            parts.method,
719            parts.uri,
720            parts.headers,
721        )))
722    }
723}