opendal/services/obs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Bytes;
24use http::header::CACHE_CONTROL;
25use http::header::CONTENT_DISPOSITION;
26use http::header::CONTENT_LENGTH;
27use http::header::CONTENT_TYPE;
28use http::header::IF_MATCH;
29use http::header::IF_NONE_MATCH;
30use http::Request;
31use http::Response;
32use reqsign::HuaweicloudObsCredential;
33use reqsign::HuaweicloudObsCredentialLoader;
34use reqsign::HuaweicloudObsSigner;
35use serde::Deserialize;
36use serde::Serialize;
37
38use crate::raw::*;
39use crate::*;
40
41pub mod constants {
42    pub const X_OBS_META_PREFIX: &str = "x-obs-meta-";
43    pub const X_OBS_VERSION_ID: &str = "x-obs-version-id";
44}
45
46pub struct ObsCore {
47    pub info: Arc<AccessorInfo>,
48    pub bucket: String,
49    pub root: String,
50    pub endpoint: String,
51
52    pub signer: HuaweicloudObsSigner,
53    pub loader: HuaweicloudObsCredentialLoader,
54}
55
56impl Debug for ObsCore {
57    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("Backend")
59            .field("root", &self.root)
60            .field("bucket", &self.bucket)
61            .field("endpoint", &self.endpoint)
62            .finish_non_exhaustive()
63    }
64}
65
66impl ObsCore {
67    async fn load_credential(&self) -> Result<Option<HuaweicloudObsCredential>> {
68        let cred = self
69            .loader
70            .load()
71            .await
72            .map_err(new_request_credential_error)?;
73
74        if let Some(cred) = cred {
75            Ok(Some(cred))
76        } else {
77            Ok(None)
78        }
79    }
80
81    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
82        let cred = if let Some(cred) = self.load_credential().await? {
83            cred
84        } else {
85            return Ok(());
86        };
87
88        self.signer.sign(req, &cred).map_err(new_request_sign_error)
89    }
90
91    pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
92        let cred = if let Some(cred) = self.load_credential().await? {
93            cred
94        } else {
95            return Ok(());
96        };
97
98        self.signer
99            .sign_query(req, duration, &cred)
100            .map_err(new_request_sign_error)
101    }
102
103    #[inline]
104    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
105        self.info.http_client().send(req).await
106    }
107}
108
109impl ObsCore {
110    pub async fn obs_get_object(
111        &self,
112        path: &str,
113        range: BytesRange,
114        args: &OpRead,
115    ) -> Result<Response<HttpBody>> {
116        let mut req = self.obs_get_object_request(path, range, args)?;
117
118        self.sign(&mut req).await?;
119
120        self.info.http_client().fetch(req).await
121    }
122
123    pub fn obs_get_object_request(
124        &self,
125        path: &str,
126        range: BytesRange,
127        args: &OpRead,
128    ) -> Result<Request<Buffer>> {
129        let p = build_abs_path(&self.root, path);
130
131        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
132
133        let mut req = Request::get(&url);
134
135        if let Some(if_match) = args.if_match() {
136            req = req.header(IF_MATCH, if_match);
137        }
138
139        if range.is_full() {
140            req = req.header(http::header::RANGE, range.to_header())
141        }
142
143        if let Some(if_none_match) = args.if_none_match() {
144            req = req.header(IF_NONE_MATCH, if_none_match);
145        }
146
147        let req = req
148            .extension(Operation::Read)
149            .body(Buffer::new())
150            .map_err(new_request_build_error)?;
151
152        Ok(req)
153    }
154
155    pub fn obs_put_object_request(
156        &self,
157        path: &str,
158        size: Option<u64>,
159        args: &OpWrite,
160        body: Buffer,
161    ) -> Result<Request<Buffer>> {
162        let p = build_abs_path(&self.root, path);
163
164        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
165
166        let mut req = Request::put(&url);
167
168        if let Some(size) = size {
169            req = req.header(CONTENT_LENGTH, size)
170        }
171        if let Some(cache_control) = args.cache_control() {
172            req = req.header(CACHE_CONTROL, cache_control)
173        }
174
175        if let Some(mime) = args.content_type() {
176            req = req.header(CONTENT_TYPE, mime)
177        }
178
179        // Set user metadata headers.
180        if let Some(user_metadata) = args.user_metadata() {
181            for (key, value) in user_metadata {
182                req = req.header(format!("{}{}", constants::X_OBS_META_PREFIX, key), value)
183            }
184        }
185
186        let req = req
187            .extension(Operation::Write)
188            .body(body)
189            .map_err(new_request_build_error)?;
190
191        Ok(req)
192    }
193
194    pub async fn obs_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
195        let mut req = self.obs_head_object_request(path, args)?;
196
197        self.sign(&mut req).await?;
198
199        self.send(req).await
200    }
201
202    pub fn obs_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
203        let p = build_abs_path(&self.root, path);
204
205        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
206
207        // The header 'Origin' is optional for API calling, the doc has mistake, confirmed with customer service of huaweicloud.
208        // https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0084.html
209
210        let mut req = Request::head(&url);
211
212        if let Some(if_match) = args.if_match() {
213            req = req.header(IF_MATCH, if_match);
214        }
215
216        if let Some(if_none_match) = args.if_none_match() {
217            req = req.header(IF_NONE_MATCH, if_none_match);
218        }
219
220        let req = req
221            .extension(Operation::Stat)
222            .body(Buffer::new())
223            .map_err(new_request_build_error)?;
224
225        Ok(req)
226    }
227
228    pub async fn obs_delete_object(&self, path: &str) -> Result<Response<Buffer>> {
229        let p = build_abs_path(&self.root, path);
230
231        let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
232
233        let req = Request::delete(&url);
234
235        let mut req = req
236            .extension(Operation::Delete)
237            .body(Buffer::new())
238            .map_err(new_request_build_error)?;
239
240        self.sign(&mut req).await?;
241
242        self.send(req).await
243    }
244
245    pub fn obs_append_object_request(
246        &self,
247        path: &str,
248        position: u64,
249        size: u64,
250        args: &OpWrite,
251        body: Buffer,
252    ) -> Result<Request<Buffer>> {
253        let p = build_abs_path(&self.root, path);
254        let url = format!(
255            "{}/{}?append&position={}",
256            self.endpoint,
257            percent_encode_path(&p),
258            position
259        );
260
261        let mut req = Request::post(&url);
262
263        req = req.header(CONTENT_LENGTH, size);
264
265        if let Some(mime) = args.content_type() {
266            req = req.header(CONTENT_TYPE, mime);
267        }
268
269        if let Some(pos) = args.content_disposition() {
270            req = req.header(CONTENT_DISPOSITION, pos);
271        }
272
273        if let Some(cache_control) = args.cache_control() {
274            req = req.header(CACHE_CONTROL, cache_control)
275        }
276
277        let req = req
278            .extension(Operation::Write)
279            .body(body)
280            .map_err(new_request_build_error)?;
281
282        Ok(req)
283    }
284
285    pub async fn obs_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
286        let source = build_abs_path(&self.root, from);
287        let target = build_abs_path(&self.root, to);
288
289        let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
290        let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
291
292        let mut req = Request::put(&url)
293            .extension(Operation::Copy)
294            .header("x-obs-copy-source", &source)
295            .body(Buffer::new())
296            .map_err(new_request_build_error)?;
297
298        self.sign(&mut req).await?;
299
300        self.send(req).await
301    }
302
303    pub async fn obs_list_objects(
304        &self,
305        path: &str,
306        next_marker: &str,
307        delimiter: &str,
308        limit: Option<usize>,
309    ) -> Result<Response<Buffer>> {
310        let p = build_abs_path(&self.root, path);
311        let mut url = QueryPairsWriter::new(&self.endpoint);
312
313        if !path.is_empty() {
314            url = url.push("prefix", &percent_encode_path(&p));
315        }
316        if !delimiter.is_empty() {
317            url = url.push("delimiter", delimiter);
318        }
319        if let Some(limit) = limit {
320            url = url.push("max-keys", &limit.to_string());
321        }
322        if !next_marker.is_empty() {
323            url = url.push("marker", next_marker);
324        }
325
326        let mut req = Request::get(url.finish())
327            .extension(Operation::List)
328            .body(Buffer::new())
329            .map_err(new_request_build_error)?;
330
331        self.sign(&mut req).await?;
332
333        self.send(req).await
334    }
335    pub async fn obs_initiate_multipart_upload(
336        &self,
337        path: &str,
338        content_type: Option<&str>,
339    ) -> Result<Response<Buffer>> {
340        let p = build_abs_path(&self.root, path);
341
342        let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
343        let mut req = Request::post(&url);
344
345        if let Some(mime) = content_type {
346            req = req.header(CONTENT_TYPE, mime)
347        }
348
349        let mut req = req
350            .extension(Operation::Write)
351            .body(Buffer::new())
352            .map_err(new_request_build_error)?;
353
354        self.sign(&mut req).await?;
355
356        self.send(req).await
357    }
358    pub async fn obs_upload_part_request(
359        &self,
360        path: &str,
361        upload_id: &str,
362        part_number: usize,
363        size: Option<u64>,
364        body: Buffer,
365    ) -> Result<Response<Buffer>> {
366        let p = build_abs_path(&self.root, path);
367
368        let url = format!(
369            "{}/{}?partNumber={}&uploadId={}",
370            self.endpoint,
371            percent_encode_path(&p),
372            part_number,
373            percent_encode_path(upload_id)
374        );
375
376        let mut req = Request::put(&url);
377
378        if let Some(size) = size {
379            req = req.header(CONTENT_LENGTH, size);
380        }
381
382        let mut req = req
383            .extension(Operation::Write)
384            // Set body
385            .body(body)
386            .map_err(new_request_build_error)?;
387
388        self.sign(&mut req).await?;
389
390        self.send(req).await
391    }
392
393    pub async fn obs_complete_multipart_upload(
394        &self,
395        path: &str,
396        upload_id: &str,
397        parts: Vec<CompleteMultipartUploadRequestPart>,
398    ) -> Result<Response<Buffer>> {
399        let p = build_abs_path(&self.root, path);
400        let url = format!(
401            "{}/{}?uploadId={}",
402            self.endpoint,
403            percent_encode_path(&p),
404            percent_encode_path(upload_id)
405        );
406
407        let req = Request::post(&url);
408
409        let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest {
410            part: parts.to_vec(),
411        })
412        .map_err(new_xml_serialize_error)?;
413        // Make sure content length has been set to avoid post with chunked encoding.
414        let req = req.header(CONTENT_LENGTH, content.len());
415        // Set content-type to `application/xml` to avoid mixed with form post.
416        let req = req.header(CONTENT_TYPE, "application/xml");
417
418        let mut req = req
419            .extension(Operation::Write)
420            .body(Buffer::from(Bytes::from(content)))
421            .map_err(new_request_build_error)?;
422
423        self.sign(&mut req).await?;
424        self.send(req).await
425    }
426
427    /// Abort an on-going multipart upload.
428    pub async fn obs_abort_multipart_upload(
429        &self,
430        path: &str,
431        upload_id: &str,
432    ) -> Result<Response<Buffer>> {
433        let p = build_abs_path(&self.root, path);
434
435        let url = format!(
436            "{}/{}?uploadId={}",
437            self.endpoint,
438            percent_encode_path(&p),
439            percent_encode_path(upload_id)
440        );
441
442        let mut req = Request::delete(&url)
443            .extension(Operation::Write)
444            .body(Buffer::new())
445            .map_err(new_request_build_error)?;
446
447        self.sign(&mut req).await?;
448        self.send(req).await
449    }
450}
451
452/// Result of CreateMultipartUpload
453#[derive(Default, Debug, Deserialize)]
454#[serde(default, rename_all = "PascalCase")]
455pub struct InitiateMultipartUploadResult {
456    pub upload_id: String,
457}
458
459/// Request of CompleteMultipartUploadRequest
460#[derive(Default, Debug, Serialize)]
461#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
462pub struct CompleteMultipartUploadRequest {
463    pub part: Vec<CompleteMultipartUploadRequestPart>,
464}
465
466#[derive(Clone, Default, Debug, Serialize)]
467#[serde(default, rename_all = "PascalCase")]
468pub struct CompleteMultipartUploadRequestPart {
469    #[serde(rename = "PartNumber")]
470    pub part_number: usize,
471    ///
472    ///
473    /// quick-xml will do escape on `"` which leads to our serialized output is
474    /// not the same as aws s3's example.
475    ///
476    /// Ideally, we could use `serialize_with` to address this (buf failed)
477    ///
478    /// ```ignore
479    /// #[derive(Default, Debug, Serialize)]
480    /// #[serde(default, rename_all = "PascalCase")]
481    /// struct CompleteMultipartUploadRequestPart {
482    ///     #[serde(rename = "PartNumber")]
483    ///     part_number: usize,
484    ///     #[serde(rename = "ETag", serialize_with = "partial_escape")]
485    ///     etag: String,
486    /// }
487    ///
488    /// fn partial_escape<S>(s: &str, ser: S) -> Result<S::Ok, S::Error>
489    /// where
490    ///     S: serde::Serializer,
491    /// {
492    ///     ser.serialize_str(&String::from_utf8_lossy(
493    ///         &quick_xml::escape::partial_escape(s.as_bytes()),
494    ///     ))
495    /// }
496    /// ```
497    ///
498    /// ref: <https://github.com/tafia/quick-xml/issues/362>
499    #[serde(rename = "ETag")]
500    pub etag: String,
501}
502
503/// Output of `CompleteMultipartUpload` operation
504#[derive(Debug, Default, Deserialize)]
505#[serde[default, rename_all = "PascalCase"]]
506pub struct CompleteMultipartUploadResult {
507    pub location: String,
508    pub bucket: String,
509    pub key: String,
510    #[serde(rename = "ETag")]
511    pub etag: String,
512}
513
514#[derive(Default, Debug, Deserialize)]
515#[serde(default, rename_all = "PascalCase")]
516pub struct ListObjectsOutput {
517    pub name: String,
518    pub prefix: String,
519    pub contents: Vec<ListObjectsOutputContent>,
520    pub common_prefixes: Vec<CommonPrefix>,
521    pub marker: String,
522    pub next_marker: Option<String>,
523}
524
525#[derive(Default, Debug, Deserialize)]
526#[serde(default, rename_all = "PascalCase")]
527pub struct CommonPrefix {
528    pub prefix: String,
529}
530
531#[derive(Default, Debug, Deserialize)]
532#[serde(default, rename_all = "PascalCase")]
533pub struct ListObjectsOutputContent {
534    pub key: String,
535    pub size: u64,
536}
537
538#[cfg(test)]
539mod tests {
540    use bytes::Buf;
541
542    use super::*;
543
544    #[test]
545    fn test_parse_xml() {
546        let bs = bytes::Bytes::from(
547            r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
548<ListBucketResult xmlns="http://obs.cn-north-4.myhuaweicloud.com/doc/2015-06-30/">
549    <Name>examplebucket</Name>
550    <Prefix>obj</Prefix>
551    <Marker>obj002</Marker>
552    <NextMarker>obj004</NextMarker>
553    <MaxKeys>1000</MaxKeys>
554    <IsTruncated>false</IsTruncated>
555    <Contents>
556        <Key>obj002</Key>
557        <LastModified>2015-07-01T02:11:19.775Z</LastModified>
558        <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
559        <Size>9</Size>
560        <Owner>
561            <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
562        </Owner>
563        <StorageClass>STANDARD</StorageClass>
564    </Contents>
565    <Contents>
566        <Key>obj003</Key>
567        <LastModified>2015-07-01T02:11:19.775Z</LastModified>
568        <ETag>"a72e382246ac83e86bd203389849e71d"</ETag>
569        <Size>10</Size>
570        <Owner>
571            <ID>b4bf1b36d9ca43d984fbcb9491b6fce9</ID>
572        </Owner>
573        <StorageClass>STANDARD</StorageClass>
574    </Contents>
575    <CommonPrefixes>
576        <Prefix>hello</Prefix>
577    </CommonPrefixes>
578    <CommonPrefixes>
579        <Prefix>world</Prefix>
580    </CommonPrefixes>
581</ListBucketResult>"#,
582        );
583        let out: ListObjectsOutput = quick_xml::de::from_reader(bs.reader()).expect("must success");
584
585        assert_eq!(out.name, "examplebucket".to_string());
586        assert_eq!(out.prefix, "obj".to_string());
587        assert_eq!(out.marker, "obj002".to_string());
588        assert_eq!(out.next_marker, Some("obj004".to_string()),);
589        assert_eq!(
590            out.contents
591                .iter()
592                .map(|v| v.key.clone())
593                .collect::<Vec<String>>(),
594            ["obj002", "obj003"],
595        );
596        assert_eq!(
597            out.contents.iter().map(|v| v.size).collect::<Vec<u64>>(),
598            [9, 10],
599        );
600        assert_eq!(
601            out.common_prefixes
602                .iter()
603                .map(|v| v.prefix.clone())
604                .collect::<Vec<String>>(),
605            ["hello", "world"],
606        )
607    }
608}