opendal/services/obs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use http::Uri;
25use log::debug;
26use reqsign::HuaweicloudObsConfig;
27use reqsign::HuaweicloudObsCredentialLoader;
28use reqsign::HuaweicloudObsSigner;
29
30use super::OBS_SCHEME;
31use super::config::ObsConfig;
32use super::core::ObsCore;
33use super::core::constants;
34use super::deleter::ObsDeleter;
35use super::error::parse_error;
36use super::lister::ObsLister;
37use super::writer::ObsWriter;
38use super::writer::ObsWriters;
39use crate::raw::*;
40use crate::*;
41
42/// Huawei-Cloud Object Storage Service (OBS) support
43#[doc = include_str!("docs.md")]
44#[derive(Default)]
45pub struct ObsBuilder {
46    pub(super) config: ObsConfig,
47
48    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
49    pub(super) http_client: Option<HttpClient>,
50}
51
52impl Debug for ObsBuilder {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        f.debug_struct("ObsBuilder")
55            .field("config", &self.config)
56            .finish_non_exhaustive()
57    }
58}
59
60impl ObsBuilder {
61    /// Set root of this backend.
62    ///
63    /// All operations will happen under this root.
64    pub fn root(mut self, root: &str) -> Self {
65        self.config.root = if root.is_empty() {
66            None
67        } else {
68            Some(root.to_string())
69        };
70
71        self
72    }
73
74    /// Set endpoint of this backend.
75    ///
76    /// Both huaweicloud default domain and user domain endpoints are allowed.
77    /// Please DO NOT add the bucket name to the endpoint.
78    ///
79    /// - `https://obs.cn-north-4.myhuaweicloud.com`
80    /// - `obs.cn-north-4.myhuaweicloud.com` (https by default)
81    /// - `https://custom.obs.com` (port should not be set)
82    pub fn endpoint(mut self, endpoint: &str) -> Self {
83        if !endpoint.is_empty() {
84            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
85        }
86
87        self
88    }
89
90    /// Set access_key_id of this backend.
91    /// - If it is set, we will take user's input first.
92    /// - If not, we will try to load it from environment.
93    pub fn access_key_id(mut self, access_key_id: &str) -> Self {
94        if !access_key_id.is_empty() {
95            self.config.access_key_id = Some(access_key_id.to_string());
96        }
97
98        self
99    }
100
101    /// Set secret_access_key of this backend.
102    /// - If it is set, we will take user's input first.
103    /// - If not, we will try to load it from environment.
104    pub fn secret_access_key(mut self, secret_access_key: &str) -> Self {
105        if !secret_access_key.is_empty() {
106            self.config.secret_access_key = Some(secret_access_key.to_string());
107        }
108
109        self
110    }
111
112    /// Set bucket of this backend.
113    /// The param is required.
114    pub fn bucket(mut self, bucket: &str) -> Self {
115        if !bucket.is_empty() {
116            self.config.bucket = Some(bucket.to_string());
117        }
118
119        self
120    }
121
122    /// Set bucket versioning status for this backend
123    pub fn enable_versioning(mut self, enabled: bool) -> Self {
124        self.config.enable_versioning = enabled;
125
126        self
127    }
128
129    /// Specify the http client that used by this service.
130    ///
131    /// # Notes
132    ///
133    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
134    /// during minor updates.
135    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
136    #[allow(deprecated)]
137    pub fn http_client(mut self, client: HttpClient) -> Self {
138        self.http_client = Some(client);
139        self
140    }
141}
142
143impl Builder for ObsBuilder {
144    type Config = ObsConfig;
145
146    fn build(self) -> Result<impl Access> {
147        debug!("backend build started: {:?}", &self);
148
149        let root = normalize_root(&self.config.root.unwrap_or_default());
150        debug!("backend use root {root}");
151
152        let bucket = match &self.config.bucket {
153            Some(bucket) => Ok(bucket.to_string()),
154            None => Err(
155                Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
156                    .with_context("service", OBS_SCHEME),
157            ),
158        }?;
159        debug!("backend use bucket {}", &bucket);
160
161        let uri = match &self.config.endpoint {
162            Some(endpoint) => endpoint.parse::<Uri>().map_err(|err| {
163                Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid")
164                    .with_context("service", OBS_SCHEME)
165                    .set_source(err)
166            }),
167            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
168                .with_context("service", OBS_SCHEME)),
169        }?;
170
171        let scheme = match uri.scheme_str() {
172            Some(scheme) => scheme.to_string(),
173            None => "https".to_string(),
174        };
175
176        let (endpoint, is_obs_default) = {
177            let host = uri.host().unwrap_or_default().to_string();
178            if host.starts_with("obs.")
179                && (host.ends_with(".myhuaweicloud.com") || host.ends_with(".huawei.com"))
180            {
181                (format!("{bucket}.{host}"), true)
182            } else {
183                (host, false)
184            }
185        };
186        debug!("backend use endpoint {}", &endpoint);
187
188        let mut cfg = HuaweicloudObsConfig::default();
189        // Load cfg from env first.
190        cfg = cfg.from_env();
191
192        if let Some(v) = self.config.access_key_id {
193            cfg.access_key_id = Some(v);
194        }
195
196        if let Some(v) = self.config.secret_access_key {
197            cfg.secret_access_key = Some(v);
198        }
199
200        let loader = HuaweicloudObsCredentialLoader::new(cfg);
201
202        // Set the bucket name in CanonicalizedResource.
203        // 1. If the bucket is bound to a user domain name, use the user domain name as the bucket name,
204        // for example, `/obs.ccc.com/object`. `obs.ccc.com` is the user domain name bound to the bucket.
205        // 2. If you do not access OBS using a user domain name, this field is in the format of `/bucket/object`.
206        //
207        // Please refer to this doc for more details:
208        // https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0010.html
209        let signer = HuaweicloudObsSigner::new(if is_obs_default { &bucket } else { &endpoint });
210
211        debug!("backend build finished");
212        Ok(ObsBackend {
213            core: Arc::new(ObsCore {
214                info: {
215                    let am = AccessorInfo::default();
216                    am.set_scheme(OBS_SCHEME)
217                        .set_root(&root)
218                        .set_name(&bucket)
219                        .set_native_capability(Capability {
220                            stat: true,
221                            stat_with_if_match: true,
222                            stat_with_if_none_match: true,
223
224                            read: true,
225
226                            read_with_if_match: true,
227                            read_with_if_none_match: true,
228
229                            write: true,
230                            write_can_empty: true,
231                            write_can_append: true,
232                            write_can_multi: true,
233                            write_with_content_type: true,
234                            write_with_cache_control: true,
235                            // The min multipart size of OBS is 5 MiB.
236                            //
237                            // ref: <https://support.huaweicloud.com/intl/en-us/ugobs-obs/obs_41_0021.html>
238                            write_multi_min_size: Some(5 * 1024 * 1024),
239                            // The max multipart size of OBS is 5 GiB.
240                            //
241                            // ref: <https://support.huaweicloud.com/intl/en-us/ugobs-obs/obs_41_0021.html>
242                            write_multi_max_size: if cfg!(target_pointer_width = "64") {
243                                Some(5 * 1024 * 1024 * 1024)
244                            } else {
245                                Some(usize::MAX)
246                            },
247                            write_with_user_metadata: true,
248
249                            delete: true,
250                            copy: true,
251
252                            list: true,
253                            list_with_recursive: true,
254
255                            presign: true,
256                            presign_stat: true,
257                            presign_read: true,
258                            presign_write: true,
259
260                            shared: true,
261
262                            ..Default::default()
263                        });
264
265                    // allow deprecated api here for compatibility
266                    #[allow(deprecated)]
267                    if let Some(client) = self.http_client {
268                        am.update_http_client(|_| client);
269                    }
270
271                    am.into()
272                },
273                bucket,
274                root,
275                endpoint: format!("{}://{}", &scheme, &endpoint),
276                signer,
277                loader,
278            }),
279        })
280    }
281}
282
283/// Backend for Huaweicloud OBS services.
284#[derive(Debug, Clone)]
285pub struct ObsBackend {
286    core: Arc<ObsCore>,
287}
288
289impl Access for ObsBackend {
290    type Reader = HttpBody;
291    type Writer = ObsWriters;
292    type Lister = oio::PageLister<ObsLister>;
293    type Deleter = oio::OneShotDeleter<ObsDeleter>;
294
295    fn info(&self) -> Arc<AccessorInfo> {
296        self.core.info.clone()
297    }
298
299    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
300        let resp = self.core.obs_head_object(path, &args).await?;
301        let headers = resp.headers();
302
303        let status = resp.status();
304
305        // The response is very similar to azblob.
306        match status {
307            StatusCode::OK => {
308                let mut meta = parse_into_metadata(path, headers)?;
309                let user_meta = headers
310                    .iter()
311                    .filter_map(|(name, _)| {
312                        name.as_str()
313                            .strip_prefix(constants::X_OBS_META_PREFIX)
314                            .and_then(|stripped_key| {
315                                parse_header_to_str(headers, name)
316                                    .unwrap_or(None)
317                                    .map(|val| (stripped_key.to_string(), val.to_string()))
318                            })
319                    })
320                    .collect::<HashMap<_, _>>();
321
322                if !user_meta.is_empty() {
323                    meta = meta.with_user_metadata(user_meta);
324                }
325
326                if let Some(v) = parse_header_to_str(headers, constants::X_OBS_VERSION_ID)? {
327                    meta.set_version(v);
328                }
329
330                Ok(RpStat::new(meta))
331            }
332            StatusCode::NOT_FOUND if path.ends_with('/') => {
333                Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
334            }
335            _ => Err(parse_error(resp)),
336        }
337    }
338
339    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
340        let resp = self.core.obs_get_object(path, args.range(), &args).await?;
341
342        let status = resp.status();
343
344        match status {
345            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
346                Ok((RpRead::default(), resp.into_body()))
347            }
348            _ => {
349                let (part, mut body) = resp.into_parts();
350                let buf = body.to_buffer().await?;
351                Err(parse_error(Response::from_parts(part, buf)))
352            }
353        }
354    }
355
356    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
357        let writer = ObsWriter::new(self.core.clone(), path, args.clone());
358
359        let w = if args.append() {
360            ObsWriters::Two(oio::AppendWriter::new(writer))
361        } else {
362            ObsWriters::One(oio::MultipartWriter::new(
363                self.core.info.clone(),
364                writer,
365                args.concurrent(),
366            ))
367        };
368
369        Ok((RpWrite::default(), w))
370    }
371
372    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
373        Ok((
374            RpDelete::default(),
375            oio::OneShotDeleter::new(ObsDeleter::new(self.core.clone())),
376        ))
377    }
378
379    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
380        let l = ObsLister::new(self.core.clone(), path, args.recursive(), args.limit());
381        Ok((RpList::default(), oio::PageLister::new(l)))
382    }
383
384    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
385        let resp = self.core.obs_copy_object(from, to).await?;
386
387        let status = resp.status();
388
389        match status {
390            StatusCode::OK => Ok(RpCopy::default()),
391            _ => Err(parse_error(resp)),
392        }
393    }
394
395    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
396        let req = match args.operation() {
397            PresignOperation::Stat(v) => self.core.obs_head_object_request(path, v),
398            PresignOperation::Read(v) => {
399                self.core
400                    .obs_get_object_request(path, BytesRange::default(), v)
401            }
402            PresignOperation::Write(v) => {
403                self.core
404                    .obs_put_object_request(path, None, v, Buffer::new())
405            }
406            PresignOperation::Delete(_) => Err(Error::new(
407                ErrorKind::Unsupported,
408                "operation is not supported",
409            )),
410        };
411        let mut req = req?;
412        self.core.sign_query(&mut req, args.expire()).await?;
413
414        // We don't need this request anymore, consume it directly.
415        let (parts, _) = req.into_parts();
416
417        Ok(RpPresign::new(PresignedRequest::new(
418            parts.method,
419            parts.uri,
420            parts.headers,
421        )))
422    }
423}