opendal/services/obs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Bytes;
24use http::header::CACHE_CONTROL;
25use http::header::CONTENT_DISPOSITION;
26use http::header::CONTENT_LENGTH;
27use http::header::CONTENT_TYPE;
28use http::header::IF_MATCH;
29use http::header::IF_NONE_MATCH;
30use http::Request;
31use http::Response;
32use reqsign::HuaweicloudObsCredential;
33use reqsign::HuaweicloudObsCredentialLoader;
34use reqsign::HuaweicloudObsSigner;
35use serde::Deserialize;
36use serde::Serialize;
37
38use crate::raw::*;
39use crate::*;
40
41pub mod constants {
42    pub const X_OBS_META_PREFIX: &str = "x-obs-meta-";
43    pub const X_OBS_VERSION_ID: &str = "x-obs-version-id";
44}
45
46pub struct ObsCore {
47    pub info: Arc<AccessorInfo>,
48    pub bucket: String,
49    pub root: String,
50    pub endpoint: String,
51
52    pub signer: HuaweicloudObsSigner,
53    pub loader: HuaweicloudObsCredentialLoader,
54}
55
56impl Debug for ObsCore {
57    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("Backend")
59            .field("root", &self.root)
60            .field("bucket", &self.bucket)
61            .field("endpoint", &self.endpoint)
62            .finish_non_exhaustive()
63    }
64}
65
66impl ObsCore {
67    async fn load_credential(&self) -> Result<Option<HuaweicloudObsCredential>> {
68        let cred = self
69            .loader
70            .load()
71            .await
72            .map_err(new_request_credential_error)?;
73
74        if let Some(cred) = cred {
75            Ok(Some(cred))
76        } else {
77            Ok(None)
78        }
79    }
80
81    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
82        let cred = if let Some(cred) = self.load_credential().await? {
83            cred
84        } else {
85            return Ok(());
86        };
87
88        self.signer.sign(req, &cred).map_err(new_request_sign_error)
89    }
90
91    pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
92        let cred = if let Some(cred) = self.load_credential().await? {
93            cred
94        } else {
95            return Ok(());
96        };
97
98        self.signer
99            .sign_query(req, duration, &cred)
100            .map_err(new_request_sign_error)
101    }
102
103    #[inline]
104    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
105        self.info.http_client().send(req).await
106    }
107}
108
109impl ObsCore {
110    pub async fn obs_get_object(
111        &self,
112        path: &str,
113        range: BytesRange,
114        args: &OpRead,
115    ) -> Result<Response<HttpBody>> {
116        let mut req = self.obs_get_object_request(path, range, args)?;
117
118        self.sign(&mut req).await?;
119
120        self.info.http_client().fetch(req).await
121    }
122
123    pub fn obs_get_object_request(
124        &self,
125        path: &str,
126        range: BytesRange,
127        args: &OpRead,
128    ) -> Result<Request<Buffer>> {
129        let p = build_abs_path(&self.root, path);
130
131        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
132
133        let mut req = Request::get(&url);
134
135        if let Some(if_match) = args.if_match() {
136            req = req.header(IF_MATCH, if_match);
137        }
138
139        if range.is_full() {
140            req = req.header(http::header::RANGE, range.to_header())
141        }
142
143        if let Some(if_none_match) = args.if_none_match() {
144            req = req.header(IF_NONE_MATCH, if_none_match);
145        }
146
147        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
148
149        Ok(req)
150    }
151
152    pub fn obs_put_object_request(
153        &self,
154        path: &str,
155        size: Option<u64>,
156        args: &OpWrite,
157        body: Buffer,
158    ) -> Result<Request<Buffer>> {
159        let p = build_abs_path(&self.root, path);
160
161        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
162
163        let mut req = Request::put(&url);
164
165        if let Some(size) = size {
166            req = req.header(CONTENT_LENGTH, size)
167        }
168        if let Some(cache_control) = args.cache_control() {
169            req = req.header(CACHE_CONTROL, cache_control)
170        }
171
172        if let Some(mime) = args.content_type() {
173            req = req.header(CONTENT_TYPE, mime)
174        }
175
176        // Set user metadata headers.
177        if let Some(user_metadata) = args.user_metadata() {
178            for (key, value) in user_metadata {
179                req = req.header(format!("{}{}", constants::X_OBS_META_PREFIX, key), value)
180            }
181        }
182
183        let req = req.body(body).map_err(new_request_build_error)?;
184
185        Ok(req)
186    }
187
188    pub async fn obs_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
189        let mut req = self.obs_head_object_request(path, args)?;
190
191        self.sign(&mut req).await?;
192
193        self.send(req).await
194    }
195
196    pub fn obs_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
197        let p = build_abs_path(&self.root, path);
198
199        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
200
201        // The header 'Origin' is optional for API calling, the doc has mistake, confirmed with customer service of huaweicloud.
202        // https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0084.html
203
204        let mut req = Request::head(&url);
205
206        if let Some(if_match) = args.if_match() {
207            req = req.header(IF_MATCH, if_match);
208        }
209
210        if let Some(if_none_match) = args.if_none_match() {
211            req = req.header(IF_NONE_MATCH, if_none_match);
212        }
213
214        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
215
216        Ok(req)
217    }
218
219    pub async fn obs_delete_object(&self, path: &str) -> Result<Response<Buffer>> {
220        let p = build_abs_path(&self.root, path);
221
222        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
223
224        let req = Request::delete(&url);
225
226        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
227
228        self.sign(&mut req).await?;
229
230        self.send(req).await
231    }
232
233    pub fn obs_append_object_request(
234        &self,
235        path: &str,
236        position: u64,
237        size: u64,
238        args: &OpWrite,
239        body: Buffer,
240    ) -> Result<Request<Buffer>> {
241        let p = build_abs_path(&self.root, path);
242        let url = format!(
243            "{}/{}?append&position={}",
244            self.endpoint,
245            percent_encode_path(&p),
246            position
247        );
248
249        let mut req = Request::post(&url);
250
251        req = req.header(CONTENT_LENGTH, size);
252
253        if let Some(mime) = args.content_type() {
254            req = req.header(CONTENT_TYPE, mime);
255        }
256
257        if let Some(pos) = args.content_disposition() {
258            req = req.header(CONTENT_DISPOSITION, pos);
259        }
260
261        if let Some(cache_control) = args.cache_control() {
262            req = req.header(CACHE_CONTROL, cache_control)
263        }
264
265        let req = req.body(body).map_err(new_request_build_error)?;
266        Ok(req)
267    }
268
269    pub async fn obs_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
270        let source = build_abs_path(&self.root, from);
271        let target = build_abs_path(&self.root, to);
272
273        let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
274        let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
275
276        let mut req = Request::put(&url)
277            .header("x-obs-copy-source", &source)
278            .body(Buffer::new())
279            .map_err(new_request_build_error)?;
280
281        self.sign(&mut req).await?;
282
283        self.send(req).await
284    }
285
286    pub async fn obs_list_objects(
287        &self,
288        path: &str,
289        next_marker: &str,
290        delimiter: &str,
291        limit: Option<usize>,
292    ) -> Result<Response<Buffer>> {
293        let p = build_abs_path(&self.root, path);
294
295        let mut queries = vec![];
296        if !path.is_empty() {
297            queries.push(format!("prefix={}", percent_encode_path(&p)));
298        }
299        if !delimiter.is_empty() {
300            queries.push(format!("delimiter={delimiter}"));
301        }
302        if let Some(limit) = limit {
303            queries.push(format!("max-keys={limit}"));
304        }
305        if !next_marker.is_empty() {
306            queries.push(format!("marker={next_marker}"));
307        }
308
309        let url = if queries.is_empty() {
310            self.endpoint.to_string()
311        } else {
312            format!("{}?{}", self.endpoint, queries.join("&"))
313        };
314
315        let mut req = Request::get(&url)
316            .body(Buffer::new())
317            .map_err(new_request_build_error)?;
318
319        self.sign(&mut req).await?;
320
321        self.send(req).await
322    }
323    pub async fn obs_initiate_multipart_upload(
324        &self,
325        path: &str,
326        content_type: Option<&str>,
327    ) -> Result<Response<Buffer>> {
328        let p = build_abs_path(&self.root, path);
329
330        let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
331        let mut req = Request::post(&url);
332
333        if let Some(mime) = content_type {
334            req = req.header(CONTENT_TYPE, mime)
335        }
336        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
337
338        self.sign(&mut req).await?;
339
340        self.send(req).await
341    }
342    pub async fn obs_upload_part_request(
343        &self,
344        path: &str,
345        upload_id: &str,
346        part_number: usize,
347        size: Option<u64>,
348        body: Buffer,
349    ) -> Result<Response<Buffer>> {
350        let p = build_abs_path(&self.root, path);
351
352        let url = format!(
353            "{}/{}?partNumber={}&uploadId={}",
354            self.endpoint,
355            percent_encode_path(&p),
356            part_number,
357            percent_encode_path(upload_id)
358        );
359
360        let mut req = Request::put(&url);
361
362        if let Some(size) = size {
363            req = req.header(CONTENT_LENGTH, size);
364        }
365
366        // Set body
367        let mut req = req.body(body).map_err(new_request_build_error)?;
368
369        self.sign(&mut req).await?;
370
371        self.send(req).await
372    }
373
374    pub async fn obs_complete_multipart_upload(
375        &self,
376        path: &str,
377        upload_id: &str,
378        parts: Vec<CompleteMultipartUploadRequestPart>,
379    ) -> Result<Response<Buffer>> {
380        let p = build_abs_path(&self.root, path);
381        let url = format!(
382            "{}/{}?uploadId={}",
383            self.endpoint,
384            percent_encode_path(&p),
385            percent_encode_path(upload_id)
386        );
387
388        let req = Request::post(&url);
389
390        let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest {
391            part: parts.to_vec(),
392        })
393        .map_err(new_xml_deserialize_error)?;
394        // Make sure content length has been set to avoid post with chunked encoding.
395        let req = req.header(CONTENT_LENGTH, content.len());
396        // Set content-type to `application/xml` to avoid mixed with form post.
397        let req = req.header(CONTENT_TYPE, "application/xml");
398
399        let mut req = req
400            .body(Buffer::from(Bytes::from(content)))
401            .map_err(new_request_build_error)?;
402
403        self.sign(&mut req).await?;
404        self.send(req).await
405    }
406
407    /// Abort an on-going multipart upload.
408    pub async fn obs_abort_multipart_upload(
409        &self,
410        path: &str,
411        upload_id: &str,
412    ) -> Result<Response<Buffer>> {
413        let p = build_abs_path(&self.root, path);
414
415        let url = format!(
416            "{}/{}?uploadId={}",
417            self.endpoint,
418            percent_encode_path(&p),
419            percent_encode_path(upload_id)
420        );
421
422        let mut req = Request::delete(&url)
423            .body(Buffer::new())
424            .map_err(new_request_build_error)?;
425
426        self.sign(&mut req).await?;
427        self.send(req).await
428    }
429}
430
431/// Result of CreateMultipartUpload
432#[derive(Default, Debug, Deserialize)]
433#[serde(default, rename_all = "PascalCase")]
434pub struct InitiateMultipartUploadResult {
435    pub upload_id: String,
436}
437
438/// Request of CompleteMultipartUploadRequest
439#[derive(Default, Debug, Serialize)]
440#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
441pub struct CompleteMultipartUploadRequest {
442    pub part: Vec<CompleteMultipartUploadRequestPart>,
443}
444
445#[derive(Clone, Default, Debug, Serialize)]
446#[serde(default, rename_all = "PascalCase")]
447pub struct CompleteMultipartUploadRequestPart {
448    #[serde(rename = "PartNumber")]
449    pub part_number: usize,
450    ///
451    ///
452    /// quick-xml will do escape on `"` which leads to our serialized output is
453    /// not the same as aws s3's example.
454    ///
455    /// Ideally, we could use `serialize_with` to address this (buf failed)
456    ///
457    /// ```ignore
458    /// #[derive(Default, Debug, Serialize)]
459    /// #[serde(default, rename_all = "PascalCase")]
460    /// struct CompleteMultipartUploadRequestPart {
461    ///     #[serde(rename = "PartNumber")]
462    ///     part_number: usize,
463    ///     #[serde(rename = "ETag", serialize_with = "partial_escape")]
464    ///     etag: String,
465    /// }
466    ///
467    /// fn partial_escape<S>(s: &str, ser: S) -> Result<S::Ok, S::Error>
468    /// where
469    ///     S: serde::Serializer,
470    /// {
471    ///     ser.serialize_str(&String::from_utf8_lossy(
472    ///         &quick_xml::escape::partial_escape(s.as_bytes()),
473    ///     ))
474    /// }
475    /// ```
476    ///
477    /// ref: <https://github.com/tafia/quick-xml/issues/362>
478    #[serde(rename = "ETag")]
479    pub etag: String,
480}
481
482/// Output of `CompleteMultipartUpload` operation
483#[derive(Debug, Default, Deserialize)]
484#[serde[default, rename_all = "PascalCase"]]
485pub struct CompleteMultipartUploadResult {
486    pub location: String,
487    pub bucket: String,
488    pub key: String,
489    #[serde(rename = "ETag")]
490    pub etag: String,
491}
492
493#[derive(Default, Debug, Deserialize)]
494#[serde(default, rename_all = "PascalCase")]
495pub struct ListObjectsOutput {
496    pub name: String,
497    pub prefix: String,
498    pub contents: Vec<ListObjectsOutputContent>,
499    pub common_prefixes: Vec<CommonPrefix>,
500    pub marker: String,
501    pub next_marker: Option<String>,
502}
503
504#[derive(Default, Debug, Deserialize)]
505#[serde(default, rename_all = "PascalCase")]
506pub struct CommonPrefix {
507    pub prefix: String,
508}
509
510#[derive(Default, Debug, Deserialize)]
511#[serde(default, rename_all = "PascalCase")]
512pub struct ListObjectsOutputContent {
513    pub key: String,
514    pub size: u64,
515}
516
517#[cfg(test)]
518mod tests {
519    use bytes::Buf;
520
521    use super::*;
522
523    #[test]
524    fn test_parse_xml() {
525        let bs = bytes::Bytes::from(
526            r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
527<ListBucketResult xmlns="http://obs.cn-north-4.myhuaweicloud.com/doc/2015-06-30/">
528    <Name>examplebucket</Name>
529    <Prefix>obj</Prefix>
530    <Marker>obj002</Marker>
531    <NextMarker>obj004</NextMarker>
532    <MaxKeys>1000</MaxKeys>
533    <IsTruncated>false</IsTruncated>
534    <Contents>
535        <Key>obj002</Key>
536        <LastModified>2015-07-01T02:11:19.775Z</LastModified>
537        <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
538        <Size>9</Size>
539        <Owner>
540            <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
541        </Owner>
542        <StorageClass>STANDARD</StorageClass>
543    </Contents>
544    <Contents>
545        <Key>obj003</Key>
546        <LastModified>2015-07-01T02:11:19.775Z</LastModified>
547        <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
548        <Size>10</Size>
549        <Owner>
550            <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
551        </Owner>
552        <StorageClass>STANDARD</StorageClass>
553    </Contents>
554    <CommonPrefixes>
555        <Prefix>hello</Prefix>
556    </CommonPrefixes>
557    <CommonPrefixes>
558        <Prefix>world</Prefix>
559    </CommonPrefixes>
560</ListBucketResult>"#,
561        );
562        let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
563
564        assert_eq!(out.name, "examplebucket".to_string());
565        assert_eq!(out.prefix, "obj".to_string());
566        assert_eq!(out.marker, "obj002".to_string());
567        assert_eq!(out.next_marker, Some("obj004".to_string()),);
568        assert_eq!(
569            out.contents
570                .iter()
571                .map(|v| v.key.clone())
572                .collect::<Vec<String>>(),
573            ["obj002", "obj003"],
574        );
575        assert_eq!(
576            out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
577            [9, 10],
578        );
579        assert_eq!(
580            out.common_prefixes
581                .iter()
582                .map(|v| v.prefix.clone())
583                .collect::<Vec<String>>(),
584            ["hello", "world"],
585        )
586    }
587}