opendal/services/azfile/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::Response;
22use http::StatusCode;
23use log::debug;
24use reqsign::AzureStorageConfig;
25use reqsign::AzureStorageLoader;
26use reqsign::AzureStorageSigner;
27
28use super::AZFILE_SCHEME;
29use super::config::AzfileConfig;
30use super::core::AzfileCore;
31use super::deleter::AzfileDeleter;
32use super::error::parse_error;
33use super::lister::AzfileLister;
34use super::writer::AzfileWriter;
35use super::writer::AzfileWriters;
36use crate::raw::*;
37use crate::*;
38
39impl From<AzureStorageConfig> for AzfileConfig {
40    fn from(config: AzureStorageConfig) -> Self {
41        AzfileConfig {
42            account_name: config.account_name,
43            account_key: config.account_key,
44            sas_token: config.sas_token,
45            endpoint: config.endpoint,
46            root: None,                // root is not part of AzureStorageConfig
47            share_name: String::new(), // share_name is not part of AzureStorageConfig
48        }
49    }
50}
51
52/// Azure File services support.
53#[doc = include_str!("docs.md")]
54#[derive(Default)]
55pub struct AzfileBuilder {
56    pub(super) config: AzfileConfig,
57
58    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
59    pub(super) http_client: Option<HttpClient>,
60}
61
62impl Debug for AzfileBuilder {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("AzfileBuilder")
65            .field("config", &self.config)
66            .finish_non_exhaustive()
67    }
68}
69
70impl AzfileBuilder {
71    /// Set root of this backend.
72    ///
73    /// All operations will happen under this root.
74    pub fn root(mut self, root: &str) -> Self {
75        self.config.root = if root.is_empty() {
76            None
77        } else {
78            Some(root.to_string())
79        };
80
81        self
82    }
83
84    /// Set endpoint of this backend.
85    pub fn endpoint(mut self, endpoint: &str) -> Self {
86        if !endpoint.is_empty() {
87            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
88            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
89        }
90
91        self
92    }
93
94    /// Set account_name of this backend.
95    ///
96    /// - If account_name is set, we will take user's input first.
97    /// - If not, we will try to load it from environment.
98    pub fn account_name(mut self, account_name: &str) -> Self {
99        if !account_name.is_empty() {
100            self.config.account_name = Some(account_name.to_string());
101        }
102
103        self
104    }
105
106    /// Set account_key of this backend.
107    ///
108    /// - If account_key is set, we will take user's input first.
109    /// - If not, we will try to load it from environment.
110    pub fn account_key(mut self, account_key: &str) -> Self {
111        if !account_key.is_empty() {
112            self.config.account_key = Some(account_key.to_string());
113        }
114
115        self
116    }
117
118    /// Set file share name of this backend.
119    ///
120    /// # Notes
121    /// You can find more about from: <https://learn.microsoft.com/en-us/rest/api/storageservices/operations-on-shares--file-service>
122    pub fn share_name(mut self, share_name: &str) -> Self {
123        if !share_name.is_empty() {
124            self.config.share_name = share_name.to_string();
125        }
126
127        self
128    }
129
130    /// Specify the http client that used by this service.
131    ///
132    /// # Notes
133    ///
134    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
135    /// during minor updates.
136    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
137    #[allow(deprecated)]
138    pub fn http_client(mut self, client: HttpClient) -> Self {
139        self.http_client = Some(client);
140        self
141    }
142
143    /// Create a new `AfileBuilder` instance from an [Azure Storage connection string][1].
144    ///
145    /// [1]: https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string
146    ///
147    /// # Example
148    /// ```
149    /// use opendal::Builder;
150    /// use opendal::services::Azfile;
151    ///
152    /// let conn_str = "AccountName=example;DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net";
153    ///
154    /// let mut config = Azfile::from_connection_string(&conn_str)
155    ///     .unwrap()
156    ///     // Add additional configuration if needed
157    ///     .share_name("myShare")
158    ///     .build()
159    ///     .unwrap();
160    /// ```
161    pub fn from_connection_string(conn_str: &str) -> Result<Self> {
162        let config =
163            raw::azure_config_from_connection_string(conn_str, raw::AzureStorageService::File)?;
164
165        Ok(AzfileConfig::from(config).into_builder())
166    }
167}
168
169impl Builder for AzfileBuilder {
170    type Config = AzfileConfig;
171
172    fn build(self) -> Result<impl Access> {
173        debug!("backend build started: {:?}", &self);
174
175        let root = normalize_root(&self.config.root.unwrap_or_default());
176        debug!("backend use root {root}");
177
178        let endpoint = match &self.config.endpoint {
179            Some(endpoint) => Ok(endpoint.clone()),
180            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
181                .with_operation("Builder::build")
182                .with_context("service", AZFILE_SCHEME)),
183        }?;
184        debug!("backend use endpoint {}", &endpoint);
185
186        let account_name_option = self
187            .config
188            .account_name
189            .clone()
190            .or_else(|| raw::azure_account_name_from_endpoint(endpoint.as_str()));
191
192        let account_name = match account_name_option {
193            Some(account_name) => Ok(account_name),
194            None => Err(
195                Error::new(ErrorKind::ConfigInvalid, "account_name is empty")
196                    .with_operation("Builder::build")
197                    .with_context("service", AZFILE_SCHEME),
198            ),
199        }?;
200
201        let config_loader = AzureStorageConfig {
202            account_name: Some(account_name),
203            account_key: self.config.account_key.clone(),
204            sas_token: self.config.sas_token.clone(),
205            ..Default::default()
206        };
207
208        let cred_loader = AzureStorageLoader::new(config_loader);
209        let signer = AzureStorageSigner::new();
210        Ok(AzfileBackend {
211            core: Arc::new(AzfileCore {
212                info: {
213                    let am = AccessorInfo::default();
214                    am.set_scheme(AZFILE_SCHEME)
215                        .set_root(&root)
216                        .set_native_capability(Capability {
217                            stat: true,
218
219                            read: true,
220
221                            write: true,
222                            create_dir: true,
223                            delete: true,
224                            rename: true,
225
226                            list: true,
227
228                            shared: true,
229
230                            ..Default::default()
231                        });
232
233                    // allow deprecated api here for compatibility
234                    #[allow(deprecated)]
235                    if let Some(client) = self.http_client {
236                        am.update_http_client(|_| client);
237                    }
238
239                    am.into()
240                },
241                root,
242                endpoint,
243                loader: cred_loader,
244                signer,
245                share_name: self.config.share_name.clone(),
246            }),
247        })
248    }
249}
250
251/// Backend for azfile services.
252#[derive(Debug, Clone)]
253pub struct AzfileBackend {
254    core: Arc<AzfileCore>,
255}
256
257impl Access for AzfileBackend {
258    type Reader = HttpBody;
259    type Writer = AzfileWriters;
260    type Lister = oio::PageLister<AzfileLister>;
261    type Deleter = oio::OneShotDeleter<AzfileDeleter>;
262
263    fn info(&self) -> Arc<AccessorInfo> {
264        self.core.info.clone()
265    }
266
267    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
268        self.core.ensure_parent_dir_exists(path).await?;
269        let resp = self.core.azfile_create_dir(path).await?;
270        let status = resp.status();
271
272        match status {
273            StatusCode::CREATED => Ok(RpCreateDir::default()),
274            _ => {
275                // we cannot just check status code because 409 Conflict has two meaning:
276                // 1. If a directory by the same name is being deleted when Create Directory is called, the server returns status code 409 (Conflict)
277                // 2. If a directory or file with the same name already exists, the operation fails with status code 409 (Conflict).
278                // but we just need case 2 (already exists)
279                // ref: https://learn.microsoft.com/en-us/rest/api/storageservices/create-directory
280                if resp
281                    .headers()
282                    .get("x-ms-error-code")
283                    .map(|value| value.to_str().unwrap_or(""))
284                    .unwrap_or_else(|| "")
285                    == "ResourceAlreadyExists"
286                {
287                    Ok(RpCreateDir::default())
288                } else {
289                    Err(parse_error(resp))
290                }
291            }
292        }
293    }
294
295    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
296        let resp = if path.ends_with('/') {
297            self.core.azfile_get_directory_properties(path).await?
298        } else {
299            self.core.azfile_get_file_properties(path).await?
300        };
301
302        let status = resp.status();
303        match status {
304            StatusCode::OK => {
305                let meta = parse_into_metadata(path, resp.headers())?;
306                Ok(RpStat::new(meta))
307            }
308            _ => Err(parse_error(resp)),
309        }
310    }
311
312    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
313        let resp = self.core.azfile_read(path, args.range()).await?;
314
315        let status = resp.status();
316        match status {
317            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
318            _ => {
319                let (part, mut body) = resp.into_parts();
320                let buf = body.to_buffer().await?;
321                Err(parse_error(Response::from_parts(part, buf)))
322            }
323        }
324    }
325
326    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
327        self.core.ensure_parent_dir_exists(path).await?;
328        let w = AzfileWriter::new(self.core.clone(), args.clone(), path.to_string());
329        let w = if args.append() {
330            AzfileWriters::Two(oio::AppendWriter::new(w))
331        } else {
332            AzfileWriters::One(oio::OneShotWriter::new(w))
333        };
334        Ok((RpWrite::default(), w))
335    }
336
337    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
338        Ok((
339            RpDelete::default(),
340            oio::OneShotDeleter::new(AzfileDeleter::new(self.core.clone())),
341        ))
342    }
343
344    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
345        let l = AzfileLister::new(self.core.clone(), path.to_string(), args.limit());
346
347        Ok((RpList::default(), oio::PageLister::new(l)))
348    }
349
350    async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
351        self.core.ensure_parent_dir_exists(to).await?;
352        let resp = self.core.azfile_rename(from, to).await?;
353        let status = resp.status();
354        match status {
355            StatusCode::OK => Ok(RpRename::default()),
356            _ => Err(parse_error(resp)),
357        }
358    }
359}