opendal/services/azdls/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use reqsign::AzureStorageConfig;
26use reqsign::AzureStorageLoader;
27use reqsign::AzureStorageSigner;
28
29use super::core::AzdlsCore;
30use super::delete::AzdlsDeleter;
31use super::error::parse_error;
32use super::lister::AzdlsLister;
33use super::writer::AzdlsWriter;
34use super::writer::AzdlsWriters;
35use crate::raw::*;
36use crate::services::AzdlsConfig;
37use crate::*;
38
39/// Known endpoint suffix Azure Data Lake Storage Gen2 URI syntax.
40/// Azure public cloud: https://accountname.dfs.core.windows.net
41/// Azure US Government: https://accountname.dfs.core.usgovcloudapi.net
42/// Azure China: https://accountname.dfs.core.chinacloudapi.cn
43const KNOWN_AZDLS_ENDPOINT_SUFFIX: &[&str] = &[
44    "dfs.core.windows.net",
45    "dfs.core.usgovcloudapi.net",
46    "dfs.core.chinacloudapi.cn",
47];
48
49impl Configurator for AzdlsConfig {
50    type Builder = AzdlsBuilder;
51
52    #[allow(deprecated)]
53    fn into_builder(self) -> Self::Builder {
54        AzdlsBuilder {
55            config: self,
56            http_client: None,
57        }
58    }
59}
60
61/// Azure Data Lake Storage Gen2 Support.
62#[doc = include_str!("docs.md")]
63#[derive(Default, Clone)]
64pub struct AzdlsBuilder {
65    config: AzdlsConfig,
66
67    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
68    http_client: Option<HttpClient>,
69}
70
71impl Debug for AzdlsBuilder {
72    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73        let mut ds = f.debug_struct("AzdlsBuilder");
74
75        ds.field("config", &self.config);
76
77        ds.finish()
78    }
79}
80
81impl AzdlsBuilder {
82    /// Set root of this backend.
83    ///
84    /// All operations will happen under this root.
85    pub fn root(mut self, root: &str) -> Self {
86        self.config.root = if root.is_empty() {
87            None
88        } else {
89            Some(root.to_string())
90        };
91
92        self
93    }
94
95    /// Set filesystem name of this backend.
96    pub fn filesystem(mut self, filesystem: &str) -> Self {
97        self.config.filesystem = filesystem.to_string();
98
99        self
100    }
101
102    /// Set endpoint of this backend.
103    ///
104    /// Endpoint must be full uri, e.g.
105    ///
106    /// - Azblob: `https://accountname.blob.core.windows.net`
107    /// - Azurite: `http://127.0.0.1:10000/devstoreaccount1`
108    pub fn endpoint(mut self, endpoint: &str) -> Self {
109        if !endpoint.is_empty() {
110            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
111            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
112        }
113
114        self
115    }
116
117    /// Set account_name of this backend.
118    ///
119    /// - If account_name is set, we will take user's input first.
120    /// - If not, we will try to load it from environment.
121    pub fn account_name(mut self, account_name: &str) -> Self {
122        if !account_name.is_empty() {
123            self.config.account_name = Some(account_name.to_string());
124        }
125
126        self
127    }
128
129    /// Set account_key of this backend.
130    ///
131    /// - If account_key is set, we will take user's input first.
132    /// - If not, we will try to load it from environment.
133    pub fn account_key(mut self, account_key: &str) -> Self {
134        if !account_key.is_empty() {
135            self.config.account_key = Some(account_key.to_string());
136        }
137
138        self
139    }
140
141    /// Specify the http client that used by this service.
142    ///
143    /// # Notes
144    ///
145    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
146    /// during minor updates.
147    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
148    #[allow(deprecated)]
149    pub fn http_client(mut self, client: HttpClient) -> Self {
150        self.http_client = Some(client);
151        self
152    }
153}
154
155impl Builder for AzdlsBuilder {
156    const SCHEME: Scheme = Scheme::Azdls;
157    type Config = AzdlsConfig;
158
159    fn build(self) -> Result<impl Access> {
160        debug!("backend build started: {:?}", &self);
161
162        let root = normalize_root(&self.config.root.unwrap_or_default());
163        debug!("backend use root {}", root);
164
165        // Handle endpoint, region and container name.
166        let filesystem = match self.config.filesystem.is_empty() {
167            false => Ok(&self.config.filesystem),
168            true => Err(Error::new(ErrorKind::ConfigInvalid, "filesystem is empty")
169                .with_operation("Builder::build")
170                .with_context("service", Scheme::Azdls)),
171        }?;
172        debug!("backend use filesystem {}", &filesystem);
173
174        let endpoint = match &self.config.endpoint {
175            Some(endpoint) => Ok(endpoint.clone()),
176            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
177                .with_operation("Builder::build")
178                .with_context("service", Scheme::Azdls)),
179        }?;
180        debug!("backend use endpoint {}", &endpoint);
181
182        let config_loader = AzureStorageConfig {
183            account_name: self
184                .config
185                .account_name
186                .clone()
187                .or_else(|| infer_storage_name_from_endpoint(endpoint.as_str())),
188            account_key: self.config.account_key.clone(),
189            sas_token: None,
190            ..Default::default()
191        };
192
193        let cred_loader = AzureStorageLoader::new(config_loader);
194        let signer = AzureStorageSigner::new();
195        Ok(AzdlsBackend {
196            core: Arc::new(AzdlsCore {
197                info: {
198                    let am = AccessorInfo::default();
199                    am.set_scheme(Scheme::Azdls)
200                        .set_root(&root)
201                        .set_name(filesystem)
202                        .set_native_capability(Capability {
203                            stat: true,
204                            stat_has_cache_control: true,
205                            stat_has_content_length: true,
206                            stat_has_content_type: true,
207                            stat_has_content_encoding: true,
208                            stat_has_content_range: true,
209                            stat_has_etag: true,
210                            stat_has_content_md5: true,
211                            stat_has_last_modified: true,
212                            stat_has_content_disposition: true,
213
214                            read: true,
215
216                            write: true,
217                            write_can_append: true,
218                            write_with_if_none_match: true,
219                            write_with_if_not_exists: true,
220
221                            create_dir: true,
222                            delete: true,
223                            rename: true,
224
225                            list: true,
226                            list_has_etag: true,
227                            list_has_content_length: true,
228                            list_has_last_modified: true,
229
230                            shared: true,
231
232                            ..Default::default()
233                        });
234
235                    // allow deprecated api here for compatibility
236                    #[allow(deprecated)]
237                    if let Some(client) = self.http_client {
238                        am.update_http_client(|_| client);
239                    }
240
241                    am.into()
242                },
243                filesystem: self.config.filesystem.clone(),
244                root,
245                endpoint,
246                loader: cred_loader,
247                signer,
248            }),
249        })
250    }
251}
252
253/// Backend for azblob services.
254#[derive(Debug, Clone)]
255pub struct AzdlsBackend {
256    core: Arc<AzdlsCore>,
257}
258
259impl Access for AzdlsBackend {
260    type Reader = HttpBody;
261    type Writer = AzdlsWriters;
262    type Lister = oio::PageLister<AzdlsLister>;
263    type Deleter = oio::OneShotDeleter<AzdlsDeleter>;
264    type BlockingReader = ();
265    type BlockingWriter = ();
266    type BlockingLister = ();
267    type BlockingDeleter = ();
268
269    fn info(&self) -> Arc<AccessorInfo> {
270        self.core.info.clone()
271    }
272
273    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
274        let mut req = self.core.azdls_create_request(
275            path,
276            "directory",
277            &OpWrite::default(),
278            Buffer::new(),
279        )?;
280
281        self.core.sign(&mut req).await?;
282
283        let resp = self.core.send(req).await?;
284
285        let status = resp.status();
286
287        match status {
288            StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
289            _ => Err(parse_error(resp)),
290        }
291    }
292
293    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
294        // Stat root always returns a DIR.
295        if path == "/" {
296            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
297        }
298
299        let resp = self.core.azdls_get_properties(path).await?;
300
301        if resp.status() != StatusCode::OK {
302            return Err(parse_error(resp));
303        }
304
305        let mut meta = parse_into_metadata(path, resp.headers())?;
306        let resource = resp
307            .headers()
308            .get("x-ms-resource-type")
309            .ok_or_else(|| {
310                Error::new(
311                    ErrorKind::Unexpected,
312                    "azdls should return x-ms-resource-type header, but it's missing",
313                )
314            })?
315            .to_str()
316            .map_err(|err| {
317                Error::new(
318                    ErrorKind::Unexpected,
319                    "azdls should return x-ms-resource-type header, but it's not a valid string",
320                )
321                .set_source(err)
322            })?;
323
324        meta = match resource {
325            "file" => meta.with_mode(EntryMode::FILE),
326            "directory" => meta.with_mode(EntryMode::DIR),
327            v => {
328                return Err(Error::new(
329                    ErrorKind::Unexpected,
330                    "azdls returns not supported x-ms-resource-type",
331                )
332                .with_context("resource", v))
333            }
334        };
335
336        Ok(RpStat::new(meta))
337    }
338
339    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
340        let resp = self.core.azdls_read(path, args.range()).await?;
341
342        let status = resp.status();
343        match status {
344            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
345            _ => {
346                let (part, mut body) = resp.into_parts();
347                let buf = body.to_buffer().await?;
348                Err(parse_error(Response::from_parts(part, buf)))
349            }
350        }
351    }
352
353    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
354        let w = AzdlsWriter::new(self.core.clone(), args.clone(), path.to_string());
355        let w = if args.append() {
356            AzdlsWriters::Two(oio::AppendWriter::new(w))
357        } else {
358            AzdlsWriters::One(oio::OneShotWriter::new(w))
359        };
360        Ok((RpWrite::default(), w))
361    }
362
363    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
364        Ok((
365            RpDelete::default(),
366            oio::OneShotDeleter::new(AzdlsDeleter::new(self.core.clone())),
367        ))
368    }
369
370    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
371        let l = AzdlsLister::new(self.core.clone(), path.to_string(), args.limit());
372
373        Ok((RpList::default(), oio::PageLister::new(l)))
374    }
375
376    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
377        if let Some(resp) = self.core.azdls_ensure_parent_path(to).await? {
378            let status = resp.status();
379            match status {
380                StatusCode::CREATED | StatusCode::CONFLICT => {}
381                _ => return Err(parse_error(resp)),
382            }
383        }
384
385        let resp = self.core.azdls_rename(from, to).await?;
386
387        let status = resp.status();
388
389        match status {
390            StatusCode::CREATED => Ok(RpRename::default()),
391            _ => Err(parse_error(resp)),
392        }
393    }
394}
395
396fn infer_storage_name_from_endpoint(endpoint: &str) -> Option<String> {
397    let endpoint: &str = endpoint
398        .strip_prefix("http://")
399        .or_else(|| endpoint.strip_prefix("https://"))
400        .unwrap_or(endpoint);
401
402    let mut parts = endpoint.splitn(2, '.');
403    let storage_name = parts.next();
404    let endpoint_suffix = parts
405        .next()
406        .unwrap_or_default()
407        .trim_end_matches('/')
408        .to_lowercase();
409
410    if KNOWN_AZDLS_ENDPOINT_SUFFIX
411        .iter()
412        .any(|s| *s == endpoint_suffix.as_str())
413    {
414        storage_name.map(|s| s.to_string())
415    } else {
416        None
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use super::infer_storage_name_from_endpoint;
423
424    #[test]
425    fn test_infer_storage_name_from_endpoint() {
426        let endpoint = "https://account.dfs.core.windows.net";
427        let storage_name = infer_storage_name_from_endpoint(endpoint);
428        assert_eq!(storage_name, Some("account".to_string()));
429    }
430
431    #[test]
432    fn test_infer_storage_name_from_endpoint_with_trailing_slash() {
433        let endpoint = "https://account.dfs.core.windows.net/";
434        let storage_name = infer_storage_name_from_endpoint(endpoint);
435        assert_eq!(storage_name, Some("account".to_string()));
436    }
437}