1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Bytes;
24use http::header::CACHE_CONTROL;
25use http::header::CONTENT_DISPOSITION;
26use http::header::CONTENT_LENGTH;
27use http::header::CONTENT_TYPE;
28use http::header::IF_MATCH;
29use http::header::IF_NONE_MATCH;
30use http::Request;
31use http::Response;
32use reqsign::TencentCosCredential;
33use reqsign::TencentCosCredentialLoader;
34use reqsign::TencentCosSigner;
35use serde::Deserialize;
36use serde::Serialize;
37
38use crate::raw::*;
39use crate::*;
40
41pub mod constants {
42 pub const COS_QUERY_VERSION_ID: &str = "versionId";
43
44 pub const X_COS_VERSION_ID: &str = "x-cos-version-id";
45}
46
47pub struct CosCore {
48 pub info: Arc<AccessorInfo>,
49 pub bucket: String,
50 pub root: String,
51 pub endpoint: String,
52
53 pub signer: TencentCosSigner,
54 pub loader: TencentCosCredentialLoader,
55}
56
57impl Debug for CosCore {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("Backend")
60 .field("root", &self.root)
61 .field("bucket", &self.bucket)
62 .field("endpoint", &self.endpoint)
63 .finish_non_exhaustive()
64 }
65}
66
67impl CosCore {
68 async fn load_credential(&self) -> Result<Option<TencentCosCredential>> {
69 let cred = self
70 .loader
71 .load()
72 .await
73 .map_err(new_request_credential_error)?;
74
75 if let Some(cred) = cred {
76 return Ok(Some(cred));
77 }
78
79 Err(Error::new(
80 ErrorKind::PermissionDenied,
81 "no valid credential found and anonymous access is not allowed",
82 ))
83 }
84
85 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
86 let cred = if let Some(cred) = self.load_credential().await? {
87 cred
88 } else {
89 return Ok(());
90 };
91
92 self.signer.sign(req, &cred).map_err(new_request_sign_error)
93 }
94
95 pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
96 let cred = if let Some(cred) = self.load_credential().await? {
97 cred
98 } else {
99 return Ok(());
100 };
101
102 self.signer
103 .sign_query(req, duration, &cred)
104 .map_err(new_request_sign_error)
105 }
106
107 #[inline]
108 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
109 self.info.http_client().send(req).await
110 }
111}
112
113impl CosCore {
114 pub async fn cos_get_object(
115 &self,
116 path: &str,
117 range: BytesRange,
118 args: &OpRead,
119 ) -> Result<Response<HttpBody>> {
120 let mut req = self.cos_get_object_request(path, range, args)?;
121
122 self.sign(&mut req).await?;
123
124 self.info.http_client().fetch(req).await
125 }
126
127 pub fn cos_get_object_request(
128 &self,
129 path: &str,
130 range: BytesRange,
131 args: &OpRead,
132 ) -> Result<Request<Buffer>> {
133 let p = build_abs_path(&self.root, path);
134
135 let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
136
137 let mut query_args = Vec::new();
138 if let Some(version) = args.version() {
139 query_args.push(format!(
140 "{}={}",
141 constants::COS_QUERY_VERSION_ID,
142 percent_decode_path(version)
143 ))
144 }
145 if !query_args.is_empty() {
146 url.push_str(&format!("?{}", query_args.join("&")));
147 }
148
149 let mut req = Request::get(&url);
150
151 if let Some(if_match) = args.if_match() {
152 req = req.header(IF_MATCH, if_match);
153 }
154
155 if !range.is_full() {
156 req = req.header(http::header::RANGE, range.to_header())
157 }
158
159 if let Some(if_none_match) = args.if_none_match() {
160 req = req.header(IF_NONE_MATCH, if_none_match);
161 }
162
163 let req = req.extension(Operation::Read);
164
165 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
166
167 Ok(req)
168 }
169
170 pub fn cos_put_object_request(
171 &self,
172 path: &str,
173 size: Option<u64>,
174 args: &OpWrite,
175 body: Buffer,
176 ) -> Result<Request<Buffer>> {
177 let p = build_abs_path(&self.root, path);
178
179 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
180
181 let mut req = Request::put(&url);
182
183 if let Some(size) = size {
184 req = req.header(CONTENT_LENGTH, size)
185 }
186 if let Some(cache_control) = args.cache_control() {
187 req = req.header(CACHE_CONTROL, cache_control)
188 }
189 if let Some(pos) = args.content_disposition() {
190 req = req.header(CONTENT_DISPOSITION, pos)
191 }
192 if let Some(mime) = args.content_type() {
193 req = req.header(CONTENT_TYPE, mime)
194 }
195
196 if args.if_not_exists() {
205 req = req.header("x-cos-forbid-overwrite", "true")
206 }
207
208 if let Some(user_metadata) = args.user_metadata() {
210 for (key, value) in user_metadata {
211 req = req.header(format!("x-cos-meta-{key}"), value)
212 }
213 }
214
215 let req = req.extension(Operation::Write);
216
217 let req = req.body(body).map_err(new_request_build_error)?;
218
219 Ok(req)
220 }
221
222 pub async fn cos_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
223 let mut req = self.cos_head_object_request(path, args)?;
224
225 self.sign(&mut req).await?;
226
227 self.send(req).await
228 }
229
230 pub fn cos_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
231 let p = build_abs_path(&self.root, path);
232
233 let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
234
235 let mut query_args = Vec::new();
236 if let Some(version) = args.version() {
237 query_args.push(format!(
238 "{}={}",
239 constants::COS_QUERY_VERSION_ID,
240 percent_decode_path(version)
241 ))
242 }
243 if !query_args.is_empty() {
244 url.push_str(&format!("?{}", query_args.join("&")));
245 }
246
247 let mut req = Request::head(&url);
248
249 if let Some(if_match) = args.if_match() {
250 req = req.header(IF_MATCH, if_match);
251 }
252
253 if let Some(if_none_match) = args.if_none_match() {
254 req = req.header(IF_NONE_MATCH, if_none_match);
255 }
256
257 let req = req.extension(Operation::Stat);
258
259 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
260
261 Ok(req)
262 }
263
264 pub async fn cos_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
265 let p = build_abs_path(&self.root, path);
266
267 let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
268
269 let mut query_args = Vec::new();
270 if let Some(version) = args.version() {
271 query_args.push(format!(
272 "{}={}",
273 constants::COS_QUERY_VERSION_ID,
274 percent_decode_path(version)
275 ))
276 }
277 if !query_args.is_empty() {
278 url.push_str(&format!("?{}", query_args.join("&")));
279 }
280
281 let req = Request::delete(&url);
282
283 let req = req.extension(Operation::Delete);
284
285 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
286
287 self.sign(&mut req).await?;
288
289 self.send(req).await
290 }
291
292 pub fn cos_append_object_request(
293 &self,
294 path: &str,
295 position: u64,
296 size: u64,
297 args: &OpWrite,
298 body: Buffer,
299 ) -> Result<Request<Buffer>> {
300 let p = build_abs_path(&self.root, path);
301 let url = format!(
302 "{}/{}?append&position={}",
303 self.endpoint,
304 percent_encode_path(&p),
305 position
306 );
307
308 let mut req = Request::post(&url);
309
310 req = req.header(CONTENT_LENGTH, size);
311
312 if let Some(mime) = args.content_type() {
313 req = req.header(CONTENT_TYPE, mime);
314 }
315
316 if let Some(pos) = args.content_disposition() {
317 req = req.header(CONTENT_DISPOSITION, pos);
318 }
319
320 if let Some(cache_control) = args.cache_control() {
321 req = req.header(CACHE_CONTROL, cache_control)
322 }
323
324 let req = req.extension(Operation::Write);
325
326 let req = req.body(body).map_err(new_request_build_error)?;
327 Ok(req)
328 }
329
330 pub async fn cos_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
331 let source = build_abs_path(&self.root, from);
332 let target = build_abs_path(&self.root, to);
333
334 let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
335 let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
336
337 let mut req = Request::put(&url)
338 .extension(Operation::Copy)
339 .header("x-cos-copy-source", &source)
340 .body(Buffer::new())
341 .map_err(new_request_build_error)?;
342
343 self.sign(&mut req).await?;
344
345 self.send(req).await
346 }
347
348 pub async fn cos_list_objects(
349 &self,
350 path: &str,
351 next_marker: &str,
352 delimiter: &str,
353 limit: Option<usize>,
354 ) -> Result<Response<Buffer>> {
355 let p = build_abs_path(&self.root, path);
356
357 let mut url = QueryPairsWriter::new(&self.endpoint);
358
359 if !p.is_empty() {
360 url = url.push("prefix", &percent_encode_path(&p));
361 }
362 if !delimiter.is_empty() {
363 url = url.push("delimiter", delimiter);
364 }
365 if let Some(limit) = limit {
366 url = url.push("max-keys", &limit.to_string());
367 }
368 if !next_marker.is_empty() {
369 url = url.push("marker", next_marker);
370 }
371
372 let mut req = Request::get(url.finish())
373 .extension(Operation::List)
374 .body(Buffer::new())
375 .map_err(new_request_build_error)?;
376
377 self.sign(&mut req).await?;
378
379 self.send(req).await
380 }
381
382 pub async fn cos_initiate_multipart_upload(
383 &self,
384 path: &str,
385 args: &OpWrite,
386 ) -> Result<Response<Buffer>> {
387 let p = build_abs_path(&self.root, path);
388
389 let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
390
391 let mut req = Request::post(&url);
392
393 if let Some(mime) = args.content_type() {
394 req = req.header(CONTENT_TYPE, mime)
395 }
396
397 if let Some(content_disposition) = args.content_disposition() {
398 req = req.header(CONTENT_DISPOSITION, content_disposition)
399 }
400
401 if let Some(cache_control) = args.cache_control() {
402 req = req.header(CACHE_CONTROL, cache_control)
403 }
404
405 if let Some(user_metadata) = args.user_metadata() {
407 for (key, value) in user_metadata {
408 req = req.header(format!("x-cos-meta-{key}"), value)
409 }
410 }
411
412 let req = req.extension(Operation::Write);
413
414 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
415
416 self.sign(&mut req).await?;
417
418 self.send(req).await
419 }
420
421 pub async fn cos_upload_part_request(
422 &self,
423 path: &str,
424 upload_id: &str,
425 part_number: usize,
426 size: u64,
427 body: Buffer,
428 ) -> Result<Response<Buffer>> {
429 let p = build_abs_path(&self.root, path);
430
431 let url = format!(
432 "{}/{}?partNumber={}&uploadId={}",
433 self.endpoint,
434 percent_encode_path(&p),
435 part_number,
436 percent_encode_path(upload_id)
437 );
438
439 let mut req = Request::put(&url);
440 req = req.header(CONTENT_LENGTH, size);
441
442 let req = req.extension(Operation::Write);
443
444 let mut req = req.body(body).map_err(new_request_build_error)?;
446
447 self.sign(&mut req).await?;
448
449 self.send(req).await
450 }
451
452 pub async fn cos_complete_multipart_upload(
453 &self,
454 path: &str,
455 upload_id: &str,
456 parts: Vec<CompleteMultipartUploadRequestPart>,
457 ) -> Result<Response<Buffer>> {
458 let p = build_abs_path(&self.root, path);
459
460 let url = format!(
461 "{}/{}?uploadId={}",
462 self.endpoint,
463 percent_encode_path(&p),
464 percent_encode_path(upload_id)
465 );
466
467 let req = Request::post(&url);
468
469 let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { part: parts })
470 .map_err(new_xml_serialize_error)?;
471 let req = req.header(CONTENT_LENGTH, content.len());
473 let req = req.header(CONTENT_TYPE, "application/xml");
475
476 let req = req.extension(Operation::Write);
477
478 let mut req = req
479 .body(Buffer::from(Bytes::from(content)))
480 .map_err(new_request_build_error)?;
481
482 self.sign(&mut req).await?;
483
484 self.send(req).await
485 }
486
487 pub async fn cos_abort_multipart_upload(
489 &self,
490 path: &str,
491 upload_id: &str,
492 ) -> Result<Response<Buffer>> {
493 let p = build_abs_path(&self.root, path);
494
495 let url = format!(
496 "{}/{}?uploadId={}",
497 self.endpoint,
498 percent_encode_path(&p),
499 percent_encode_path(upload_id)
500 );
501
502 let mut req = Request::delete(&url)
503 .extension(Operation::Delete)
504 .body(Buffer::new())
505 .map_err(new_request_build_error)?;
506 self.sign(&mut req).await?;
507 self.send(req).await
508 }
509
510 pub async fn cos_list_object_versions(
511 &self,
512 prefix: &str,
513 delimiter: &str,
514 limit: Option<usize>,
515 key_marker: &str,
516 version_id_marker: &str,
517 ) -> Result<Response<Buffer>> {
518 let p = build_abs_path(&self.root, prefix);
519
520 let mut url = QueryPairsWriter::new(&self.endpoint);
521 url = url.push("versions", "");
522 if !p.is_empty() {
523 url = url.push("prefix", &percent_encode_path(p.as_str()));
524 }
525 if !delimiter.is_empty() {
526 url = url.push("delimiter", delimiter);
527 }
528
529 if let Some(limit) = limit {
530 url = url.push("max-keys", &limit.to_string());
531 }
532 if !key_marker.is_empty() {
533 url = url.push("key-marker", &percent_encode_path(key_marker));
534 }
535 if !version_id_marker.is_empty() {
536 url = url.push("version-id-marker", &percent_encode_path(version_id_marker));
537 }
538
539 let mut req = Request::get(url.finish())
540 .extension(Operation::List)
541 .body(Buffer::new())
542 .map_err(new_request_build_error)?;
543
544 self.sign(&mut req).await?;
545
546 self.send(req).await
547 }
548}
549
550#[derive(Default, Debug, Deserialize)]
552#[serde(default, rename_all = "PascalCase")]
553pub struct InitiateMultipartUploadResult {
554 pub upload_id: String,
555}
556
557#[derive(Default, Debug, Serialize)]
559#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
560pub struct CompleteMultipartUploadRequest {
561 pub part: Vec<CompleteMultipartUploadRequestPart>,
562}
563
564#[derive(Clone, Default, Debug, Serialize)]
565#[serde(default, rename_all = "PascalCase")]
566pub struct CompleteMultipartUploadRequestPart {
567 #[serde(rename = "PartNumber")]
568 pub part_number: usize,
569 #[serde(rename = "ETag")]
598 pub etag: String,
599}
600
601#[derive(Debug, Default, Deserialize)]
603#[serde[default, rename_all = "PascalCase"]]
604pub struct CompleteMultipartUploadResult {
605 pub location: String,
606 pub bucket: String,
607 pub key: String,
608 #[serde(rename = "ETag")]
609 pub etag: String,
610}
611
612#[derive(Default, Debug, Deserialize)]
613#[serde(default, rename_all = "PascalCase")]
614pub struct ListObjectsOutput {
615 pub name: String,
616 pub prefix: String,
617 pub contents: Vec<ListObjectsOutputContent>,
618 pub common_prefixes: Vec<CommonPrefix>,
619 pub marker: String,
620 pub next_marker: Option<String>,
621}
622
623#[derive(Default, Debug, Deserialize)]
624#[serde(default, rename_all = "PascalCase")]
625pub struct CommonPrefix {
626 pub prefix: String,
627}
628
629#[derive(Default, Debug, Deserialize)]
630#[serde(default, rename_all = "PascalCase")]
631pub struct ListObjectsOutputContent {
632 pub key: String,
633 pub size: u64,
634}
635
636#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
637#[serde(rename_all = "PascalCase")]
638pub struct OutputCommonPrefix {
639 pub prefix: String,
640}
641
642#[derive(Default, Debug, Deserialize)]
644#[serde(default, rename_all = "PascalCase")]
645pub struct ListObjectVersionsOutput {
646 pub is_truncated: Option<bool>,
647 pub next_key_marker: Option<String>,
648 pub next_version_id_marker: Option<String>,
649 pub common_prefixes: Vec<OutputCommonPrefix>,
650 pub version: Vec<ListObjectVersionsOutputVersion>,
651 pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
652}
653
654#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
655#[serde(rename_all = "PascalCase")]
656pub struct ListObjectVersionsOutputVersion {
657 pub key: String,
658 pub version_id: String,
659 pub is_latest: bool,
660 pub size: u64,
661 pub last_modified: String,
662 #[serde(rename = "ETag")]
663 pub etag: Option<String>,
664}
665
666#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
667#[serde(rename_all = "PascalCase")]
668pub struct ListObjectVersionsOutputDeleteMarker {
669 pub key: String,
670 pub version_id: String,
671 pub is_latest: bool,
672 pub last_modified: String,
673}
674
675#[cfg(test)]
676mod tests {
677 use bytes::Buf;
678
679 use super::*;
680
681 #[test]
682 fn test_parse_xml() {
683 let bs = bytes::Bytes::from(
684 r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
685<ListBucketResult>
686 <Name>examplebucket</Name>
687 <Prefix>obj</Prefix>
688 <Marker>obj002</Marker>
689 <NextMarker>obj004</NextMarker>
690 <MaxKeys>1000</MaxKeys>
691 <IsTruncated>false</IsTruncated>
692 <Contents>
693 <Key>obj002</Key>
694 <LastModified>2015-07-01T02:11:19.775Z</LastModified>
695 <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
696 <Size>9</Size>
697 <Owner>
698 <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
699 </Owner>
700 <StorageClass>STANDARD</StorageClass>
701 </Contents>
702 <Contents>
703 <Key>obj003</Key>
704 <LastModified>2015-07-01T02:11:19.775Z</LastModified>
705 <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
706 <Size>10</Size>
707 <Owner>
708 <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
709 </Owner>
710 <StorageClass>STANDARD</StorageClass>
711 </Contents>
712 <CommonPrefixes>
713 <Prefix>hello</Prefix>
714 </CommonPrefixes>
715 <CommonPrefixes>
716 <Prefix>world</Prefix>
717 </CommonPrefixes>
718</ListBucketResult>"#,
719 );
720 let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
721
722 assert_eq!(out.name, "examplebucket".to_string());
723 assert_eq!(out.prefix, "obj".to_string());
724 assert_eq!(out.marker, "obj002".to_string());
725 assert_eq!(out.next_marker, Some("obj004".to_string()),);
726 assert_eq!(
727 out.contents
728 .iter()
729 .map(|v| v.key.clone())
730 .collect::<Vec<String>>(),
731 ["obj002", "obj003"],
732 );
733 assert_eq!(
734 out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
735 [9, 10],
736 );
737 assert_eq!(
738 out.common_prefixes
739 .iter()
740 .map(|v| v.prefix.clone())
741 .collect::<Vec<String>>(),
742 ["hello", "world"],
743 )
744 }
745}