opendal/services/azfile/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::Response;
22use http::StatusCode;
23use log::debug;
24use reqsign::AzureStorageConfig;
25use reqsign::AzureStorageLoader;
26use reqsign::AzureStorageSigner;
27
28use super::AZFILE_SCHEME;
29use super::config::AzfileConfig;
30use super::core::AzfileCore;
31use super::core::X_MS_META_PREFIX;
32use super::deleter::AzfileDeleter;
33use super::error::parse_error;
34use super::lister::AzfileLister;
35use super::writer::AzfileWriter;
36use super::writer::AzfileWriters;
37use crate::raw::*;
38use crate::*;
39
40impl From<AzureStorageConfig> for AzfileConfig {
41    fn from(config: AzureStorageConfig) -> Self {
42        AzfileConfig {
43            account_name: config.account_name,
44            account_key: config.account_key,
45            sas_token: config.sas_token,
46            endpoint: config.endpoint,
47            root: None,                // root is not part of AzureStorageConfig
48            share_name: String::new(), // share_name is not part of AzureStorageConfig
49        }
50    }
51}
52
53/// Azure File services support.
54#[doc = include_str!("docs.md")]
55#[derive(Default)]
56pub struct AzfileBuilder {
57    pub(super) config: AzfileConfig,
58
59    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
60    pub(super) http_client: Option<HttpClient>,
61}
62
63impl Debug for AzfileBuilder {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        f.debug_struct("AzfileBuilder")
66            .field("config", &self.config)
67            .finish_non_exhaustive()
68    }
69}
70
71impl AzfileBuilder {
72    /// Set root of this backend.
73    ///
74    /// All operations will happen under this root.
75    pub fn root(mut self, root: &str) -> Self {
76        self.config.root = if root.is_empty() {
77            None
78        } else {
79            Some(root.to_string())
80        };
81
82        self
83    }
84
85    /// Set endpoint of this backend.
86    pub fn endpoint(mut self, endpoint: &str) -> Self {
87        if !endpoint.is_empty() {
88            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
89            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
90        }
91
92        self
93    }
94
95    /// Set account_name of this backend.
96    ///
97    /// - If account_name is set, we will take user's input first.
98    /// - If not, we will try to load it from environment.
99    pub fn account_name(mut self, account_name: &str) -> Self {
100        if !account_name.is_empty() {
101            self.config.account_name = Some(account_name.to_string());
102        }
103
104        self
105    }
106
107    /// Set account_key of this backend.
108    ///
109    /// - If account_key is set, we will take user's input first.
110    /// - If not, we will try to load it from environment.
111    pub fn account_key(mut self, account_key: &str) -> Self {
112        if !account_key.is_empty() {
113            self.config.account_key = Some(account_key.to_string());
114        }
115
116        self
117    }
118
119    /// Set file share name of this backend.
120    ///
121    /// # Notes
122    /// You can find more about from: <https://learn.microsoft.com/en-us/rest/api/storageservices/operations-on-shares--file-service>
123    pub fn share_name(mut self, share_name: &str) -> Self {
124        if !share_name.is_empty() {
125            self.config.share_name = share_name.to_string();
126        }
127
128        self
129    }
130
131    /// Specify the http client that used by this service.
132    ///
133    /// # Notes
134    ///
135    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
136    /// during minor updates.
137    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
138    #[allow(deprecated)]
139    pub fn http_client(mut self, client: HttpClient) -> Self {
140        self.http_client = Some(client);
141        self
142    }
143
144    /// Create a new `AfileBuilder` instance from an [Azure Storage connection string][1].
145    ///
146    /// [1]: https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string
147    ///
148    /// # Example
149    /// ```
150    /// use opendal::Builder;
151    /// use opendal::services::Azfile;
152    ///
153    /// let conn_str = "AccountName=example;DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net";
154    ///
155    /// let mut config = Azfile::from_connection_string(&conn_str)
156    ///     .unwrap()
157    ///     // Add additional configuration if needed
158    ///     .share_name("myShare")
159    ///     .build()
160    ///     .unwrap();
161    /// ```
162    pub fn from_connection_string(conn_str: &str) -> Result<Self> {
163        let config =
164            raw::azure_config_from_connection_string(conn_str, raw::AzureStorageService::File)?;
165
166        Ok(AzfileConfig::from(config).into_builder())
167    }
168}
169
170impl Builder for AzfileBuilder {
171    type Config = AzfileConfig;
172
173    fn build(self) -> Result<impl Access> {
174        debug!("backend build started: {:?}", &self);
175
176        let root = normalize_root(&self.config.root.unwrap_or_default());
177        debug!("backend use root {root}");
178
179        let endpoint = match &self.config.endpoint {
180            Some(endpoint) => Ok(endpoint.clone()),
181            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
182                .with_operation("Builder::build")
183                .with_context("service", AZFILE_SCHEME)),
184        }?;
185        debug!("backend use endpoint {}", &endpoint);
186
187        let account_name_option = self
188            .config
189            .account_name
190            .clone()
191            .or_else(|| raw::azure_account_name_from_endpoint(endpoint.as_str()));
192
193        let account_name = match account_name_option {
194            Some(account_name) => Ok(account_name),
195            None => Err(
196                Error::new(ErrorKind::ConfigInvalid, "account_name is empty")
197                    .with_operation("Builder::build")
198                    .with_context("service", AZFILE_SCHEME),
199            ),
200        }?;
201
202        let config_loader = AzureStorageConfig {
203            account_name: Some(account_name),
204            account_key: self.config.account_key.clone(),
205            sas_token: self.config.sas_token.clone(),
206            ..Default::default()
207        };
208
209        let cred_loader = AzureStorageLoader::new(config_loader);
210        let signer = AzureStorageSigner::new();
211        Ok(AzfileBackend {
212            core: Arc::new(AzfileCore {
213                info: {
214                    let am = AccessorInfo::default();
215                    am.set_scheme(AZFILE_SCHEME)
216                        .set_root(&root)
217                        .set_native_capability(Capability {
218                            stat: true,
219
220                            read: true,
221
222                            write: true,
223                            write_with_user_metadata: true,
224
225                            create_dir: true,
226                            delete: true,
227                            rename: true,
228
229                            list: true,
230
231                            shared: true,
232
233                            ..Default::default()
234                        });
235
236                    // allow deprecated api here for compatibility
237                    #[allow(deprecated)]
238                    if let Some(client) = self.http_client {
239                        am.update_http_client(|_| client);
240                    }
241
242                    am.into()
243                },
244                root,
245                endpoint,
246                loader: cred_loader,
247                signer,
248                share_name: self.config.share_name.clone(),
249            }),
250        })
251    }
252}
253
254/// Backend for azfile services.
255#[derive(Debug, Clone)]
256pub struct AzfileBackend {
257    core: Arc<AzfileCore>,
258}
259
260impl Access for AzfileBackend {
261    type Reader = HttpBody;
262    type Writer = AzfileWriters;
263    type Lister = oio::PageLister<AzfileLister>;
264    type Deleter = oio::OneShotDeleter<AzfileDeleter>;
265
266    fn info(&self) -> Arc<AccessorInfo> {
267        self.core.info.clone()
268    }
269
270    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
271        self.core.ensure_parent_dir_exists(path).await?;
272        let resp = self.core.azfile_create_dir(path).await?;
273        let status = resp.status();
274
275        match status {
276            StatusCode::CREATED => Ok(RpCreateDir::default()),
277            _ => {
278                // we cannot just check status code because 409 Conflict has two meaning:
279                // 1. If a directory by the same name is being deleted when Create Directory is called, the server returns status code 409 (Conflict)
280                // 2. If a directory or file with the same name already exists, the operation fails with status code 409 (Conflict).
281                // but we just need case 2 (already exists)
282                // ref: https://learn.microsoft.com/en-us/rest/api/storageservices/create-directory
283                if resp
284                    .headers()
285                    .get("x-ms-error-code")
286                    .map(|value| value.to_str().unwrap_or(""))
287                    .unwrap_or_else(|| "")
288                    == "ResourceAlreadyExists"
289                {
290                    Ok(RpCreateDir::default())
291                } else {
292                    Err(parse_error(resp))
293                }
294            }
295        }
296    }
297
298    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
299        let resp = if path.ends_with('/') {
300            self.core.azfile_get_directory_properties(path).await?
301        } else {
302            self.core.azfile_get_file_properties(path).await?
303        };
304
305        let status = resp.status();
306        match status {
307            StatusCode::OK => {
308                let headers = resp.headers();
309                let mut meta = parse_into_metadata(path, headers)?;
310                let user_meta = parse_prefixed_headers(headers, X_MS_META_PREFIX);
311                if !user_meta.is_empty() {
312                    meta = meta.with_user_metadata(user_meta);
313                }
314                Ok(RpStat::new(meta))
315            }
316            _ => Err(parse_error(resp)),
317        }
318    }
319
320    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
321        let resp = self.core.azfile_read(path, args.range()).await?;
322
323        let status = resp.status();
324        match status {
325            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
326            _ => {
327                let (part, mut body) = resp.into_parts();
328                let buf = body.to_buffer().await?;
329                Err(parse_error(Response::from_parts(part, buf)))
330            }
331        }
332    }
333
334    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
335        self.core.ensure_parent_dir_exists(path).await?;
336        let w = AzfileWriter::new(self.core.clone(), args.clone(), path.to_string());
337        let w = if args.append() {
338            AzfileWriters::Two(oio::AppendWriter::new(w))
339        } else {
340            AzfileWriters::One(oio::OneShotWriter::new(w))
341        };
342        Ok((RpWrite::default(), w))
343    }
344
345    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
346        Ok((
347            RpDelete::default(),
348            oio::OneShotDeleter::new(AzfileDeleter::new(self.core.clone())),
349        ))
350    }
351
352    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
353        let l = AzfileLister::new(self.core.clone(), path.to_string(), args.limit());
354
355        Ok((RpList::default(), oio::PageLister::new(l)))
356    }
357
358    async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
359        self.core.ensure_parent_dir_exists(to).await?;
360        let resp = self.core.azfile_rename(from, to).await?;
361        let status = resp.status();
362        match status {
363            StatusCode::OK => Ok(RpRename::default()),
364            _ => Err(parse_error(resp)),
365        }
366    }
367}