opendal/services/cos/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20use std::time::Duration;
21
22use bytes::Bytes;
23use http::Request;
24use http::Response;
25use http::header::CACHE_CONTROL;
26use http::header::CONTENT_DISPOSITION;
27use http::header::CONTENT_LENGTH;
28use http::header::CONTENT_TYPE;
29use http::header::IF_MATCH;
30use http::header::IF_MODIFIED_SINCE;
31use http::header::IF_NONE_MATCH;
32use http::header::IF_UNMODIFIED_SINCE;
33use reqsign::TencentCosCredential;
34use reqsign::TencentCosCredentialLoader;
35use reqsign::TencentCosSigner;
36use serde::Deserialize;
37use serde::Serialize;
38
39use crate::raw::*;
40use crate::*;
41
42pub mod constants {
43    pub const COS_QUERY_VERSION_ID: &str = "versionId";
44
45    pub const X_COS_VERSION_ID: &str = "x-cos-version-id";
46}
47
48pub struct CosCore {
49    pub info: Arc<AccessorInfo>,
50    pub bucket: String,
51    pub root: String,
52    pub endpoint: String,
53
54    pub signer: TencentCosSigner,
55    pub loader: TencentCosCredentialLoader,
56}
57
58impl Debug for CosCore {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        f.debug_struct("CosCore")
61            .field("root", &self.root)
62            .field("bucket", &self.bucket)
63            .field("endpoint", &self.endpoint)
64            .finish_non_exhaustive()
65    }
66}
67
68impl CosCore {
69    async fn load_credential(&self) -> Result<Option<TencentCosCredential>> {
70        let cred = self
71            .loader
72            .load()
73            .await
74            .map_err(new_request_credential_error)?;
75
76        if let Some(cred) = cred {
77            return Ok(Some(cred));
78        }
79
80        Err(Error::new(
81            ErrorKind::PermissionDenied,
82            "no valid credential found and anonymous access is not allowed",
83        ))
84    }
85
86    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
87        let cred = if let Some(cred) = self.load_credential().await? {
88            cred
89        } else {
90            return Ok(());
91        };
92
93        self.signer.sign(req, &cred).map_err(new_request_sign_error)
94    }
95
96    pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
97        let cred = if let Some(cred) = self.load_credential().await? {
98            cred
99        } else {
100            return Ok(());
101        };
102
103        self.signer
104            .sign_query(req, duration, &cred)
105            .map_err(new_request_sign_error)
106    }
107
108    #[inline]
109    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
110        self.info.http_client().send(req).await
111    }
112}
113
114impl CosCore {
115    pub async fn cos_get_object(
116        &self,
117        path: &str,
118        range: BytesRange,
119        args: &OpRead,
120    ) -> Result<Response<HttpBody>> {
121        let mut req = self.cos_get_object_request(path, range, args)?;
122
123        self.sign(&mut req).await?;
124
125        self.info.http_client().fetch(req).await
126    }
127
128    pub fn cos_get_object_request(
129        &self,
130        path: &str,
131        range: BytesRange,
132        args: &OpRead,
133    ) -> Result<Request<Buffer>> {
134        let p = build_abs_path(&self.root, path);
135
136        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
137
138        let mut query_args = Vec::new();
139        if let Some(version) = args.version() {
140            query_args.push(format!(
141                "{}={}",
142                constants::COS_QUERY_VERSION_ID,
143                percent_decode_path(version)
144            ))
145        }
146        if !query_args.is_empty() {
147            url.push_str(&format!("?{}", query_args.join("&")));
148        }
149
150        let mut req = Request::get(&url);
151
152        if let Some(if_match) = args.if_match() {
153            req = req.header(IF_MATCH, if_match);
154        }
155
156        if !range.is_full() {
157            req = req.header(http::header::RANGE, range.to_header())
158        }
159
160        if let Some(if_none_match) = args.if_none_match() {
161            req = req.header(IF_NONE_MATCH, if_none_match);
162        }
163
164        if let Some(if_modified_since) = args.if_modified_since() {
165            req = req.header(IF_MODIFIED_SINCE, if_modified_since.format_http_date());
166        }
167
168        if let Some(if_unmodified_since) = args.if_unmodified_since() {
169            req = req.header(IF_UNMODIFIED_SINCE, if_unmodified_since.format_http_date());
170        }
171
172        let req = req.extension(Operation::Read);
173
174        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
175
176        Ok(req)
177    }
178
179    pub fn cos_put_object_request(
180        &self,
181        path: &str,
182        size: Option<u64>,
183        args: &OpWrite,
184        body: Buffer,
185    ) -> Result<Request<Buffer>> {
186        let p = build_abs_path(&self.root, path);
187
188        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
189
190        let mut req = Request::put(&url);
191
192        if let Some(size) = size {
193            req = req.header(CONTENT_LENGTH, size)
194        }
195        if let Some(cache_control) = args.cache_control() {
196            req = req.header(CACHE_CONTROL, cache_control)
197        }
198        if let Some(pos) = args.content_disposition() {
199            req = req.header(CONTENT_DISPOSITION, pos)
200        }
201        if let Some(mime) = args.content_type() {
202            req = req.header(CONTENT_TYPE, mime)
203        }
204
205        // For a bucket which has never enabled versioning, you may use it to
206        // specify whether to prohibit overwriting the object with the same name
207        // when uploading the object:
208        //
209        // When the x-cos-forbid-overwrite is specified as true, overwriting the object
210        // with the same name will be prohibited.
211        //
212        // ref: https://www.tencentcloud.com/document/product/436/7749
213        if args.if_not_exists() {
214            req = req.header("x-cos-forbid-overwrite", "true")
215        }
216
217        // Set user metadata headers.
218        if let Some(user_metadata) = args.user_metadata() {
219            for (key, value) in user_metadata {
220                req = req.header(format!("x-cos-meta-{key}"), value)
221            }
222        }
223
224        let req = req.extension(Operation::Write);
225
226        let req = req.body(body).map_err(new_request_build_error)?;
227
228        Ok(req)
229    }
230
231    pub async fn cos_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
232        let mut req = self.cos_head_object_request(path, args)?;
233
234        self.sign(&mut req).await?;
235
236        self.send(req).await
237    }
238
239    pub fn cos_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
240        let p = build_abs_path(&self.root, path);
241
242        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
243
244        let mut query_args = Vec::new();
245        if let Some(version) = args.version() {
246            query_args.push(format!(
247                "{}={}",
248                constants::COS_QUERY_VERSION_ID,
249                percent_decode_path(version)
250            ))
251        }
252        if !query_args.is_empty() {
253            url.push_str(&format!("?{}", query_args.join("&")));
254        }
255
256        let mut req = Request::head(&url);
257
258        if let Some(if_match) = args.if_match() {
259            req = req.header(IF_MATCH, if_match);
260        }
261
262        if let Some(if_none_match) = args.if_none_match() {
263            req = req.header(IF_NONE_MATCH, if_none_match);
264        }
265
266        let req = req.extension(Operation::Stat);
267
268        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
269
270        Ok(req)
271    }
272
273    pub async fn cos_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
274        let p = build_abs_path(&self.root, path);
275
276        let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
277
278        let mut query_args = Vec::new();
279        if let Some(version) = args.version() {
280            query_args.push(format!(
281                "{}={}",
282                constants::COS_QUERY_VERSION_ID,
283                percent_decode_path(version)
284            ))
285        }
286        if !query_args.is_empty() {
287            url.push_str(&format!("?{}", query_args.join("&")));
288        }
289
290        let req = Request::delete(&url);
291
292        let req = req.extension(Operation::Delete);
293
294        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
295
296        self.sign(&mut req).await?;
297
298        self.send(req).await
299    }
300
301    pub fn cos_append_object_request(
302        &self,
303        path: &str,
304        position: u64,
305        size: u64,
306        args: &OpWrite,
307        body: Buffer,
308    ) -> Result<Request<Buffer>> {
309        let p = build_abs_path(&self.root, path);
310        let url = format!(
311            "{}/{}?append&position={}",
312            self.endpoint,
313            percent_encode_path(&p),
314            position
315        );
316
317        let mut req = Request::post(&url);
318
319        req = req.header(CONTENT_LENGTH, size);
320
321        if let Some(mime) = args.content_type() {
322            req = req.header(CONTENT_TYPE, mime);
323        }
324
325        if let Some(pos) = args.content_disposition() {
326            req = req.header(CONTENT_DISPOSITION, pos);
327        }
328
329        if let Some(cache_control) = args.cache_control() {
330            req = req.header(CACHE_CONTROL, cache_control)
331        }
332
333        let req = req.extension(Operation::Write);
334
335        let req = req.body(body).map_err(new_request_build_error)?;
336        Ok(req)
337    }
338
339    pub async fn cos_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
340        let source = build_abs_path(&self.root, from);
341        let target = build_abs_path(&self.root, to);
342
343        let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
344        let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
345
346        let mut req = Request::put(&url)
347            .extension(Operation::Copy)
348            .header("x-cos-copy-source", &source)
349            .body(Buffer::new())
350            .map_err(new_request_build_error)?;
351
352        self.sign(&mut req).await?;
353
354        self.send(req).await
355    }
356
357    pub async fn cos_list_objects(
358        &self,
359        path: &str,
360        next_marker: &str,
361        delimiter: &str,
362        limit: Option<usize>,
363    ) -> Result<Response<Buffer>> {
364        let p = build_abs_path(&self.root, path);
365
366        let mut url = QueryPairsWriter::new(&self.endpoint);
367
368        if !p.is_empty() {
369            url = url.push("prefix", &percent_encode_path(&p));
370        }
371        if !delimiter.is_empty() {
372            url = url.push("delimiter", delimiter);
373        }
374        if let Some(limit) = limit {
375            url = url.push("max-keys", &limit.to_string());
376        }
377        if !next_marker.is_empty() {
378            url = url.push("marker", next_marker);
379        }
380
381        let mut req = Request::get(url.finish())
382            .extension(Operation::List)
383            .body(Buffer::new())
384            .map_err(new_request_build_error)?;
385
386        self.sign(&mut req).await?;
387
388        self.send(req).await
389    }
390
391    pub async fn cos_initiate_multipart_upload(
392        &self,
393        path: &str,
394        args: &OpWrite,
395    ) -> Result<Response<Buffer>> {
396        let p = build_abs_path(&self.root, path);
397
398        let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
399
400        let mut req = Request::post(&url);
401
402        if let Some(mime) = args.content_type() {
403            req = req.header(CONTENT_TYPE, mime)
404        }
405
406        if let Some(content_disposition) = args.content_disposition() {
407            req = req.header(CONTENT_DISPOSITION, content_disposition)
408        }
409
410        if let Some(cache_control) = args.cache_control() {
411            req = req.header(CACHE_CONTROL, cache_control)
412        }
413
414        // Set user metadata headers.
415        if let Some(user_metadata) = args.user_metadata() {
416            for (key, value) in user_metadata {
417                req = req.header(format!("x-cos-meta-{key}"), value)
418            }
419        }
420
421        let req = req.extension(Operation::Write);
422
423        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
424
425        self.sign(&mut req).await?;
426
427        self.send(req).await
428    }
429
430    pub async fn cos_upload_part_request(
431        &self,
432        path: &str,
433        upload_id: &str,
434        part_number: usize,
435        size: u64,
436        body: Buffer,
437    ) -> Result<Response<Buffer>> {
438        let p = build_abs_path(&self.root, path);
439
440        let url = format!(
441            "{}/{}?partNumber={}&uploadId={}",
442            self.endpoint,
443            percent_encode_path(&p),
444            part_number,
445            percent_encode_path(upload_id)
446        );
447
448        let mut req = Request::put(&url);
449        req = req.header(CONTENT_LENGTH, size);
450
451        let req = req.extension(Operation::Write);
452
453        // Set body
454        let mut req = req.body(body).map_err(new_request_build_error)?;
455
456        self.sign(&mut req).await?;
457
458        self.send(req).await
459    }
460
461    pub async fn cos_complete_multipart_upload(
462        &self,
463        path: &str,
464        upload_id: &str,
465        parts: Vec<CompleteMultipartUploadRequestPart>,
466    ) -> Result<Response<Buffer>> {
467        let p = build_abs_path(&self.root, path);
468
469        let url = format!(
470            "{}/{}?uploadId={}",
471            self.endpoint,
472            percent_encode_path(&p),
473            percent_encode_path(upload_id)
474        );
475
476        let req = Request::post(&url);
477
478        let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts })
479            .map_err(new_xml_serialize_error)?;
480        // Make sure content length has been set to avoid post with chunked encoding.
481        let req = req.header(CONTENT_LENGTH, content.len());
482        // Set content-type to `application/xml` to avoid mixed with form post.
483        let req = req.header(CONTENT_TYPE, "application/xml");
484
485        let req = req.extension(Operation::Write);
486
487        let mut req = req
488            .body(Buffer::from(Bytes::from(content)))
489            .map_err(new_request_build_error)?;
490
491        self.sign(&mut req).await?;
492
493        self.send(req).await
494    }
495
496    /// Abort an on-going multipart upload.
497    pub async fn cos_abort_multipart_upload(
498        &self,
499        path: &str,
500        upload_id: &str,
501    ) -> Result<Response<Buffer>> {
502        let p = build_abs_path(&self.root, path);
503
504        let url = format!(
505            "{}/{}?uploadId={}",
506            self.endpoint,
507            percent_encode_path(&p),
508            percent_encode_path(upload_id)
509        );
510
511        let mut req = Request::delete(&url)
512            .extension(Operation::Delete)
513            .body(Buffer::new())
514            .map_err(new_request_build_error)?;
515        self.sign(&mut req).await?;
516        self.send(req).await
517    }
518
519    pub async fn cos_list_object_versions(
520        &self,
521        prefix: &str,
522        delimiter: &str,
523        limit: Option<usize>,
524        key_marker: &str,
525        version_id_marker: &str,
526    ) -> Result<Response<Buffer>> {
527        let p = build_abs_path(&self.root, prefix);
528
529        let mut url = QueryPairsWriter::new(&self.endpoint);
530        url = url.push("versions", "");
531        if !p.is_empty() {
532            url = url.push("prefix", &percent_encode_path(p.as_str()));
533        }
534        if !delimiter.is_empty() {
535            url = url.push("delimiter", delimiter);
536        }
537
538        if let Some(limit) = limit {
539            url = url.push("max-keys", &limit.to_string());
540        }
541        if !key_marker.is_empty() {
542            url = url.push("key-marker", &percent_encode_path(key_marker));
543        }
544        if !version_id_marker.is_empty() {
545            url = url.push("version-id-marker", &percent_encode_path(version_id_marker));
546        }
547
548        let mut req = Request::get(url.finish())
549            .extension(Operation::List)
550            .body(Buffer::new())
551            .map_err(new_request_build_error)?;
552
553        self.sign(&mut req).await?;
554
555        self.send(req).await
556    }
557}
558
559/// Result of CreateMultipartUpload
560#[derive(Default, Debug, Deserialize)]
561#[serde(default, rename_all = "PascalCase")]
562pub struct InitiateMultipartUploadResult {
563    pub upload_id: String,
564}
565
566/// Request of CompleteMultipartUploadRequest
567#[derive(Default, Debug, Serialize)]
568#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
569pub struct CompleteMultipartUploadRequest {
570    pub part: Vec<CompleteMultipartUploadRequestPart>,
571}
572
573#[derive(Clone, Default, Debug, Serialize)]
574#[serde(default, rename_all = "PascalCase")]
575pub struct CompleteMultipartUploadRequestPart {
576    #[serde(rename = "PartNumber")]
577    pub part_number: usize,
578    /// # TODO
579    ///
580    /// quick-xml will do escape on `"` which leads to our serialized output is
581    /// not the same as aws s3's example.
582    ///
583    /// Ideally, we could use `serialize_with` to address this (buf failed)
584    ///
585    /// ```ignore
586    /// #[derive(Default, Debug, Serialize)]
587    /// #[serde(default, rename_all = "PascalCase")]
588    /// struct CompleteMultipartUploadRequestPart {
589    ///     #[serde(rename = "PartNumber")]
590    ///     part_number: usize,
591    ///     #[serde(rename = "ETag", serialize_with = "partial_escape")]
592    ///     etag: String,
593    /// }
594    ///
595    /// fn partial_escape<S>(s: &str, ser: S) -> Result<S::Ok, S::Error>
596    /// where
597    ///     S: serde::Serializer,
598    /// {
599    ///     ser.serialize_str(&String::from_utf8_lossy(
600    ///         &quick_xml::escape::partial_escape(s.as_bytes()),
601    ///     ))
602    /// }
603    /// ```
604    ///
605    /// ref: <https://github.com/tafia/quick-xml/issues/362>
606    #[serde(rename = "ETag")]
607    pub etag: String,
608}
609
610/// Output of `CompleteMultipartUpload` operation
611#[derive(Debug, Default, Deserialize)]
612#[serde[default, rename_all = "PascalCase"]]
613pub struct CompleteMultipartUploadResult {
614    pub location: String,
615    pub bucket: String,
616    pub key: String,
617    #[serde(rename = "ETag")]
618    pub etag: String,
619}
620
621#[derive(Default, Debug, Deserialize)]
622#[serde(default, rename_all = "PascalCase")]
623pub struct ListObjectsOutput {
624    pub name: String,
625    pub prefix: String,
626    pub contents: Vec<ListObjectsOutputContent>,
627    pub common_prefixes: Vec<CommonPrefix>,
628    pub marker: String,
629    pub next_marker: Option<String>,
630}
631
632#[derive(Default, Debug, Deserialize)]
633#[serde(default, rename_all = "PascalCase")]
634pub struct CommonPrefix {
635    pub prefix: String,
636}
637
638#[derive(Default, Debug, Deserialize)]
639#[serde(default, rename_all = "PascalCase")]
640pub struct ListObjectsOutputContent {
641    pub key: String,
642    pub size: u64,
643}
644
645#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
646#[serde(rename_all = "PascalCase")]
647pub struct OutputCommonPrefix {
648    pub prefix: String,
649}
650
651/// Output of ListObjectVersions
652#[derive(Default, Debug, Deserialize)]
653#[serde(default, rename_all = "PascalCase")]
654pub struct ListObjectVersionsOutput {
655    pub is_truncated: Option<bool>,
656    pub next_key_marker: Option<String>,
657    pub next_version_id_marker: Option<String>,
658    pub common_prefixes: Vec<OutputCommonPrefix>,
659    pub version: Vec<ListObjectVersionsOutputVersion>,
660    pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
661}
662
663#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
664#[serde(rename_all = "PascalCase")]
665pub struct ListObjectVersionsOutputVersion {
666    pub key: String,
667    pub version_id: String,
668    pub is_latest: bool,
669    pub size: u64,
670    pub last_modified: String,
671    #[serde(rename = "ETag")]
672    pub etag: Option<String>,
673}
674
675#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
676#[serde(rename_all = "PascalCase")]
677pub struct ListObjectVersionsOutputDeleteMarker {
678    pub key: String,
679    pub version_id: String,
680    pub is_latest: bool,
681    pub last_modified: String,
682}
683
684#[cfg(test)]
685mod tests {
686    use bytes::Buf;
687
688    use super::*;
689
690    #[test]
691    fn test_parse_xml() {
692        let bs = bytes::Bytes::from(
693            r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
694<ListBucketResult>
695    <Name>examplebucket</Name>
696    <Prefix>obj</Prefix>
697    <Marker>obj002</Marker>
698    <NextMarker>obj004</NextMarker>
699    <MaxKeys>1000</MaxKeys>
700    <IsTruncated>false</IsTruncated>
701    <Contents>
702        <Key>obj002</Key>
703        <LastModified>2015-07-01T02:11:19.775Z</LastModified>
704        <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
705        <Size>9</Size>
706        <Owner>
707            <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
708        </Owner>
709        <StorageClass>STANDARD</StorageClass>
710    </Contents>
711    <Contents>
712        <Key>obj003</Key>
713        <LastModified>2015-07-01T02:11:19.775Z</LastModified>
714        <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
715        <Size>10</Size>
716        <Owner>
717            <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
718        </Owner>
719        <StorageClass>STANDARD</StorageClass>
720    </Contents>
721    <CommonPrefixes>
722        <Prefix>hello</Prefix>
723    </CommonPrefixes>
724    <CommonPrefixes>
725        <Prefix>world</Prefix>
726    </CommonPrefixes>
727</ListBucketResult>"#,
728        );
729        let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
730
731        assert_eq!(out.name, "examplebucket".to_string());
732        assert_eq!(out.prefix, "obj".to_string());
733        assert_eq!(out.marker, "obj002".to_string());
734        assert_eq!(out.next_marker, Some("obj004".to_string()),);
735        assert_eq!(
736            out.contents
737                .iter()
738                .map(|v| v.key.clone())
739                .collect::<Vec<String>>(),
740            ["obj002", "obj003"],
741        );
742        assert_eq!(
743            out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
744            [9, 10],
745        );
746        assert_eq!(
747            out.common_prefixes
748                .iter()
749                .map(|v| v.prefix.clone())
750                .collect::<Vec<String>>(),
751            ["hello", "world"],
752        )
753    }
754}