opendal/services/cos/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Bytes;
24use http::header::CACHE_CONTROL;
25use http::header::CONTENT_DISPOSITION;
26use http::header::CONTENT_LENGTH;
27use http::header::CONTENT_TYPE;
28use http::header::IF_MATCH;
29use http::header::IF_NONE_MATCH;
30use http::Request;
31use http::Response;
32use reqsign::TencentCosCredential;
33use reqsign::TencentCosCredentialLoader;
34use reqsign::TencentCosSigner;
35use serde::Deserialize;
36use serde::Serialize;
37
38use crate::raw::*;
39use crate::*;
40
41pub mod constants {
42    pub const COS_QUERY_VERSION_ID: &str = "versionId";
43
44    pub const X_COS_VERSION_ID: &str = "x-cos-version-id";
45}
46
47pub struct CosCore {
48    pub info: Arc<AccessorInfo>,
49    pub bucket: String,
50    pub root: String,
51    pub endpoint: String,
52
53    pub signer: TencentCosSigner,
54    pub loader: TencentCosCredentialLoader,
55}
56
57impl Debug for CosCore {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("Backend")
60            .field("root", &self.root)
61            .field("bucket", &self.bucket)
62            .field("endpoint", &self.endpoint)
63            .finish_non_exhaustive()
64    }
65}
66
67impl CosCore {
68    async fn load_credential(&self) -> Result<Option<TencentCosCredential>> {
69        let cred = self
70            .loader
71            .load()
72            .await
73            .map_err(new_request_credential_error)?;
74
75        if let Some(cred) = cred {
76            return Ok(Some(cred));
77        }
78
79        Err(Error::new(
80            ErrorKind::PermissionDenied,
81            "no valid credential found and anonymous access is not allowed",
82        ))
83    }
84
85    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
86        let cred = if let Some(cred) = self.load_credential().await? {
87            cred
88        } else {
89            return Ok(());
90        };
91
92        self.signer.sign(req, &cred).map_err(new_request_sign_error)
93    }
94
95    pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
96        let cred = if let Some(cred) = self.load_credential().await? {
97            cred
98        } else {
99            return Ok(());
100        };
101
102        self.signer
103            .sign_query(req, duration, &cred)
104            .map_err(new_request_sign_error)
105    }
106
107    #[inline]
108    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
109        self.info.http_client().send(req).await
110    }
111}
112
113impl CosCore {
114    pub async fn cos_get_object(
115        &self,
116        path: &str,
117        range: BytesRange,
118        args: &OpRead,
119    ) -> Result<Response<HttpBody>> {
120        let mut req = self.cos_get_object_request(path, range, args)?;
121
122        self.sign(&mut req).await?;
123
124        self.info.http_client().fetch(req).await
125    }
126
127    pub fn cos_get_object_request(
128        &self,
129        path: &str,
130        range: BytesRange,
131        args: &OpRead,
132    ) -> Result<Request<Buffer>> {
133        let p = build_abs_path(&self.root, path);
134
135        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
136
137        let mut query_args = Vec::new();
138        if let Some(version) = args.version() {
139            query_args.push(format!(
140                "{}={}",
141                constants::COS_QUERY_VERSION_ID,
142                percent_decode_path(version)
143            ))
144        }
145        if !query_args.is_empty() {
146            url.push_str(&format!("?{}", query_args.join("&")));
147        }
148
149        let mut req = Request::get(&url);
150
151        if let Some(if_match) = args.if_match() {
152            req = req.header(IF_MATCH, if_match);
153        }
154
155        if !range.is_full() {
156            req = req.header(http::header::RANGE, range.to_header())
157        }
158
159        if let Some(if_none_match) = args.if_none_match() {
160            req = req.header(IF_NONE_MATCH, if_none_match);
161        }
162
163        let req = req.extension(Operation::Read);
164
165        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
166
167        Ok(req)
168    }
169
170    pub fn cos_put_object_request(
171        &self,
172        path: &str,
173        size: Option<u64>,
174        args: &OpWrite,
175        body: Buffer,
176    ) -> Result<Request<Buffer>> {
177        let p = build_abs_path(&self.root, path);
178
179        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
180
181        let mut req = Request::put(&url);
182
183        if let Some(size) = size {
184            req = req.header(CONTENT_LENGTH, size)
185        }
186        if let Some(cache_control) = args.cache_control() {
187            req = req.header(CACHE_CONTROL, cache_control)
188        }
189        if let Some(pos) = args.content_disposition() {
190            req = req.header(CONTENT_DISPOSITION, pos)
191        }
192        if let Some(mime) = args.content_type() {
193            req = req.header(CONTENT_TYPE, mime)
194        }
195
196        // For a bucket which has never enabled versioning, you may use it to
197        // specify whether to prohibit overwriting the object with the same name
198        // when uploading the object:
199        //
200        // When the x-cos-forbid-overwrite is specified as true, overwriting the object
201        // with the same name will be prohibited.
202        //
203        // ref: https://www.tencentcloud.com/document/product/436/7749
204        if args.if_not_exists() {
205            req = req.header("x-cos-forbid-overwrite", "true")
206        }
207
208        // Set user metadata headers.
209        if let Some(user_metadata) = args.user_metadata() {
210            for (key, value) in user_metadata {
211                req = req.header(format!("x-cos-meta-{key}"), value)
212            }
213        }
214
215        let req = req.extension(Operation::Write);
216
217        let req = req.body(body).map_err(new_request_build_error)?;
218
219        Ok(req)
220    }
221
222    pub async fn cos_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
223        let mut req = self.cos_head_object_request(path, args)?;
224
225        self.sign(&mut req).await?;
226
227        self.send(req).await
228    }
229
230    pub fn cos_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
231        let p = build_abs_path(&self.root, path);
232
233        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
234
235        let mut query_args = Vec::new();
236        if let Some(version) = args.version() {
237            query_args.push(format!(
238                "{}={}",
239                constants::COS_QUERY_VERSION_ID,
240                percent_decode_path(version)
241            ))
242        }
243        if !query_args.is_empty() {
244            url.push_str(&format!("?{}", query_args.join("&")));
245        }
246
247        let mut req = Request::head(&url);
248
249        if let Some(if_match) = args.if_match() {
250            req = req.header(IF_MATCH, if_match);
251        }
252
253        if let Some(if_none_match) = args.if_none_match() {
254            req = req.header(IF_NONE_MATCH, if_none_match);
255        }
256
257        let req = req.extension(Operation::Stat);
258
259        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
260
261        Ok(req)
262    }
263
264    pub async fn cos_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
265        let p = build_abs_path(&self.root, path);
266
267        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
268
269        let mut query_args = Vec::new();
270        if let Some(version) = args.version() {
271            query_args.push(format!(
272                "{}={}",
273                constants::COS_QUERY_VERSION_ID,
274                percent_decode_path(version)
275            ))
276        }
277        if !query_args.is_empty() {
278            url.push_str(&format!("?{}", query_args.join("&")));
279        }
280
281        let req = Request::delete(&url);
282
283        let req = req.extension(Operation::Delete);
284
285        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
286
287        self.sign(&mut req).await?;
288
289        self.send(req).await
290    }
291
292    pub fn cos_append_object_request(
293        &self,
294        path: &str,
295        position: u64,
296        size: u64,
297        args: &OpWrite,
298        body: Buffer,
299    ) -> Result<Request<Buffer>> {
300        let p = build_abs_path(&self.root, path);
301        let url = format!(
302            "{}/{}?append&position={}",
303            self.endpoint,
304            percent_encode_path(&p),
305            position
306        );
307
308        let mut req = Request::post(&url);
309
310        req = req.header(CONTENT_LENGTH, size);
311
312        if let Some(mime) = args.content_type() {
313            req = req.header(CONTENT_TYPE, mime);
314        }
315
316        if let Some(pos) = args.content_disposition() {
317            req = req.header(CONTENT_DISPOSITION, pos);
318        }
319
320        if let Some(cache_control) = args.cache_control() {
321            req = req.header(CACHE_CONTROL, cache_control)
322        }
323
324        let req = req.extension(Operation::Write);
325
326        let req = req.body(body).map_err(new_request_build_error)?;
327        Ok(req)
328    }
329
330    pub async fn cos_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
331        let source = build_abs_path(&self.root, from);
332        let target = build_abs_path(&self.root, to);
333
334        let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
335        let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
336
337        let mut req = Request::put(&url)
338            .extension(Operation::Copy)
339            .header("x-cos-copy-source", &source)
340            .body(Buffer::new())
341            .map_err(new_request_build_error)?;
342
343        self.sign(&mut req).await?;
344
345        self.send(req).await
346    }
347
348    pub async fn cos_list_objects(
349        &self,
350        path: &str,
351        next_marker: &str,
352        delimiter: &str,
353        limit: Option<usize>,
354    ) -> Result<Response<Buffer>> {
355        let p = build_abs_path(&self.root, path);
356
357        let mut url = QueryPairsWriter::new(&self.endpoint);
358
359        if !p.is_empty() {
360            url = url.push("prefix", &percent_encode_path(&p));
361        }
362        if !delimiter.is_empty() {
363            url = url.push("delimiter", delimiter);
364        }
365        if let Some(limit) = limit {
366            url = url.push("max-keys", &limit.to_string());
367        }
368        if !next_marker.is_empty() {
369            url = url.push("marker", next_marker);
370        }
371
372        let mut req = Request::get(url.finish())
373            .extension(Operation::List)
374            .body(Buffer::new())
375            .map_err(new_request_build_error)?;
376
377        self.sign(&mut req).await?;
378
379        self.send(req).await
380    }
381
382    pub async fn cos_initiate_multipart_upload(
383        &self,
384        path: &str,
385        args: &OpWrite,
386    ) -> Result<Response<Buffer>> {
387        let p = build_abs_path(&self.root, path);
388
389        let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
390
391        let mut req = Request::post(&url);
392
393        if let Some(mime) = args.content_type() {
394            req = req.header(CONTENT_TYPE, mime)
395        }
396
397        if let Some(content_disposition) = args.content_disposition() {
398            req = req.header(CONTENT_DISPOSITION, content_disposition)
399        }
400
401        if let Some(cache_control) = args.cache_control() {
402            req = req.header(CACHE_CONTROL, cache_control)
403        }
404
405        // Set user metadata headers.
406        if let Some(user_metadata) = args.user_metadata() {
407            for (key, value) in user_metadata {
408                req = req.header(format!("x-cos-meta-{key}"), value)
409            }
410        }
411
412        let req = req.extension(Operation::Write);
413
414        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
415
416        self.sign(&mut req).await?;
417
418        self.send(req).await
419    }
420
421    pub async fn cos_upload_part_request(
422        &self,
423        path: &str,
424        upload_id: &str,
425        part_number: usize,
426        size: u64,
427        body: Buffer,
428    ) -> Result<Response<Buffer>> {
429        let p = build_abs_path(&self.root, path);
430
431        let url = format!(
432            "{}/{}?partNumber={}&uploadId={}",
433            self.endpoint,
434            percent_encode_path(&p),
435            part_number,
436            percent_encode_path(upload_id)
437        );
438
439        let mut req = Request::put(&url);
440        req = req.header(CONTENT_LENGTH, size);
441
442        let req = req.extension(Operation::Write);
443
444        // Set body
445        let mut req = req.body(body).map_err(new_request_build_error)?;
446
447        self.sign(&mut req).await?;
448
449        self.send(req).await
450    }
451
452    pub async fn cos_complete_multipart_upload(
453        &self,
454        path: &str,
455        upload_id: &str,
456        parts: Vec<CompleteMultipartUploadRequestPart>,
457    ) -> Result<Response<Buffer>> {
458        let p = build_abs_path(&self.root, path);
459
460        let url = format!(
461            "{}/{}?uploadId={}",
462            self.endpoint,
463            percent_encode_path(&p),
464            percent_encode_path(upload_id)
465        );
466
467        let req = Request::post(&url);
468
469        let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts })
470            .map_err(new_xml_serialize_error)?;
471        // Make sure content length has been set to avoid post with chunked encoding.
472        let req = req.header(CONTENT_LENGTH, content.len());
473        // Set content-type to `application/xml` to avoid mixed with form post.
474        let req = req.header(CONTENT_TYPE, "application/xml");
475
476        let req = req.extension(Operation::Write);
477
478        let mut req = req
479            .body(Buffer::from(Bytes::from(content)))
480            .map_err(new_request_build_error)?;
481
482        self.sign(&mut req).await?;
483
484        self.send(req).await
485    }
486
487    /// Abort an on-going multipart upload.
488    pub async fn cos_abort_multipart_upload(
489        &self,
490        path: &str,
491        upload_id: &str,
492    ) -> Result<Response<Buffer>> {
493        let p = build_abs_path(&self.root, path);
494
495        let url = format!(
496            "{}/{}?uploadId={}",
497            self.endpoint,
498            percent_encode_path(&p),
499            percent_encode_path(upload_id)
500        );
501
502        let mut req = Request::delete(&url)
503            .extension(Operation::Delete)
504            .body(Buffer::new())
505            .map_err(new_request_build_error)?;
506        self.sign(&mut req).await?;
507        self.send(req).await
508    }
509
510    pub async fn cos_list_object_versions(
511        &self,
512        prefix: &str,
513        delimiter: &str,
514        limit: Option<usize>,
515        key_marker: &str,
516        version_id_marker: &str,
517    ) -> Result<Response<Buffer>> {
518        let p = build_abs_path(&self.root, prefix);
519
520        let mut url = QueryPairsWriter::new(&self.endpoint);
521        url = url.push("versions", "");
522        if !p.is_empty() {
523            url = url.push("prefix", &percent_encode_path(p.as_str()));
524        }
525        if !delimiter.is_empty() {
526            url = url.push("delimiter", delimiter);
527        }
528
529        if let Some(limit) = limit {
530            url = url.push("max-keys", &limit.to_string());
531        }
532        if !key_marker.is_empty() {
533            url = url.push("key-marker", &percent_encode_path(key_marker));
534        }
535        if !version_id_marker.is_empty() {
536            url = url.push("version-id-marker", &percent_encode_path(version_id_marker));
537        }
538
539        let mut req = Request::get(url.finish())
540            .extension(Operation::List)
541            .body(Buffer::new())
542            .map_err(new_request_build_error)?;
543
544        self.sign(&mut req).await?;
545
546        self.send(req).await
547    }
548}
549
550/// Result of CreateMultipartUpload
551#[derive(Default, Debug, Deserialize)]
552#[serde(default, rename_all = "PascalCase")]
553pub struct InitiateMultipartUploadResult {
554    pub upload_id: String,
555}
556
557/// Request of CompleteMultipartUploadRequest
558#[derive(Default, Debug, Serialize)]
559#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
560pub struct CompleteMultipartUploadRequest {
561    pub part: Vec<CompleteMultipartUploadRequestPart>,
562}
563
564#[derive(Clone, Default, Debug, Serialize)]
565#[serde(default, rename_all = "PascalCase")]
566pub struct CompleteMultipartUploadRequestPart {
567    #[serde(rename = "PartNumber")]
568    pub part_number: usize,
569    /// # TODO
570    ///
571    /// quick-xml will do escape on `"` which leads to our serialized output is
572    /// not the same as aws s3's example.
573    ///
574    /// Ideally, we could use `serialize_with` to address this (buf failed)
575    ///
576    /// ```ignore
577    /// #[derive(Default, Debug, Serialize)]
578    /// #[serde(default, rename_all = "PascalCase")]
579    /// struct CompleteMultipartUploadRequestPart {
580    ///     #[serde(rename = "PartNumber")]
581    ///     part_number: usize,
582    ///     #[serde(rename = "ETag", serialize_with = "partial_escape")]
583    ///     etag: String,
584    /// }
585    ///
586    /// fn partial_escape<S>(s: &str, ser: S) -> Result<S::Ok, S::Error>
587    /// where
588    ///     S: serde::Serializer,
589    /// {
590    ///     ser.serialize_str(&String::from_utf8_lossy(
591    ///         &quick_xml::escape::partial_escape(s.as_bytes()),
592    ///     ))
593    /// }
594    /// ```
595    ///
596    /// ref: <https://github.com/tafia/quick-xml/issues/362>
597    #[serde(rename = "ETag")]
598    pub etag: String,
599}
600
601/// Output of `CompleteMultipartUpload` operation
602#[derive(Debug, Default, Deserialize)]
603#[serde[default, rename_all = "PascalCase"]]
604pub struct CompleteMultipartUploadResult {
605    pub location: String,
606    pub bucket: String,
607    pub key: String,
608    #[serde(rename = "ETag")]
609    pub etag: String,
610}
611
612#[derive(Default, Debug, Deserialize)]
613#[serde(default, rename_all = "PascalCase")]
614pub struct ListObjectsOutput {
615    pub name: String,
616    pub prefix: String,
617    pub contents: Vec<ListObjectsOutputContent>,
618    pub common_prefixes: Vec<CommonPrefix>,
619    pub marker: String,
620    pub next_marker: Option<String>,
621}
622
623#[derive(Default, Debug, Deserialize)]
624#[serde(default, rename_all = "PascalCase")]
625pub struct CommonPrefix {
626    pub prefix: String,
627}
628
629#[derive(Default, Debug, Deserialize)]
630#[serde(default, rename_all = "PascalCase")]
631pub struct ListObjectsOutputContent {
632    pub key: String,
633    pub size: u64,
634}
635
636#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
637#[serde(rename_all = "PascalCase")]
638pub struct OutputCommonPrefix {
639    pub prefix: String,
640}
641
642/// Output of ListObjectVersions
643#[derive(Default, Debug, Deserialize)]
644#[serde(default, rename_all = "PascalCase")]
645pub struct ListObjectVersionsOutput {
646    pub is_truncated: Option<bool>,
647    pub next_key_marker: Option<String>,
648    pub next_version_id_marker: Option<String>,
649    pub common_prefixes: Vec<OutputCommonPrefix>,
650    pub version: Vec<ListObjectVersionsOutputVersion>,
651    pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
652}
653
654#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
655#[serde(rename_all = "PascalCase")]
656pub struct ListObjectVersionsOutputVersion {
657    pub key: String,
658    pub version_id: String,
659    pub is_latest: bool,
660    pub size: u64,
661    pub last_modified: String,
662    #[serde(rename = "ETag")]
663    pub etag: Option<String>,
664}
665
666#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
667#[serde(rename_all = "PascalCase")]
668pub struct ListObjectVersionsOutputDeleteMarker {
669    pub key: String,
670    pub version_id: String,
671    pub is_latest: bool,
672    pub last_modified: String,
673}
674
675#[cfg(test)]
676mod tests {
677    use bytes::Buf;
678
679    use super::*;
680
681    #[test]
682    fn test_parse_xml() {
683        let bs = bytes::Bytes::from(
684            r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
685<ListBucketResult>
686    <Name>examplebucket</Name>
687    <Prefix>obj</Prefix>
688    <Marker>obj002</Marker>
689    <NextMarker>obj004</NextMarker>
690    <MaxKeys>1000</MaxKeys>
691    <IsTruncated>false</IsTruncated>
692    <Contents>
693        <Key>obj002</Key>
694        <LastModified>2015-07-01T02:11:19.775Z</LastModified>
695        <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
696        <Size>9</Size>
697        <Owner>
698            <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
699        </Owner>
700        <StorageClass>STANDARD</StorageClass>
701    </Contents>
702    <Contents>
703        <Key>obj003</Key>
704        <LastModified>2015-07-01T02:11:19.775Z</LastModified>
705        <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
706        <Size>10</Size>
707        <Owner>
708            <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
709        </Owner>
710        <StorageClass>STANDARD</StorageClass>
711    </Contents>
712    <CommonPrefixes>
713        <Prefix>hello</Prefix>
714    </CommonPrefixes>
715    <CommonPrefixes>
716        <Prefix>world</Prefix>
717    </CommonPrefixes>
718</ListBucketResult>"#,
719        );
720        let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
721
722        assert_eq!(out.name, "examplebucket".to_string());
723        assert_eq!(out.prefix, "obj".to_string());
724        assert_eq!(out.marker, "obj002".to_string());
725        assert_eq!(out.next_marker, Some("obj004".to_string()),);
726        assert_eq!(
727            out.contents
728                .iter()
729                .map(|v| v.key.clone())
730                .collect::<Vec<String>>(),
731            ["obj002", "obj003"],
732        );
733        assert_eq!(
734            out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
735            [9, 10],
736        );
737        assert_eq!(
738            out.common_prefixes
739                .iter()
740                .map(|v| v.prefix.clone())
741                .collect::<Vec<String>>(),
742            ["hello", "world"],
743        )
744    }
745}