opendal/services/azdls/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use http::Response;
23use http::StatusCode;
24use log::debug;
25use reqsign::AzureStorageConfig;
26use reqsign::AzureStorageLoader;
27use reqsign::AzureStorageSigner;
28
29use super::core::AzdlsCore;
30use super::core::DIRECTORY;
31use super::delete::AzdlsDeleter;
32use super::error::parse_error;
33use super::lister::AzdlsLister;
34use super::writer::AzdlsWriter;
35use super::writer::AzdlsWriters;
36use crate::raw::*;
37use crate::services::AzdlsConfig;
38use crate::*;
39
40impl From<AzureStorageConfig> for AzdlsConfig {
41    fn from(config: AzureStorageConfig) -> Self {
42        AzdlsConfig {
43            endpoint: config.endpoint,
44            account_name: config.account_name,
45            account_key: config.account_key,
46            client_secret: config.client_secret,
47            tenant_id: config.tenant_id,
48            client_id: config.client_id,
49            sas_token: config.sas_token,
50            authority_host: config.authority_host,
51            ..Default::default()
52        }
53    }
54}
55
56impl Configurator for AzdlsConfig {
57    type Builder = AzdlsBuilder;
58
59    #[allow(deprecated)]
60    fn into_builder(self) -> Self::Builder {
61        AzdlsBuilder {
62            config: self,
63            http_client: None,
64        }
65    }
66}
67
68/// Azure Data Lake Storage Gen2 Support.
69#[doc = include_str!("docs.md")]
70#[derive(Default, Clone)]
71pub struct AzdlsBuilder {
72    config: AzdlsConfig,
73
74    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
75    http_client: Option<HttpClient>,
76}
77
78impl Debug for AzdlsBuilder {
79    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
80        let mut ds = f.debug_struct("AzdlsBuilder");
81
82        ds.field("config", &self.config);
83
84        ds.finish()
85    }
86}
87
88impl AzdlsBuilder {
89    /// Set root of this backend.
90    ///
91    /// All operations will happen under this root.
92    pub fn root(mut self, root: &str) -> Self {
93        self.config.root = if root.is_empty() {
94            None
95        } else {
96            Some(root.to_string())
97        };
98
99        self
100    }
101
102    /// Set filesystem name of this backend.
103    pub fn filesystem(mut self, filesystem: &str) -> Self {
104        self.config.filesystem = filesystem.to_string();
105
106        self
107    }
108
109    /// Set endpoint of this backend.
110    ///
111    /// Endpoint must be full uri, e.g.
112    ///
113    /// - Azblob: `https://accountname.blob.core.windows.net`
114    /// - Azurite: `http://127.0.0.1:10000/devstoreaccount1`
115    pub fn endpoint(mut self, endpoint: &str) -> Self {
116        if !endpoint.is_empty() {
117            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
118            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
119        }
120
121        self
122    }
123
124    /// Set account_name of this backend.
125    ///
126    /// - If account_name is set, we will take user's input first.
127    /// - If not, we will try to load it from environment.
128    pub fn account_name(mut self, account_name: &str) -> Self {
129        if !account_name.is_empty() {
130            self.config.account_name = Some(account_name.to_string());
131        }
132
133        self
134    }
135
136    /// Set account_key of this backend.
137    ///
138    /// - If account_key is set, we will take user's input first.
139    /// - If not, we will try to load it from environment.
140    pub fn account_key(mut self, account_key: &str) -> Self {
141        if !account_key.is_empty() {
142            self.config.account_key = Some(account_key.to_string());
143        }
144
145        self
146    }
147
148    /// Set client_secret of this backend.
149    ///
150    /// - If client_secret is set, we will take user's input first.
151    /// - If not, we will try to load it from environment.
152    /// - required for client_credentials authentication
153    pub fn client_secret(mut self, client_secret: &str) -> Self {
154        if !client_secret.is_empty() {
155            self.config.client_secret = Some(client_secret.to_string());
156        }
157
158        self
159    }
160
161    /// Set tenant_id of this backend.
162    ///
163    /// - If tenant_id is set, we will take user's input first.
164    /// - If not, we will try to load it from environment.
165    /// - required for client_credentials authentication
166    pub fn tenant_id(mut self, tenant_id: &str) -> Self {
167        if !tenant_id.is_empty() {
168            self.config.tenant_id = Some(tenant_id.to_string());
169        }
170
171        self
172    }
173
174    /// Set client_id of this backend.
175    ///
176    /// - If client_id is set, we will take user's input first.
177    /// - If not, we will try to load it from environment.
178    /// - required for client_credentials authentication
179    pub fn client_id(mut self, client_id: &str) -> Self {
180        if !client_id.is_empty() {
181            self.config.client_id = Some(client_id.to_string());
182        }
183
184        self
185    }
186
187    /// Set the sas_token of this backend.
188    pub fn sas_token(mut self, sas_token: &str) -> Self {
189        if !sas_token.is_empty() {
190            self.config.sas_token = Some(sas_token.to_string());
191        }
192
193        self
194    }
195
196    /// Set authority_host of this backend.
197    ///
198    /// - If authority_host is set, we will take user's input first.
199    /// - If not, we will try to load it from environment.
200    /// - default value: `https://login.microsoftonline.com`
201    pub fn authority_host(mut self, authority_host: &str) -> Self {
202        if !authority_host.is_empty() {
203            self.config.authority_host = Some(authority_host.to_string());
204        }
205
206        self
207    }
208
209    /// Specify the http client that used by this service.
210    ///
211    /// # Notes
212    ///
213    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
214    /// during minor updates.
215    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
216    #[allow(deprecated)]
217    pub fn http_client(mut self, client: HttpClient) -> Self {
218        self.http_client = Some(client);
219        self
220    }
221
222    /// Create a new `AzdlsBuilder` instance from an [Azure Storage connection string][1].
223    ///
224    /// [1]: https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string
225    ///
226    /// # Example
227    /// ```
228    /// use opendal::Builder;
229    /// use opendal::services::Azdls;
230    ///
231    /// let conn_str = "AccountName=example;DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net";
232    ///
233    /// let mut config = Azdls::from_connection_string(&conn_str)
234    ///     .unwrap()
235    ///     // Add additional configuration if needed
236    ///     .filesystem("myFilesystem")
237    ///     .client_id("myClientId")
238    ///     .client_secret("myClientSecret")
239    ///     .tenant_id("myTenantId")
240    ///     .build()
241    ///     .unwrap();
242    /// ```
243    pub fn from_connection_string(conn_str: &str) -> Result<Self> {
244        let config =
245            raw::azure_config_from_connection_string(conn_str, raw::AzureStorageService::Adls)?;
246
247        Ok(AzdlsConfig::from(config).into_builder())
248    }
249}
250
251impl Builder for AzdlsBuilder {
252    const SCHEME: Scheme = Scheme::Azdls;
253    type Config = AzdlsConfig;
254
255    fn build(self) -> Result<impl Access> {
256        debug!("backend build started: {:?}", &self);
257
258        let root = normalize_root(&self.config.root.unwrap_or_default());
259        debug!("backend use root {}", root);
260
261        // Handle endpoint, region and container name.
262        let filesystem = match self.config.filesystem.is_empty() {
263            false => Ok(&self.config.filesystem),
264            true => Err(Error::new(ErrorKind::ConfigInvalid, "filesystem is empty")
265                .with_operation("Builder::build")
266                .with_context("service", Scheme::Azdls)),
267        }?;
268        debug!("backend use filesystem {}", &filesystem);
269
270        let endpoint = match &self.config.endpoint {
271            Some(endpoint) => Ok(endpoint.clone().trim_end_matches('/').to_string()),
272            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
273                .with_operation("Builder::build")
274                .with_context("service", Scheme::Azdls)),
275        }?;
276        debug!("backend use endpoint {}", &endpoint);
277
278        let config_loader = AzureStorageConfig {
279            account_name: self
280                .config
281                .account_name
282                .clone()
283                .or_else(|| raw::azure_account_name_from_endpoint(endpoint.as_str())),
284            account_key: self.config.account_key.clone(),
285            sas_token: self.config.sas_token,
286            client_id: self.config.client_id.clone(),
287            client_secret: self.config.client_secret.clone(),
288            tenant_id: self.config.tenant_id.clone(),
289            authority_host: self.config.authority_host.clone(),
290            ..Default::default()
291        };
292
293        let cred_loader = AzureStorageLoader::new(config_loader);
294        let signer = AzureStorageSigner::new();
295        Ok(AzdlsBackend {
296            core: Arc::new(AzdlsCore {
297                info: {
298                    let am = AccessorInfo::default();
299                    am.set_scheme(Scheme::Azdls)
300                        .set_root(&root)
301                        .set_name(filesystem)
302                        .set_native_capability(Capability {
303                            stat: true,
304                            stat_has_cache_control: true,
305                            stat_has_content_length: true,
306                            stat_has_content_type: true,
307                            stat_has_content_encoding: true,
308                            stat_has_content_range: true,
309                            stat_has_etag: true,
310                            stat_has_content_md5: true,
311                            stat_has_last_modified: true,
312                            stat_has_content_disposition: true,
313
314                            read: true,
315
316                            write: true,
317                            write_can_append: true,
318                            write_with_if_none_match: true,
319                            write_with_if_not_exists: true,
320
321                            create_dir: true,
322                            delete: true,
323                            rename: true,
324
325                            list: true,
326                            list_has_etag: true,
327                            list_has_content_length: true,
328                            list_has_last_modified: true,
329
330                            shared: true,
331
332                            ..Default::default()
333                        });
334
335                    // allow deprecated api here for compatibility
336                    #[allow(deprecated)]
337                    if let Some(client) = self.http_client {
338                        am.update_http_client(|_| client);
339                    }
340
341                    am.into()
342                },
343                filesystem: self.config.filesystem.clone(),
344                root,
345                endpoint,
346                loader: cred_loader,
347                signer,
348            }),
349        })
350    }
351}
352
353/// Backend for azblob services.
354#[derive(Debug, Clone)]
355pub struct AzdlsBackend {
356    core: Arc<AzdlsCore>,
357}
358
359impl Access for AzdlsBackend {
360    type Reader = HttpBody;
361    type Writer = AzdlsWriters;
362    type Lister = oio::PageLister<AzdlsLister>;
363    type Deleter = oio::OneShotDeleter<AzdlsDeleter>;
364
365    fn info(&self) -> Arc<AccessorInfo> {
366        self.core.info.clone()
367    }
368
369    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
370        let resp = self
371            .core
372            .azdls_create(path, DIRECTORY, &OpWrite::default())
373            .await?;
374
375        let status = resp.status();
376        match status {
377            StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
378            _ => Err(parse_error(resp)),
379        }
380    }
381
382    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
383        // Stat root always returns a DIR.
384        // TODO: include metadata for the root (#4746)
385        if path == "/" {
386            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
387        }
388
389        let metadata = self.core.azdls_stat_metadata(path).await?;
390        Ok(RpStat::new(metadata))
391    }
392
393    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
394        let resp = self.core.azdls_read(path, args.range()).await?;
395
396        let status = resp.status();
397        match status {
398            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
399            _ => {
400                let (part, mut body) = resp.into_parts();
401                let buf = body.to_buffer().await?;
402                Err(parse_error(Response::from_parts(part, buf)))
403            }
404        }
405    }
406
407    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
408        let w = AzdlsWriter::new(self.core.clone(), args.clone(), path.to_string());
409        let w = if args.append() {
410            AzdlsWriters::Two(oio::AppendWriter::new(w))
411        } else {
412            AzdlsWriters::One(oio::OneShotWriter::new(w))
413        };
414        Ok((RpWrite::default(), w))
415    }
416
417    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
418        Ok((
419            RpDelete::default(),
420            oio::OneShotDeleter::new(AzdlsDeleter::new(self.core.clone())),
421        ))
422    }
423
424    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
425        let l = AzdlsLister::new(self.core.clone(), path.to_string(), args.limit());
426
427        Ok((RpList::default(), oio::PageLister::new(l)))
428    }
429
430    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
431        if let Some(resp) = self.core.azdls_ensure_parent_path(to).await? {
432            let status = resp.status();
433            match status {
434                StatusCode::CREATED | StatusCode::CONFLICT => {}
435                _ => return Err(parse_error(resp)),
436            }
437        }
438
439        let resp = self.core.azdls_rename(from, to).await?;
440
441        let status = resp.status();
442
443        match status {
444            StatusCode::CREATED => Ok(RpRename::default()),
445            _ => Err(parse_error(resp)),
446        }
447    }
448}