1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Bytes;
24use http::header::CACHE_CONTROL;
25use http::header::CONTENT_DISPOSITION;
26use http::header::CONTENT_LENGTH;
27use http::header::CONTENT_TYPE;
28use http::header::IF_MATCH;
29use http::header::IF_MODIFIED_SINCE;
30use http::header::IF_NONE_MATCH;
31use http::header::IF_UNMODIFIED_SINCE;
32use http::Request;
33use http::Response;
34use reqsign::TencentCosCredential;
35use reqsign::TencentCosCredentialLoader;
36use reqsign::TencentCosSigner;
37use serde::Deserialize;
38use serde::Serialize;
39
40use crate::raw::*;
41use crate::*;
42
43pub mod constants {
44 pub const COS_QUERY_VERSION_ID: &str = "versionId";
45
46 pub const X_COS_VERSION_ID: &str = "x-cos-version-id";
47}
48
49pub struct CosCore {
50 pub info: Arc<AccessorInfo>,
51 pub bucket: String,
52 pub root: String,
53 pub endpoint: String,
54
55 pub signer: TencentCosSigner,
56 pub loader: TencentCosCredentialLoader,
57}
58
59impl Debug for CosCore {
60 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct("Backend")
62 .field("root", &self.root)
63 .field("bucket", &self.bucket)
64 .field("endpoint", &self.endpoint)
65 .finish_non_exhaustive()
66 }
67}
68
69impl CosCore {
70 async fn load_credential(&self) -> Result<Option<TencentCosCredential>> {
71 let cred = self
72 .loader
73 .load()
74 .await
75 .map_err(new_request_credential_error)?;
76
77 if let Some(cred) = cred {
78 return Ok(Some(cred));
79 }
80
81 Err(Error::new(
82 ErrorKind::PermissionDenied,
83 "no valid credential found and anonymous access is not allowed",
84 ))
85 }
86
87 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
88 let cred = if let Some(cred) = self.load_credential().await? {
89 cred
90 } else {
91 return Ok(());
92 };
93
94 self.signer.sign(req, &cred).map_err(new_request_sign_error)
95 }
96
97 pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
98 let cred = if let Some(cred) = self.load_credential().await? {
99 cred
100 } else {
101 return Ok(());
102 };
103
104 self.signer
105 .sign_query(req, duration, &cred)
106 .map_err(new_request_sign_error)
107 }
108
109 #[inline]
110 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
111 self.info.http_client().send(req).await
112 }
113}
114
115impl CosCore {
116 pub async fn cos_get_object(
117 &self,
118 path: &str,
119 range: BytesRange,
120 args: &OpRead,
121 ) -> Result<Response<HttpBody>> {
122 let mut req = self.cos_get_object_request(path, range, args)?;
123
124 self.sign(&mut req).await?;
125
126 self.info.http_client().fetch(req).await
127 }
128
129 pub fn cos_get_object_request(
130 &self,
131 path: &str,
132 range: BytesRange,
133 args: &OpRead,
134 ) -> Result<Request<Buffer>> {
135 let p = build_abs_path(&self.root, path);
136
137 let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
138
139 let mut query_args = Vec::new();
140 if let Some(version) = args.version() {
141 query_args.push(format!(
142 "{}={}",
143 constants::COS_QUERY_VERSION_ID,
144 percent_decode_path(version)
145 ))
146 }
147 if !query_args.is_empty() {
148 url.push_str(&format!("?{}", query_args.join("&")));
149 }
150
151 let mut req = Request::get(&url);
152
153 if let Some(if_match) = args.if_match() {
154 req = req.header(IF_MATCH, if_match);
155 }
156
157 if !range.is_full() {
158 req = req.header(http::header::RANGE, range.to_header())
159 }
160
161 if let Some(if_none_match) = args.if_none_match() {
162 req = req.header(IF_NONE_MATCH, if_none_match);
163 }
164
165 if let Some(if_modified_since) = args.if_modified_since() {
166 req = req.header(
167 IF_MODIFIED_SINCE,
168 format_datetime_into_http_date(if_modified_since),
169 );
170 }
171
172 if let Some(if_unmodified_since) = args.if_unmodified_since() {
173 req = req.header(
174 IF_UNMODIFIED_SINCE,
175 format_datetime_into_http_date(if_unmodified_since),
176 );
177 }
178
179 let req = req.extension(Operation::Read);
180
181 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
182
183 Ok(req)
184 }
185
186 pub fn cos_put_object_request(
187 &self,
188 path: &str,
189 size: Option<u64>,
190 args: &OpWrite,
191 body: Buffer,
192 ) -> Result<Request<Buffer>> {
193 let p = build_abs_path(&self.root, path);
194
195 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
196
197 let mut req = Request::put(&url);
198
199 if let Some(size) = size {
200 req = req.header(CONTENT_LENGTH, size)
201 }
202 if let Some(cache_control) = args.cache_control() {
203 req = req.header(CACHE_CONTROL, cache_control)
204 }
205 if let Some(pos) = args.content_disposition() {
206 req = req.header(CONTENT_DISPOSITION, pos)
207 }
208 if let Some(mime) = args.content_type() {
209 req = req.header(CONTENT_TYPE, mime)
210 }
211
212 if args.if_not_exists() {
221 req = req.header("x-cos-forbid-overwrite", "true")
222 }
223
224 if let Some(user_metadata) = args.user_metadata() {
226 for (key, value) in user_metadata {
227 req = req.header(format!("x-cos-meta-{key}"), value)
228 }
229 }
230
231 let req = req.extension(Operation::Write);
232
233 let req = req.body(body).map_err(new_request_build_error)?;
234
235 Ok(req)
236 }
237
238 pub async fn cos_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
239 let mut req = self.cos_head_object_request(path, args)?;
240
241 self.sign(&mut req).await?;
242
243 self.send(req).await
244 }
245
246 pub fn cos_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
247 let p = build_abs_path(&self.root, path);
248
249 let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
250
251 let mut query_args = Vec::new();
252 if let Some(version) = args.version() {
253 query_args.push(format!(
254 "{}={}",
255 constants::COS_QUERY_VERSION_ID,
256 percent_decode_path(version)
257 ))
258 }
259 if !query_args.is_empty() {
260 url.push_str(&format!("?{}", query_args.join("&")));
261 }
262
263 let mut req = Request::head(&url);
264
265 if let Some(if_match) = args.if_match() {
266 req = req.header(IF_MATCH, if_match);
267 }
268
269 if let Some(if_none_match) = args.if_none_match() {
270 req = req.header(IF_NONE_MATCH, if_none_match);
271 }
272
273 let req = req.extension(Operation::Stat);
274
275 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
276
277 Ok(req)
278 }
279
280 pub async fn cos_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
281 let p = build_abs_path(&self.root, path);
282
283 let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
284
285 let mut query_args = Vec::new();
286 if let Some(version) = args.version() {
287 query_args.push(format!(
288 "{}={}",
289 constants::COS_QUERY_VERSION_ID,
290 percent_decode_path(version)
291 ))
292 }
293 if !query_args.is_empty() {
294 url.push_str(&format!("?{}", query_args.join("&")));
295 }
296
297 let req = Request::delete(&url);
298
299 let req = req.extension(Operation::Delete);
300
301 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
302
303 self.sign(&mut req).await?;
304
305 self.send(req).await
306 }
307
308 pub fn cos_append_object_request(
309 &self,
310 path: &str,
311 position: u64,
312 size: u64,
313 args: &OpWrite,
314 body: Buffer,
315 ) -> Result<Request<Buffer>> {
316 let p = build_abs_path(&self.root, path);
317 let url = format!(
318 "{}/{}?append&position={}",
319 self.endpoint,
320 percent_encode_path(&p),
321 position
322 );
323
324 let mut req = Request::post(&url);
325
326 req = req.header(CONTENT_LENGTH, size);
327
328 if let Some(mime) = args.content_type() {
329 req = req.header(CONTENT_TYPE, mime);
330 }
331
332 if let Some(pos) = args.content_disposition() {
333 req = req.header(CONTENT_DISPOSITION, pos);
334 }
335
336 if let Some(cache_control) = args.cache_control() {
337 req = req.header(CACHE_CONTROL, cache_control)
338 }
339
340 let req = req.extension(Operation::Write);
341
342 let req = req.body(body).map_err(new_request_build_error)?;
343 Ok(req)
344 }
345
346 pub async fn cos_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
347 let source = build_abs_path(&self.root, from);
348 let target = build_abs_path(&self.root, to);
349
350 let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
351 let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
352
353 let mut req = Request::put(&url)
354 .extension(Operation::Copy)
355 .header("x-cos-copy-source", &source)
356 .body(Buffer::new())
357 .map_err(new_request_build_error)?;
358
359 self.sign(&mut req).await?;
360
361 self.send(req).await
362 }
363
364 pub async fn cos_list_objects(
365 &self,
366 path: &str,
367 next_marker: &str,
368 delimiter: &str,
369 limit: Option<usize>,
370 ) -> Result<Response<Buffer>> {
371 let p = build_abs_path(&self.root, path);
372
373 let mut url = QueryPairsWriter::new(&self.endpoint);
374
375 if !p.is_empty() {
376 url = url.push("prefix", &percent_encode_path(&p));
377 }
378 if !delimiter.is_empty() {
379 url = url.push("delimiter", delimiter);
380 }
381 if let Some(limit) = limit {
382 url = url.push("max-keys", &limit.to_string());
383 }
384 if !next_marker.is_empty() {
385 url = url.push("marker", next_marker);
386 }
387
388 let mut req = Request::get(url.finish())
389 .extension(Operation::List)
390 .body(Buffer::new())
391 .map_err(new_request_build_error)?;
392
393 self.sign(&mut req).await?;
394
395 self.send(req).await
396 }
397
398 pub async fn cos_initiate_multipart_upload(
399 &self,
400 path: &str,
401 args: &OpWrite,
402 ) -> Result<Response<Buffer>> {
403 let p = build_abs_path(&self.root, path);
404
405 let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
406
407 let mut req = Request::post(&url);
408
409 if let Some(mime) = args.content_type() {
410 req = req.header(CONTENT_TYPE, mime)
411 }
412
413 if let Some(content_disposition) = args.content_disposition() {
414 req = req.header(CONTENT_DISPOSITION, content_disposition)
415 }
416
417 if let Some(cache_control) = args.cache_control() {
418 req = req.header(CACHE_CONTROL, cache_control)
419 }
420
421 if let Some(user_metadata) = args.user_metadata() {
423 for (key, value) in user_metadata {
424 req = req.header(format!("x-cos-meta-{key}"), value)
425 }
426 }
427
428 let req = req.extension(Operation::Write);
429
430 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
431
432 self.sign(&mut req).await?;
433
434 self.send(req).await
435 }
436
437 pub async fn cos_upload_part_request(
438 &self,
439 path: &str,
440 upload_id: &str,
441 part_number: usize,
442 size: u64,
443 body: Buffer,
444 ) -> Result<Response<Buffer>> {
445 let p = build_abs_path(&self.root, path);
446
447 let url = format!(
448 "{}/{}?partNumber={}&uploadId={}",
449 self.endpoint,
450 percent_encode_path(&p),
451 part_number,
452 percent_encode_path(upload_id)
453 );
454
455 let mut req = Request::put(&url);
456 req = req.header(CONTENT_LENGTH, size);
457
458 let req = req.extension(Operation::Write);
459
460 let mut req = req.body(body).map_err(new_request_build_error)?;
462
463 self.sign(&mut req).await?;
464
465 self.send(req).await
466 }
467
468 pub async fn cos_complete_multipart_upload(
469 &self,
470 path: &str,
471 upload_id: &str,
472 parts: Vec<CompleteMultipartUploadRequestPart>,
473 ) -> Result<Response<Buffer>> {
474 let p = build_abs_path(&self.root, path);
475
476 let url = format!(
477 "{}/{}?uploadId={}",
478 self.endpoint,
479 percent_encode_path(&p),
480 percent_encode_path(upload_id)
481 );
482
483 let req = Request::post(&url);
484
485 let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts })
486 .map_err(new_xml_serialize_error)?;
487 let req = req.header(CONTENT_LENGTH, content.len());
489 let req = req.header(CONTENT_TYPE, "application/xml");
491
492 let req = req.extension(Operation::Write);
493
494 let mut req = req
495 .body(Buffer::from(Bytes::from(content)))
496 .map_err(new_request_build_error)?;
497
498 self.sign(&mut req).await?;
499
500 self.send(req).await
501 }
502
503 pub async fn cos_abort_multipart_upload(
505 &self,
506 path: &str,
507 upload_id: &str,
508 ) -> Result<Response<Buffer>> {
509 let p = build_abs_path(&self.root, path);
510
511 let url = format!(
512 "{}/{}?uploadId={}",
513 self.endpoint,
514 percent_encode_path(&p),
515 percent_encode_path(upload_id)
516 );
517
518 let mut req = Request::delete(&url)
519 .extension(Operation::Delete)
520 .body(Buffer::new())
521 .map_err(new_request_build_error)?;
522 self.sign(&mut req).await?;
523 self.send(req).await
524 }
525
526 pub async fn cos_list_object_versions(
527 &self,
528 prefix: &str,
529 delimiter: &str,
530 limit: Option<usize>,
531 key_marker: &str,
532 version_id_marker: &str,
533 ) -> Result<Response<Buffer>> {
534 let p = build_abs_path(&self.root, prefix);
535
536 let mut url = QueryPairsWriter::new(&self.endpoint);
537 url = url.push("versions", "");
538 if !p.is_empty() {
539 url = url.push("prefix", &percent_encode_path(p.as_str()));
540 }
541 if !delimiter.is_empty() {
542 url = url.push("delimiter", delimiter);
543 }
544
545 if let Some(limit) = limit {
546 url = url.push("max-keys", &limit.to_string());
547 }
548 if !key_marker.is_empty() {
549 url = url.push("key-marker", &percent_encode_path(key_marker));
550 }
551 if !version_id_marker.is_empty() {
552 url = url.push("version-id-marker", &percent_encode_path(version_id_marker));
553 }
554
555 let mut req = Request::get(url.finish())
556 .extension(Operation::List)
557 .body(Buffer::new())
558 .map_err(new_request_build_error)?;
559
560 self.sign(&mut req).await?;
561
562 self.send(req).await
563 }
564}
565
566#[derive(Default, Debug, Deserialize)]
568#[serde(default, rename_all = "PascalCase")]
569pub struct InitiateMultipartUploadResult {
570 pub upload_id: String,
571}
572
573#[derive(Default, Debug, Serialize)]
575#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
576pub struct CompleteMultipartUploadRequest {
577 pub part: Vec<CompleteMultipartUploadRequestPart>,
578}
579
580#[derive(Clone, Default, Debug, Serialize)]
581#[serde(default, rename_all = "PascalCase")]
582pub struct CompleteMultipartUploadRequestPart {
583 #[serde(rename = "PartNumber")]
584 pub part_number: usize,
585 #[serde(rename = "ETag")]
614 pub etag: String,
615}
616
617#[derive(Debug, Default, Deserialize)]
619#[serde[default, rename_all = "PascalCase"]]
620pub struct CompleteMultipartUploadResult {
621 pub location: String,
622 pub bucket: String,
623 pub key: String,
624 #[serde(rename = "ETag")]
625 pub etag: String,
626}
627
628#[derive(Default, Debug, Deserialize)]
629#[serde(default, rename_all = "PascalCase")]
630pub struct ListObjectsOutput {
631 pub name: String,
632 pub prefix: String,
633 pub contents: Vec<ListObjectsOutputContent>,
634 pub common_prefixes: Vec<CommonPrefix>,
635 pub marker: String,
636 pub next_marker: Option<String>,
637}
638
639#[derive(Default, Debug, Deserialize)]
640#[serde(default, rename_all = "PascalCase")]
641pub struct CommonPrefix {
642 pub prefix: String,
643}
644
645#[derive(Default, Debug, Deserialize)]
646#[serde(default, rename_all = "PascalCase")]
647pub struct ListObjectsOutputContent {
648 pub key: String,
649 pub size: u64,
650}
651
652#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
653#[serde(rename_all = "PascalCase")]
654pub struct OutputCommonPrefix {
655 pub prefix: String,
656}
657
658#[derive(Default, Debug, Deserialize)]
660#[serde(default, rename_all = "PascalCase")]
661pub struct ListObjectVersionsOutput {
662 pub is_truncated: Option<bool>,
663 pub next_key_marker: Option<String>,
664 pub next_version_id_marker: Option<String>,
665 pub common_prefixes: Vec<OutputCommonPrefix>,
666 pub version: Vec<ListObjectVersionsOutputVersion>,
667 pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
668}
669
670#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
671#[serde(rename_all = "PascalCase")]
672pub struct ListObjectVersionsOutputVersion {
673 pub key: String,
674 pub version_id: String,
675 pub is_latest: bool,
676 pub size: u64,
677 pub last_modified: String,
678 #[serde(rename = "ETag")]
679 pub etag: Option<String>,
680}
681
682#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
683#[serde(rename_all = "PascalCase")]
684pub struct ListObjectVersionsOutputDeleteMarker {
685 pub key: String,
686 pub version_id: String,
687 pub is_latest: bool,
688 pub last_modified: String,
689}
690
691#[cfg(test)]
692mod tests {
693 use bytes::Buf;
694
695 use super::*;
696
697 #[test]
698 fn test_parse_xml() {
699 let bs = bytes::Bytes::from(
700 r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
701<ListBucketResult>
702 <Name>examplebucket</Name>
703 <Prefix>obj</Prefix>
704 <Marker>obj002</Marker>
705 <NextMarker>obj004</NextMarker>
706 <MaxKeys>1000</MaxKeys>
707 <IsTruncated>false</IsTruncated>
708 <Contents>
709 <Key>obj002</Key>
710 <LastModified>2015-07-01T02:11:19.775Z</LastModified>
711 <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
712 <Size>9</Size>
713 <Owner>
714 <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
715 </Owner>
716 <StorageClass>STANDARD</StorageClass>
717 </Contents>
718 <Contents>
719 <Key>obj003</Key>
720 <LastModified>2015-07-01T02:11:19.775Z</LastModified>
721 <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
722 <Size>10</Size>
723 <Owner>
724 <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
725 </Owner>
726 <StorageClass>STANDARD</StorageClass>
727 </Contents>
728 <CommonPrefixes>
729 <Prefix>hello</Prefix>
730 </CommonPrefixes>
731 <CommonPrefixes>
732 <Prefix>world</Prefix>
733 </CommonPrefixes>
734</ListBucketResult>"#,
735 );
736 let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
737
738 assert_eq!(out.name, "examplebucket".to_string());
739 assert_eq!(out.prefix, "obj".to_string());
740 assert_eq!(out.marker, "obj002".to_string());
741 assert_eq!(out.next_marker, Some("obj004".to_string()),);
742 assert_eq!(
743 out.contents
744 .iter()
745 .map(|v| v.key.clone())
746 .collect::<Vec<String>>(),
747 ["obj002", "obj003"],
748 );
749 assert_eq!(
750 out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
751 [9, 10],
752 );
753 assert_eq!(
754 out.common_prefixes
755 .iter()
756 .map(|v| v.prefix.clone())
757 .collect::<Vec<String>>(),
758 ["hello", "world"],
759 )
760 }
761}