opendal_core/services/cos/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Bytes;
22use http::Request;
23use http::Response;
24use http::header::CACHE_CONTROL;
25use http::header::CONTENT_DISPOSITION;
26use http::header::CONTENT_LENGTH;
27use http::header::CONTENT_TYPE;
28use http::header::IF_MATCH;
29use http::header::IF_MODIFIED_SINCE;
30use http::header::IF_NONE_MATCH;
31use http::header::IF_UNMODIFIED_SINCE;
32use reqsign::TencentCosCredential;
33use reqsign::TencentCosCredentialLoader;
34use reqsign::TencentCosSigner;
35use serde::Deserialize;
36use serde::Serialize;
37
38use crate::raw::*;
39use crate::*;
40
41pub mod constants {
42    pub const COS_QUERY_VERSION_ID: &str = "versionId";
43
44    pub const X_COS_VERSION_ID: &str = "x-cos-version-id";
45}
46
47pub struct CosCore {
48    pub info: Arc<AccessorInfo>,
49    pub bucket: String,
50    pub root: String,
51    pub endpoint: String,
52
53    pub signer: TencentCosSigner,
54    pub loader: TencentCosCredentialLoader,
55}
56
57impl Debug for CosCore {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("CosCore")
60            .field("root", &self.root)
61            .field("bucket", &self.bucket)
62            .field("endpoint", &self.endpoint)
63            .finish_non_exhaustive()
64    }
65}
66
67impl CosCore {
68    async fn load_credential(&self) -> Result<Option<TencentCosCredential>> {
69        let cred = self
70            .loader
71            .load()
72            .await
73            .map_err(new_request_credential_error)?;
74
75        if let Some(cred) = cred {
76            return Ok(Some(cred));
77        }
78
79        Err(Error::new(
80            ErrorKind::PermissionDenied,
81            "no valid credential found and anonymous access is not allowed",
82        ))
83    }
84
85    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
86        let cred = if let Some(cred) = self.load_credential().await? {
87            cred
88        } else {
89            return Ok(());
90        };
91
92        self.signer.sign(req, &cred).map_err(new_request_sign_error)
93    }
94
95    pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
96        let cred = if let Some(cred) = self.load_credential().await? {
97            cred
98        } else {
99            return Ok(());
100        };
101
102        self.signer
103            .sign_query(req, duration, &cred)
104            .map_err(new_request_sign_error)
105    }
106
107    #[inline]
108    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
109        self.info.http_client().send(req).await
110    }
111}
112
113impl CosCore {
114    pub async fn cos_get_object(
115        &self,
116        path: &str,
117        range: BytesRange,
118        args: &OpRead,
119    ) -> Result<Response<HttpBody>> {
120        let mut req = self.cos_get_object_request(path, range, args)?;
121
122        self.sign(&mut req).await?;
123
124        self.info.http_client().fetch(req).await
125    }
126
127    pub fn cos_get_object_request(
128        &self,
129        path: &str,
130        range: BytesRange,
131        args: &OpRead,
132    ) -> Result<Request<Buffer>> {
133        let p = build_abs_path(&self.root, path);
134
135        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
136
137        let mut query_args = Vec::new();
138        if let Some(version) = args.version() {
139            query_args.push(format!(
140                "{}={}",
141                constants::COS_QUERY_VERSION_ID,
142                percent_decode_path(version)
143            ))
144        }
145        if !query_args.is_empty() {
146            url.push_str(&format!("?{}", query_args.join("&")));
147        }
148
149        let mut req = Request::get(&url);
150
151        if let Some(if_match) = args.if_match() {
152            req = req.header(IF_MATCH, if_match);
153        }
154
155        if !range.is_full() {
156            req = req.header(http::header::RANGE, range.to_header())
157        }
158
159        if let Some(if_none_match) = args.if_none_match() {
160            req = req.header(IF_NONE_MATCH, if_none_match);
161        }
162
163        if let Some(if_modified_since) = args.if_modified_since() {
164            req = req.header(IF_MODIFIED_SINCE, if_modified_since.format_http_date());
165        }
166
167        if let Some(if_unmodified_since) = args.if_unmodified_since() {
168            req = req.header(IF_UNMODIFIED_SINCE, if_unmodified_since.format_http_date());
169        }
170
171        let req = req.extension(Operation::Read);
172
173        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
174
175        Ok(req)
176    }
177
178    pub fn cos_put_object_request(
179        &self,
180        path: &str,
181        size: Option<u64>,
182        args: &OpWrite,
183        body: Buffer,
184    ) -> Result<Request<Buffer>> {
185        let p = build_abs_path(&self.root, path);
186
187        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
188
189        let mut req = Request::put(&url);
190
191        if let Some(size) = size {
192            req = req.header(CONTENT_LENGTH, size)
193        }
194        if let Some(cache_control) = args.cache_control() {
195            req = req.header(CACHE_CONTROL, cache_control)
196        }
197        if let Some(pos) = args.content_disposition() {
198            req = req.header(CONTENT_DISPOSITION, pos)
199        }
200        if let Some(mime) = args.content_type() {
201            req = req.header(CONTENT_TYPE, mime)
202        }
203
204        // For a bucket which has never enabled versioning, you may use it to
205        // specify whether to prohibit overwriting the object with the same name
206        // when uploading the object:
207        //
208        // When the x-cos-forbid-overwrite is specified as true, overwriting the object
209        // with the same name will be prohibited.
210        //
211        // ref: https://www.tencentcloud.com/document/product/436/7749
212        if args.if_not_exists() {
213            req = req.header("x-cos-forbid-overwrite", "true")
214        }
215
216        // Set user metadata headers.
217        if let Some(user_metadata) = args.user_metadata() {
218            for (key, value) in user_metadata {
219                req = req.header(format!("x-cos-meta-{key}"), value)
220            }
221        }
222
223        let req = req.extension(Operation::Write);
224
225        let req = req.body(body).map_err(new_request_build_error)?;
226
227        Ok(req)
228    }
229
230    pub async fn cos_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
231        let mut req = self.cos_head_object_request(path, args)?;
232
233        self.sign(&mut req).await?;
234
235        self.send(req).await
236    }
237
238    pub fn cos_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
239        let p = build_abs_path(&self.root, path);
240
241        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
242
243        let mut query_args = Vec::new();
244        if let Some(version) = args.version() {
245            query_args.push(format!(
246                "{}={}",
247                constants::COS_QUERY_VERSION_ID,
248                percent_decode_path(version)
249            ))
250        }
251        if !query_args.is_empty() {
252            url.push_str(&format!("?{}", query_args.join("&")));
253        }
254
255        let mut req = Request::head(&url);
256
257        if let Some(if_match) = args.if_match() {
258            req = req.header(IF_MATCH, if_match);
259        }
260
261        if let Some(if_none_match) = args.if_none_match() {
262            req = req.header(IF_NONE_MATCH, if_none_match);
263        }
264
265        let req = req.extension(Operation::Stat);
266
267        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
268
269        Ok(req)
270    }
271
272    pub async fn cos_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
273        let p = build_abs_path(&self.root, path);
274
275        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
276
277        let mut query_args = Vec::new();
278        if let Some(version) = args.version() {
279            query_args.push(format!(
280                "{}={}",
281                constants::COS_QUERY_VERSION_ID,
282                percent_decode_path(version)
283            ))
284        }
285        if !query_args.is_empty() {
286            url.push_str(&format!("?{}", query_args.join("&")));
287        }
288
289        let req = Request::delete(&url);
290
291        let req = req.extension(Operation::Delete);
292
293        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
294
295        self.sign(&mut req).await?;
296
297        self.send(req).await
298    }
299
300    pub fn cos_append_object_request(
301        &self,
302        path: &str,
303        position: u64,
304        size: u64,
305        args: &OpWrite,
306        body: Buffer,
307    ) -> Result<Request<Buffer>> {
308        let p = build_abs_path(&self.root, path);
309        let url = format!(
310            "{}/{}?append&position={}",
311            self.endpoint,
312            percent_encode_path(&p),
313            position
314        );
315
316        let mut req = Request::post(&url);
317
318        req = req.header(CONTENT_LENGTH, size);
319
320        if let Some(mime) = args.content_type() {
321            req = req.header(CONTENT_TYPE, mime);
322        }
323
324        if let Some(pos) = args.content_disposition() {
325            req = req.header(CONTENT_DISPOSITION, pos);
326        }
327
328        if let Some(cache_control) = args.cache_control() {
329            req = req.header(CACHE_CONTROL, cache_control)
330        }
331
332        let req = req.extension(Operation::Write);
333
334        let req = req.body(body).map_err(new_request_build_error)?;
335        Ok(req)
336    }
337
338    pub async fn cos_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
339        let source = build_abs_path(&self.root, from);
340        let target = build_abs_path(&self.root, to);
341
342        let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
343        let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
344
345        let mut req = Request::put(&url)
346            .extension(Operation::Copy)
347            .header("x-cos-copy-source", &source)
348            .body(Buffer::new())
349            .map_err(new_request_build_error)?;
350
351        self.sign(&mut req).await?;
352
353        self.send(req).await
354    }
355
356    pub async fn cos_list_objects(
357        &self,
358        path: &str,
359        next_marker: &str,
360        delimiter: &str,
361        limit: Option<usize>,
362    ) -> Result<Response<Buffer>> {
363        let p = build_abs_path(&self.root, path);
364
365        let mut url = QueryPairsWriter::new(&self.endpoint);
366
367        if !p.is_empty() {
368            url = url.push("prefix", &percent_encode_path(&p));
369        }
370        if !delimiter.is_empty() {
371            url = url.push("delimiter", delimiter);
372        }
373        if let Some(limit) = limit {
374            url = url.push("max-keys", &limit.to_string());
375        }
376        if !next_marker.is_empty() {
377            url = url.push("marker", next_marker);
378        }
379
380        let mut req = Request::get(url.finish())
381            .extension(Operation::List)
382            .body(Buffer::new())
383            .map_err(new_request_build_error)?;
384
385        self.sign(&mut req).await?;
386
387        self.send(req).await
388    }
389
390    pub async fn cos_initiate_multipart_upload(
391        &self,
392        path: &str,
393        args: &OpWrite,
394    ) -> Result<Response<Buffer>> {
395        let p = build_abs_path(&self.root, path);
396
397        let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
398
399        let mut req = Request::post(&url);
400
401        if let Some(mime) = args.content_type() {
402            req = req.header(CONTENT_TYPE, mime)
403        }
404
405        if let Some(content_disposition) = args.content_disposition() {
406            req = req.header(CONTENT_DISPOSITION, content_disposition)
407        }
408
409        if let Some(cache_control) = args.cache_control() {
410            req = req.header(CACHE_CONTROL, cache_control)
411        }
412
413        // Set user metadata headers.
414        if let Some(user_metadata) = args.user_metadata() {
415            for (key, value) in user_metadata {
416                req = req.header(format!("x-cos-meta-{key}"), value)
417            }
418        }
419
420        let req = req.extension(Operation::Write);
421
422        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
423
424        self.sign(&mut req).await?;
425
426        self.send(req).await
427    }
428
429    pub async fn cos_upload_part_request(
430        &self,
431        path: &str,
432        upload_id: &str,
433        part_number: usize,
434        size: u64,
435        body: Buffer,
436    ) -> Result<Response<Buffer>> {
437        let p = build_abs_path(&self.root, path);
438
439        let url = format!(
440            "{}/{}?partNumber={}&uploadId={}",
441            self.endpoint,
442            percent_encode_path(&p),
443            part_number,
444            percent_encode_path(upload_id)
445        );
446
447        let mut req = Request::put(&url);
448        req = req.header(CONTENT_LENGTH, size);
449
450        let req = req.extension(Operation::Write);
451
452        // Set body
453        let mut req = req.body(body).map_err(new_request_build_error)?;
454
455        self.sign(&mut req).await?;
456
457        self.send(req).await
458    }
459
460    pub async fn cos_complete_multipart_upload(
461        &self,
462        path: &str,
463        upload_id: &str,
464        parts: Vec<CompleteMultipartUploadRequestPart>,
465    ) -> Result<Response<Buffer>> {
466        let p = build_abs_path(&self.root, path);
467
468        let url = format!(
469            "{}/{}?uploadId={}",
470            self.endpoint,
471            percent_encode_path(&p),
472            percent_encode_path(upload_id)
473        );
474
475        let req = Request::post(&url);
476
477        let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts })
478            .map_err(new_xml_serialize_error)?;
479        // Make sure content length has been set to avoid post with chunked encoding.
480        let req = req.header(CONTENT_LENGTH, content.len());
481        // Set content-type to `application/xml` to avoid mixed with form post.
482        let req = req.header(CONTENT_TYPE, "application/xml");
483
484        let req = req.extension(Operation::Write);
485
486        let mut req = req
487            .body(Buffer::from(Bytes::from(content)))
488            .map_err(new_request_build_error)?;
489
490        self.sign(&mut req).await?;
491
492        self.send(req).await
493    }
494
495    /// Abort an on-going multipart upload.
496    pub async fn cos_abort_multipart_upload(
497        &self,
498        path: &str,
499        upload_id: &str,
500    ) -> Result<Response<Buffer>> {
501        let p = build_abs_path(&self.root, path);
502
503        let url = format!(
504            "{}/{}?uploadId={}",
505            self.endpoint,
506            percent_encode_path(&p),
507            percent_encode_path(upload_id)
508        );
509
510        let mut req = Request::delete(&url)
511            .extension(Operation::Delete)
512            .body(Buffer::new())
513            .map_err(new_request_build_error)?;
514        self.sign(&mut req).await?;
515        self.send(req).await
516    }
517
518    pub async fn cos_list_object_versions(
519        &self,
520        prefix: &str,
521        delimiter: &str,
522        limit: Option<usize>,
523        key_marker: &str,
524        version_id_marker: &str,
525    ) -> Result<Response<Buffer>> {
526        let p = build_abs_path(&self.root, prefix);
527
528        let mut url = QueryPairsWriter::new(&self.endpoint);
529        url = url.push("versions", "");
530        if !p.is_empty() {
531            url = url.push("prefix", &percent_encode_path(p.as_str()));
532        }
533        if !delimiter.is_empty() {
534            url = url.push("delimiter", delimiter);
535        }
536
537        if let Some(limit) = limit {
538            url = url.push("max-keys", &limit.to_string());
539        }
540        if !key_marker.is_empty() {
541            url = url.push("key-marker", &percent_encode_path(key_marker));
542        }
543        if !version_id_marker.is_empty() {
544            url = url.push("version-id-marker", &percent_encode_path(version_id_marker));
545        }
546
547        let mut req = Request::get(url.finish())
548            .extension(Operation::List)
549            .body(Buffer::new())
550            .map_err(new_request_build_error)?;
551
552        self.sign(&mut req).await?;
553
554        self.send(req).await
555    }
556}
557
558/// Result of CreateMultipartUpload
559#[derive(Default, Debug, Deserialize)]
560#[serde(default, rename_all = "PascalCase")]
561pub struct InitiateMultipartUploadResult {
562    pub upload_id: String,
563}
564
565/// Request of CompleteMultipartUploadRequest
566#[derive(Default, Debug, Serialize)]
567#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
568pub struct CompleteMultipartUploadRequest {
569    pub part: Vec<CompleteMultipartUploadRequestPart>,
570}
571
572#[derive(Clone, Default, Debug, Serialize)]
573#[serde(default, rename_all = "PascalCase")]
574pub struct CompleteMultipartUploadRequestPart {
575    #[serde(rename = "PartNumber")]
576    pub part_number: usize,
577    /// # TODO
578    ///
579    /// quick-xml will do escape on `"` which leads to our serialized output is
580    /// not the same as aws s3's example.
581    ///
582    /// Ideally, we could use `serialize_with` to address this (buf failed)
583    ///
584    /// ```ignore
585    /// #[derive(Default, Debug, Serialize)]
586    /// #[serde(default, rename_all = "PascalCase")]
587    /// struct CompleteMultipartUploadRequestPart {
588    ///     #[serde(rename = "PartNumber")]
589    ///     part_number: usize,
590    ///     #[serde(rename = "ETag", serialize_with = "partial_escape")]
591    ///     etag: String,
592    /// }
593    ///
594    /// fn partial_escape<S>(s: &str, ser: S) -> Result<S::Ok, S::Error>
595    /// where
596    ///     S: serde::Serializer,
597    /// {
598    ///     ser.serialize_str(&String::from_utf8_lossy(
599    ///         &quick_xml::escape::partial_escape(s.as_bytes()),
600    ///     ))
601    /// }
602    /// ```
603    ///
604    /// ref: <https://github.com/tafia/quick-xml/issues/362>
605    #[serde(rename = "ETag")]
606    pub etag: String,
607}
608
609/// Output of `CompleteMultipartUpload` operation
610#[derive(Debug, Default, Deserialize)]
611#[serde[default, rename_all = "PascalCase"]]
612pub struct CompleteMultipartUploadResult {
613    pub location: String,
614    pub bucket: String,
615    pub key: String,
616    #[serde(rename = "ETag")]
617    pub etag: String,
618}
619
620#[derive(Default, Debug, Deserialize)]
621#[serde(default, rename_all = "PascalCase")]
622pub struct ListObjectsOutput {
623    pub name: String,
624    pub prefix: String,
625    pub contents: Vec<ListObjectsOutputContent>,
626    pub common_prefixes: Vec<CommonPrefix>,
627    pub marker: String,
628    pub next_marker: Option<String>,
629}
630
631#[derive(Default, Debug, Deserialize)]
632#[serde(default, rename_all = "PascalCase")]
633pub struct CommonPrefix {
634    pub prefix: String,
635}
636
637#[derive(Default, Debug, Deserialize)]
638#[serde(default, rename_all = "PascalCase")]
639pub struct ListObjectsOutputContent {
640    pub key: String,
641    pub size: u64,
642}
643
644#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
645#[serde(rename_all = "PascalCase")]
646pub struct OutputCommonPrefix {
647    pub prefix: String,
648}
649
650/// Output of ListObjectVersions
651#[derive(Default, Debug, Deserialize)]
652#[serde(default, rename_all = "PascalCase")]
653pub struct ListObjectVersionsOutput {
654    pub is_truncated: Option<bool>,
655    pub next_key_marker: Option<String>,
656    pub next_version_id_marker: Option<String>,
657    pub common_prefixes: Vec<OutputCommonPrefix>,
658    pub version: Vec<ListObjectVersionsOutputVersion>,
659    pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
660}
661
662#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
663#[serde(rename_all = "PascalCase")]
664pub struct ListObjectVersionsOutputVersion {
665    pub key: String,
666    pub version_id: String,
667    pub is_latest: bool,
668    pub size: u64,
669    pub last_modified: String,
670    #[serde(rename = "ETag")]
671    pub etag: Option<String>,
672}
673
674#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
675#[serde(rename_all = "PascalCase")]
676pub struct ListObjectVersionsOutputDeleteMarker {
677    pub key: String,
678    pub version_id: String,
679    pub is_latest: bool,
680    pub last_modified: String,
681}
682
683#[cfg(test)]
684mod tests {
685    use bytes::Buf;
686
687    use super::*;
688
689    #[test]
690    fn test_parse_xml() {
691        let bs = bytes::Bytes::from(
692            r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
693<ListBucketResult>
694    <Name>examplebucket</Name>
695    <Prefix>obj</Prefix>
696    <Marker>obj002</Marker>
697    <NextMarker>obj004</NextMarker>
698    <MaxKeys>1000</MaxKeys>
699    <IsTruncated>false</IsTruncated>
700    <Contents>
701        <Key>obj002</Key>
702        <LastModified>2015-07-01T02:11:19.775Z</LastModified>
703        <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
704        <Size>9</Size>
705        <Owner>
706            <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
707        </Owner>
708        <StorageClass>STANDARD</StorageClass>
709    </Contents>
710    <Contents>
711        <Key>obj003</Key>
712        <LastModified>2015-07-01T02:11:19.775Z</LastModified>
713        <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
714        <Size>10</Size>
715        <Owner>
716            <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
717        </Owner>
718        <StorageClass>STANDARD</StorageClass>
719    </Contents>
720    <CommonPrefixes>
721        <Prefix>hello</Prefix>
722    </CommonPrefixes>
723    <CommonPrefixes>
724        <Prefix>world</Prefix>
725    </CommonPrefixes>
726</ListBucketResult>"#,
727        );
728        let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
729
730        assert_eq!(out.name, "examplebucket".to_string());
731        assert_eq!(out.prefix, "obj".to_string());
732        assert_eq!(out.marker, "obj002".to_string());
733        assert_eq!(out.next_marker, Some("obj004".to_string()),);
734        assert_eq!(
735            out.contents
736                .iter()
737                .map(|v| v.key.clone())
738                .collect::<Vec<String>>(),
739            ["obj002", "obj003"],
740        );
741        assert_eq!(
742            out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
743            [9, 10],
744        );
745        assert_eq!(
746            out.common_prefixes
747                .iter()
748                .map(|v| v.prefix.clone())
749                .collect::<Vec<String>>(),
750            ["hello", "world"],
751        )
752    }
753}