1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Bytes;
24use http::header::CACHE_CONTROL;
25use http::header::CONTENT_DISPOSITION;
26use http::header::CONTENT_LENGTH;
27use http::header::CONTENT_TYPE;
28use http::header::IF_MATCH;
29use http::header::IF_NONE_MATCH;
30use http::Request;
31use http::Response;
32use reqsign::HuaweicloudObsCredential;
33use reqsign::HuaweicloudObsCredentialLoader;
34use reqsign::HuaweicloudObsSigner;
35use serde::Deserialize;
36use serde::Serialize;
37
38use crate::raw::*;
39use crate::*;
40
41pub mod constants {
42 pub const X_OBS_META_PREFIX: &str = "x-obs-meta-";
43 pub const X_OBS_VERSION_ID: &str = "x-obs-version-id";
44}
45
46pub struct ObsCore {
47 pub info: Arc<AccessorInfo>,
48 pub bucket: String,
49 pub root: String,
50 pub endpoint: String,
51
52 pub signer: HuaweicloudObsSigner,
53 pub loader: HuaweicloudObsCredentialLoader,
54}
55
56impl Debug for ObsCore {
57 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("Backend")
59 .field("root", &self.root)
60 .field("bucket", &self.bucket)
61 .field("endpoint", &self.endpoint)
62 .finish_non_exhaustive()
63 }
64}
65
66impl ObsCore {
67 async fn load_credential(&self) -> Result<Option<HuaweicloudObsCredential>> {
68 let cred = self
69 .loader
70 .load()
71 .await
72 .map_err(new_request_credential_error)?;
73
74 if let Some(cred) = cred {
75 Ok(Some(cred))
76 } else {
77 Ok(None)
78 }
79 }
80
81 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
82 let cred = if let Some(cred) = self.load_credential().await? {
83 cred
84 } else {
85 return Ok(());
86 };
87
88 self.signer.sign(req, &cred).map_err(new_request_sign_error)
89 }
90
91 pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
92 let cred = if let Some(cred) = self.load_credential().await? {
93 cred
94 } else {
95 return Ok(());
96 };
97
98 self.signer
99 .sign_query(req, duration, &cred)
100 .map_err(new_request_sign_error)
101 }
102
103 #[inline]
104 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
105 self.info.http_client().send(req).await
106 }
107}
108
109impl ObsCore {
110 pub async fn obs_get_object(
111 &self,
112 path: &str,
113 range: BytesRange,
114 args: &OpRead,
115 ) -> Result<Response<HttpBody>> {
116 let mut req = self.obs_get_object_request(path, range, args)?;
117
118 self.sign(&mut req).await?;
119
120 self.info.http_client().fetch(req).await
121 }
122
123 pub fn obs_get_object_request(
124 &self,
125 path: &str,
126 range: BytesRange,
127 args: &OpRead,
128 ) -> Result<Request<Buffer>> {
129 let p = build_abs_path(&self.root, path);
130
131 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
132
133 let mut req = Request::get(&url);
134
135 if let Some(if_match) = args.if_match() {
136 req = req.header(IF_MATCH, if_match);
137 }
138
139 if range.is_full() {
140 req = req.header(http::header::RANGE, range.to_header())
141 }
142
143 if let Some(if_none_match) = args.if_none_match() {
144 req = req.header(IF_NONE_MATCH, if_none_match);
145 }
146
147 let req = req
148 .extension(Operation::Read)
149 .body(Buffer::new())
150 .map_err(new_request_build_error)?;
151
152 Ok(req)
153 }
154
155 pub fn obs_put_object_request(
156 &self,
157 path: &str,
158 size: Option<u64>,
159 args: &OpWrite,
160 body: Buffer,
161 ) -> Result<Request<Buffer>> {
162 let p = build_abs_path(&self.root, path);
163
164 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
165
166 let mut req = Request::put(&url);
167
168 if let Some(size) = size {
169 req = req.header(CONTENT_LENGTH, size)
170 }
171 if let Some(cache_control) = args.cache_control() {
172 req = req.header(CACHE_CONTROL, cache_control)
173 }
174
175 if let Some(mime) = args.content_type() {
176 req = req.header(CONTENT_TYPE, mime)
177 }
178
179 if let Some(user_metadata) = args.user_metadata() {
181 for (key, value) in user_metadata {
182 req = req.header(format!("{}{}", constants::X_OBS_META_PREFIX, key), value)
183 }
184 }
185
186 let req = req
187 .extension(Operation::Write)
188 .body(body)
189 .map_err(new_request_build_error)?;
190
191 Ok(req)
192 }
193
194 pub async fn obs_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
195 let mut req = self.obs_head_object_request(path, args)?;
196
197 self.sign(&mut req).await?;
198
199 self.send(req).await
200 }
201
202 pub fn obs_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
203 let p = build_abs_path(&self.root, path);
204
205 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
206
207 let mut req = Request::head(&url);
211
212 if let Some(if_match) = args.if_match() {
213 req = req.header(IF_MATCH, if_match);
214 }
215
216 if let Some(if_none_match) = args.if_none_match() {
217 req = req.header(IF_NONE_MATCH, if_none_match);
218 }
219
220 let req = req
221 .extension(Operation::Stat)
222 .body(Buffer::new())
223 .map_err(new_request_build_error)?;
224
225 Ok(req)
226 }
227
228 pub async fn obs_delete_object(&self, path: &str) -> Result<Response<Buffer>> {
229 let p = build_abs_path(&self.root, path);
230
231 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
232
233 let req = Request::delete(&url);
234
235 let mut req = req
236 .extension(Operation::Delete)
237 .body(Buffer::new())
238 .map_err(new_request_build_error)?;
239
240 self.sign(&mut req).await?;
241
242 self.send(req).await
243 }
244
245 pub fn obs_append_object_request(
246 &self,
247 path: &str,
248 position: u64,
249 size: u64,
250 args: &OpWrite,
251 body: Buffer,
252 ) -> Result<Request<Buffer>> {
253 let p = build_abs_path(&self.root, path);
254 let url = format!(
255 "{}/{}?append&position={}",
256 self.endpoint,
257 percent_encode_path(&p),
258 position
259 );
260
261 let mut req = Request::post(&url);
262
263 req = req.header(CONTENT_LENGTH, size);
264
265 if let Some(mime) = args.content_type() {
266 req = req.header(CONTENT_TYPE, mime);
267 }
268
269 if let Some(pos) = args.content_disposition() {
270 req = req.header(CONTENT_DISPOSITION, pos);
271 }
272
273 if let Some(cache_control) = args.cache_control() {
274 req = req.header(CACHE_CONTROL, cache_control)
275 }
276
277 let req = req
278 .extension(Operation::Write)
279 .body(body)
280 .map_err(new_request_build_error)?;
281
282 Ok(req)
283 }
284
285 pub async fn obs_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
286 let source = build_abs_path(&self.root, from);
287 let target = build_abs_path(&self.root, to);
288
289 let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
290 let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
291
292 let mut req = Request::put(&url)
293 .extension(Operation::Copy)
294 .header("x-obs-copy-source", &source)
295 .body(Buffer::new())
296 .map_err(new_request_build_error)?;
297
298 self.sign(&mut req).await?;
299
300 self.send(req).await
301 }
302
303 pub async fn obs_list_objects(
304 &self,
305 path: &str,
306 next_marker: &str,
307 delimiter: &str,
308 limit: Option<usize>,
309 ) -> Result<Response<Buffer>> {
310 let p = build_abs_path(&self.root, path);
311 let mut url = QueryPairsWriter::new(&self.endpoint);
312
313 if !path.is_empty() {
314 url = url.push("prefix", &percent_encode_path(&p));
315 }
316 if !delimiter.is_empty() {
317 url = url.push("delimiter", delimiter);
318 }
319 if let Some(limit) = limit {
320 url = url.push("max-keys", &limit.to_string());
321 }
322 if !next_marker.is_empty() {
323 url = url.push("marker", next_marker);
324 }
325
326 let mut req = Request::get(url.finish())
327 .extension(Operation::List)
328 .body(Buffer::new())
329 .map_err(new_request_build_error)?;
330
331 self.sign(&mut req).await?;
332
333 self.send(req).await
334 }
335 pub async fn obs_initiate_multipart_upload(
336 &self,
337 path: &str,
338 content_type: Option<&str>,
339 ) -> Result<Response<Buffer>> {
340 let p = build_abs_path(&self.root, path);
341
342 let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
343 let mut req = Request::post(&url);
344
345 if let Some(mime) = content_type {
346 req = req.header(CONTENT_TYPE, mime)
347 }
348
349 let mut req = req
350 .extension(Operation::Write)
351 .body(Buffer::new())
352 .map_err(new_request_build_error)?;
353
354 self.sign(&mut req).await?;
355
356 self.send(req).await
357 }
358 pub async fn obs_upload_part_request(
359 &self,
360 path: &str,
361 upload_id: &str,
362 part_number: usize,
363 size: Option<u64>,
364 body: Buffer,
365 ) -> Result<Response<Buffer>> {
366 let p = build_abs_path(&self.root, path);
367
368 let url = format!(
369 "{}/{}?partNumber={}&uploadId={}",
370 self.endpoint,
371 percent_encode_path(&p),
372 part_number,
373 percent_encode_path(upload_id)
374 );
375
376 let mut req = Request::put(&url);
377
378 if let Some(size) = size {
379 req = req.header(CONTENT_LENGTH, size);
380 }
381
382 let mut req = req
383 .extension(Operation::Write)
384 .body(body)
386 .map_err(new_request_build_error)?;
387
388 self.sign(&mut req).await?;
389
390 self.send(req).await
391 }
392
393 pub async fn obs_complete_multipart_upload(
394 &self,
395 path: &str,
396 upload_id: &str,
397 parts: Vec<CompleteMultipartUploadRequestPart>,
398 ) -> Result<Response<Buffer>> {
399 let p = build_abs_path(&self.root, path);
400 let url = format!(
401 "{}/{}?uploadId={}",
402 self.endpoint,
403 percent_encode_path(&p),
404 percent_encode_path(upload_id)
405 );
406
407 let req = Request::post(&url);
408
409 let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest {
410 part: parts.to_vec(),
411 })
412 .map_err(new_xml_serialize_error)?;
413 let req = req.header(CONTENT_LENGTH, content.len());
415 let req = req.header(CONTENT_TYPE, "application/xml");
417
418 let mut req = req
419 .extension(Operation::Write)
420 .body(Buffer::from(Bytes::from(content)))
421 .map_err(new_request_build_error)?;
422
423 self.sign(&mut req).await?;
424 self.send(req).await
425 }
426
427 pub async fn obs_abort_multipart_upload(
429 &self,
430 path: &str,
431 upload_id: &str,
432 ) -> Result<Response<Buffer>> {
433 let p = build_abs_path(&self.root, path);
434
435 let url = format!(
436 "{}/{}?uploadId={}",
437 self.endpoint,
438 percent_encode_path(&p),
439 percent_encode_path(upload_id)
440 );
441
442 let mut req = Request::delete(&url)
443 .extension(Operation::Write)
444 .body(Buffer::new())
445 .map_err(new_request_build_error)?;
446
447 self.sign(&mut req).await?;
448 self.send(req).await
449 }
450}
451
452#[derive(Default, Debug, Deserialize)]
454#[serde(default, rename_all = "PascalCase")]
455pub struct InitiateMultipartUploadResult {
456 pub upload_id: String,
457}
458
459#[derive(Default, Debug, Serialize)]
461#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
462pub struct CompleteMultipartUploadRequest {
463 pub part: Vec<CompleteMultipartUploadRequestPart>,
464}
465
466#[derive(Clone, Default, Debug, Serialize)]
467#[serde(default, rename_all = "PascalCase")]
468pub struct CompleteMultipartUploadRequestPart {
469 #[serde(rename = "PartNumber")]
470 pub part_number: usize,
471 #[serde(rename = "ETag")]
500 pub etag: String,
501}
502
503#[derive(Debug, Default, Deserialize)]
505#[serde[default, rename_all = "PascalCase"]]
506pub struct CompleteMultipartUploadResult {
507 pub location: String,
508 pub bucket: String,
509 pub key: String,
510 #[serde(rename = "ETag")]
511 pub etag: String,
512}
513
514#[derive(Default, Debug, Deserialize)]
515#[serde(default, rename_all = "PascalCase")]
516pub struct ListObjectsOutput {
517 pub name: String,
518 pub prefix: String,
519 pub contents: Vec<ListObjectsOutputContent>,
520 pub common_prefixes: Vec<CommonPrefix>,
521 pub marker: String,
522 pub next_marker: Option<String>,
523}
524
525#[derive(Default, Debug, Deserialize)]
526#[serde(default, rename_all = "PascalCase")]
527pub struct CommonPrefix {
528 pub prefix: String,
529}
530
531#[derive(Default, Debug, Deserialize)]
532#[serde(default, rename_all = "PascalCase")]
533pub struct ListObjectsOutputContent {
534 pub key: String,
535 pub size: u64,
536}
537
538#[cfg(test)]
539mod tests {
540 use bytes::Buf;
541
542 use super::*;
543
544 #[test]
545 fn test_parse_xml() {
546 let bs = bytes::Bytes::from(
547 r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
548<ListBucketResult xmlns="http://obs.cn-north-4.myhuaweicloud.com/doc/2015-06-30/">
549 <Name>examplebucket</Name>
550 <Prefix>obj</Prefix>
551 <Marker>obj002</Marker>
552 <NextMarker>obj004</NextMarker>
553 <MaxKeys>1000</MaxKeys>
554 <IsTruncated>false</IsTruncated>
555 <Contents>
556 <Key>obj002</Key>
557 <LastModified>2015-07-01T02:11:19.775Z</LastModified>
558 <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
559 <Size>9</Size>
560 <Owner>
561 <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
562 </Owner>
563 <StorageClass>STANDARD</StorageClass>
564 </Contents>
565 <Contents>
566 <Key>obj003</Key>
567 <LastModified>2015-07-01T02:11:19.775Z</LastModified>
568 <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
569 <Size>10</Size>
570 <Owner>
571 <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
572 </Owner>
573 <StorageClass>STANDARD</StorageClass>
574 </Contents>
575 <CommonPrefixes>
576 <Prefix>hello</Prefix>
577 </CommonPrefixes>
578 <CommonPrefixes>
579 <Prefix>world</Prefix>
580 </CommonPrefixes>
581</ListBucketResult>"#,
582 );
583 let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
584
585 assert_eq!(out.name, "examplebucket".to_string());
586 assert_eq!(out.prefix, "obj".to_string());
587 assert_eq!(out.marker, "obj002".to_string());
588 assert_eq!(out.next_marker, Some("obj004".to_string()),);
589 assert_eq!(
590 out.contents
591 .iter()
592 .map(|v| v.key.clone())
593 .collect::<Vec<String>>(),
594 ["obj002", "obj003"],
595 );
596 assert_eq!(
597 out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
598 [9, 10],
599 );
600 assert_eq!(
601 out.common_prefixes
602 .iter()
603 .map(|v| v.prefix.clone())
604 .collect::<Vec<String>>(),
605 ["hello", "world"],
606 )
607 }
608}