1use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Bytes;
22use http::Request;
23use http::Response;
24use http::header::CACHE_CONTROL;
25use http::header::CONTENT_DISPOSITION;
26use http::header::CONTENT_LENGTH;
27use http::header::CONTENT_TYPE;
28use http::header::IF_MATCH;
29use http::header::IF_MODIFIED_SINCE;
30use http::header::IF_NONE_MATCH;
31use http::header::IF_UNMODIFIED_SINCE;
32use reqsign::TencentCosCredential;
33use reqsign::TencentCosCredentialLoader;
34use reqsign::TencentCosSigner;
35use serde::Deserialize;
36use serde::Serialize;
37
38use crate::raw::*;
39use crate::*;
40
41pub mod constants {
42 pub const COS_QUERY_VERSION_ID: &str = "versionId";
43
44 pub const X_COS_VERSION_ID: &str = "x-cos-version-id";
45}
46
47pub struct CosCore {
48 pub info: Arc<AccessorInfo>,
49 pub bucket: String,
50 pub root: String,
51 pub endpoint: String,
52
53 pub signer: TencentCosSigner,
54 pub loader: TencentCosCredentialLoader,
55}
56
57impl Debug for CosCore {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("CosCore")
60 .field("root", &self.root)
61 .field("bucket", &self.bucket)
62 .field("endpoint", &self.endpoint)
63 .finish_non_exhaustive()
64 }
65}
66
67impl CosCore {
68 async fn load_credential(&self) -> Result<Option<TencentCosCredential>> {
69 let cred = self
70 .loader
71 .load()
72 .await
73 .map_err(new_request_credential_error)?;
74
75 if let Some(cred) = cred {
76 return Ok(Some(cred));
77 }
78
79 Err(Error::new(
80 ErrorKind::PermissionDenied,
81 "no valid credential found and anonymous access is not allowed",
82 ))
83 }
84
85 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
86 let cred = if let Some(cred) = self.load_credential().await? {
87 cred
88 } else {
89 return Ok(());
90 };
91
92 self.signer.sign(req, &cred).map_err(new_request_sign_error)
93 }
94
95 pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
96 let cred = if let Some(cred) = self.load_credential().await? {
97 cred
98 } else {
99 return Ok(());
100 };
101
102 self.signer
103 .sign_query(req, duration, &cred)
104 .map_err(new_request_sign_error)
105 }
106
107 #[inline]
108 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
109 self.info.http_client().send(req).await
110 }
111}
112
113impl CosCore {
114 pub async fn cos_get_object(
115 &self,
116 path: &str,
117 range: BytesRange,
118 args: &OpRead,
119 ) -> Result<Response<HttpBody>> {
120 let mut req = self.cos_get_object_request(path, range, args)?;
121
122 self.sign(&mut req).await?;
123
124 self.info.http_client().fetch(req).await
125 }
126
127 pub fn cos_get_object_request(
128 &self,
129 path: &str,
130 range: BytesRange,
131 args: &OpRead,
132 ) -> Result<Request<Buffer>> {
133 let p = build_abs_path(&self.root, path);
134
135 let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
136
137 let mut query_args = Vec::new();
138 if let Some(version) = args.version() {
139 query_args.push(format!(
140 "{}={}",
141 constants::COS_QUERY_VERSION_ID,
142 percent_decode_path(version)
143 ))
144 }
145 if !query_args.is_empty() {
146 url.push_str(&format!("?{}", query_args.join("&")));
147 }
148
149 let mut req = Request::get(&url);
150
151 if let Some(if_match) = args.if_match() {
152 req = req.header(IF_MATCH, if_match);
153 }
154
155 if !range.is_full() {
156 req = req.header(http::header::RANGE, range.to_header())
157 }
158
159 if let Some(if_none_match) = args.if_none_match() {
160 req = req.header(IF_NONE_MATCH, if_none_match);
161 }
162
163 if let Some(if_modified_since) = args.if_modified_since() {
164 req = req.header(IF_MODIFIED_SINCE, if_modified_since.format_http_date());
165 }
166
167 if let Some(if_unmodified_since) = args.if_unmodified_since() {
168 req = req.header(IF_UNMODIFIED_SINCE, if_unmodified_since.format_http_date());
169 }
170
171 let req = req.extension(Operation::Read);
172
173 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
174
175 Ok(req)
176 }
177
178 pub fn cos_put_object_request(
179 &self,
180 path: &str,
181 size: Option<u64>,
182 args: &OpWrite,
183 body: Buffer,
184 ) -> Result<Request<Buffer>> {
185 let p = build_abs_path(&self.root, path);
186
187 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
188
189 let mut req = Request::put(&url);
190
191 if let Some(size) = size {
192 req = req.header(CONTENT_LENGTH, size)
193 }
194 if let Some(cache_control) = args.cache_control() {
195 req = req.header(CACHE_CONTROL, cache_control)
196 }
197 if let Some(pos) = args.content_disposition() {
198 req = req.header(CONTENT_DISPOSITION, pos)
199 }
200 if let Some(mime) = args.content_type() {
201 req = req.header(CONTENT_TYPE, mime)
202 }
203
204 if args.if_not_exists() {
213 req = req.header("x-cos-forbid-overwrite", "true")
214 }
215
216 if let Some(user_metadata) = args.user_metadata() {
218 for (key, value) in user_metadata {
219 req = req.header(format!("x-cos-meta-{key}"), value)
220 }
221 }
222
223 let req = req.extension(Operation::Write);
224
225 let req = req.body(body).map_err(new_request_build_error)?;
226
227 Ok(req)
228 }
229
230 pub async fn cos_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
231 let mut req = self.cos_head_object_request(path, args)?;
232
233 self.sign(&mut req).await?;
234
235 self.send(req).await
236 }
237
238 pub fn cos_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
239 let p = build_abs_path(&self.root, path);
240
241 let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
242
243 let mut query_args = Vec::new();
244 if let Some(version) = args.version() {
245 query_args.push(format!(
246 "{}={}",
247 constants::COS_QUERY_VERSION_ID,
248 percent_decode_path(version)
249 ))
250 }
251 if !query_args.is_empty() {
252 url.push_str(&format!("?{}", query_args.join("&")));
253 }
254
255 let mut req = Request::head(&url);
256
257 if let Some(if_match) = args.if_match() {
258 req = req.header(IF_MATCH, if_match);
259 }
260
261 if let Some(if_none_match) = args.if_none_match() {
262 req = req.header(IF_NONE_MATCH, if_none_match);
263 }
264
265 let req = req.extension(Operation::Stat);
266
267 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
268
269 Ok(req)
270 }
271
272 pub async fn cos_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
273 let p = build_abs_path(&self.root, path);
274
275 let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
276
277 let mut query_args = Vec::new();
278 if let Some(version) = args.version() {
279 query_args.push(format!(
280 "{}={}",
281 constants::COS_QUERY_VERSION_ID,
282 percent_decode_path(version)
283 ))
284 }
285 if !query_args.is_empty() {
286 url.push_str(&format!("?{}", query_args.join("&")));
287 }
288
289 let req = Request::delete(&url);
290
291 let req = req.extension(Operation::Delete);
292
293 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
294
295 self.sign(&mut req).await?;
296
297 self.send(req).await
298 }
299
300 pub fn cos_append_object_request(
301 &self,
302 path: &str,
303 position: u64,
304 size: u64,
305 args: &OpWrite,
306 body: Buffer,
307 ) -> Result<Request<Buffer>> {
308 let p = build_abs_path(&self.root, path);
309 let url = format!(
310 "{}/{}?append&position={}",
311 self.endpoint,
312 percent_encode_path(&p),
313 position
314 );
315
316 let mut req = Request::post(&url);
317
318 req = req.header(CONTENT_LENGTH, size);
319
320 if let Some(mime) = args.content_type() {
321 req = req.header(CONTENT_TYPE, mime);
322 }
323
324 if let Some(pos) = args.content_disposition() {
325 req = req.header(CONTENT_DISPOSITION, pos);
326 }
327
328 if let Some(cache_control) = args.cache_control() {
329 req = req.header(CACHE_CONTROL, cache_control)
330 }
331
332 let req = req.extension(Operation::Write);
333
334 let req = req.body(body).map_err(new_request_build_error)?;
335 Ok(req)
336 }
337
338 pub async fn cos_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
339 let source = build_abs_path(&self.root, from);
340 let target = build_abs_path(&self.root, to);
341
342 let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
343 let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
344
345 let mut req = Request::put(&url)
346 .extension(Operation::Copy)
347 .header("x-cos-copy-source", &source)
348 .body(Buffer::new())
349 .map_err(new_request_build_error)?;
350
351 self.sign(&mut req).await?;
352
353 self.send(req).await
354 }
355
356 pub async fn cos_list_objects(
357 &self,
358 path: &str,
359 next_marker: &str,
360 delimiter: &str,
361 limit: Option<usize>,
362 ) -> Result<Response<Buffer>> {
363 let p = build_abs_path(&self.root, path);
364
365 let mut url = QueryPairsWriter::new(&self.endpoint);
366
367 if !p.is_empty() {
368 url = url.push("prefix", &percent_encode_path(&p));
369 }
370 if !delimiter.is_empty() {
371 url = url.push("delimiter", delimiter);
372 }
373 if let Some(limit) = limit {
374 url = url.push("max-keys", &limit.to_string());
375 }
376 if !next_marker.is_empty() {
377 url = url.push("marker", next_marker);
378 }
379
380 let mut req = Request::get(url.finish())
381 .extension(Operation::List)
382 .body(Buffer::new())
383 .map_err(new_request_build_error)?;
384
385 self.sign(&mut req).await?;
386
387 self.send(req).await
388 }
389
390 pub async fn cos_initiate_multipart_upload(
391 &self,
392 path: &str,
393 args: &OpWrite,
394 ) -> Result<Response<Buffer>> {
395 let p = build_abs_path(&self.root, path);
396
397 let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
398
399 let mut req = Request::post(&url);
400
401 if let Some(mime) = args.content_type() {
402 req = req.header(CONTENT_TYPE, mime)
403 }
404
405 if let Some(content_disposition) = args.content_disposition() {
406 req = req.header(CONTENT_DISPOSITION, content_disposition)
407 }
408
409 if let Some(cache_control) = args.cache_control() {
410 req = req.header(CACHE_CONTROL, cache_control)
411 }
412
413 if let Some(user_metadata) = args.user_metadata() {
415 for (key, value) in user_metadata {
416 req = req.header(format!("x-cos-meta-{key}"), value)
417 }
418 }
419
420 let req = req.extension(Operation::Write);
421
422 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
423
424 self.sign(&mut req).await?;
425
426 self.send(req).await
427 }
428
429 pub async fn cos_upload_part_request(
430 &self,
431 path: &str,
432 upload_id: &str,
433 part_number: usize,
434 size: u64,
435 body: Buffer,
436 ) -> Result<Response<Buffer>> {
437 let p = build_abs_path(&self.root, path);
438
439 let url = format!(
440 "{}/{}?partNumber={}&uploadId={}",
441 self.endpoint,
442 percent_encode_path(&p),
443 part_number,
444 percent_encode_path(upload_id)
445 );
446
447 let mut req = Request::put(&url);
448 req = req.header(CONTENT_LENGTH, size);
449
450 let req = req.extension(Operation::Write);
451
452 let mut req = req.body(body).map_err(new_request_build_error)?;
454
455 self.sign(&mut req).await?;
456
457 self.send(req).await
458 }
459
460 pub async fn cos_complete_multipart_upload(
461 &self,
462 path: &str,
463 upload_id: &str,
464 parts: Vec<CompleteMultipartUploadRequestPart>,
465 ) -> Result<Response<Buffer>> {
466 let p = build_abs_path(&self.root, path);
467
468 let url = format!(
469 "{}/{}?uploadId={}",
470 self.endpoint,
471 percent_encode_path(&p),
472 percent_encode_path(upload_id)
473 );
474
475 let req = Request::post(&url);
476
477 let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts })
478 .map_err(new_xml_serialize_error)?;
479 let req = req.header(CONTENT_LENGTH, content.len());
481 let req = req.header(CONTENT_TYPE, "application/xml");
483
484 let req = req.extension(Operation::Write);
485
486 let mut req = req
487 .body(Buffer::from(Bytes::from(content)))
488 .map_err(new_request_build_error)?;
489
490 self.sign(&mut req).await?;
491
492 self.send(req).await
493 }
494
495 pub async fn cos_abort_multipart_upload(
497 &self,
498 path: &str,
499 upload_id: &str,
500 ) -> Result<Response<Buffer>> {
501 let p = build_abs_path(&self.root, path);
502
503 let url = format!(
504 "{}/{}?uploadId={}",
505 self.endpoint,
506 percent_encode_path(&p),
507 percent_encode_path(upload_id)
508 );
509
510 let mut req = Request::delete(&url)
511 .extension(Operation::Delete)
512 .body(Buffer::new())
513 .map_err(new_request_build_error)?;
514 self.sign(&mut req).await?;
515 self.send(req).await
516 }
517
518 pub async fn cos_list_object_versions(
519 &self,
520 prefix: &str,
521 delimiter: &str,
522 limit: Option<usize>,
523 key_marker: &str,
524 version_id_marker: &str,
525 ) -> Result<Response<Buffer>> {
526 let p = build_abs_path(&self.root, prefix);
527
528 let mut url = QueryPairsWriter::new(&self.endpoint);
529 url = url.push("versions", "");
530 if !p.is_empty() {
531 url = url.push("prefix", &percent_encode_path(p.as_str()));
532 }
533 if !delimiter.is_empty() {
534 url = url.push("delimiter", delimiter);
535 }
536
537 if let Some(limit) = limit {
538 url = url.push("max-keys", &limit.to_string());
539 }
540 if !key_marker.is_empty() {
541 url = url.push("key-marker", &percent_encode_path(key_marker));
542 }
543 if !version_id_marker.is_empty() {
544 url = url.push("version-id-marker", &percent_encode_path(version_id_marker));
545 }
546
547 let mut req = Request::get(url.finish())
548 .extension(Operation::List)
549 .body(Buffer::new())
550 .map_err(new_request_build_error)?;
551
552 self.sign(&mut req).await?;
553
554 self.send(req).await
555 }
556}
557
558#[derive(Default, Debug, Deserialize)]
560#[serde(default, rename_all = "PascalCase")]
561pub struct InitiateMultipartUploadResult {
562 pub upload_id: String,
563}
564
565#[derive(Default, Debug, Serialize)]
567#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
568pub struct CompleteMultipartUploadRequest {
569 pub part: Vec<CompleteMultipartUploadRequestPart>,
570}
571
572#[derive(Clone, Default, Debug, Serialize)]
573#[serde(default, rename_all = "PascalCase")]
574pub struct CompleteMultipartUploadRequestPart {
575 #[serde(rename = "PartNumber")]
576 pub part_number: usize,
577 #[serde(rename = "ETag")]
606 pub etag: String,
607}
608
609#[derive(Debug, Default, Deserialize)]
611#[serde[default, rename_all = "PascalCase"]]
612pub struct CompleteMultipartUploadResult {
613 pub location: String,
614 pub bucket: String,
615 pub key: String,
616 #[serde(rename = "ETag")]
617 pub etag: String,
618}
619
620#[derive(Default, Debug, Deserialize)]
621#[serde(default, rename_all = "PascalCase")]
622pub struct ListObjectsOutput {
623 pub name: String,
624 pub prefix: String,
625 pub contents: Vec<ListObjectsOutputContent>,
626 pub common_prefixes: Vec<CommonPrefix>,
627 pub marker: String,
628 pub next_marker: Option<String>,
629}
630
631#[derive(Default, Debug, Deserialize)]
632#[serde(default, rename_all = "PascalCase")]
633pub struct CommonPrefix {
634 pub prefix: String,
635}
636
637#[derive(Default, Debug, Deserialize)]
638#[serde(default, rename_all = "PascalCase")]
639pub struct ListObjectsOutputContent {
640 pub key: String,
641 pub size: u64,
642}
643
644#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
645#[serde(rename_all = "PascalCase")]
646pub struct OutputCommonPrefix {
647 pub prefix: String,
648}
649
650#[derive(Default, Debug, Deserialize)]
652#[serde(default, rename_all = "PascalCase")]
653pub struct ListObjectVersionsOutput {
654 pub is_truncated: Option<bool>,
655 pub next_key_marker: Option<String>,
656 pub next_version_id_marker: Option<String>,
657 pub common_prefixes: Vec<OutputCommonPrefix>,
658 pub version: Vec<ListObjectVersionsOutputVersion>,
659 pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
660}
661
662#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
663#[serde(rename_all = "PascalCase")]
664pub struct ListObjectVersionsOutputVersion {
665 pub key: String,
666 pub version_id: String,
667 pub is_latest: bool,
668 pub size: u64,
669 pub last_modified: String,
670 #[serde(rename = "ETag")]
671 pub etag: Option<String>,
672}
673
674#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
675#[serde(rename_all = "PascalCase")]
676pub struct ListObjectVersionsOutputDeleteMarker {
677 pub key: String,
678 pub version_id: String,
679 pub is_latest: bool,
680 pub last_modified: String,
681}
682
683#[cfg(test)]
684mod tests {
685 use bytes::Buf;
686
687 use super::*;
688
689 #[test]
690 fn test_parse_xml() {
691 let bs = bytes::Bytes::from(
692 r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
693<ListBucketResult>
694 <Name>examplebucket</Name>
695 <Prefix>obj</Prefix>
696 <Marker>obj002</Marker>
697 <NextMarker>obj004</NextMarker>
698 <MaxKeys>1000</MaxKeys>
699 <IsTruncated>false</IsTruncated>
700 <Contents>
701 <Key>obj002</Key>
702 <LastModified>2015-07-01T02:11:19.775Z</LastModified>
703 <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
704 <Size>9</Size>
705 <Owner>
706 <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
707 </Owner>
708 <StorageClass>STANDARD</StorageClass>
709 </Contents>
710 <Contents>
711 <Key>obj003</Key>
712 <LastModified>2015-07-01T02:11:19.775Z</LastModified>
713 <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
714 <Size>10</Size>
715 <Owner>
716 <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
717 </Owner>
718 <StorageClass>STANDARD</StorageClass>
719 </Contents>
720 <CommonPrefixes>
721 <Prefix>hello</Prefix>
722 </CommonPrefixes>
723 <CommonPrefixes>
724 <Prefix>world</Prefix>
725 </CommonPrefixes>
726</ListBucketResult>"#,
727 );
728 let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
729
730 assert_eq!(out.name, "examplebucket".to_string());
731 assert_eq!(out.prefix, "obj".to_string());
732 assert_eq!(out.marker, "obj002".to_string());
733 assert_eq!(out.next_marker, Some("obj004".to_string()),);
734 assert_eq!(
735 out.contents
736 .iter()
737 .map(|v| v.key.clone())
738 .collect::<Vec<String>>(),
739 ["obj002", "obj003"],
740 );
741 assert_eq!(
742 out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
743 [9, 10],
744 );
745 assert_eq!(
746 out.common_prefixes
747 .iter()
748 .map(|v| v.prefix.clone())
749 .collect::<Vec<String>>(),
750 ["hello", "world"],
751 )
752 }
753}