opendal/services/cos/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::fmt::Write;
21use std::sync::Arc;
22use std::time::Duration;
23
24use bytes::Bytes;
25use http::header::CACHE_CONTROL;
26use http::header::CONTENT_DISPOSITION;
27use http::header::CONTENT_LENGTH;
28use http::header::CONTENT_TYPE;
29use http::header::IF_MATCH;
30use http::header::IF_NONE_MATCH;
31use http::Request;
32use http::Response;
33use reqsign::TencentCosCredential;
34use reqsign::TencentCosCredentialLoader;
35use reqsign::TencentCosSigner;
36use serde::Deserialize;
37use serde::Serialize;
38
39use crate::raw::*;
40use crate::*;
41
42pub mod constants {
43    pub const COS_QUERY_VERSION_ID: &str = "versionId";
44
45    pub const X_COS_VERSION_ID: &str = "x-cos-version-id";
46}
47
48pub struct CosCore {
49    pub info: Arc<AccessorInfo>,
50    pub bucket: String,
51    pub root: String,
52    pub endpoint: String,
53
54    pub signer: TencentCosSigner,
55    pub loader: TencentCosCredentialLoader,
56}
57
58impl Debug for CosCore {
59    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
60        f.debug_struct("Backend")
61            .field("root", &self.root)
62            .field("bucket", &self.bucket)
63            .field("endpoint", &self.endpoint)
64            .finish_non_exhaustive()
65    }
66}
67
68impl CosCore {
69    async fn load_credential(&self) -> Result<Option<TencentCosCredential>> {
70        let cred = self
71            .loader
72            .load()
73            .await
74            .map_err(new_request_credential_error)?;
75
76        if let Some(cred) = cred {
77            return Ok(Some(cred));
78        }
79
80        Err(Error::new(
81            ErrorKind::PermissionDenied,
82            "no valid credential found and anonymous access is not allowed",
83        ))
84    }
85
86    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
87        let cred = if let Some(cred) = self.load_credential().await? {
88            cred
89        } else {
90            return Ok(());
91        };
92
93        self.signer.sign(req, &cred).map_err(new_request_sign_error)
94    }
95
96    pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
97        let cred = if let Some(cred) = self.load_credential().await? {
98            cred
99        } else {
100            return Ok(());
101        };
102
103        self.signer
104            .sign_query(req, duration, &cred)
105            .map_err(new_request_sign_error)
106    }
107
108    #[inline]
109    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
110        self.info.http_client().send(req).await
111    }
112}
113
114impl CosCore {
115    pub async fn cos_get_object(
116        &self,
117        path: &str,
118        range: BytesRange,
119        args: &OpRead,
120    ) -> Result<Response<HttpBody>> {
121        let mut req = self.cos_get_object_request(path, range, args)?;
122
123        self.sign(&mut req).await?;
124
125        self.info.http_client().fetch(req).await
126    }
127
128    pub fn cos_get_object_request(
129        &self,
130        path: &str,
131        range: BytesRange,
132        args: &OpRead,
133    ) -> Result<Request<Buffer>> {
134        let p = build_abs_path(&self.root, path);
135
136        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
137
138        let mut query_args = Vec::new();
139        if let Some(version) = args.version() {
140            query_args.push(format!(
141                "{}={}",
142                constants::COS_QUERY_VERSION_ID,
143                percent_decode_path(version)
144            ))
145        }
146        if !query_args.is_empty() {
147            url.push_str(&format!("?{}", query_args.join("&")));
148        }
149
150        let mut req = Request::get(&url);
151
152        if let Some(if_match) = args.if_match() {
153            req = req.header(IF_MATCH, if_match);
154        }
155
156        if !range.is_full() {
157            req = req.header(http::header::RANGE, range.to_header())
158        }
159
160        if let Some(if_none_match) = args.if_none_match() {
161            req = req.header(IF_NONE_MATCH, if_none_match);
162        }
163
164        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
165
166        Ok(req)
167    }
168
169    pub fn cos_put_object_request(
170        &self,
171        path: &str,
172        size: Option<u64>,
173        args: &OpWrite,
174        body: Buffer,
175    ) -> Result<Request<Buffer>> {
176        let p = build_abs_path(&self.root, path);
177
178        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
179
180        let mut req = Request::put(&url);
181
182        if let Some(size) = size {
183            req = req.header(CONTENT_LENGTH, size)
184        }
185        if let Some(cache_control) = args.cache_control() {
186            req = req.header(CACHE_CONTROL, cache_control)
187        }
188        if let Some(pos) = args.content_disposition() {
189            req = req.header(CONTENT_DISPOSITION, pos)
190        }
191        if let Some(mime) = args.content_type() {
192            req = req.header(CONTENT_TYPE, mime)
193        }
194
195        // For a bucket which has never enabled versioning, you may use it to
196        // specify whether to prohibit overwriting the object with the same name
197        // when uploading the object:
198        //
199        // When the x-cos-forbid-overwrite is specified as true, overwriting the object
200        // with the same name will be prohibited.
201        //
202        // ref: https://www.tencentcloud.com/document/product/436/7749
203        if args.if_not_exists() {
204            req = req.header("x-cos-forbid-overwrite", "true")
205        }
206
207        // Set user metadata headers.
208        if let Some(user_metadata) = args.user_metadata() {
209            for (key, value) in user_metadata {
210                req = req.header(format!("x-cos-meta-{key}"), value)
211            }
212        }
213
214        let req = req.body(body).map_err(new_request_build_error)?;
215
216        Ok(req)
217    }
218
219    pub async fn cos_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
220        let mut req = self.cos_head_object_request(path, args)?;
221
222        self.sign(&mut req).await?;
223
224        self.send(req).await
225    }
226
227    pub fn cos_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
228        let p = build_abs_path(&self.root, path);
229
230        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
231
232        let mut query_args = Vec::new();
233        if let Some(version) = args.version() {
234            query_args.push(format!(
235                "{}={}",
236                constants::COS_QUERY_VERSION_ID,
237                percent_decode_path(version)
238            ))
239        }
240        if !query_args.is_empty() {
241            url.push_str(&format!("?{}", query_args.join("&")));
242        }
243
244        let mut req = Request::head(&url);
245
246        if let Some(if_match) = args.if_match() {
247            req = req.header(IF_MATCH, if_match);
248        }
249
250        if let Some(if_none_match) = args.if_none_match() {
251            req = req.header(IF_NONE_MATCH, if_none_match);
252        }
253
254        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
255
256        Ok(req)
257    }
258
259    pub async fn cos_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
260        let p = build_abs_path(&self.root, path);
261
262        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
263
264        let mut query_args = Vec::new();
265        if let Some(version) = args.version() {
266            query_args.push(format!(
267                "{}={}",
268                constants::COS_QUERY_VERSION_ID,
269                percent_decode_path(version)
270            ))
271        }
272        if !query_args.is_empty() {
273            url.push_str(&format!("?{}", query_args.join("&")));
274        }
275
276        let req = Request::delete(&url);
277
278        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
279
280        self.sign(&mut req).await?;
281
282        self.send(req).await
283    }
284
285    pub fn cos_append_object_request(
286        &self,
287        path: &str,
288        position: u64,
289        size: u64,
290        args: &OpWrite,
291        body: Buffer,
292    ) -> Result<Request<Buffer>> {
293        let p = build_abs_path(&self.root, path);
294        let url = format!(
295            "{}/{}?append&position={}",
296            self.endpoint,
297            percent_encode_path(&p),
298            position
299        );
300
301        let mut req = Request::post(&url);
302
303        req = req.header(CONTENT_LENGTH, size);
304
305        if let Some(mime) = args.content_type() {
306            req = req.header(CONTENT_TYPE, mime);
307        }
308
309        if let Some(pos) = args.content_disposition() {
310            req = req.header(CONTENT_DISPOSITION, pos);
311        }
312
313        if let Some(cache_control) = args.cache_control() {
314            req = req.header(CACHE_CONTROL, cache_control)
315        }
316
317        let req = req.body(body).map_err(new_request_build_error)?;
318        Ok(req)
319    }
320
321    pub async fn cos_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
322        let source = build_abs_path(&self.root, from);
323        let target = build_abs_path(&self.root, to);
324
325        let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
326        let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
327
328        let mut req = Request::put(&url)
329            .header("x-cos-copy-source", &source)
330            .body(Buffer::new())
331            .map_err(new_request_build_error)?;
332
333        self.sign(&mut req).await?;
334
335        self.send(req).await
336    }
337
338    pub async fn cos_list_objects(
339        &self,
340        path: &str,
341        next_marker: &str,
342        delimiter: &str,
343        limit: Option<usize>,
344    ) -> Result<Response<Buffer>> {
345        let p = build_abs_path(&self.root, path);
346
347        let mut queries = vec![];
348        if !p.is_empty() {
349            queries.push(format!("prefix={}", percent_encode_path(&p)));
350        }
351        if !delimiter.is_empty() {
352            queries.push(format!("delimiter={delimiter}"));
353        }
354        if let Some(limit) = limit {
355            queries.push(format!("max-keys={limit}"));
356        }
357        if !next_marker.is_empty() {
358            queries.push(format!("marker={next_marker}"));
359        }
360
361        let url = if queries.is_empty() {
362            self.endpoint.to_string()
363        } else {
364            format!("{}?{}", self.endpoint, queries.join("&"))
365        };
366
367        let mut req = Request::get(&url)
368            .body(Buffer::new())
369            .map_err(new_request_build_error)?;
370
371        self.sign(&mut req).await?;
372
373        self.send(req).await
374    }
375
376    pub async fn cos_initiate_multipart_upload(
377        &self,
378        path: &str,
379        args: &OpWrite,
380    ) -> Result<Response<Buffer>> {
381        let p = build_abs_path(&self.root, path);
382
383        let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
384
385        let mut req = Request::post(&url);
386
387        if let Some(mime) = args.content_type() {
388            req = req.header(CONTENT_TYPE, mime)
389        }
390
391        if let Some(content_disposition) = args.content_disposition() {
392            req = req.header(CONTENT_DISPOSITION, content_disposition)
393        }
394
395        if let Some(cache_control) = args.cache_control() {
396            req = req.header(CACHE_CONTROL, cache_control)
397        }
398
399        // Set user metadata headers.
400        if let Some(user_metadata) = args.user_metadata() {
401            for (key, value) in user_metadata {
402                req = req.header(format!("x-cos-meta-{key}"), value)
403            }
404        }
405
406        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
407
408        self.sign(&mut req).await?;
409
410        self.send(req).await
411    }
412
413    pub async fn cos_upload_part_request(
414        &self,
415        path: &str,
416        upload_id: &str,
417        part_number: usize,
418        size: u64,
419        body: Buffer,
420    ) -> Result<Response<Buffer>> {
421        let p = build_abs_path(&self.root, path);
422
423        let url = format!(
424            "{}/{}?partNumber={}&uploadId={}",
425            self.endpoint,
426            percent_encode_path(&p),
427            part_number,
428            percent_encode_path(upload_id)
429        );
430
431        let mut req = Request::put(&url);
432        req = req.header(CONTENT_LENGTH, size);
433        // Set body
434        let mut req = req.body(body).map_err(new_request_build_error)?;
435
436        self.sign(&mut req).await?;
437
438        self.send(req).await
439    }
440
441    pub async fn cos_complete_multipart_upload(
442        &self,
443        path: &str,
444        upload_id: &str,
445        parts: Vec<CompleteMultipartUploadRequestPart>,
446    ) -> Result<Response<Buffer>> {
447        let p = build_abs_path(&self.root, path);
448
449        let url = format!(
450            "{}/{}?uploadId={}",
451            self.endpoint,
452            percent_encode_path(&p),
453            percent_encode_path(upload_id)
454        );
455
456        let req = Request::post(&url);
457
458        let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts })
459            .map_err(new_xml_deserialize_error)?;
460        // Make sure content length has been set to avoid post with chunked encoding.
461        let req = req.header(CONTENT_LENGTH, content.len());
462        // Set content-type to `application/xml` to avoid mixed with form post.
463        let req = req.header(CONTENT_TYPE, "application/xml");
464
465        let mut req = req
466            .body(Buffer::from(Bytes::from(content)))
467            .map_err(new_request_build_error)?;
468
469        self.sign(&mut req).await?;
470
471        self.send(req).await
472    }
473
474    /// Abort an on-going multipart upload.
475    pub async fn cos_abort_multipart_upload(
476        &self,
477        path: &str,
478        upload_id: &str,
479    ) -> Result<Response<Buffer>> {
480        let p = build_abs_path(&self.root, path);
481
482        let url = format!(
483            "{}/{}?uploadId={}",
484            self.endpoint,
485            percent_encode_path(&p),
486            percent_encode_path(upload_id)
487        );
488
489        let mut req = Request::delete(&url)
490            .body(Buffer::new())
491            .map_err(new_request_build_error)?;
492        self.sign(&mut req).await?;
493        self.send(req).await
494    }
495
496    pub async fn cos_list_object_versions(
497        &self,
498        prefix: &str,
499        delimiter: &str,
500        limit: Option<usize>,
501        key_marker: &str,
502        version_id_marker: &str,
503    ) -> Result<Response<Buffer>> {
504        let p = build_abs_path(&self.root, prefix);
505
506        let mut url = format!("{}?versions", self.endpoint);
507        if !p.is_empty() {
508            write!(url, "&prefix={}", percent_encode_path(p.as_str()))
509                .expect("write into string must succeed");
510        }
511        if !delimiter.is_empty() {
512            write!(url, "&delimiter={}", delimiter).expect("write into string must succeed");
513        }
514
515        if let Some(limit) = limit {
516            write!(url, "&max-keys={}", limit).expect("write into string must succeed");
517        }
518        if !key_marker.is_empty() {
519            write!(url, "&key-marker={}", percent_encode_path(key_marker))
520                .expect("write into string must succeed");
521        }
522        if !version_id_marker.is_empty() {
523            write!(
524                url,
525                "&version-id-marker={}",
526                percent_encode_path(version_id_marker)
527            )
528            .expect("write into string must succeed");
529        }
530
531        let mut req = Request::get(&url)
532            .body(Buffer::new())
533            .map_err(new_request_build_error)?;
534
535        self.sign(&mut req).await?;
536
537        self.send(req).await
538    }
539}
540
541/// Result of CreateMultipartUpload
542#[derive(Default, Debug, Deserialize)]
543#[serde(default, rename_all = "PascalCase")]
544pub struct InitiateMultipartUploadResult {
545    pub upload_id: String,
546}
547
548/// Request of CompleteMultipartUploadRequest
549#[derive(Default, Debug, Serialize)]
550#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
551pub struct CompleteMultipartUploadRequest {
552    pub part: Vec<CompleteMultipartUploadRequestPart>,
553}
554
555#[derive(Clone, Default, Debug, Serialize)]
556#[serde(default, rename_all = "PascalCase")]
557pub struct CompleteMultipartUploadRequestPart {
558    #[serde(rename = "PartNumber")]
559    pub part_number: usize,
560    /// # TODO
561    ///
562    /// quick-xml will do escape on `"` which leads to our serialized output is
563    /// not the same as aws s3's example.
564    ///
565    /// Ideally, we could use `serialize_with` to address this (buf failed)
566    ///
567    /// ```ignore
568    /// #[derive(Default, Debug, Serialize)]
569    /// #[serde(default, rename_all = "PascalCase")]
570    /// struct CompleteMultipartUploadRequestPart {
571    ///     #[serde(rename = "PartNumber")]
572    ///     part_number: usize,
573    ///     #[serde(rename = "ETag", serialize_with = "partial_escape")]
574    ///     etag: String,
575    /// }
576    ///
577    /// fn partial_escape<S>(s: &str, ser: S) -> Result<S::Ok, S::Error>
578    /// where
579    ///     S: serde::Serializer,
580    /// {
581    ///     ser.serialize_str(&String::from_utf8_lossy(
582    ///         &quick_xml::escape::partial_escape(s.as_bytes()),
583    ///     ))
584    /// }
585    /// ```
586    ///
587    /// ref: <https://github.com/tafia/quick-xml/issues/362>
588    #[serde(rename = "ETag")]
589    pub etag: String,
590}
591
592/// Output of `CompleteMultipartUpload` operation
593#[derive(Debug, Default, Deserialize)]
594#[serde[default, rename_all = "PascalCase"]]
595pub struct CompleteMultipartUploadResult {
596    pub location: String,
597    pub bucket: String,
598    pub key: String,
599    #[serde(rename = "ETag")]
600    pub etag: String,
601}
602
603#[derive(Default, Debug, Deserialize)]
604#[serde(default, rename_all = "PascalCase")]
605pub struct ListObjectsOutput {
606    pub name: String,
607    pub prefix: String,
608    pub contents: Vec<ListObjectsOutputContent>,
609    pub common_prefixes: Vec<CommonPrefix>,
610    pub marker: String,
611    pub next_marker: Option<String>,
612}
613
614#[derive(Default, Debug, Deserialize)]
615#[serde(default, rename_all = "PascalCase")]
616pub struct CommonPrefix {
617    pub prefix: String,
618}
619
620#[derive(Default, Debug, Deserialize)]
621#[serde(default, rename_all = "PascalCase")]
622pub struct ListObjectsOutputContent {
623    pub key: String,
624    pub size: u64,
625}
626
627#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
628#[serde(rename_all = "PascalCase")]
629pub struct OutputCommonPrefix {
630    pub prefix: String,
631}
632
633/// Output of ListObjectVersions
634#[derive(Default, Debug, Deserialize)]
635#[serde(default, rename_all = "PascalCase")]
636pub struct ListObjectVersionsOutput {
637    pub is_truncated: Option<bool>,
638    pub next_key_marker: Option<String>,
639    pub next_version_id_marker: Option<String>,
640    pub common_prefixes: Vec<OutputCommonPrefix>,
641    pub version: Vec<ListObjectVersionsOutputVersion>,
642    pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
643}
644
645#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
646#[serde(rename_all = "PascalCase")]
647pub struct ListObjectVersionsOutputVersion {
648    pub key: String,
649    pub version_id: String,
650    pub is_latest: bool,
651    pub size: u64,
652    pub last_modified: String,
653    #[serde(rename = "ETag")]
654    pub etag: Option<String>,
655}
656
657#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
658#[serde(rename_all = "PascalCase")]
659pub struct ListObjectVersionsOutputDeleteMarker {
660    pub key: String,
661    pub version_id: String,
662    pub is_latest: bool,
663    pub last_modified: String,
664}
665
666#[cfg(test)]
667mod tests {
668    use bytes::Buf;
669
670    use super::*;
671
672    #[test]
673    fn test_parse_xml() {
674        let bs = bytes::Bytes::from(
675            r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
676<ListBucketResult>
677    <Name>examplebucket</Name>
678    <Prefix>obj</Prefix>
679    <Marker>obj002</Marker>
680    <NextMarker>obj004</NextMarker>
681    <MaxKeys>1000</MaxKeys>
682    <IsTruncated>false</IsTruncated>
683    <Contents>
684        <Key>obj002</Key>
685        <LastModified>2015-07-01T02:11:19.775Z</LastModified>
686        <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
687        <Size>9</Size>
688        <Owner>
689            <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
690        </Owner>
691        <StorageClass>STANDARD</StorageClass>
692    </Contents>
693    <Contents>
694        <Key>obj003</Key>
695        <LastModified>2015-07-01T02:11:19.775Z</LastModified>
696        <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
697        <Size>10</Size>
698        <Owner>
699            <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
700        </Owner>
701        <StorageClass>STANDARD</StorageClass>
702    </Contents>
703    <CommonPrefixes>
704        <Prefix>hello</Prefix>
705    </CommonPrefixes>
706    <CommonPrefixes>
707        <Prefix>world</Prefix>
708    </CommonPrefixes>
709</ListBucketResult>"#,
710        );
711        let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
712
713        assert_eq!(out.name, "examplebucket".to_string());
714        assert_eq!(out.prefix, "obj".to_string());
715        assert_eq!(out.marker, "obj002".to_string());
716        assert_eq!(out.next_marker, Some("obj004".to_string()),);
717        assert_eq!(
718            out.contents
719                .iter()
720                .map(|v| v.key.clone())
721                .collect::<Vec<String>>(),
722            ["obj002", "obj003"],
723        );
724        assert_eq!(
725            out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
726            [9, 10],
727        );
728        assert_eq!(
729            out.common_prefixes
730                .iter()
731                .map(|v| v.prefix.clone())
732                .collect::<Vec<String>>(),
733            ["hello", "world"],
734        )
735    }
736}