opendal_core/services/obs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Bytes;
22use http::Request;
23use http::Response;
24use http::header::CACHE_CONTROL;
25use http::header::CONTENT_DISPOSITION;
26use http::header::CONTENT_LENGTH;
27use http::header::CONTENT_TYPE;
28use http::header::IF_MATCH;
29use http::header::IF_NONE_MATCH;
30use reqsign::HuaweicloudObsCredential;
31use reqsign::HuaweicloudObsCredentialLoader;
32use reqsign::HuaweicloudObsSigner;
33use serde::Deserialize;
34use serde::Serialize;
35
36use crate::raw::*;
37use crate::*;
38
39pub mod constants {
40    pub const X_OBS_META_PREFIX: &str = "x-obs-meta-";
41    pub const X_OBS_VERSION_ID: &str = "x-obs-version-id";
42}
43
44pub struct ObsCore {
45    pub info: Arc<AccessorInfo>,
46    pub bucket: String,
47    pub root: String,
48    pub endpoint: String,
49
50    pub signer: HuaweicloudObsSigner,
51    pub loader: HuaweicloudObsCredentialLoader,
52}
53
54impl Debug for ObsCore {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("ObsCore")
57            .field("root", &self.root)
58            .field("bucket", &self.bucket)
59            .field("endpoint", &self.endpoint)
60            .finish_non_exhaustive()
61    }
62}
63
64impl ObsCore {
65    async fn load_credential(&self) -> Result<Option<HuaweicloudObsCredential>> {
66        let cred = self
67            .loader
68            .load()
69            .await
70            .map_err(new_request_credential_error)?;
71
72        if let Some(cred) = cred {
73            Ok(Some(cred))
74        } else {
75            Ok(None)
76        }
77    }
78
79    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
80        let cred = if let Some(cred) = self.load_credential().await? {
81            cred
82        } else {
83            return Ok(());
84        };
85
86        self.signer.sign(req, &cred).map_err(new_request_sign_error)
87    }
88
89    pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
90        let cred = if let Some(cred) = self.load_credential().await? {
91            cred
92        } else {
93            return Ok(());
94        };
95
96        self.signer
97            .sign_query(req, duration, &cred)
98            .map_err(new_request_sign_error)
99    }
100
101    #[inline]
102    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
103        self.info.http_client().send(req).await
104    }
105}
106
107impl ObsCore {
108    pub async fn obs_get_object(
109        &self,
110        path: &str,
111        range: BytesRange,
112        args: &OpRead,
113    ) -> Result<Response<HttpBody>> {
114        let mut req = self.obs_get_object_request(path, range, args)?;
115
116        self.sign(&mut req).await?;
117
118        self.info.http_client().fetch(req).await
119    }
120
121    pub fn obs_get_object_request(
122        &self,
123        path: &str,
124        range: BytesRange,
125        args: &OpRead,
126    ) -> Result<Request<Buffer>> {
127        let p = build_abs_path(&self.root, path);
128
129        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
130
131        let mut req = Request::get(&url);
132
133        if let Some(if_match) = args.if_match() {
134            req = req.header(IF_MATCH, if_match);
135        }
136
137        if !range.is_full() {
138            req = req.header(http::header::RANGE, range.to_header())
139        }
140
141        if let Some(if_none_match) = args.if_none_match() {
142            req = req.header(IF_NONE_MATCH, if_none_match);
143        }
144
145        let req = req
146            .extension(Operation::Read)
147            .body(Buffer::new())
148            .map_err(new_request_build_error)?;
149
150        Ok(req)
151    }
152
153    pub fn obs_put_object_request(
154        &self,
155        path: &str,
156        size: Option<u64>,
157        args: &OpWrite,
158        body: Buffer,
159    ) -> Result<Request<Buffer>> {
160        let p = build_abs_path(&self.root, path);
161
162        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
163
164        let mut req = Request::put(&url);
165
166        if let Some(size) = size {
167            req = req.header(CONTENT_LENGTH, size)
168        }
169        if let Some(cache_control) = args.cache_control() {
170            req = req.header(CACHE_CONTROL, cache_control)
171        }
172
173        if let Some(mime) = args.content_type() {
174            req = req.header(CONTENT_TYPE, mime)
175        }
176
177        // Set user metadata headers.
178        if let Some(user_metadata) = args.user_metadata() {
179            for (key, value) in user_metadata {
180                req = req.header(format!("{}{}", constants::X_OBS_META_PREFIX, key), value)
181            }
182        }
183
184        let req = req
185            .extension(Operation::Write)
186            .body(body)
187            .map_err(new_request_build_error)?;
188
189        Ok(req)
190    }
191
192    pub async fn obs_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
193        let mut req = self.obs_head_object_request(path, args)?;
194
195        self.sign(&mut req).await?;
196
197        self.send(req).await
198    }
199
200    pub fn obs_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
201        let p = build_abs_path(&self.root, path);
202
203        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
204
205        // The header 'Origin' is optional for API calling, the doc has mistake, confirmed with customer service of huaweicloud.
206        // https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0084.html
207
208        let mut req = Request::head(&url);
209
210        if let Some(if_match) = args.if_match() {
211            req = req.header(IF_MATCH, if_match);
212        }
213
214        if let Some(if_none_match) = args.if_none_match() {
215            req = req.header(IF_NONE_MATCH, if_none_match);
216        }
217
218        let req = req
219            .extension(Operation::Stat)
220            .body(Buffer::new())
221            .map_err(new_request_build_error)?;
222
223        Ok(req)
224    }
225
226    pub async fn obs_delete_object(&self, path: &str) -> Result<Response<Buffer>> {
227        let p = build_abs_path(&self.root, path);
228
229        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
230
231        let req = Request::delete(&url);
232
233        let mut req = req
234            .extension(Operation::Delete)
235            .body(Buffer::new())
236            .map_err(new_request_build_error)?;
237
238        self.sign(&mut req).await?;
239
240        self.send(req).await
241    }
242
243    pub fn obs_append_object_request(
244        &self,
245        path: &str,
246        position: u64,
247        size: u64,
248        args: &OpWrite,
249        body: Buffer,
250    ) -> Result<Request<Buffer>> {
251        let p = build_abs_path(&self.root, path);
252        let url = format!(
253            "{}/{}?append&position={}",
254            self.endpoint,
255            percent_encode_path(&p),
256            position
257        );
258
259        let mut req = Request::post(&url);
260
261        req = req.header(CONTENT_LENGTH, size);
262
263        if let Some(mime) = args.content_type() {
264            req = req.header(CONTENT_TYPE, mime);
265        }
266
267        if let Some(pos) = args.content_disposition() {
268            req = req.header(CONTENT_DISPOSITION, pos);
269        }
270
271        if let Some(cache_control) = args.cache_control() {
272            req = req.header(CACHE_CONTROL, cache_control)
273        }
274
275        let req = req
276            .extension(Operation::Write)
277            .body(body)
278            .map_err(new_request_build_error)?;
279
280        Ok(req)
281    }
282
283    pub async fn obs_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
284        let source = build_abs_path(&self.root, from);
285        let target = build_abs_path(&self.root, to);
286
287        let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
288        let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
289
290        let mut req = Request::put(&url)
291            .extension(Operation::Copy)
292            .header("x-obs-copy-source", &source)
293            .body(Buffer::new())
294            .map_err(new_request_build_error)?;
295
296        self.sign(&mut req).await?;
297
298        self.send(req).await
299    }
300
301    pub async fn obs_list_objects(
302        &self,
303        path: &str,
304        next_marker: &str,
305        delimiter: &str,
306        limit: Option<usize>,
307    ) -> Result<Response<Buffer>> {
308        let p = build_abs_path(&self.root, path);
309        let mut url = QueryPairsWriter::new(&self.endpoint);
310
311        if !path.is_empty() {
312            url = url.push("prefix", &percent_encode_path(&p));
313        }
314        if !delimiter.is_empty() {
315            url = url.push("delimiter", delimiter);
316        }
317        if let Some(limit) = limit {
318            url = url.push("max-keys", &limit.to_string());
319        }
320        if !next_marker.is_empty() {
321            url = url.push("marker", next_marker);
322        }
323
324        let mut req = Request::get(url.finish())
325            .extension(Operation::List)
326            .body(Buffer::new())
327            .map_err(new_request_build_error)?;
328
329        self.sign(&mut req).await?;
330
331        self.send(req).await
332    }
333    pub async fn obs_initiate_multipart_upload(
334        &self,
335        path: &str,
336        content_type: Option<&str>,
337    ) -> Result<Response<Buffer>> {
338        let p = build_abs_path(&self.root, path);
339
340        let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
341        let mut req = Request::post(&url);
342
343        if let Some(mime) = content_type {
344            req = req.header(CONTENT_TYPE, mime)
345        }
346
347        let mut req = req
348            .extension(Operation::Write)
349            .body(Buffer::new())
350            .map_err(new_request_build_error)?;
351
352        self.sign(&mut req).await?;
353
354        self.send(req).await
355    }
356    pub async fn obs_upload_part_request(
357        &self,
358        path: &str,
359        upload_id: &str,
360        part_number: usize,
361        size: Option<u64>,
362        body: Buffer,
363    ) -> Result<Response<Buffer>> {
364        let p = build_abs_path(&self.root, path);
365
366        let url = format!(
367            "{}/{}?partNumber={}&uploadId={}",
368            self.endpoint,
369            percent_encode_path(&p),
370            part_number,
371            percent_encode_path(upload_id)
372        );
373
374        let mut req = Request::put(&url);
375
376        if let Some(size) = size {
377            req = req.header(CONTENT_LENGTH, size);
378        }
379
380        let mut req = req
381            .extension(Operation::Write)
382            // Set body
383            .body(body)
384            .map_err(new_request_build_error)?;
385
386        self.sign(&mut req).await?;
387
388        self.send(req).await
389    }
390
391    pub async fn obs_complete_multipart_upload(
392        &self,
393        path: &str,
394        upload_id: &str,
395        parts: Vec<CompleteMultipartUploadRequestPart>,
396    ) -> Result<Response<Buffer>> {
397        let p = build_abs_path(&self.root, path);
398        let url = format!(
399            "{}/{}?uploadId={}",
400            self.endpoint,
401            percent_encode_path(&p),
402            percent_encode_path(upload_id)
403        );
404
405        let req = Request::post(&url);
406
407        let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest {
408            part: parts.to_vec(),
409        })
410        .map_err(new_xml_serialize_error)?;
411        // Make sure content length has been set to avoid post with chunked encoding.
412        let req = req.header(CONTENT_LENGTH, content.len());
413        // Set content-type to `application/xml` to avoid mixed with form post.
414        let req = req.header(CONTENT_TYPE, "application/xml");
415
416        let mut req = req
417            .extension(Operation::Write)
418            .body(Buffer::from(Bytes::from(content)))
419            .map_err(new_request_build_error)?;
420
421        self.sign(&mut req).await?;
422        self.send(req).await
423    }
424
425    /// Abort an on-going multipart upload.
426    pub async fn obs_abort_multipart_upload(
427        &self,
428        path: &str,
429        upload_id: &str,
430    ) -> Result<Response<Buffer>> {
431        let p = build_abs_path(&self.root, path);
432
433        let url = format!(
434            "{}/{}?uploadId={}",
435            self.endpoint,
436            percent_encode_path(&p),
437            percent_encode_path(upload_id)
438        );
439
440        let mut req = Request::delete(&url)
441            .extension(Operation::Write)
442            .body(Buffer::new())
443            .map_err(new_request_build_error)?;
444
445        self.sign(&mut req).await?;
446        self.send(req).await
447    }
448}
449
450/// Result of CreateMultipartUpload
451#[derive(Default, Debug, Deserialize)]
452#[serde(default, rename_all = "PascalCase")]
453pub struct InitiateMultipartUploadResult {
454    pub upload_id: String,
455}
456
457/// Request of CompleteMultipartUploadRequest
458#[derive(Default, Debug, Serialize)]
459#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
460pub struct CompleteMultipartUploadRequest {
461    pub part: Vec<CompleteMultipartUploadRequestPart>,
462}
463
464#[derive(Clone, Default, Debug, Serialize)]
465#[serde(default, rename_all = "PascalCase")]
466pub struct CompleteMultipartUploadRequestPart {
467    #[serde(rename = "PartNumber")]
468    pub part_number: usize,
469    ///
470    ///
471    /// quick-xml will do escape on `"` which leads to our serialized output is
472    /// not the same as aws s3's example.
473    ///
474    /// Ideally, we could use `serialize_with` to address this (buf failed)
475    ///
476    /// ```ignore
477    /// #[derive(Default, Debug, Serialize)]
478    /// #[serde(default, rename_all = "PascalCase")]
479    /// struct CompleteMultipartUploadRequestPart {
480    ///     #[serde(rename = "PartNumber")]
481    ///     part_number: usize,
482    ///     #[serde(rename = "ETag", serialize_with = "partial_escape")]
483    ///     etag: String,
484    /// }
485    ///
486    /// fn partial_escape<S>(s: &str, ser: S) -> Result<S::Ok, S::Error>
487    /// where
488    ///     S: serde::Serializer,
489    /// {
490    ///     ser.serialize_str(&String::from_utf8_lossy(
491    ///         &quick_xml::escape::partial_escape(s.as_bytes()),
492    ///     ))
493    /// }
494    /// ```
495    ///
496    /// ref: <https://github.com/tafia/quick-xml/issues/362>
497    #[serde(rename = "ETag")]
498    pub etag: String,
499}
500
501/// Output of `CompleteMultipartUpload` operation
502#[derive(Debug, Default, Deserialize)]
503#[serde[default, rename_all = "PascalCase"]]
504pub struct CompleteMultipartUploadResult {
505    pub location: String,
506    pub bucket: String,
507    pub key: String,
508    #[serde(rename = "ETag")]
509    pub etag: String,
510}
511
512#[derive(Default, Debug, Deserialize)]
513#[serde(default, rename_all = "PascalCase")]
514pub struct ListObjectsOutput {
515    pub name: String,
516    pub prefix: String,
517    pub contents: Vec<ListObjectsOutputContent>,
518    pub common_prefixes: Vec<CommonPrefix>,
519    pub marker: String,
520    pub next_marker: Option<String>,
521}
522
523#[derive(Default, Debug, Deserialize)]
524#[serde(default, rename_all = "PascalCase")]
525pub struct CommonPrefix {
526    pub prefix: String,
527}
528
529#[derive(Default, Debug, Deserialize)]
530#[serde(default, rename_all = "PascalCase")]
531pub struct ListObjectsOutputContent {
532    pub key: String,
533    pub size: u64,
534}
535
536#[cfg(test)]
537mod tests {
538    use bytes::Buf;
539
540    use super::*;
541
542    #[test]
543    fn test_parse_xml() {
544        let bs = bytes::Bytes::from(
545            r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
546<ListBucketResult xmlns="http://obs.cn-north-4.myhuaweicloud.com/doc/2015-06-30/">
547    <Name>examplebucket</Name>
548    <Prefix>obj</Prefix>
549    <Marker>obj002</Marker>
550    <NextMarker>obj004</NextMarker>
551    <MaxKeys>1000</MaxKeys>
552    <IsTruncated>false</IsTruncated>
553    <Contents>
554        <Key>obj002</Key>
555        <LastModified>2015-07-01T02:11:19.775Z</LastModified>
556        <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
557        <Size>9</Size>
558        <Owner>
559            <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
560        </Owner>
561        <StorageClass>STANDARD</StorageClass>
562    </Contents>
563    <Contents>
564        <Key>obj003</Key>
565        <LastModified>2015-07-01T02:11:19.775Z</LastModified>
566        <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
567        <Size>10</Size>
568        <Owner>
569            <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
570        </Owner>
571        <StorageClass>STANDARD</StorageClass>
572    </Contents>
573    <CommonPrefixes>
574        <Prefix>hello</Prefix>
575    </CommonPrefixes>
576    <CommonPrefixes>
577        <Prefix>world</Prefix>
578    </CommonPrefixes>
579</ListBucketResult>"#,
580        );
581        let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
582
583        assert_eq!(out.name, "examplebucket".to_string());
584        assert_eq!(out.prefix, "obj".to_string());
585        assert_eq!(out.marker, "obj002".to_string());
586        assert_eq!(out.next_marker, Some("obj004".to_string()),);
587        assert_eq!(
588            out.contents
589                .iter()
590                .map(|v| v.key.clone())
591                .collect::<Vec<String>>(),
592            ["obj002", "obj003"],
593        );
594        assert_eq!(
595            out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
596            [9, 10],
597        );
598        assert_eq!(
599            out.common_prefixes
600                .iter()
601                .map(|v| v.prefix.clone())
602                .collect::<Vec<String>>(),
603            ["hello", "world"],
604        )
605    }
606}