1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::fmt::Write;
21use std::sync::Arc;
22use std::time::Duration;
23
24use bytes::Bytes;
25use http::header::CACHE_CONTROL;
26use http::header::CONTENT_DISPOSITION;
27use http::header::CONTENT_LENGTH;
28use http::header::CONTENT_TYPE;
29use http::header::IF_MATCH;
30use http::header::IF_NONE_MATCH;
31use http::Request;
32use http::Response;
33use reqsign::TencentCosCredential;
34use reqsign::TencentCosCredentialLoader;
35use reqsign::TencentCosSigner;
36use serde::Deserialize;
37use serde::Serialize;
38
39use crate::raw::*;
40use crate::*;
41
42pub mod constants {
43 pub const COS_QUERY_VERSION_ID: &str = "versionId";
44
45 pub const X_COS_VERSION_ID: &str = "x-cos-version-id";
46}
47
48pub struct CosCore {
49 pub info: Arc<AccessorInfo>,
50 pub bucket: String,
51 pub root: String,
52 pub endpoint: String,
53
54 pub signer: TencentCosSigner,
55 pub loader: TencentCosCredentialLoader,
56}
57
58impl Debug for CosCore {
59 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
60 f.debug_struct("Backend")
61 .field("root", &self.root)
62 .field("bucket", &self.bucket)
63 .field("endpoint", &self.endpoint)
64 .finish_non_exhaustive()
65 }
66}
67
68impl CosCore {
69 async fn load_credential(&self) -> Result<Option<TencentCosCredential>> {
70 let cred = self
71 .loader
72 .load()
73 .await
74 .map_err(new_request_credential_error)?;
75
76 if let Some(cred) = cred {
77 return Ok(Some(cred));
78 }
79
80 Err(Error::new(
81 ErrorKind::PermissionDenied,
82 "no valid credential found and anonymous access is not allowed",
83 ))
84 }
85
86 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
87 let cred = if let Some(cred) = self.load_credential().await? {
88 cred
89 } else {
90 return Ok(());
91 };
92
93 self.signer.sign(req, &cred).map_err(new_request_sign_error)
94 }
95
96 pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
97 let cred = if let Some(cred) = self.load_credential().await? {
98 cred
99 } else {
100 return Ok(());
101 };
102
103 self.signer
104 .sign_query(req, duration, &cred)
105 .map_err(new_request_sign_error)
106 }
107
108 #[inline]
109 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
110 self.info.http_client().send(req).await
111 }
112}
113
114impl CosCore {
115 pub async fn cos_get_object(
116 &self,
117 path: &str,
118 range: BytesRange,
119 args: &OpRead,
120 ) -> Result<Response<HttpBody>> {
121 let mut req = self.cos_get_object_request(path, range, args)?;
122
123 self.sign(&mut req).await?;
124
125 self.info.http_client().fetch(req).await
126 }
127
128 pub fn cos_get_object_request(
129 &self,
130 path: &str,
131 range: BytesRange,
132 args: &OpRead,
133 ) -> Result<Request<Buffer>> {
134 let p = build_abs_path(&self.root, path);
135
136 let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
137
138 let mut query_args = Vec::new();
139 if let Some(version) = args.version() {
140 query_args.push(format!(
141 "{}={}",
142 constants::COS_QUERY_VERSION_ID,
143 percent_decode_path(version)
144 ))
145 }
146 if !query_args.is_empty() {
147 url.push_str(&format!("?{}", query_args.join("&")));
148 }
149
150 let mut req = Request::get(&url);
151
152 if let Some(if_match) = args.if_match() {
153 req = req.header(IF_MATCH, if_match);
154 }
155
156 if !range.is_full() {
157 req = req.header(http::header::RANGE, range.to_header())
158 }
159
160 if let Some(if_none_match) = args.if_none_match() {
161 req = req.header(IF_NONE_MATCH, if_none_match);
162 }
163
164 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
165
166 Ok(req)
167 }
168
169 pub fn cos_put_object_request(
170 &self,
171 path: &str,
172 size: Option<u64>,
173 args: &OpWrite,
174 body: Buffer,
175 ) -> Result<Request<Buffer>> {
176 let p = build_abs_path(&self.root, path);
177
178 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
179
180 let mut req = Request::put(&url);
181
182 if let Some(size) = size {
183 req = req.header(CONTENT_LENGTH, size)
184 }
185 if let Some(cache_control) = args.cache_control() {
186 req = req.header(CACHE_CONTROL, cache_control)
187 }
188 if let Some(pos) = args.content_disposition() {
189 req = req.header(CONTENT_DISPOSITION, pos)
190 }
191 if let Some(mime) = args.content_type() {
192 req = req.header(CONTENT_TYPE, mime)
193 }
194
195 if args.if_not_exists() {
204 req = req.header("x-cos-forbid-overwrite", "true")
205 }
206
207 if let Some(user_metadata) = args.user_metadata() {
209 for (key, value) in user_metadata {
210 req = req.header(format!("x-cos-meta-{key}"), value)
211 }
212 }
213
214 let req = req.body(body).map_err(new_request_build_error)?;
215
216 Ok(req)
217 }
218
219 pub async fn cos_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
220 let mut req = self.cos_head_object_request(path, args)?;
221
222 self.sign(&mut req).await?;
223
224 self.send(req).await
225 }
226
227 pub fn cos_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
228 let p = build_abs_path(&self.root, path);
229
230 let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
231
232 let mut query_args = Vec::new();
233 if let Some(version) = args.version() {
234 query_args.push(format!(
235 "{}={}",
236 constants::COS_QUERY_VERSION_ID,
237 percent_decode_path(version)
238 ))
239 }
240 if !query_args.is_empty() {
241 url.push_str(&format!("?{}", query_args.join("&")));
242 }
243
244 let mut req = Request::head(&url);
245
246 if let Some(if_match) = args.if_match() {
247 req = req.header(IF_MATCH, if_match);
248 }
249
250 if let Some(if_none_match) = args.if_none_match() {
251 req = req.header(IF_NONE_MATCH, if_none_match);
252 }
253
254 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
255
256 Ok(req)
257 }
258
259 pub async fn cos_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
260 let p = build_abs_path(&self.root, path);
261
262 let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
263
264 let mut query_args = Vec::new();
265 if let Some(version) = args.version() {
266 query_args.push(format!(
267 "{}={}",
268 constants::COS_QUERY_VERSION_ID,
269 percent_decode_path(version)
270 ))
271 }
272 if !query_args.is_empty() {
273 url.push_str(&format!("?{}", query_args.join("&")));
274 }
275
276 let req = Request::delete(&url);
277
278 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
279
280 self.sign(&mut req).await?;
281
282 self.send(req).await
283 }
284
285 pub fn cos_append_object_request(
286 &self,
287 path: &str,
288 position: u64,
289 size: u64,
290 args: &OpWrite,
291 body: Buffer,
292 ) -> Result<Request<Buffer>> {
293 let p = build_abs_path(&self.root, path);
294 let url = format!(
295 "{}/{}?append&position={}",
296 self.endpoint,
297 percent_encode_path(&p),
298 position
299 );
300
301 let mut req = Request::post(&url);
302
303 req = req.header(CONTENT_LENGTH, size);
304
305 if let Some(mime) = args.content_type() {
306 req = req.header(CONTENT_TYPE, mime);
307 }
308
309 if let Some(pos) = args.content_disposition() {
310 req = req.header(CONTENT_DISPOSITION, pos);
311 }
312
313 if let Some(cache_control) = args.cache_control() {
314 req = req.header(CACHE_CONTROL, cache_control)
315 }
316
317 let req = req.body(body).map_err(new_request_build_error)?;
318 Ok(req)
319 }
320
321 pub async fn cos_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
322 let source = build_abs_path(&self.root, from);
323 let target = build_abs_path(&self.root, to);
324
325 let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
326 let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
327
328 let mut req = Request::put(&url)
329 .header("x-cos-copy-source", &source)
330 .body(Buffer::new())
331 .map_err(new_request_build_error)?;
332
333 self.sign(&mut req).await?;
334
335 self.send(req).await
336 }
337
338 pub async fn cos_list_objects(
339 &self,
340 path: &str,
341 next_marker: &str,
342 delimiter: &str,
343 limit: Option<usize>,
344 ) -> Result<Response<Buffer>> {
345 let p = build_abs_path(&self.root, path);
346
347 let mut queries = vec![];
348 if !p.is_empty() {
349 queries.push(format!("prefix={}", percent_encode_path(&p)));
350 }
351 if !delimiter.is_empty() {
352 queries.push(format!("delimiter={delimiter}"));
353 }
354 if let Some(limit) = limit {
355 queries.push(format!("max-keys={limit}"));
356 }
357 if !next_marker.is_empty() {
358 queries.push(format!("marker={next_marker}"));
359 }
360
361 let url = if queries.is_empty() {
362 self.endpoint.to_string()
363 } else {
364 format!("{}?{}", self.endpoint, queries.join("&"))
365 };
366
367 let mut req = Request::get(&url)
368 .body(Buffer::new())
369 .map_err(new_request_build_error)?;
370
371 self.sign(&mut req).await?;
372
373 self.send(req).await
374 }
375
376 pub async fn cos_initiate_multipart_upload(
377 &self,
378 path: &str,
379 args: &OpWrite,
380 ) -> Result<Response<Buffer>> {
381 let p = build_abs_path(&self.root, path);
382
383 let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
384
385 let mut req = Request::post(&url);
386
387 if let Some(mime) = args.content_type() {
388 req = req.header(CONTENT_TYPE, mime)
389 }
390
391 if let Some(content_disposition) = args.content_disposition() {
392 req = req.header(CONTENT_DISPOSITION, content_disposition)
393 }
394
395 if let Some(cache_control) = args.cache_control() {
396 req = req.header(CACHE_CONTROL, cache_control)
397 }
398
399 if let Some(user_metadata) = args.user_metadata() {
401 for (key, value) in user_metadata {
402 req = req.header(format!("x-cos-meta-{key}"), value)
403 }
404 }
405
406 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
407
408 self.sign(&mut req).await?;
409
410 self.send(req).await
411 }
412
413 pub async fn cos_upload_part_request(
414 &self,
415 path: &str,
416 upload_id: &str,
417 part_number: usize,
418 size: u64,
419 body: Buffer,
420 ) -> Result<Response<Buffer>> {
421 let p = build_abs_path(&self.root, path);
422
423 let url = format!(
424 "{}/{}?partNumber={}&uploadId={}",
425 self.endpoint,
426 percent_encode_path(&p),
427 part_number,
428 percent_encode_path(upload_id)
429 );
430
431 let mut req = Request::put(&url);
432 req = req.header(CONTENT_LENGTH, size);
433 let mut req = req.body(body).map_err(new_request_build_error)?;
435
436 self.sign(&mut req).await?;
437
438 self.send(req).await
439 }
440
441 pub async fn cos_complete_multipart_upload(
442 &self,
443 path: &str,
444 upload_id: &str,
445 parts: Vec<CompleteMultipartUploadRequestPart>,
446 ) -> Result<Response<Buffer>> {
447 let p = build_abs_path(&self.root, path);
448
449 let url = format!(
450 "{}/{}?uploadId={}",
451 self.endpoint,
452 percent_encode_path(&p),
453 percent_encode_path(upload_id)
454 );
455
456 let req = Request::post(&url);
457
458 let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts })
459 .map_err(new_xml_deserialize_error)?;
460 let req = req.header(CONTENT_LENGTH, content.len());
462 let req = req.header(CONTENT_TYPE, "application/xml");
464
465 let mut req = req
466 .body(Buffer::from(Bytes::from(content)))
467 .map_err(new_request_build_error)?;
468
469 self.sign(&mut req).await?;
470
471 self.send(req).await
472 }
473
474 pub async fn cos_abort_multipart_upload(
476 &self,
477 path: &str,
478 upload_id: &str,
479 ) -> Result<Response<Buffer>> {
480 let p = build_abs_path(&self.root, path);
481
482 let url = format!(
483 "{}/{}?uploadId={}",
484 self.endpoint,
485 percent_encode_path(&p),
486 percent_encode_path(upload_id)
487 );
488
489 let mut req = Request::delete(&url)
490 .body(Buffer::new())
491 .map_err(new_request_build_error)?;
492 self.sign(&mut req).await?;
493 self.send(req).await
494 }
495
496 pub async fn cos_list_object_versions(
497 &self,
498 prefix: &str,
499 delimiter: &str,
500 limit: Option<usize>,
501 key_marker: &str,
502 version_id_marker: &str,
503 ) -> Result<Response<Buffer>> {
504 let p = build_abs_path(&self.root, prefix);
505
506 let mut url = format!("{}?versions", self.endpoint);
507 if !p.is_empty() {
508 write!(url, "&prefix={}", percent_encode_path(p.as_str()))
509 .expect("write into string must succeed");
510 }
511 if !delimiter.is_empty() {
512 write!(url, "&delimiter={}", delimiter).expect("write into string must succeed");
513 }
514
515 if let Some(limit) = limit {
516 write!(url, "&max-keys={}", limit).expect("write into string must succeed");
517 }
518 if !key_marker.is_empty() {
519 write!(url, "&key-marker={}", percent_encode_path(key_marker))
520 .expect("write into string must succeed");
521 }
522 if !version_id_marker.is_empty() {
523 write!(
524 url,
525 "&version-id-marker={}",
526 percent_encode_path(version_id_marker)
527 )
528 .expect("write into string must succeed");
529 }
530
531 let mut req = Request::get(&url)
532 .body(Buffer::new())
533 .map_err(new_request_build_error)?;
534
535 self.sign(&mut req).await?;
536
537 self.send(req).await
538 }
539}
540
541#[derive(Default, Debug, Deserialize)]
543#[serde(default, rename_all = "PascalCase")]
544pub struct InitiateMultipartUploadResult {
545 pub upload_id: String,
546}
547
548#[derive(Default, Debug, Serialize)]
550#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
551pub struct CompleteMultipartUploadRequest {
552 pub part: Vec<CompleteMultipartUploadRequestPart>,
553}
554
555#[derive(Clone, Default, Debug, Serialize)]
556#[serde(default, rename_all = "PascalCase")]
557pub struct CompleteMultipartUploadRequestPart {
558 #[serde(rename = "PartNumber")]
559 pub part_number: usize,
560 #[serde(rename = "ETag")]
589 pub etag: String,
590}
591
592#[derive(Debug, Default, Deserialize)]
594#[serde[default, rename_all = "PascalCase"]]
595pub struct CompleteMultipartUploadResult {
596 pub location: String,
597 pub bucket: String,
598 pub key: String,
599 #[serde(rename = "ETag")]
600 pub etag: String,
601}
602
603#[derive(Default, Debug, Deserialize)]
604#[serde(default, rename_all = "PascalCase")]
605pub struct ListObjectsOutput {
606 pub name: String,
607 pub prefix: String,
608 pub contents: Vec<ListObjectsOutputContent>,
609 pub common_prefixes: Vec<CommonPrefix>,
610 pub marker: String,
611 pub next_marker: Option<String>,
612}
613
614#[derive(Default, Debug, Deserialize)]
615#[serde(default, rename_all = "PascalCase")]
616pub struct CommonPrefix {
617 pub prefix: String,
618}
619
620#[derive(Default, Debug, Deserialize)]
621#[serde(default, rename_all = "PascalCase")]
622pub struct ListObjectsOutputContent {
623 pub key: String,
624 pub size: u64,
625}
626
627#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
628#[serde(rename_all = "PascalCase")]
629pub struct OutputCommonPrefix {
630 pub prefix: String,
631}
632
633#[derive(Default, Debug, Deserialize)]
635#[serde(default, rename_all = "PascalCase")]
636pub struct ListObjectVersionsOutput {
637 pub is_truncated: Option<bool>,
638 pub next_key_marker: Option<String>,
639 pub next_version_id_marker: Option<String>,
640 pub common_prefixes: Vec<OutputCommonPrefix>,
641 pub version: Vec<ListObjectVersionsOutputVersion>,
642 pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
643}
644
645#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
646#[serde(rename_all = "PascalCase")]
647pub struct ListObjectVersionsOutputVersion {
648 pub key: String,
649 pub version_id: String,
650 pub is_latest: bool,
651 pub size: u64,
652 pub last_modified: String,
653 #[serde(rename = "ETag")]
654 pub etag: Option<String>,
655}
656
657#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
658#[serde(rename_all = "PascalCase")]
659pub struct ListObjectVersionsOutputDeleteMarker {
660 pub key: String,
661 pub version_id: String,
662 pub is_latest: bool,
663 pub last_modified: String,
664}
665
666#[cfg(test)]
667mod tests {
668 use bytes::Buf;
669
670 use super::*;
671
672 #[test]
673 fn test_parse_xml() {
674 let bs = bytes::Bytes::from(
675 r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
676<ListBucketResult>
677 <Name>examplebucket</Name>
678 <Prefix>obj</Prefix>
679 <Marker>obj002</Marker>
680 <NextMarker>obj004</NextMarker>
681 <MaxKeys>1000</MaxKeys>
682 <IsTruncated>false</IsTruncated>
683 <Contents>
684 <Key>obj002</Key>
685 <LastModified>2015-07-01T02:11:19.775Z</LastModified>
686 <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
687 <Size>9</Size>
688 <Owner>
689 <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
690 </Owner>
691 <StorageClass>STANDARD</StorageClass>
692 </Contents>
693 <Contents>
694 <Key>obj003</Key>
695 <LastModified>2015-07-01T02:11:19.775Z</LastModified>
696 <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
697 <Size>10</Size>
698 <Owner>
699 <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
700 </Owner>
701 <StorageClass>STANDARD</StorageClass>
702 </Contents>
703 <CommonPrefixes>
704 <Prefix>hello</Prefix>
705 </CommonPrefixes>
706 <CommonPrefixes>
707 <Prefix>world</Prefix>
708 </CommonPrefixes>
709</ListBucketResult>"#,
710 );
711 let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
712
713 assert_eq!(out.name, "examplebucket".to_string());
714 assert_eq!(out.prefix, "obj".to_string());
715 assert_eq!(out.marker, "obj002".to_string());
716 assert_eq!(out.next_marker, Some("obj004".to_string()),);
717 assert_eq!(
718 out.contents
719 .iter()
720 .map(|v| v.key.clone())
721 .collect::<Vec<String>>(),
722 ["obj002", "obj003"],
723 );
724 assert_eq!(
725 out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
726 [9, 10],
727 );
728 assert_eq!(
729 out.common_prefixes
730 .iter()
731 .map(|v| v.prefix.clone())
732 .collect::<Vec<String>>(),
733 ["hello", "world"],
734 )
735 }
736}