opendal/services/azdls/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use reqsign::AzureStorageConfig;
26use reqsign::AzureStorageLoader;
27use reqsign::AzureStorageSigner;
28
29use super::core::AzdlsCore;
30use super::core::DIRECTORY;
31use super::delete::AzdlsDeleter;
32use super::error::parse_error;
33use super::lister::AzdlsLister;
34use super::writer::AzdlsWriter;
35use super::writer::AzdlsWriters;
36use crate::raw::*;
37use crate::services::AzdlsConfig;
38use crate::*;
39
40/// Known endpoint suffix Azure Data Lake Storage Gen2 URI syntax.
41/// Azure public cloud: https://accountname.dfs.core.windows.net
42/// Azure US Government: https://accountname.dfs.core.usgovcloudapi.net
43/// Azure China: https://accountname.dfs.core.chinacloudapi.cn
44const KNOWN_AZDLS_ENDPOINT_SUFFIX: &[&str] = &[
45    "dfs.core.windows.net",
46    "dfs.core.usgovcloudapi.net",
47    "dfs.core.chinacloudapi.cn",
48];
49
50impl Configurator for AzdlsConfig {
51    type Builder = AzdlsBuilder;
52
53    #[allow(deprecated)]
54    fn into_builder(self) -> Self::Builder {
55        AzdlsBuilder {
56            config: self,
57            http_client: None,
58        }
59    }
60}
61
62/// Azure Data Lake Storage Gen2 Support.
63#[doc = include_str!("docs.md")]
64#[derive(Default, Clone)]
65pub struct AzdlsBuilder {
66    config: AzdlsConfig,
67
68    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
69    http_client: Option<HttpClient>,
70}
71
72impl Debug for AzdlsBuilder {
73    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74        let mut ds = f.debug_struct("AzdlsBuilder");
75
76        ds.field("config", &self.config);
77
78        ds.finish()
79    }
80}
81
82impl AzdlsBuilder {
83    /// Set root of this backend.
84    ///
85    /// All operations will happen under this root.
86    pub fn root(mut self, root: &str) -> Self {
87        self.config.root = if root.is_empty() {
88            None
89        } else {
90            Some(root.to_string())
91        };
92
93        self
94    }
95
96    /// Set filesystem name of this backend.
97    pub fn filesystem(mut self, filesystem: &str) -> Self {
98        self.config.filesystem = filesystem.to_string();
99
100        self
101    }
102
103    /// Set endpoint of this backend.
104    ///
105    /// Endpoint must be full uri, e.g.
106    ///
107    /// - Azblob: `https://accountname.blob.core.windows.net`
108    /// - Azurite: `http://127.0.0.1:10000/devstoreaccount1`
109    pub fn endpoint(mut self, endpoint: &str) -> Self {
110        if !endpoint.is_empty() {
111            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
112            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
113        }
114
115        self
116    }
117
118    /// Set account_name of this backend.
119    ///
120    /// - If account_name is set, we will take user's input first.
121    /// - If not, we will try to load it from environment.
122    pub fn account_name(mut self, account_name: &str) -> Self {
123        if !account_name.is_empty() {
124            self.config.account_name = Some(account_name.to_string());
125        }
126
127        self
128    }
129
130    /// Set account_key of this backend.
131    ///
132    /// - If account_key is set, we will take user's input first.
133    /// - If not, we will try to load it from environment.
134    pub fn account_key(mut self, account_key: &str) -> Self {
135        if !account_key.is_empty() {
136            self.config.account_key = Some(account_key.to_string());
137        }
138
139        self
140    }
141
142    /// Specify the http client that used by this service.
143    ///
144    /// # Notes
145    ///
146    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
147    /// during minor updates.
148    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
149    #[allow(deprecated)]
150    pub fn http_client(mut self, client: HttpClient) -> Self {
151        self.http_client = Some(client);
152        self
153    }
154}
155
156impl Builder for AzdlsBuilder {
157    const SCHEME: Scheme = Scheme::Azdls;
158    type Config = AzdlsConfig;
159
160    fn build(self) -> Result<impl Access> {
161        debug!("backend build started: {:?}", &self);
162
163        let root = normalize_root(&self.config.root.unwrap_or_default());
164        debug!("backend use root {}", root);
165
166        // Handle endpoint, region and container name.
167        let filesystem = match self.config.filesystem.is_empty() {
168            false => Ok(&self.config.filesystem),
169            true => Err(Error::new(ErrorKind::ConfigInvalid, "filesystem is empty")
170                .with_operation("Builder::build")
171                .with_context("service", Scheme::Azdls)),
172        }?;
173        debug!("backend use filesystem {}", &filesystem);
174
175        let endpoint = match &self.config.endpoint {
176            Some(endpoint) => Ok(endpoint.clone()),
177            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
178                .with_operation("Builder::build")
179                .with_context("service", Scheme::Azdls)),
180        }?;
181        debug!("backend use endpoint {}", &endpoint);
182
183        let config_loader = AzureStorageConfig {
184            account_name: self
185                .config
186                .account_name
187                .clone()
188                .or_else(|| infer_storage_name_from_endpoint(endpoint.as_str())),
189            account_key: self.config.account_key.clone(),
190            sas_token: None,
191            ..Default::default()
192        };
193
194        let cred_loader = AzureStorageLoader::new(config_loader);
195        let signer = AzureStorageSigner::new();
196        Ok(AzdlsBackend {
197            core: Arc::new(AzdlsCore {
198                info: {
199                    let am = AccessorInfo::default();
200                    am.set_scheme(Scheme::Azdls)
201                        .set_root(&root)
202                        .set_name(filesystem)
203                        .set_native_capability(Capability {
204                            stat: true,
205                            stat_has_cache_control: true,
206                            stat_has_content_length: true,
207                            stat_has_content_type: true,
208                            stat_has_content_encoding: true,
209                            stat_has_content_range: true,
210                            stat_has_etag: true,
211                            stat_has_content_md5: true,
212                            stat_has_last_modified: true,
213                            stat_has_content_disposition: true,
214
215                            read: true,
216
217                            write: true,
218                            write_can_append: true,
219                            write_with_if_none_match: true,
220                            write_with_if_not_exists: true,
221
222                            create_dir: true,
223                            delete: true,
224                            rename: true,
225
226                            list: true,
227                            list_has_etag: true,
228                            list_has_content_length: true,
229                            list_has_last_modified: true,
230
231                            shared: true,
232
233                            ..Default::default()
234                        });
235
236                    // allow deprecated api here for compatibility
237                    #[allow(deprecated)]
238                    if let Some(client) = self.http_client {
239                        am.update_http_client(|_| client);
240                    }
241
242                    am.into()
243                },
244                filesystem: self.config.filesystem.clone(),
245                root,
246                endpoint,
247                loader: cred_loader,
248                signer,
249            }),
250        })
251    }
252}
253
254/// Backend for azblob services.
255#[derive(Debug, Clone)]
256pub struct AzdlsBackend {
257    core: Arc<AzdlsCore>,
258}
259
260impl Access for AzdlsBackend {
261    type Reader = HttpBody;
262    type Writer = AzdlsWriters;
263    type Lister = oio::PageLister<AzdlsLister>;
264    type Deleter = oio::OneShotDeleter<AzdlsDeleter>;
265    type BlockingReader = ();
266    type BlockingWriter = ();
267    type BlockingLister = ();
268    type BlockingDeleter = ();
269
270    fn info(&self) -> Arc<AccessorInfo> {
271        self.core.info.clone()
272    }
273
274    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
275        let resp = self
276            .core
277            .azdls_create(path, DIRECTORY, &OpWrite::default())
278            .await?;
279
280        let status = resp.status();
281        match status {
282            StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
283            _ => Err(parse_error(resp)),
284        }
285    }
286
287    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
288        // Stat root always returns a DIR.
289        // TODO: include metadata for the root (#4746)
290        if path == "/" {
291            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
292        }
293
294        let metadata = self.core.azdls_stat_metadata(path).await?;
295        Ok(RpStat::new(metadata))
296    }
297
298    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
299        let resp = self.core.azdls_read(path, args.range()).await?;
300
301        let status = resp.status();
302        match status {
303            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
304            _ => {
305                let (part, mut body) = resp.into_parts();
306                let buf = body.to_buffer().await?;
307                Err(parse_error(Response::from_parts(part, buf)))
308            }
309        }
310    }
311
312    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
313        let w = AzdlsWriter::new(self.core.clone(), args.clone(), path.to_string());
314        let w = if args.append() {
315            AzdlsWriters::Two(oio::AppendWriter::new(w))
316        } else {
317            AzdlsWriters::One(oio::OneShotWriter::new(w))
318        };
319        Ok((RpWrite::default(), w))
320    }
321
322    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
323        Ok((
324            RpDelete::default(),
325            oio::OneShotDeleter::new(AzdlsDeleter::new(self.core.clone())),
326        ))
327    }
328
329    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
330        let l = AzdlsLister::new(self.core.clone(), path.to_string(), args.limit());
331
332        Ok((RpList::default(), oio::PageLister::new(l)))
333    }
334
335    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
336        if let Some(resp) = self.core.azdls_ensure_parent_path(to).await? {
337            let status = resp.status();
338            match status {
339                StatusCode::CREATED | StatusCode::CONFLICT => {}
340                _ => return Err(parse_error(resp)),
341            }
342        }
343
344        let resp = self.core.azdls_rename(from, to).await?;
345
346        let status = resp.status();
347
348        match status {
349            StatusCode::CREATED => Ok(RpRename::default()),
350            _ => Err(parse_error(resp)),
351        }
352    }
353}
354
355fn infer_storage_name_from_endpoint(endpoint: &str) -> Option<String> {
356    let endpoint: &str = endpoint
357        .strip_prefix("http://")
358        .or_else(|| endpoint.strip_prefix("https://"))
359        .unwrap_or(endpoint);
360
361    let mut parts = endpoint.splitn(2, '.');
362    let storage_name = parts.next();
363    let endpoint_suffix = parts
364        .next()
365        .unwrap_or_default()
366        .trim_end_matches('/')
367        .to_lowercase();
368
369    if KNOWN_AZDLS_ENDPOINT_SUFFIX
370        .iter()
371        .any(|s| *s == endpoint_suffix.as_str())
372    {
373        storage_name.map(|s| s.to_string())
374    } else {
375        None
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::infer_storage_name_from_endpoint;
382
383    #[test]
384    fn test_infer_storage_name_from_endpoint() {
385        let endpoint = "https://account.dfs.core.windows.net";
386        let storage_name = infer_storage_name_from_endpoint(endpoint);
387        assert_eq!(storage_name, Some("account".to_string()));
388    }
389
390    #[test]
391    fn test_infer_storage_name_from_endpoint_with_trailing_slash() {
392        let endpoint = "https://account.dfs.core.windows.net/";
393        let storage_name = infer_storage_name_from_endpoint(endpoint);
394        assert_eq!(storage_name, Some("account".to_string()));
395    }
396}