opendal/services/cos/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Bytes;
24use http::header::CACHE_CONTROL;
25use http::header::CONTENT_DISPOSITION;
26use http::header::CONTENT_LENGTH;
27use http::header::CONTENT_TYPE;
28use http::header::IF_MATCH;
29use http::header::IF_MODIFIED_SINCE;
30use http::header::IF_NONE_MATCH;
31use http::header::IF_UNMODIFIED_SINCE;
32use http::Request;
33use http::Response;
34use reqsign::TencentCosCredential;
35use reqsign::TencentCosCredentialLoader;
36use reqsign::TencentCosSigner;
37use serde::Deserialize;
38use serde::Serialize;
39
40use crate::raw::*;
41use crate::*;
42
43pub mod constants {
44    pub const COS_QUERY_VERSION_ID: &str = "versionId";
45
46    pub const X_COS_VERSION_ID: &str = "x-cos-version-id";
47}
48
49pub struct CosCore {
50    pub info: Arc<AccessorInfo>,
51    pub bucket: String,
52    pub root: String,
53    pub endpoint: String,
54
55    pub signer: TencentCosSigner,
56    pub loader: TencentCosCredentialLoader,
57}
58
59impl Debug for CosCore {
60    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
61        f.debug_struct("Backend")
62            .field("root", &self.root)
63            .field("bucket", &self.bucket)
64            .field("endpoint", &self.endpoint)
65            .finish_non_exhaustive()
66    }
67}
68
69impl CosCore {
70    async fn load_credential(&self) -> Result<Option<TencentCosCredential>> {
71        let cred = self
72            .loader
73            .load()
74            .await
75            .map_err(new_request_credential_error)?;
76
77        if let Some(cred) = cred {
78            return Ok(Some(cred));
79        }
80
81        Err(Error::new(
82            ErrorKind::PermissionDenied,
83            "no valid credential found and anonymous access is not allowed",
84        ))
85    }
86
87    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
88        let cred = if let Some(cred) = self.load_credential().await? {
89            cred
90        } else {
91            return Ok(());
92        };
93
94        self.signer.sign(req, &cred).map_err(new_request_sign_error)
95    }
96
97    pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
98        let cred = if let Some(cred) = self.load_credential().await? {
99            cred
100        } else {
101            return Ok(());
102        };
103
104        self.signer
105            .sign_query(req, duration, &cred)
106            .map_err(new_request_sign_error)
107    }
108
109    #[inline]
110    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
111        self.info.http_client().send(req).await
112    }
113}
114
115impl CosCore {
116    pub async fn cos_get_object(
117        &self,
118        path: &str,
119        range: BytesRange,
120        args: &OpRead,
121    ) -> Result<Response<HttpBody>> {
122        let mut req = self.cos_get_object_request(path, range, args)?;
123
124        self.sign(&mut req).await?;
125
126        self.info.http_client().fetch(req).await
127    }
128
129    pub fn cos_get_object_request(
130        &self,
131        path: &str,
132        range: BytesRange,
133        args: &OpRead,
134    ) -> Result<Request<Buffer>> {
135        let p = build_abs_path(&self.root, path);
136
137        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
138
139        let mut query_args = Vec::new();
140        if let Some(version) = args.version() {
141            query_args.push(format!(
142                "{}={}",
143                constants::COS_QUERY_VERSION_ID,
144                percent_decode_path(version)
145            ))
146        }
147        if !query_args.is_empty() {
148            url.push_str(&format!("?{}", query_args.join("&")));
149        }
150
151        let mut req = Request::get(&url);
152
153        if let Some(if_match) = args.if_match() {
154            req = req.header(IF_MATCH, if_match);
155        }
156
157        if !range.is_full() {
158            req = req.header(http::header::RANGE, range.to_header())
159        }
160
161        if let Some(if_none_match) = args.if_none_match() {
162            req = req.header(IF_NONE_MATCH, if_none_match);
163        }
164
165        if let Some(if_modified_since) = args.if_modified_since() {
166            req = req.header(
167                IF_MODIFIED_SINCE,
168                format_datetime_into_http_date(if_modified_since),
169            );
170        }
171
172        if let Some(if_unmodified_since) = args.if_unmodified_since() {
173            req = req.header(
174                IF_UNMODIFIED_SINCE,
175                format_datetime_into_http_date(if_unmodified_since),
176            );
177        }
178
179        let req = req.extension(Operation::Read);
180
181        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
182
183        Ok(req)
184    }
185
186    pub fn cos_put_object_request(
187        &self,
188        path: &str,
189        size: Option<u64>,
190        args: &OpWrite,
191        body: Buffer,
192    ) -> Result<Request<Buffer>> {
193        let p = build_abs_path(&self.root, path);
194
195        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
196
197        let mut req = Request::put(&url);
198
199        if let Some(size) = size {
200            req = req.header(CONTENT_LENGTH, size)
201        }
202        if let Some(cache_control) = args.cache_control() {
203            req = req.header(CACHE_CONTROL, cache_control)
204        }
205        if let Some(pos) = args.content_disposition() {
206            req = req.header(CONTENT_DISPOSITION, pos)
207        }
208        if let Some(mime) = args.content_type() {
209            req = req.header(CONTENT_TYPE, mime)
210        }
211
212        // For a bucket which has never enabled versioning, you may use it to
213        // specify whether to prohibit overwriting the object with the same name
214        // when uploading the object:
215        //
216        // When the x-cos-forbid-overwrite is specified as true, overwriting the object
217        // with the same name will be prohibited.
218        //
219        // ref: https://www.tencentcloud.com/document/product/436/7749
220        if args.if_not_exists() {
221            req = req.header("x-cos-forbid-overwrite", "true")
222        }
223
224        // Set user metadata headers.
225        if let Some(user_metadata) = args.user_metadata() {
226            for (key, value) in user_metadata {
227                req = req.header(format!("x-cos-meta-{key}"), value)
228            }
229        }
230
231        let req = req.extension(Operation::Write);
232
233        let req = req.body(body).map_err(new_request_build_error)?;
234
235        Ok(req)
236    }
237
238    pub async fn cos_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
239        let mut req = self.cos_head_object_request(path, args)?;
240
241        self.sign(&mut req).await?;
242
243        self.send(req).await
244    }
245
246    pub fn cos_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
247        let p = build_abs_path(&self.root, path);
248
249        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
250
251        let mut query_args = Vec::new();
252        if let Some(version) = args.version() {
253            query_args.push(format!(
254                "{}={}",
255                constants::COS_QUERY_VERSION_ID,
256                percent_decode_path(version)
257            ))
258        }
259        if !query_args.is_empty() {
260            url.push_str(&format!("?{}", query_args.join("&")));
261        }
262
263        let mut req = Request::head(&url);
264
265        if let Some(if_match) = args.if_match() {
266            req = req.header(IF_MATCH, if_match);
267        }
268
269        if let Some(if_none_match) = args.if_none_match() {
270            req = req.header(IF_NONE_MATCH, if_none_match);
271        }
272
273        let req = req.extension(Operation::Stat);
274
275        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
276
277        Ok(req)
278    }
279
280    pub async fn cos_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
281        let p = build_abs_path(&self.root, path);
282
283        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
284
285        let mut query_args = Vec::new();
286        if let Some(version) = args.version() {
287            query_args.push(format!(
288                "{}={}",
289                constants::COS_QUERY_VERSION_ID,
290                percent_decode_path(version)
291            ))
292        }
293        if !query_args.is_empty() {
294            url.push_str(&format!("?{}", query_args.join("&")));
295        }
296
297        let req = Request::delete(&url);
298
299        let req = req.extension(Operation::Delete);
300
301        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
302
303        self.sign(&mut req).await?;
304
305        self.send(req).await
306    }
307
308    pub fn cos_append_object_request(
309        &self,
310        path: &str,
311        position: u64,
312        size: u64,
313        args: &OpWrite,
314        body: Buffer,
315    ) -> Result<Request<Buffer>> {
316        let p = build_abs_path(&self.root, path);
317        let url = format!(
318            "{}/{}?append&position={}",
319            self.endpoint,
320            percent_encode_path(&p),
321            position
322        );
323
324        let mut req = Request::post(&url);
325
326        req = req.header(CONTENT_LENGTH, size);
327
328        if let Some(mime) = args.content_type() {
329            req = req.header(CONTENT_TYPE, mime);
330        }
331
332        if let Some(pos) = args.content_disposition() {
333            req = req.header(CONTENT_DISPOSITION, pos);
334        }
335
336        if let Some(cache_control) = args.cache_control() {
337            req = req.header(CACHE_CONTROL, cache_control)
338        }
339
340        let req = req.extension(Operation::Write);
341
342        let req = req.body(body).map_err(new_request_build_error)?;
343        Ok(req)
344    }
345
346    pub async fn cos_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
347        let source = build_abs_path(&self.root, from);
348        let target = build_abs_path(&self.root, to);
349
350        let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
351        let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
352
353        let mut req = Request::put(&url)
354            .extension(Operation::Copy)
355            .header("x-cos-copy-source", &source)
356            .body(Buffer::new())
357            .map_err(new_request_build_error)?;
358
359        self.sign(&mut req).await?;
360
361        self.send(req).await
362    }
363
364    pub async fn cos_list_objects(
365        &self,
366        path: &str,
367        next_marker: &str,
368        delimiter: &str,
369        limit: Option<usize>,
370    ) -> Result<Response<Buffer>> {
371        let p = build_abs_path(&self.root, path);
372
373        let mut url = QueryPairsWriter::new(&self.endpoint);
374
375        if !p.is_empty() {
376            url = url.push("prefix", &percent_encode_path(&p));
377        }
378        if !delimiter.is_empty() {
379            url = url.push("delimiter", delimiter);
380        }
381        if let Some(limit) = limit {
382            url = url.push("max-keys", &limit.to_string());
383        }
384        if !next_marker.is_empty() {
385            url = url.push("marker", next_marker);
386        }
387
388        let mut req = Request::get(url.finish())
389            .extension(Operation::List)
390            .body(Buffer::new())
391            .map_err(new_request_build_error)?;
392
393        self.sign(&mut req).await?;
394
395        self.send(req).await
396    }
397
398    pub async fn cos_initiate_multipart_upload(
399        &self,
400        path: &str,
401        args: &OpWrite,
402    ) -> Result<Response<Buffer>> {
403        let p = build_abs_path(&self.root, path);
404
405        let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
406
407        let mut req = Request::post(&url);
408
409        if let Some(mime) = args.content_type() {
410            req = req.header(CONTENT_TYPE, mime)
411        }
412
413        if let Some(content_disposition) = args.content_disposition() {
414            req = req.header(CONTENT_DISPOSITION, content_disposition)
415        }
416
417        if let Some(cache_control) = args.cache_control() {
418            req = req.header(CACHE_CONTROL, cache_control)
419        }
420
421        // Set user metadata headers.
422        if let Some(user_metadata) = args.user_metadata() {
423            for (key, value) in user_metadata {
424                req = req.header(format!("x-cos-meta-{key}"), value)
425            }
426        }
427
428        let req = req.extension(Operation::Write);
429
430        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
431
432        self.sign(&mut req).await?;
433
434        self.send(req).await
435    }
436
437    pub async fn cos_upload_part_request(
438        &self,
439        path: &str,
440        upload_id: &str,
441        part_number: usize,
442        size: u64,
443        body: Buffer,
444    ) -> Result<Response<Buffer>> {
445        let p = build_abs_path(&self.root, path);
446
447        let url = format!(
448            "{}/{}?partNumber={}&uploadId={}",
449            self.endpoint,
450            percent_encode_path(&p),
451            part_number,
452            percent_encode_path(upload_id)
453        );
454
455        let mut req = Request::put(&url);
456        req = req.header(CONTENT_LENGTH, size);
457
458        let req = req.extension(Operation::Write);
459
460        // Set body
461        let mut req = req.body(body).map_err(new_request_build_error)?;
462
463        self.sign(&mut req).await?;
464
465        self.send(req).await
466    }
467
468    pub async fn cos_complete_multipart_upload(
469        &self,
470        path: &str,
471        upload_id: &str,
472        parts: Vec<CompleteMultipartUploadRequestPart>,
473    ) -> Result<Response<Buffer>> {
474        let p = build_abs_path(&self.root, path);
475
476        let url = format!(
477            "{}/{}?uploadId={}",
478            self.endpoint,
479            percent_encode_path(&p),
480            percent_encode_path(upload_id)
481        );
482
483        let req = Request::post(&url);
484
485        let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts })
486            .map_err(new_xml_serialize_error)?;
487        // Make sure content length has been set to avoid post with chunked encoding.
488        let req = req.header(CONTENT_LENGTH, content.len());
489        // Set content-type to `application/xml` to avoid mixed with form post.
490        let req = req.header(CONTENT_TYPE, "application/xml");
491
492        let req = req.extension(Operation::Write);
493
494        let mut req = req
495            .body(Buffer::from(Bytes::from(content)))
496            .map_err(new_request_build_error)?;
497
498        self.sign(&mut req).await?;
499
500        self.send(req).await
501    }
502
503    /// Abort an on-going multipart upload.
504    pub async fn cos_abort_multipart_upload(
505        &self,
506        path: &str,
507        upload_id: &str,
508    ) -> Result<Response<Buffer>> {
509        let p = build_abs_path(&self.root, path);
510
511        let url = format!(
512            "{}/{}?uploadId={}",
513            self.endpoint,
514            percent_encode_path(&p),
515            percent_encode_path(upload_id)
516        );
517
518        let mut req = Request::delete(&url)
519            .extension(Operation::Delete)
520            .body(Buffer::new())
521            .map_err(new_request_build_error)?;
522        self.sign(&mut req).await?;
523        self.send(req).await
524    }
525
526    pub async fn cos_list_object_versions(
527        &self,
528        prefix: &str,
529        delimiter: &str,
530        limit: Option<usize>,
531        key_marker: &str,
532        version_id_marker: &str,
533    ) -> Result<Response<Buffer>> {
534        let p = build_abs_path(&self.root, prefix);
535
536        let mut url = QueryPairsWriter::new(&self.endpoint);
537        url = url.push("versions", "");
538        if !p.is_empty() {
539            url = url.push("prefix", &percent_encode_path(p.as_str()));
540        }
541        if !delimiter.is_empty() {
542            url = url.push("delimiter", delimiter);
543        }
544
545        if let Some(limit) = limit {
546            url = url.push("max-keys", &limit.to_string());
547        }
548        if !key_marker.is_empty() {
549            url = url.push("key-marker", &percent_encode_path(key_marker));
550        }
551        if !version_id_marker.is_empty() {
552            url = url.push("version-id-marker", &percent_encode_path(version_id_marker));
553        }
554
555        let mut req = Request::get(url.finish())
556            .extension(Operation::List)
557            .body(Buffer::new())
558            .map_err(new_request_build_error)?;
559
560        self.sign(&mut req).await?;
561
562        self.send(req).await
563    }
564}
565
566/// Result of CreateMultipartUpload
567#[derive(Default, Debug, Deserialize)]
568#[serde(default, rename_all = "PascalCase")]
569pub struct InitiateMultipartUploadResult {
570    pub upload_id: String,
571}
572
573/// Request of CompleteMultipartUploadRequest
574#[derive(Default, Debug, Serialize)]
575#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
576pub struct CompleteMultipartUploadRequest {
577    pub part: Vec<CompleteMultipartUploadRequestPart>,
578}
579
580#[derive(Clone, Default, Debug, Serialize)]
581#[serde(default, rename_all = "PascalCase")]
582pub struct CompleteMultipartUploadRequestPart {
583    #[serde(rename = "PartNumber")]
584    pub part_number: usize,
585    /// # TODO
586    ///
587    /// quick-xml will do escape on `"` which leads to our serialized output is
588    /// not the same as aws s3's example.
589    ///
590    /// Ideally, we could use `serialize_with` to address this (buf failed)
591    ///
592    /// ```ignore
593    /// #[derive(Default, Debug, Serialize)]
594    /// #[serde(default, rename_all = "PascalCase")]
595    /// struct CompleteMultipartUploadRequestPart {
596    ///     #[serde(rename = "PartNumber")]
597    ///     part_number: usize,
598    ///     #[serde(rename = "ETag", serialize_with = "partial_escape")]
599    ///     etag: String,
600    /// }
601    ///
602    /// fn partial_escape<S>(s: &str, ser: S) -> Result<S::Ok, S::Error>
603    /// where
604    ///     S: serde::Serializer,
605    /// {
606    ///     ser.serialize_str(&String::from_utf8_lossy(
607    ///         &quick_xml::escape::partial_escape(s.as_bytes()),
608    ///     ))
609    /// }
610    /// ```
611    ///
612    /// ref: <https://github.com/tafia/quick-xml/issues/362>
613    #[serde(rename = "ETag")]
614    pub etag: String,
615}
616
617/// Output of `CompleteMultipartUpload` operation
618#[derive(Debug, Default, Deserialize)]
619#[serde[default, rename_all = "PascalCase"]]
620pub struct CompleteMultipartUploadResult {
621    pub location: String,
622    pub bucket: String,
623    pub key: String,
624    #[serde(rename = "ETag")]
625    pub etag: String,
626}
627
628#[derive(Default, Debug, Deserialize)]
629#[serde(default, rename_all = "PascalCase")]
630pub struct ListObjectsOutput {
631    pub name: String,
632    pub prefix: String,
633    pub contents: Vec<ListObjectsOutputContent>,
634    pub common_prefixes: Vec<CommonPrefix>,
635    pub marker: String,
636    pub next_marker: Option<String>,
637}
638
639#[derive(Default, Debug, Deserialize)]
640#[serde(default, rename_all = "PascalCase")]
641pub struct CommonPrefix {
642    pub prefix: String,
643}
644
645#[derive(Default, Debug, Deserialize)]
646#[serde(default, rename_all = "PascalCase")]
647pub struct ListObjectsOutputContent {
648    pub key: String,
649    pub size: u64,
650}
651
652#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
653#[serde(rename_all = "PascalCase")]
654pub struct OutputCommonPrefix {
655    pub prefix: String,
656}
657
658/// Output of ListObjectVersions
659#[derive(Default, Debug, Deserialize)]
660#[serde(default, rename_all = "PascalCase")]
661pub struct ListObjectVersionsOutput {
662    pub is_truncated: Option<bool>,
663    pub next_key_marker: Option<String>,
664    pub next_version_id_marker: Option<String>,
665    pub common_prefixes: Vec<OutputCommonPrefix>,
666    pub version: Vec<ListObjectVersionsOutputVersion>,
667    pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
668}
669
670#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
671#[serde(rename_all = "PascalCase")]
672pub struct ListObjectVersionsOutputVersion {
673    pub key: String,
674    pub version_id: String,
675    pub is_latest: bool,
676    pub size: u64,
677    pub last_modified: String,
678    #[serde(rename = "ETag")]
679    pub etag: Option<String>,
680}
681
682#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
683#[serde(rename_all = "PascalCase")]
684pub struct ListObjectVersionsOutputDeleteMarker {
685    pub key: String,
686    pub version_id: String,
687    pub is_latest: bool,
688    pub last_modified: String,
689}
690
691#[cfg(test)]
692mod tests {
693    use bytes::Buf;
694
695    use super::*;
696
697    #[test]
698    fn test_parse_xml() {
699        let bs = bytes::Bytes::from(
700            r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
701<ListBucketResult>
702    <Name>examplebucket</Name>
703    <Prefix>obj</Prefix>
704    <Marker>obj002</Marker>
705    <NextMarker>obj004</NextMarker>
706    <MaxKeys>1000</MaxKeys>
707    <IsTruncated>false</IsTruncated>
708    <Contents>
709        <Key>obj002</Key>
710        <LastModified>2015-07-01T02:11:19.775Z</LastModified>
711        <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
712        <Size>9</Size>
713        <Owner>
714            <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
715        </Owner>
716        <StorageClass>STANDARD</StorageClass>
717    </Contents>
718    <Contents>
719        <Key>obj003</Key>
720        <LastModified>2015-07-01T02:11:19.775Z</LastModified>
721        <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
722        <Size>10</Size>
723        <Owner>
724            <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
725        </Owner>
726        <StorageClass>STANDARD</StorageClass>
727    </Contents>
728    <CommonPrefixes>
729        <Prefix>hello</Prefix>
730    </CommonPrefixes>
731    <CommonPrefixes>
732        <Prefix>world</Prefix>
733    </CommonPrefixes>
734</ListBucketResult>"#,
735        );
736        let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
737
738        assert_eq!(out.name, "examplebucket".to_string());
739        assert_eq!(out.prefix, "obj".to_string());
740        assert_eq!(out.marker, "obj002".to_string());
741        assert_eq!(out.next_marker, Some("obj004".to_string()),);
742        assert_eq!(
743            out.contents
744                .iter()
745                .map(|v| v.key.clone())
746                .collect::<Vec<String>>(),
747            ["obj002", "obj003"],
748        );
749        assert_eq!(
750            out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
751            [9, 10],
752        );
753        assert_eq!(
754            out.common_prefixes
755                .iter()
756                .map(|v| v.prefix.clone())
757                .collect::<Vec<String>>(),
758            ["hello", "world"],
759        )
760    }
761}