opendal/services/cos/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::Response;
22use http::StatusCode;
23use http::Uri;
24use log::debug;
25use reqsign::TencentCosConfig;
26use reqsign::TencentCosCredentialLoader;
27use reqsign::TencentCosSigner;
28
29use super::core::*;
30use super::delete::CosDeleter;
31use super::error::parse_error;
32use super::lister::{CosLister, CosListers, CosObjectVersionsLister};
33use super::writer::CosWriter;
34use super::writer::CosWriters;
35use crate::raw::oio::PageLister;
36use crate::raw::*;
37use crate::services::CosConfig;
38use crate::*;
39
40impl Configurator for CosConfig {
41    type Builder = CosBuilder;
42
43    #[allow(deprecated)]
44    fn into_builder(self) -> Self::Builder {
45        CosBuilder {
46            config: self,
47
48            http_client: None,
49        }
50    }
51}
52
53/// Tencent-Cloud COS services support.
54#[doc = include_str!("docs.md")]
55#[derive(Default, Clone)]
56pub struct CosBuilder {
57    config: CosConfig,
58
59    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
60    http_client: Option<HttpClient>,
61}
62
63impl Debug for CosBuilder {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        f.debug_struct("CosBuilder")
66            .field("config", &self.config)
67            .finish()
68    }
69}
70
71impl CosBuilder {
72    /// Set root of this backend.
73    ///
74    /// All operations will happen under this root.
75    pub fn root(mut self, root: &str) -> Self {
76        self.config.root = if root.is_empty() {
77            None
78        } else {
79            Some(root.to_string())
80        };
81
82        self
83    }
84
85    /// Set endpoint of this backend.
86    ///
87    /// NOTE: no bucket or account id in endpoint, we will trim them if exists.
88    ///
89    /// # Examples
90    ///
91    /// - `https://cos.ap-singapore.myqcloud.com`
92    pub fn endpoint(mut self, endpoint: &str) -> Self {
93        if !endpoint.is_empty() {
94            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
95        }
96
97        self
98    }
99
100    /// Set secret_id of this backend.
101    /// - If it is set, we will take user's input first.
102    /// - If not, we will try to load it from environment.
103    pub fn secret_id(mut self, secret_id: &str) -> Self {
104        if !secret_id.is_empty() {
105            self.config.secret_id = Some(secret_id.to_string());
106        }
107
108        self
109    }
110
111    /// Set secret_key of this backend.
112    /// - If it is set, we will take user's input first.
113    /// - If not, we will try to load it from environment.
114    pub fn secret_key(mut self, secret_key: &str) -> Self {
115        if !secret_key.is_empty() {
116            self.config.secret_key = Some(secret_key.to_string());
117        }
118
119        self
120    }
121
122    /// Set bucket of this backend.
123    /// The param is required.
124    pub fn bucket(mut self, bucket: &str) -> Self {
125        if !bucket.is_empty() {
126            self.config.bucket = Some(bucket.to_string());
127        }
128
129        self
130    }
131
132    /// Set bucket versioning status for this backend
133    pub fn enable_versioning(mut self, enabled: bool) -> Self {
134        self.config.enable_versioning = enabled;
135
136        self
137    }
138
139    /// Disable config load so that opendal will not load config from
140    /// environment.
141    ///
142    /// For examples:
143    ///
144    /// - envs like `TENCENTCLOUD_SECRET_ID`
145    pub fn disable_config_load(mut self) -> Self {
146        self.config.disable_config_load = true;
147        self
148    }
149
150    /// Specify the http client that used by this service.
151    ///
152    /// # Notes
153    ///
154    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
155    /// during minor updates.
156    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
157    #[allow(deprecated)]
158    pub fn http_client(mut self, client: HttpClient) -> Self {
159        self.http_client = Some(client);
160        self
161    }
162}
163
164impl Builder for CosBuilder {
165    const SCHEME: Scheme = Scheme::Cos;
166    type Config = CosConfig;
167
168    fn build(self) -> Result<impl Access> {
169        debug!("backend build started: {:?}", &self);
170
171        let root = normalize_root(&self.config.root.unwrap_or_default());
172        debug!("backend use root {}", root);
173
174        let bucket = match &self.config.bucket {
175            Some(bucket) => Ok(bucket.to_string()),
176            None => Err(
177                Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
178                    .with_context("service", Scheme::Cos),
179            ),
180        }?;
181        debug!("backend use bucket {}", &bucket);
182
183        let uri = match &self.config.endpoint {
184            Some(endpoint) => endpoint.parse::<Uri>().map_err(|err| {
185                Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
186                    .with_context("service", Scheme::Cos)
187                    .with_context("endpoint", endpoint)
188                    .set_source(err)
189            }),
190            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
191                .with_context("service", Scheme::Cos)),
192        }?;
193
194        let scheme = match uri.scheme_str() {
195            Some(scheme) => scheme.to_string(),
196            None => "https".to_string(),
197        };
198
199        // If endpoint contains bucket name, we should trim them.
200        let endpoint = uri.host().unwrap().replace(&format!("//{bucket}."), "//");
201        debug!("backend use endpoint {}", &endpoint);
202
203        let mut cfg = TencentCosConfig::default();
204        if !self.config.disable_config_load {
205            cfg = cfg.from_env();
206        }
207
208        if let Some(v) = self.config.secret_id {
209            cfg.secret_id = Some(v);
210        }
211        if let Some(v) = self.config.secret_key {
212            cfg.secret_key = Some(v);
213        }
214
215        let cred_loader = TencentCosCredentialLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);
216
217        let signer = TencentCosSigner::new();
218
219        Ok(CosBackend {
220            core: Arc::new(CosCore {
221                info: {
222                    let am = AccessorInfo::default();
223                    am.set_scheme(Scheme::Cos)
224                        .set_root(&root)
225                        .set_name(&bucket)
226                        .set_native_capability(Capability {
227                            stat: true,
228                            stat_with_if_match: true,
229                            stat_with_if_none_match: true,
230                            stat_has_cache_control: true,
231                            stat_has_content_length: true,
232                            stat_has_content_type: true,
233                            stat_has_content_encoding: true,
234                            stat_has_content_range: true,
235                            stat_with_version: self.config.enable_versioning,
236                            stat_has_etag: true,
237                            stat_has_content_md5: true,
238                            stat_has_last_modified: true,
239                            stat_has_content_disposition: true,
240                            stat_has_version: true,
241                            stat_has_user_metadata: true,
242
243                            read: true,
244
245                            read_with_if_match: true,
246                            read_with_if_none_match: true,
247                            read_with_version: self.config.enable_versioning,
248
249                            write: true,
250                            write_can_empty: true,
251                            write_can_append: true,
252                            write_can_multi: true,
253                            write_with_content_type: true,
254                            write_with_cache_control: true,
255                            write_with_content_disposition: true,
256                            // Cos doesn't support forbid overwrite while version has been enabled.
257                            write_with_if_not_exists: !self.config.enable_versioning,
258                            // The min multipart size of COS is 1 MiB.
259                            //
260                            // ref: <https://www.tencentcloud.com/document/product/436/14112>
261                            write_multi_min_size: Some(1024 * 1024),
262                            // The max multipart size of COS is 5 GiB.
263                            //
264                            // ref: <https://www.tencentcloud.com/document/product/436/14112>
265                            write_multi_max_size: if cfg!(target_pointer_width = "64") {
266                                Some(5 * 1024 * 1024 * 1024)
267                            } else {
268                                Some(usize::MAX)
269                            },
270                            write_with_user_metadata: true,
271
272                            delete: true,
273                            delete_with_version: self.config.enable_versioning,
274                            copy: true,
275
276                            list: true,
277                            list_with_recursive: true,
278                            list_with_versions: self.config.enable_versioning,
279                            list_with_deleted: self.config.enable_versioning,
280                            list_has_content_length: true,
281
282                            presign: true,
283                            presign_stat: true,
284                            presign_read: true,
285                            presign_write: true,
286
287                            shared: true,
288
289                            ..Default::default()
290                        });
291
292                    // allow deprecated api here for compatibility
293                    #[allow(deprecated)]
294                    if let Some(client) = self.http_client {
295                        am.update_http_client(|_| client);
296                    }
297
298                    am.into()
299                },
300                bucket: bucket.clone(),
301                root,
302                endpoint: format!("{}://{}.{}", &scheme, &bucket, &endpoint),
303                signer,
304                loader: cred_loader,
305            }),
306        })
307    }
308}
309
310/// Backend for Tencent-Cloud COS services.
311#[derive(Debug, Clone)]
312pub struct CosBackend {
313    core: Arc<CosCore>,
314}
315
316impl Access for CosBackend {
317    type Reader = HttpBody;
318    type Writer = CosWriters;
319    type Lister = CosListers;
320    type Deleter = oio::OneShotDeleter<CosDeleter>;
321    type BlockingReader = ();
322    type BlockingWriter = ();
323    type BlockingLister = ();
324    type BlockingDeleter = ();
325
326    fn info(&self) -> Arc<AccessorInfo> {
327        self.core.info.clone()
328    }
329
330    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
331        let resp = self.core.cos_head_object(path, &args).await?;
332
333        let status = resp.status();
334
335        match status {
336            StatusCode::OK => {
337                let headers = resp.headers();
338                let mut meta = parse_into_metadata(path, headers)?;
339
340                let user_meta = parse_prefixed_headers(headers, "x-cos-meta-");
341                if !user_meta.is_empty() {
342                    meta.with_user_metadata(user_meta);
343                }
344
345                if let Some(v) = parse_header_to_str(headers, constants::X_COS_VERSION_ID)? {
346                    if v != "null" {
347                        meta.set_version(v);
348                    }
349                }
350
351                Ok(RpStat::new(meta))
352            }
353            _ => Err(parse_error(resp)),
354        }
355    }
356
357    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
358        let resp = self.core.cos_get_object(path, args.range(), &args).await?;
359
360        let status = resp.status();
361
362        match status {
363            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
364                Ok((RpRead::default(), resp.into_body()))
365            }
366            _ => {
367                let (part, mut body) = resp.into_parts();
368                let buf = body.to_buffer().await?;
369                Err(parse_error(Response::from_parts(part, buf)))
370            }
371        }
372    }
373
374    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
375        let writer = CosWriter::new(self.core.clone(), path, args.clone());
376
377        let w = if args.append() {
378            CosWriters::Two(oio::AppendWriter::new(writer))
379        } else {
380            CosWriters::One(oio::MultipartWriter::new(
381                self.core.info.clone(),
382                writer,
383                args.concurrent(),
384            ))
385        };
386
387        Ok((RpWrite::default(), w))
388    }
389
390    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
391        Ok((
392            RpDelete::default(),
393            oio::OneShotDeleter::new(CosDeleter::new(self.core.clone())),
394        ))
395    }
396
397    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
398        let l = if args.versions() || args.deleted() {
399            TwoWays::Two(PageLister::new(CosObjectVersionsLister::new(
400                self.core.clone(),
401                path,
402                args,
403            )))
404        } else {
405            TwoWays::One(PageLister::new(CosLister::new(
406                self.core.clone(),
407                path,
408                args.recursive(),
409                args.limit(),
410            )))
411        };
412
413        Ok((RpList::default(), l))
414    }
415
416    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
417        let resp = self.core.cos_copy_object(from, to).await?;
418
419        let status = resp.status();
420
421        match status {
422            StatusCode::OK => Ok(RpCopy::default()),
423            _ => Err(parse_error(resp)),
424        }
425    }
426
427    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
428        let req = match args.operation() {
429            PresignOperation::Stat(v) => self.core.cos_head_object_request(path, v),
430            PresignOperation::Read(v) => {
431                self.core
432                    .cos_get_object_request(path, BytesRange::default(), v)
433            }
434            PresignOperation::Write(v) => {
435                self.core
436                    .cos_put_object_request(path, None, v, Buffer::new())
437            }
438            PresignOperation::Delete(_) => Err(Error::new(
439                ErrorKind::Unsupported,
440                "operation is not supported",
441            )),
442        };
443        let mut req = req?;
444        self.core.sign_query(&mut req, args.expire()).await?;
445
446        // We don't need this request anymore, consume it directly.
447        let (parts, _) = req.into_parts();
448
449        Ok(RpPresign::new(PresignedRequest::new(
450            parts.method,
451            parts.uri,
452            parts.headers,
453        )))
454    }
455}