1use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Bytes;
22use http::Request;
23use http::Response;
24use http::header::CACHE_CONTROL;
25use http::header::CONTENT_DISPOSITION;
26use http::header::CONTENT_LENGTH;
27use http::header::CONTENT_TYPE;
28use http::header::IF_MATCH;
29use http::header::IF_NONE_MATCH;
30use reqsign::HuaweicloudObsCredential;
31use reqsign::HuaweicloudObsCredentialLoader;
32use reqsign::HuaweicloudObsSigner;
33use serde::Deserialize;
34use serde::Serialize;
35
36use crate::raw::*;
37use crate::*;
38
39pub mod constants {
40 pub const X_OBS_META_PREFIX: &str = "x-obs-meta-";
41 pub const X_OBS_VERSION_ID: &str = "x-obs-version-id";
42}
43
44pub struct ObsCore {
45 pub info: Arc<AccessorInfo>,
46 pub bucket: String,
47 pub root: String,
48 pub endpoint: String,
49
50 pub signer: HuaweicloudObsSigner,
51 pub loader: HuaweicloudObsCredentialLoader,
52}
53
54impl Debug for ObsCore {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("ObsCore")
57 .field("root", &self.root)
58 .field("bucket", &self.bucket)
59 .field("endpoint", &self.endpoint)
60 .finish_non_exhaustive()
61 }
62}
63
64impl ObsCore {
65 async fn load_credential(&self) -> Result<Option<HuaweicloudObsCredential>> {
66 let cred = self
67 .loader
68 .load()
69 .await
70 .map_err(new_request_credential_error)?;
71
72 if let Some(cred) = cred {
73 Ok(Some(cred))
74 } else {
75 Ok(None)
76 }
77 }
78
79 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
80 let cred = if let Some(cred) = self.load_credential().await? {
81 cred
82 } else {
83 return Ok(());
84 };
85
86 self.signer.sign(req, &cred).map_err(new_request_sign_error)
87 }
88
89 pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
90 let cred = if let Some(cred) = self.load_credential().await? {
91 cred
92 } else {
93 return Ok(());
94 };
95
96 self.signer
97 .sign_query(req, duration, &cred)
98 .map_err(new_request_sign_error)
99 }
100
101 #[inline]
102 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
103 self.info.http_client().send(req).await
104 }
105}
106
107impl ObsCore {
108 pub async fn obs_get_object(
109 &self,
110 path: &str,
111 range: BytesRange,
112 args: &OpRead,
113 ) -> Result<Response<HttpBody>> {
114 let mut req = self.obs_get_object_request(path, range, args)?;
115
116 self.sign(&mut req).await?;
117
118 self.info.http_client().fetch(req).await
119 }
120
121 pub fn obs_get_object_request(
122 &self,
123 path: &str,
124 range: BytesRange,
125 args: &OpRead,
126 ) -> Result<Request<Buffer>> {
127 let p = build_abs_path(&self.root, path);
128
129 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
130
131 let mut req = Request::get(&url);
132
133 if let Some(if_match) = args.if_match() {
134 req = req.header(IF_MATCH, if_match);
135 }
136
137 if !range.is_full() {
138 req = req.header(http::header::RANGE, range.to_header())
139 }
140
141 if let Some(if_none_match) = args.if_none_match() {
142 req = req.header(IF_NONE_MATCH, if_none_match);
143 }
144
145 let req = req
146 .extension(Operation::Read)
147 .body(Buffer::new())
148 .map_err(new_request_build_error)?;
149
150 Ok(req)
151 }
152
153 pub fn obs_put_object_request(
154 &self,
155 path: &str,
156 size: Option<u64>,
157 args: &OpWrite,
158 body: Buffer,
159 ) -> Result<Request<Buffer>> {
160 let p = build_abs_path(&self.root, path);
161
162 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
163
164 let mut req = Request::put(&url);
165
166 if let Some(size) = size {
167 req = req.header(CONTENT_LENGTH, size)
168 }
169 if let Some(cache_control) = args.cache_control() {
170 req = req.header(CACHE_CONTROL, cache_control)
171 }
172
173 if let Some(mime) = args.content_type() {
174 req = req.header(CONTENT_TYPE, mime)
175 }
176
177 if let Some(user_metadata) = args.user_metadata() {
179 for (key, value) in user_metadata {
180 req = req.header(format!("{}{}", constants::X_OBS_META_PREFIX, key), value)
181 }
182 }
183
184 let req = req
185 .extension(Operation::Write)
186 .body(body)
187 .map_err(new_request_build_error)?;
188
189 Ok(req)
190 }
191
192 pub async fn obs_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
193 let mut req = self.obs_head_object_request(path, args)?;
194
195 self.sign(&mut req).await?;
196
197 self.send(req).await
198 }
199
200 pub fn obs_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
201 let p = build_abs_path(&self.root, path);
202
203 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
204
205 let mut req = Request::head(&url);
209
210 if let Some(if_match) = args.if_match() {
211 req = req.header(IF_MATCH, if_match);
212 }
213
214 if let Some(if_none_match) = args.if_none_match() {
215 req = req.header(IF_NONE_MATCH, if_none_match);
216 }
217
218 let req = req
219 .extension(Operation::Stat)
220 .body(Buffer::new())
221 .map_err(new_request_build_error)?;
222
223 Ok(req)
224 }
225
226 pub async fn obs_delete_object(&self, path: &str) -> Result<Response<Buffer>> {
227 let p = build_abs_path(&self.root, path);
228
229 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
230
231 let req = Request::delete(&url);
232
233 let mut req = req
234 .extension(Operation::Delete)
235 .body(Buffer::new())
236 .map_err(new_request_build_error)?;
237
238 self.sign(&mut req).await?;
239
240 self.send(req).await
241 }
242
243 pub fn obs_append_object_request(
244 &self,
245 path: &str,
246 position: u64,
247 size: u64,
248 args: &OpWrite,
249 body: Buffer,
250 ) -> Result<Request<Buffer>> {
251 let p = build_abs_path(&self.root, path);
252 let url = format!(
253 "{}/{}?append&position={}",
254 self.endpoint,
255 percent_encode_path(&p),
256 position
257 );
258
259 let mut req = Request::post(&url);
260
261 req = req.header(CONTENT_LENGTH, size);
262
263 if let Some(mime) = args.content_type() {
264 req = req.header(CONTENT_TYPE, mime);
265 }
266
267 if let Some(pos) = args.content_disposition() {
268 req = req.header(CONTENT_DISPOSITION, pos);
269 }
270
271 if let Some(cache_control) = args.cache_control() {
272 req = req.header(CACHE_CONTROL, cache_control)
273 }
274
275 let req = req
276 .extension(Operation::Write)
277 .body(body)
278 .map_err(new_request_build_error)?;
279
280 Ok(req)
281 }
282
283 pub async fn obs_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
284 let source = build_abs_path(&self.root, from);
285 let target = build_abs_path(&self.root, to);
286
287 let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
288 let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
289
290 let mut req = Request::put(&url)
291 .extension(Operation::Copy)
292 .header("x-obs-copy-source", &source)
293 .body(Buffer::new())
294 .map_err(new_request_build_error)?;
295
296 self.sign(&mut req).await?;
297
298 self.send(req).await
299 }
300
301 pub async fn obs_list_objects(
302 &self,
303 path: &str,
304 next_marker: &str,
305 delimiter: &str,
306 limit: Option<usize>,
307 ) -> Result<Response<Buffer>> {
308 let p = build_abs_path(&self.root, path);
309 let mut url = QueryPairsWriter::new(&self.endpoint);
310
311 if !path.is_empty() {
312 url = url.push("prefix", &percent_encode_path(&p));
313 }
314 if !delimiter.is_empty() {
315 url = url.push("delimiter", delimiter);
316 }
317 if let Some(limit) = limit {
318 url = url.push("max-keys", &limit.to_string());
319 }
320 if !next_marker.is_empty() {
321 url = url.push("marker", next_marker);
322 }
323
324 let mut req = Request::get(url.finish())
325 .extension(Operation::List)
326 .body(Buffer::new())
327 .map_err(new_request_build_error)?;
328
329 self.sign(&mut req).await?;
330
331 self.send(req).await
332 }
333 pub async fn obs_initiate_multipart_upload(
334 &self,
335 path: &str,
336 content_type: Option<&str>,
337 ) -> Result<Response<Buffer>> {
338 let p = build_abs_path(&self.root, path);
339
340 let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
341 let mut req = Request::post(&url);
342
343 if let Some(mime) = content_type {
344 req = req.header(CONTENT_TYPE, mime)
345 }
346
347 let mut req = req
348 .extension(Operation::Write)
349 .body(Buffer::new())
350 .map_err(new_request_build_error)?;
351
352 self.sign(&mut req).await?;
353
354 self.send(req).await
355 }
356 pub async fn obs_upload_part_request(
357 &self,
358 path: &str,
359 upload_id: &str,
360 part_number: usize,
361 size: Option<u64>,
362 body: Buffer,
363 ) -> Result<Response<Buffer>> {
364 let p = build_abs_path(&self.root, path);
365
366 let url = format!(
367 "{}/{}?partNumber={}&uploadId={}",
368 self.endpoint,
369 percent_encode_path(&p),
370 part_number,
371 percent_encode_path(upload_id)
372 );
373
374 let mut req = Request::put(&url);
375
376 if let Some(size) = size {
377 req = req.header(CONTENT_LENGTH, size);
378 }
379
380 let mut req = req
381 .extension(Operation::Write)
382 .body(body)
384 .map_err(new_request_build_error)?;
385
386 self.sign(&mut req).await?;
387
388 self.send(req).await
389 }
390
391 pub async fn obs_complete_multipart_upload(
392 &self,
393 path: &str,
394 upload_id: &str,
395 parts: Vec<CompleteMultipartUploadRequestPart>,
396 ) -> Result<Response<Buffer>> {
397 let p = build_abs_path(&self.root, path);
398 let url = format!(
399 "{}/{}?uploadId={}",
400 self.endpoint,
401 percent_encode_path(&p),
402 percent_encode_path(upload_id)
403 );
404
405 let req = Request::post(&url);
406
407 let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest {
408 part: parts.to_vec(),
409 })
410 .map_err(new_xml_serialize_error)?;
411 let req = req.header(CONTENT_LENGTH, content.len());
413 let req = req.header(CONTENT_TYPE, "application/xml");
415
416 let mut req = req
417 .extension(Operation::Write)
418 .body(Buffer::from(Bytes::from(content)))
419 .map_err(new_request_build_error)?;
420
421 self.sign(&mut req).await?;
422 self.send(req).await
423 }
424
425 pub async fn obs_abort_multipart_upload(
427 &self,
428 path: &str,
429 upload_id: &str,
430 ) -> Result<Response<Buffer>> {
431 let p = build_abs_path(&self.root, path);
432
433 let url = format!(
434 "{}/{}?uploadId={}",
435 self.endpoint,
436 percent_encode_path(&p),
437 percent_encode_path(upload_id)
438 );
439
440 let mut req = Request::delete(&url)
441 .extension(Operation::Write)
442 .body(Buffer::new())
443 .map_err(new_request_build_error)?;
444
445 self.sign(&mut req).await?;
446 self.send(req).await
447 }
448}
449
450#[derive(Default, Debug, Deserialize)]
452#[serde(default, rename_all = "PascalCase")]
453pub struct InitiateMultipartUploadResult {
454 pub upload_id: String,
455}
456
457#[derive(Default, Debug, Serialize)]
459#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
460pub struct CompleteMultipartUploadRequest {
461 pub part: Vec<CompleteMultipartUploadRequestPart>,
462}
463
464#[derive(Clone, Default, Debug, Serialize)]
465#[serde(default, rename_all = "PascalCase")]
466pub struct CompleteMultipartUploadRequestPart {
467 #[serde(rename = "PartNumber")]
468 pub part_number: usize,
469 #[serde(rename = "ETag")]
498 pub etag: String,
499}
500
501#[derive(Debug, Default, Deserialize)]
503#[serde[default, rename_all = "PascalCase"]]
504pub struct CompleteMultipartUploadResult {
505 pub location: String,
506 pub bucket: String,
507 pub key: String,
508 #[serde(rename = "ETag")]
509 pub etag: String,
510}
511
512#[derive(Default, Debug, Deserialize)]
513#[serde(default, rename_all = "PascalCase")]
514pub struct ListObjectsOutput {
515 pub name: String,
516 pub prefix: String,
517 pub contents: Vec<ListObjectsOutputContent>,
518 pub common_prefixes: Vec<CommonPrefix>,
519 pub marker: String,
520 pub next_marker: Option<String>,
521}
522
523#[derive(Default, Debug, Deserialize)]
524#[serde(default, rename_all = "PascalCase")]
525pub struct CommonPrefix {
526 pub prefix: String,
527}
528
529#[derive(Default, Debug, Deserialize)]
530#[serde(default, rename_all = "PascalCase")]
531pub struct ListObjectsOutputContent {
532 pub key: String,
533 pub size: u64,
534}
535
536#[cfg(test)]
537mod tests {
538 use bytes::Buf;
539
540 use super::*;
541
542 #[test]
543 fn test_parse_xml() {
544 let bs = bytes::Bytes::from(
545 r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
546<ListBucketResult xmlns="http://obs.cn-north-4.myhuaweicloud.com/doc/2015-06-30/">
547 <Name>examplebucket</Name>
548 <Prefix>obj</Prefix>
549 <Marker>obj002</Marker>
550 <NextMarker>obj004</NextMarker>
551 <MaxKeys>1000</MaxKeys>
552 <IsTruncated>false</IsTruncated>
553 <Contents>
554 <Key>obj002</Key>
555 <LastModified>2015-07-01T02:11:19.775Z</LastModified>
556 <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
557 <Size>9</Size>
558 <Owner>
559 <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
560 </Owner>
561 <StorageClass>STANDARD</StorageClass>
562 </Contents>
563 <Contents>
564 <Key>obj003</Key>
565 <LastModified>2015-07-01T02:11:19.775Z</LastModified>
566 <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
567 <Size>10</Size>
568 <Owner>
569 <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
570 </Owner>
571 <StorageClass>STANDARD</StorageClass>
572 </Contents>
573 <CommonPrefixes>
574 <Prefix>hello</Prefix>
575 </CommonPrefixes>
576 <CommonPrefixes>
577 <Prefix>world</Prefix>
578 </CommonPrefixes>
579</ListBucketResult>"#,
580 );
581 let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
582
583 assert_eq!(out.name, "examplebucket".to_string());
584 assert_eq!(out.prefix, "obj".to_string());
585 assert_eq!(out.marker, "obj002".to_string());
586 assert_eq!(out.next_marker, Some("obj004".to_string()),);
587 assert_eq!(
588 out.contents
589 .iter()
590 .map(|v| v.key.clone())
591 .collect::<Vec<String>>(),
592 ["obj002", "obj003"],
593 );
594 assert_eq!(
595 out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
596 [9, 10],
597 );
598 assert_eq!(
599 out.common_prefixes
600 .iter()
601 .map(|v| v.prefix.clone())
602 .collect::<Vec<String>>(),
603 ["hello", "world"],
604 )
605 }
606}