1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Bytes;
24use http::header::CACHE_CONTROL;
25use http::header::CONTENT_DISPOSITION;
26use http::header::CONTENT_LENGTH;
27use http::header::CONTENT_TYPE;
28use http::header::IF_MATCH;
29use http::header::IF_NONE_MATCH;
30use http::Request;
31use http::Response;
32use reqsign::HuaweicloudObsCredential;
33use reqsign::HuaweicloudObsCredentialLoader;
34use reqsign::HuaweicloudObsSigner;
35use serde::Deserialize;
36use serde::Serialize;
37
38use crate::raw::*;
39use crate::*;
40
41pub mod constants {
42 pub const X_OBS_META_PREFIX: &str = "x-obs-meta-";
43 pub const X_OBS_VERSION_ID: &str = "x-obs-version-id";
44}
45
46pub struct ObsCore {
47 pub info: Arc<AccessorInfo>,
48 pub bucket: String,
49 pub root: String,
50 pub endpoint: String,
51
52 pub signer: HuaweicloudObsSigner,
53 pub loader: HuaweicloudObsCredentialLoader,
54}
55
56impl Debug for ObsCore {
57 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("Backend")
59 .field("root", &self.root)
60 .field("bucket", &self.bucket)
61 .field("endpoint", &self.endpoint)
62 .finish_non_exhaustive()
63 }
64}
65
66impl ObsCore {
67 async fn load_credential(&self) -> Result<Option<HuaweicloudObsCredential>> {
68 let cred = self
69 .loader
70 .load()
71 .await
72 .map_err(new_request_credential_error)?;
73
74 if let Some(cred) = cred {
75 Ok(Some(cred))
76 } else {
77 Ok(None)
78 }
79 }
80
81 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
82 let cred = if let Some(cred) = self.load_credential().await? {
83 cred
84 } else {
85 return Ok(());
86 };
87
88 self.signer.sign(req, &cred).map_err(new_request_sign_error)
89 }
90
91 pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
92 let cred = if let Some(cred) = self.load_credential().await? {
93 cred
94 } else {
95 return Ok(());
96 };
97
98 self.signer
99 .sign_query(req, duration, &cred)
100 .map_err(new_request_sign_error)
101 }
102
103 #[inline]
104 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
105 self.info.http_client().send(req).await
106 }
107}
108
109impl ObsCore {
110 pub async fn obs_get_object(
111 &self,
112 path: &str,
113 range: BytesRange,
114 args: &OpRead,
115 ) -> Result<Response<HttpBody>> {
116 let mut req = self.obs_get_object_request(path, range, args)?;
117
118 self.sign(&mut req).await?;
119
120 self.info.http_client().fetch(req).await
121 }
122
123 pub fn obs_get_object_request(
124 &self,
125 path: &str,
126 range: BytesRange,
127 args: &OpRead,
128 ) -> Result<Request<Buffer>> {
129 let p = build_abs_path(&self.root, path);
130
131 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
132
133 let mut req = Request::get(&url);
134
135 if let Some(if_match) = args.if_match() {
136 req = req.header(IF_MATCH, if_match);
137 }
138
139 if range.is_full() {
140 req = req.header(http::header::RANGE, range.to_header())
141 }
142
143 if let Some(if_none_match) = args.if_none_match() {
144 req = req.header(IF_NONE_MATCH, if_none_match);
145 }
146
147 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
148
149 Ok(req)
150 }
151
152 pub fn obs_put_object_request(
153 &self,
154 path: &str,
155 size: Option<u64>,
156 args: &OpWrite,
157 body: Buffer,
158 ) -> Result<Request<Buffer>> {
159 let p = build_abs_path(&self.root, path);
160
161 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
162
163 let mut req = Request::put(&url);
164
165 if let Some(size) = size {
166 req = req.header(CONTENT_LENGTH, size)
167 }
168 if let Some(cache_control) = args.cache_control() {
169 req = req.header(CACHE_CONTROL, cache_control)
170 }
171
172 if let Some(mime) = args.content_type() {
173 req = req.header(CONTENT_TYPE, mime)
174 }
175
176 if let Some(user_metadata) = args.user_metadata() {
178 for (key, value) in user_metadata {
179 req = req.header(format!("{}{}", constants::X_OBS_META_PREFIX, key), value)
180 }
181 }
182
183 let req = req.body(body).map_err(new_request_build_error)?;
184
185 Ok(req)
186 }
187
188 pub async fn obs_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
189 let mut req = self.obs_head_object_request(path, args)?;
190
191 self.sign(&mut req).await?;
192
193 self.send(req).await
194 }
195
196 pub fn obs_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
197 let p = build_abs_path(&self.root, path);
198
199 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
200
201 let mut req = Request::head(&url);
205
206 if let Some(if_match) = args.if_match() {
207 req = req.header(IF_MATCH, if_match);
208 }
209
210 if let Some(if_none_match) = args.if_none_match() {
211 req = req.header(IF_NONE_MATCH, if_none_match);
212 }
213
214 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
215
216 Ok(req)
217 }
218
219 pub async fn obs_delete_object(&self, path: &str) -> Result<Response<Buffer>> {
220 let p = build_abs_path(&self.root, path);
221
222 let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
223
224 let req = Request::delete(&url);
225
226 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
227
228 self.sign(&mut req).await?;
229
230 self.send(req).await
231 }
232
233 pub fn obs_append_object_request(
234 &self,
235 path: &str,
236 position: u64,
237 size: u64,
238 args: &OpWrite,
239 body: Buffer,
240 ) -> Result<Request<Buffer>> {
241 let p = build_abs_path(&self.root, path);
242 let url = format!(
243 "{}/{}?append&position={}",
244 self.endpoint,
245 percent_encode_path(&p),
246 position
247 );
248
249 let mut req = Request::post(&url);
250
251 req = req.header(CONTENT_LENGTH, size);
252
253 if let Some(mime) = args.content_type() {
254 req = req.header(CONTENT_TYPE, mime);
255 }
256
257 if let Some(pos) = args.content_disposition() {
258 req = req.header(CONTENT_DISPOSITION, pos);
259 }
260
261 if let Some(cache_control) = args.cache_control() {
262 req = req.header(CACHE_CONTROL, cache_control)
263 }
264
265 let req = req.body(body).map_err(new_request_build_error)?;
266 Ok(req)
267 }
268
269 pub async fn obs_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
270 let source = build_abs_path(&self.root, from);
271 let target = build_abs_path(&self.root, to);
272
273 let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
274 let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
275
276 let mut req = Request::put(&url)
277 .header("x-obs-copy-source", &source)
278 .body(Buffer::new())
279 .map_err(new_request_build_error)?;
280
281 self.sign(&mut req).await?;
282
283 self.send(req).await
284 }
285
286 pub async fn obs_list_objects(
287 &self,
288 path: &str,
289 next_marker: &str,
290 delimiter: &str,
291 limit: Option<usize>,
292 ) -> Result<Response<Buffer>> {
293 let p = build_abs_path(&self.root, path);
294
295 let mut queries = vec![];
296 if !path.is_empty() {
297 queries.push(format!("prefix={}", percent_encode_path(&p)));
298 }
299 if !delimiter.is_empty() {
300 queries.push(format!("delimiter={delimiter}"));
301 }
302 if let Some(limit) = limit {
303 queries.push(format!("max-keys={limit}"));
304 }
305 if !next_marker.is_empty() {
306 queries.push(format!("marker={next_marker}"));
307 }
308
309 let url = if queries.is_empty() {
310 self.endpoint.to_string()
311 } else {
312 format!("{}?{}", self.endpoint, queries.join("&"))
313 };
314
315 let mut req = Request::get(&url)
316 .body(Buffer::new())
317 .map_err(new_request_build_error)?;
318
319 self.sign(&mut req).await?;
320
321 self.send(req).await
322 }
323 pub async fn obs_initiate_multipart_upload(
324 &self,
325 path: &str,
326 content_type: Option<&str>,
327 ) -> Result<Response<Buffer>> {
328 let p = build_abs_path(&self.root, path);
329
330 let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
331 let mut req = Request::post(&url);
332
333 if let Some(mime) = content_type {
334 req = req.header(CONTENT_TYPE, mime)
335 }
336 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
337
338 self.sign(&mut req).await?;
339
340 self.send(req).await
341 }
342 pub async fn obs_upload_part_request(
343 &self,
344 path: &str,
345 upload_id: &str,
346 part_number: usize,
347 size: Option<u64>,
348 body: Buffer,
349 ) -> Result<Response<Buffer>> {
350 let p = build_abs_path(&self.root, path);
351
352 let url = format!(
353 "{}/{}?partNumber={}&uploadId={}",
354 self.endpoint,
355 percent_encode_path(&p),
356 part_number,
357 percent_encode_path(upload_id)
358 );
359
360 let mut req = Request::put(&url);
361
362 if let Some(size) = size {
363 req = req.header(CONTENT_LENGTH, size);
364 }
365
366 let mut req = req.body(body).map_err(new_request_build_error)?;
368
369 self.sign(&mut req).await?;
370
371 self.send(req).await
372 }
373
374 pub async fn obs_complete_multipart_upload(
375 &self,
376 path: &str,
377 upload_id: &str,
378 parts: Vec<CompleteMultipartUploadRequestPart>,
379 ) -> Result<Response<Buffer>> {
380 let p = build_abs_path(&self.root, path);
381 let url = format!(
382 "{}/{}?uploadId={}",
383 self.endpoint,
384 percent_encode_path(&p),
385 percent_encode_path(upload_id)
386 );
387
388 let req = Request::post(&url);
389
390 let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest {
391 part: parts.to_vec(),
392 })
393 .map_err(new_xml_deserialize_error)?;
394 let req = req.header(CONTENT_LENGTH, content.len());
396 let req = req.header(CONTENT_TYPE, "application/xml");
398
399 let mut req = req
400 .body(Buffer::from(Bytes::from(content)))
401 .map_err(new_request_build_error)?;
402
403 self.sign(&mut req).await?;
404 self.send(req).await
405 }
406
407 pub async fn obs_abort_multipart_upload(
409 &self,
410 path: &str,
411 upload_id: &str,
412 ) -> Result<Response<Buffer>> {
413 let p = build_abs_path(&self.root, path);
414
415 let url = format!(
416 "{}/{}?uploadId={}",
417 self.endpoint,
418 percent_encode_path(&p),
419 percent_encode_path(upload_id)
420 );
421
422 let mut req = Request::delete(&url)
423 .body(Buffer::new())
424 .map_err(new_request_build_error)?;
425
426 self.sign(&mut req).await?;
427 self.send(req).await
428 }
429}
430
431#[derive(Default, Debug, Deserialize)]
433#[serde(default, rename_all = "PascalCase")]
434pub struct InitiateMultipartUploadResult {
435 pub upload_id: String,
436}
437
438#[derive(Default, Debug, Serialize)]
440#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
441pub struct CompleteMultipartUploadRequest {
442 pub part: Vec<CompleteMultipartUploadRequestPart>,
443}
444
445#[derive(Clone, Default, Debug, Serialize)]
446#[serde(default, rename_all = "PascalCase")]
447pub struct CompleteMultipartUploadRequestPart {
448 #[serde(rename = "PartNumber")]
449 pub part_number: usize,
450 #[serde(rename = "ETag")]
479 pub etag: String,
480}
481
482#[derive(Debug, Default, Deserialize)]
484#[serde[default, rename_all = "PascalCase"]]
485pub struct CompleteMultipartUploadResult {
486 pub location: String,
487 pub bucket: String,
488 pub key: String,
489 #[serde(rename = "ETag")]
490 pub etag: String,
491}
492
493#[derive(Default, Debug, Deserialize)]
494#[serde(default, rename_all = "PascalCase")]
495pub struct ListObjectsOutput {
496 pub name: String,
497 pub prefix: String,
498 pub contents: Vec<ListObjectsOutputContent>,
499 pub common_prefixes: Vec<CommonPrefix>,
500 pub marker: String,
501 pub next_marker: Option<String>,
502}
503
504#[derive(Default, Debug, Deserialize)]
505#[serde(default, rename_all = "PascalCase")]
506pub struct CommonPrefix {
507 pub prefix: String,
508}
509
510#[derive(Default, Debug, Deserialize)]
511#[serde(default, rename_all = "PascalCase")]
512pub struct ListObjectsOutputContent {
513 pub key: String,
514 pub size: u64,
515}
516
517#[cfg(test)]
518mod tests {
519 use bytes::Buf;
520
521 use super::*;
522
523 #[test]
524 fn test_parse_xml() {
525 let bs = bytes::Bytes::from(
526 r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
527<ListBucketResult xmlns="http://obs.cn-north-4.myhuaweicloud.com/doc/2015-06-30/">
528 <Name>examplebucket</Name>
529 <Prefix>obj</Prefix>
530 <Marker>obj002</Marker>
531 <NextMarker>obj004</NextMarker>
532 <MaxKeys>1000</MaxKeys>
533 <IsTruncated>false</IsTruncated>
534 <Contents>
535 <Key>obj002</Key>
536 <LastModified>2015-07-01T02:11:19.775Z</LastModified>
537 <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
538 <Size>9</Size>
539 <Owner>
540 <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
541 </Owner>
542 <StorageClass>STANDARD</StorageClass>
543 </Contents>
544 <Contents>
545 <Key>obj003</Key>
546 <LastModified>2015-07-01T02:11:19.775Z</LastModified>
547 <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
548 <Size>10</Size>
549 <Owner>
550 <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
551 </Owner>
552 <StorageClass>STANDARD</StorageClass>
553 </Contents>
554 <CommonPrefixes>
555 <Prefix>hello</Prefix>
556 </CommonPrefixes>
557 <CommonPrefixes>
558 <Prefix>world</Prefix>
559 </CommonPrefixes>
560</ListBucketResult>"#,
561 );
562 let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
563
564 assert_eq!(out.name, "examplebucket".to_string());
565 assert_eq!(out.prefix, "obj".to_string());
566 assert_eq!(out.marker, "obj002".to_string());
567 assert_eq!(out.next_marker, Some("obj004".to_string()),);
568 assert_eq!(
569 out.contents
570 .iter()
571 .map(|v| v.key.clone())
572 .collect::<Vec<String>>(),
573 ["obj002", "obj003"],
574 );
575 assert_eq!(
576 out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
577 [9, 10],
578 );
579 assert_eq!(
580 out.common_prefixes
581 .iter()
582 .map(|v| v.prefix.clone())
583 .collect::<Vec<String>>(),
584 ["hello", "world"],
585 )
586 }
587}