1use std::fmt::Debug;
19use std::sync::Arc;
20use std::time::Duration;
21
22use bytes::Bytes;
23use http::Request;
24use http::Response;
25use http::header::CACHE_CONTROL;
26use http::header::CONTENT_DISPOSITION;
27use http::header::CONTENT_LENGTH;
28use http::header::CONTENT_TYPE;
29use http::header::IF_MATCH;
30use http::header::IF_MODIFIED_SINCE;
31use http::header::IF_NONE_MATCH;
32use http::header::IF_UNMODIFIED_SINCE;
33use reqsign::TencentCosCredential;
34use reqsign::TencentCosCredentialLoader;
35use reqsign::TencentCosSigner;
36use serde::Deserialize;
37use serde::Serialize;
38
39use crate::raw::*;
40use crate::*;
41
42pub mod constants {
43 pub const COS_QUERY_VERSION_ID: &str = "versionId";
44
45 pub const X_COS_VERSION_ID: &str = "x-cos-version-id";
46}
47
48pub struct CosCore {
49 pub info: Arc<AccessorInfo>,
50 pub bucket: String,
51 pub root: String,
52 pub endpoint: String,
53
54 pub signer: TencentCosSigner,
55 pub loader: TencentCosCredentialLoader,
56}
57
58impl Debug for CosCore {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 f.debug_struct("CosCore")
61 .field("root", &self.root)
62 .field("bucket", &self.bucket)
63 .field("endpoint", &self.endpoint)
64 .finish_non_exhaustive()
65 }
66}
67
68impl CosCore {
69 async fn load_credential(&self) -> Result<Option<TencentCosCredential>> {
70 let cred = self
71 .loader
72 .load()
73 .await
74 .map_err(new_request_credential_error)?;
75
76 if let Some(cred) = cred {
77 return Ok(Some(cred));
78 }
79
80 Err(Error::new(
81 ErrorKind::PermissionDenied,
82 "no valid credential found and anonymous access is not allowed",
83 ))
84 }
85
86 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
87 let cred = if let Some(cred) = self.load_credential().await? {
88 cred
89 } else {
90 return Ok(());
91 };
92
93 self.signer.sign(req, &cred).map_err(new_request_sign_error)
94 }
95
96 pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
97 let cred = if let Some(cred) = self.load_credential().await? {
98 cred
99 } else {
100 return Ok(());
101 };
102
103 self.signer
104 .sign_query(req, duration, &cred)
105 .map_err(new_request_sign_error)
106 }
107
108 #[inline]
109 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
110 self.info.http_client().send(req).await
111 }
112}
113
114impl CosCore {
115 pub async fn cos_get_object(
116 &self,
117 path: &str,
118 range: BytesRange,
119 args: &OpRead,
120 ) -> Result<Response<HttpBody>> {
121 let mut req = self.cos_get_object_request(path, range, args)?;
122
123 self.sign(&mut req).await?;
124
125 self.info.http_client().fetch(req).await
126 }
127
128 pub fn cos_get_object_request(
129 &self,
130 path: &str,
131 range: BytesRange,
132 args: &OpRead,
133 ) -> Result<Request<Buffer>> {
134 let p = build_abs_path(&self.root, path);
135
136 let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
137
138 let mut query_args = Vec::new();
139 if let Some(version) = args.version() {
140 query_args.push(format!(
141 "{}={}",
142 constants::COS_QUERY_VERSION_ID,
143 percent_decode_path(version)
144 ))
145 }
146 if !query_args.is_empty() {
147 url.push_str(&format!("?{}", query_args.join("&")));
148 }
149
150 let mut req = Request::get(&url);
151
152 if let Some(if_match) = args.if_match() {
153 req = req.header(IF_MATCH, if_match);
154 }
155
156 if !range.is_full() {
157 req = req.header(http::header::RANGE, range.to_header())
158 }
159
160 if let Some(if_none_match) = args.if_none_match() {
161 req = req.header(IF_NONE_MATCH, if_none_match);
162 }
163
164 if let Some(if_modified_since) = args.if_modified_since() {
165 req = req.header(IF_MODIFIED_SINCE, if_modified_since.format_http_date());
166 }
167
168 if let Some(if_unmodified_since) = args.if_unmodified_since() {
169 req = req.header(IF_UNMODIFIED_SINCE, if_unmodified_since.format_http_date());
170 }
171
172 let req = req.extension(Operation::Read);
173
174 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
175
176 Ok(req)
177 }
178
179 pub fn cos_put_object_request(
180 &self,
181 path: &str,
182 size: Option<u64>,
183 args: &OpWrite,
184 body: Buffer,
185 ) -> Result<Request<Buffer>> {
186 let p = build_abs_path(&self.root, path);
187
188 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
189
190 let mut req = Request::put(&url);
191
192 if let Some(size) = size {
193 req = req.header(CONTENT_LENGTH, size)
194 }
195 if let Some(cache_control) = args.cache_control() {
196 req = req.header(CACHE_CONTROL, cache_control)
197 }
198 if let Some(pos) = args.content_disposition() {
199 req = req.header(CONTENT_DISPOSITION, pos)
200 }
201 if let Some(mime) = args.content_type() {
202 req = req.header(CONTENT_TYPE, mime)
203 }
204
205 if args.if_not_exists() {
214 req = req.header("x-cos-forbid-overwrite", "true")
215 }
216
217 if let Some(user_metadata) = args.user_metadata() {
219 for (key, value) in user_metadata {
220 req = req.header(format!("x-cos-meta-{key}"), value)
221 }
222 }
223
224 let req = req.extension(Operation::Write);
225
226 let req = req.body(body).map_err(new_request_build_error)?;
227
228 Ok(req)
229 }
230
231 pub async fn cos_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
232 let mut req = self.cos_head_object_request(path, args)?;
233
234 self.sign(&mut req).await?;
235
236 self.send(req).await
237 }
238
239 pub fn cos_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
240 let p = build_abs_path(&self.root, path);
241
242 let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
243
244 let mut query_args = Vec::new();
245 if let Some(version) = args.version() {
246 query_args.push(format!(
247 "{}={}",
248 constants::COS_QUERY_VERSION_ID,
249 percent_decode_path(version)
250 ))
251 }
252 if !query_args.is_empty() {
253 url.push_str(&format!("?{}", query_args.join("&")));
254 }
255
256 let mut req = Request::head(&url);
257
258 if let Some(if_match) = args.if_match() {
259 req = req.header(IF_MATCH, if_match);
260 }
261
262 if let Some(if_none_match) = args.if_none_match() {
263 req = req.header(IF_NONE_MATCH, if_none_match);
264 }
265
266 let req = req.extension(Operation::Stat);
267
268 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
269
270 Ok(req)
271 }
272
273 pub async fn cos_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
274 let p = build_abs_path(&self.root, path);
275
276 let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
277
278 let mut query_args = Vec::new();
279 if let Some(version) = args.version() {
280 query_args.push(format!(
281 "{}={}",
282 constants::COS_QUERY_VERSION_ID,
283 percent_decode_path(version)
284 ))
285 }
286 if !query_args.is_empty() {
287 url.push_str(&format!("?{}", query_args.join("&")));
288 }
289
290 let req = Request::delete(&url);
291
292 let req = req.extension(Operation::Delete);
293
294 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
295
296 self.sign(&mut req).await?;
297
298 self.send(req).await
299 }
300
301 pub fn cos_append_object_request(
302 &self,
303 path: &str,
304 position: u64,
305 size: u64,
306 args: &OpWrite,
307 body: Buffer,
308 ) -> Result<Request<Buffer>> {
309 let p = build_abs_path(&self.root, path);
310 let url = format!(
311 "{}/{}?append&position={}",
312 self.endpoint,
313 percent_encode_path(&p),
314 position
315 );
316
317 let mut req = Request::post(&url);
318
319 req = req.header(CONTENT_LENGTH, size);
320
321 if let Some(mime) = args.content_type() {
322 req = req.header(CONTENT_TYPE, mime);
323 }
324
325 if let Some(pos) = args.content_disposition() {
326 req = req.header(CONTENT_DISPOSITION, pos);
327 }
328
329 if let Some(cache_control) = args.cache_control() {
330 req = req.header(CACHE_CONTROL, cache_control)
331 }
332
333 let req = req.extension(Operation::Write);
334
335 let req = req.body(body).map_err(new_request_build_error)?;
336 Ok(req)
337 }
338
339 pub async fn cos_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
340 let source = build_abs_path(&self.root, from);
341 let target = build_abs_path(&self.root, to);
342
343 let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
344 let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
345
346 let mut req = Request::put(&url)
347 .extension(Operation::Copy)
348 .header("x-cos-copy-source", &source)
349 .body(Buffer::new())
350 .map_err(new_request_build_error)?;
351
352 self.sign(&mut req).await?;
353
354 self.send(req).await
355 }
356
357 pub async fn cos_list_objects(
358 &self,
359 path: &str,
360 next_marker: &str,
361 delimiter: &str,
362 limit: Option<usize>,
363 ) -> Result<Response<Buffer>> {
364 let p = build_abs_path(&self.root, path);
365
366 let mut url = QueryPairsWriter::new(&self.endpoint);
367
368 if !p.is_empty() {
369 url = url.push("prefix", &percent_encode_path(&p));
370 }
371 if !delimiter.is_empty() {
372 url = url.push("delimiter", delimiter);
373 }
374 if let Some(limit) = limit {
375 url = url.push("max-keys", &limit.to_string());
376 }
377 if !next_marker.is_empty() {
378 url = url.push("marker", next_marker);
379 }
380
381 let mut req = Request::get(url.finish())
382 .extension(Operation::List)
383 .body(Buffer::new())
384 .map_err(new_request_build_error)?;
385
386 self.sign(&mut req).await?;
387
388 self.send(req).await
389 }
390
391 pub async fn cos_initiate_multipart_upload(
392 &self,
393 path: &str,
394 args: &OpWrite,
395 ) -> Result<Response<Buffer>> {
396 let p = build_abs_path(&self.root, path);
397
398 let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
399
400 let mut req = Request::post(&url);
401
402 if let Some(mime) = args.content_type() {
403 req = req.header(CONTENT_TYPE, mime)
404 }
405
406 if let Some(content_disposition) = args.content_disposition() {
407 req = req.header(CONTENT_DISPOSITION, content_disposition)
408 }
409
410 if let Some(cache_control) = args.cache_control() {
411 req = req.header(CACHE_CONTROL, cache_control)
412 }
413
414 if let Some(user_metadata) = args.user_metadata() {
416 for (key, value) in user_metadata {
417 req = req.header(format!("x-cos-meta-{key}"), value)
418 }
419 }
420
421 let req = req.extension(Operation::Write);
422
423 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
424
425 self.sign(&mut req).await?;
426
427 self.send(req).await
428 }
429
430 pub async fn cos_upload_part_request(
431 &self,
432 path: &str,
433 upload_id: &str,
434 part_number: usize,
435 size: u64,
436 body: Buffer,
437 ) -> Result<Response<Buffer>> {
438 let p = build_abs_path(&self.root, path);
439
440 let url = format!(
441 "{}/{}?partNumber={}&uploadId={}",
442 self.endpoint,
443 percent_encode_path(&p),
444 part_number,
445 percent_encode_path(upload_id)
446 );
447
448 let mut req = Request::put(&url);
449 req = req.header(CONTENT_LENGTH, size);
450
451 let req = req.extension(Operation::Write);
452
453 let mut req = req.body(body).map_err(new_request_build_error)?;
455
456 self.sign(&mut req).await?;
457
458 self.send(req).await
459 }
460
461 pub async fn cos_complete_multipart_upload(
462 &self,
463 path: &str,
464 upload_id: &str,
465 parts: Vec<CompleteMultipartUploadRequestPart>,
466 ) -> Result<Response<Buffer>> {
467 let p = build_abs_path(&self.root, path);
468
469 let url = format!(
470 "{}/{}?uploadId={}",
471 self.endpoint,
472 percent_encode_path(&p),
473 percent_encode_path(upload_id)
474 );
475
476 let req = Request::post(&url);
477
478 let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts })
479 .map_err(new_xml_serialize_error)?;
480 let req = req.header(CONTENT_LENGTH, content.len());
482 let req = req.header(CONTENT_TYPE, "application/xml");
484
485 let req = req.extension(Operation::Write);
486
487 let mut req = req
488 .body(Buffer::from(Bytes::from(content)))
489 .map_err(new_request_build_error)?;
490
491 self.sign(&mut req).await?;
492
493 self.send(req).await
494 }
495
496 pub async fn cos_abort_multipart_upload(
498 &self,
499 path: &str,
500 upload_id: &str,
501 ) -> Result<Response<Buffer>> {
502 let p = build_abs_path(&self.root, path);
503
504 let url = format!(
505 "{}/{}?uploadId={}",
506 self.endpoint,
507 percent_encode_path(&p),
508 percent_encode_path(upload_id)
509 );
510
511 let mut req = Request::delete(&url)
512 .extension(Operation::Delete)
513 .body(Buffer::new())
514 .map_err(new_request_build_error)?;
515 self.sign(&mut req).await?;
516 self.send(req).await
517 }
518
519 pub async fn cos_list_object_versions(
520 &self,
521 prefix: &str,
522 delimiter: &str,
523 limit: Option<usize>,
524 key_marker: &str,
525 version_id_marker: &str,
526 ) -> Result<Response<Buffer>> {
527 let p = build_abs_path(&self.root, prefix);
528
529 let mut url = QueryPairsWriter::new(&self.endpoint);
530 url = url.push("versions", "");
531 if !p.is_empty() {
532 url = url.push("prefix", &percent_encode_path(p.as_str()));
533 }
534 if !delimiter.is_empty() {
535 url = url.push("delimiter", delimiter);
536 }
537
538 if let Some(limit) = limit {
539 url = url.push("max-keys", &limit.to_string());
540 }
541 if !key_marker.is_empty() {
542 url = url.push("key-marker", &percent_encode_path(key_marker));
543 }
544 if !version_id_marker.is_empty() {
545 url = url.push("version-id-marker", &percent_encode_path(version_id_marker));
546 }
547
548 let mut req = Request::get(url.finish())
549 .extension(Operation::List)
550 .body(Buffer::new())
551 .map_err(new_request_build_error)?;
552
553 self.sign(&mut req).await?;
554
555 self.send(req).await
556 }
557}
558
559#[derive(Default, Debug, Deserialize)]
561#[serde(default, rename_all = "PascalCase")]
562pub struct InitiateMultipartUploadResult {
563 pub upload_id: String,
564}
565
566#[derive(Default, Debug, Serialize)]
568#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
569pub struct CompleteMultipartUploadRequest {
570 pub part: Vec<CompleteMultipartUploadRequestPart>,
571}
572
573#[derive(Clone, Default, Debug, Serialize)]
574#[serde(default, rename_all = "PascalCase")]
575pub struct CompleteMultipartUploadRequestPart {
576 #[serde(rename = "PartNumber")]
577 pub part_number: usize,
578 #[serde(rename = "ETag")]
607 pub etag: String,
608}
609
610#[derive(Debug, Default, Deserialize)]
612#[serde[default, rename_all = "PascalCase"]]
613pub struct CompleteMultipartUploadResult {
614 pub location: String,
615 pub bucket: String,
616 pub key: String,
617 #[serde(rename = "ETag")]
618 pub etag: String,
619}
620
621#[derive(Default, Debug, Deserialize)]
622#[serde(default, rename_all = "PascalCase")]
623pub struct ListObjectsOutput {
624 pub name: String,
625 pub prefix: String,
626 pub contents: Vec<ListObjectsOutputContent>,
627 pub common_prefixes: Vec<CommonPrefix>,
628 pub marker: String,
629 pub next_marker: Option<String>,
630}
631
632#[derive(Default, Debug, Deserialize)]
633#[serde(default, rename_all = "PascalCase")]
634pub struct CommonPrefix {
635 pub prefix: String,
636}
637
638#[derive(Default, Debug, Deserialize)]
639#[serde(default, rename_all = "PascalCase")]
640pub struct ListObjectsOutputContent {
641 pub key: String,
642 pub size: u64,
643}
644
645#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
646#[serde(rename_all = "PascalCase")]
647pub struct OutputCommonPrefix {
648 pub prefix: String,
649}
650
651#[derive(Default, Debug, Deserialize)]
653#[serde(default, rename_all = "PascalCase")]
654pub struct ListObjectVersionsOutput {
655 pub is_truncated: Option<bool>,
656 pub next_key_marker: Option<String>,
657 pub next_version_id_marker: Option<String>,
658 pub common_prefixes: Vec<OutputCommonPrefix>,
659 pub version: Vec<ListObjectVersionsOutputVersion>,
660 pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
661}
662
663#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
664#[serde(rename_all = "PascalCase")]
665pub struct ListObjectVersionsOutputVersion {
666 pub key: String,
667 pub version_id: String,
668 pub is_latest: bool,
669 pub size: u64,
670 pub last_modified: String,
671 #[serde(rename = "ETag")]
672 pub etag: Option<String>,
673}
674
675#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
676#[serde(rename_all = "PascalCase")]
677pub struct ListObjectVersionsOutputDeleteMarker {
678 pub key: String,
679 pub version_id: String,
680 pub is_latest: bool,
681 pub last_modified: String,
682}
683
684#[cfg(test)]
685mod tests {
686 use bytes::Buf;
687
688 use super::*;
689
690 #[test]
691 fn test_parse_xml() {
692 let bs = bytes::Bytes::from(
693 r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
694<ListBucketResult>
695 <Name>examplebucket</Name>
696 <Prefix>obj</Prefix>
697 <Marker>obj002</Marker>
698 <NextMarker>obj004</NextMarker>
699 <MaxKeys>1000</MaxKeys>
700 <IsTruncated>false</IsTruncated>
701 <Contents>
702 <Key>obj002</Key>
703 <LastModified>2015-07-01T02:11:19.775Z</LastModified>
704 <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
705 <Size>9</Size>
706 <Owner>
707 <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
708 </Owner>
709 <StorageClass>STANDARD</StorageClass>
710 </Contents>
711 <Contents>
712 <Key>obj003</Key>
713 <LastModified>2015-07-01T02:11:19.775Z</LastModified>
714 <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
715 <Size>10</Size>
716 <Owner>
717 <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
718 </Owner>
719 <StorageClass>STANDARD</StorageClass>
720 </Contents>
721 <CommonPrefixes>
722 <Prefix>hello</Prefix>
723 </CommonPrefixes>
724 <CommonPrefixes>
725 <Prefix>world</Prefix>
726 </CommonPrefixes>
727</ListBucketResult>"#,
728 );
729 let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
730
731 assert_eq!(out.name, "examplebucket".to_string());
732 assert_eq!(out.prefix, "obj".to_string());
733 assert_eq!(out.marker, "obj002".to_string());
734 assert_eq!(out.next_marker, Some("obj004".to_string()),);
735 assert_eq!(
736 out.contents
737 .iter()
738 .map(|v| v.key.clone())
739 .collect::<Vec<String>>(),
740 ["obj002", "obj003"],
741 );
742 assert_eq!(
743 out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
744 [9, 10],
745 );
746 assert_eq!(
747 out.common_prefixes
748 .iter()
749 .map(|v| v.prefix.clone())
750 .collect::<Vec<String>>(),
751 ["hello", "world"],
752 )
753 }
754}