1use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::Response;
22use http::StatusCode;
23use log::debug;
24use reqsign::AzureStorageConfig;
25use reqsign::AzureStorageLoader;
26use reqsign::AzureStorageSigner;
27
28use super::AZFILE_SCHEME;
29use super::config::AzfileConfig;
30use super::core::AzfileCore;
31use super::core::X_MS_META_PREFIX;
32use super::deleter::AzfileDeleter;
33use super::error::parse_error;
34use super::lister::AzfileLister;
35use super::writer::AzfileWriter;
36use super::writer::AzfileWriters;
37use crate::raw::*;
38use crate::*;
39
40impl From<AzureStorageConfig> for AzfileConfig {
41 fn from(config: AzureStorageConfig) -> Self {
42 AzfileConfig {
43 account_name: config.account_name,
44 account_key: config.account_key,
45 sas_token: config.sas_token,
46 endpoint: config.endpoint,
47 root: None, share_name: String::new(), }
50 }
51}
52
53#[doc = include_str!("docs.md")]
55#[derive(Default)]
56pub struct AzfileBuilder {
57 pub(super) config: AzfileConfig,
58
59 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
60 pub(super) http_client: Option<HttpClient>,
61}
62
63impl Debug for AzfileBuilder {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 f.debug_struct("AzfileBuilder")
66 .field("config", &self.config)
67 .finish_non_exhaustive()
68 }
69}
70
71impl AzfileBuilder {
72 pub fn root(mut self, root: &str) -> Self {
76 self.config.root = if root.is_empty() {
77 None
78 } else {
79 Some(root.to_string())
80 };
81
82 self
83 }
84
85 pub fn endpoint(mut self, endpoint: &str) -> Self {
87 if !endpoint.is_empty() {
88 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
90 }
91
92 self
93 }
94
95 pub fn account_name(mut self, account_name: &str) -> Self {
100 if !account_name.is_empty() {
101 self.config.account_name = Some(account_name.to_string());
102 }
103
104 self
105 }
106
107 pub fn account_key(mut self, account_key: &str) -> Self {
112 if !account_key.is_empty() {
113 self.config.account_key = Some(account_key.to_string());
114 }
115
116 self
117 }
118
119 pub fn share_name(mut self, share_name: &str) -> Self {
124 if !share_name.is_empty() {
125 self.config.share_name = share_name.to_string();
126 }
127
128 self
129 }
130
131 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
138 #[allow(deprecated)]
139 pub fn http_client(mut self, client: HttpClient) -> Self {
140 self.http_client = Some(client);
141 self
142 }
143
144 pub fn from_connection_string(conn_str: &str) -> Result<Self> {
163 let config =
164 raw::azure_config_from_connection_string(conn_str, raw::AzureStorageService::File)?;
165
166 Ok(AzfileConfig::from(config).into_builder())
167 }
168}
169
170impl Builder for AzfileBuilder {
171 type Config = AzfileConfig;
172
173 fn build(self) -> Result<impl Access> {
174 debug!("backend build started: {:?}", &self);
175
176 let root = normalize_root(&self.config.root.unwrap_or_default());
177 debug!("backend use root {root}");
178
179 let endpoint = match &self.config.endpoint {
180 Some(endpoint) => Ok(endpoint.clone()),
181 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
182 .with_operation("Builder::build")
183 .with_context("service", AZFILE_SCHEME)),
184 }?;
185 debug!("backend use endpoint {}", &endpoint);
186
187 let account_name_option = self
188 .config
189 .account_name
190 .clone()
191 .or_else(|| raw::azure_account_name_from_endpoint(endpoint.as_str()));
192
193 let account_name = match account_name_option {
194 Some(account_name) => Ok(account_name),
195 None => Err(
196 Error::new(ErrorKind::ConfigInvalid, "account_name is empty")
197 .with_operation("Builder::build")
198 .with_context("service", AZFILE_SCHEME),
199 ),
200 }?;
201
202 let config_loader = AzureStorageConfig {
203 account_name: Some(account_name),
204 account_key: self.config.account_key.clone(),
205 sas_token: self.config.sas_token.clone(),
206 ..Default::default()
207 };
208
209 let cred_loader = AzureStorageLoader::new(config_loader);
210 let signer = AzureStorageSigner::new();
211 Ok(AzfileBackend {
212 core: Arc::new(AzfileCore {
213 info: {
214 let am = AccessorInfo::default();
215 am.set_scheme(AZFILE_SCHEME)
216 .set_root(&root)
217 .set_native_capability(Capability {
218 stat: true,
219
220 read: true,
221
222 write: true,
223 write_with_user_metadata: true,
224
225 create_dir: true,
226 delete: true,
227 rename: true,
228
229 list: true,
230
231 shared: true,
232
233 ..Default::default()
234 });
235
236 #[allow(deprecated)]
238 if let Some(client) = self.http_client {
239 am.update_http_client(|_| client);
240 }
241
242 am.into()
243 },
244 root,
245 endpoint,
246 loader: cred_loader,
247 signer,
248 share_name: self.config.share_name.clone(),
249 }),
250 })
251 }
252}
253
254#[derive(Debug, Clone)]
256pub struct AzfileBackend {
257 core: Arc<AzfileCore>,
258}
259
260impl Access for AzfileBackend {
261 type Reader = HttpBody;
262 type Writer = AzfileWriters;
263 type Lister = oio::PageLister<AzfileLister>;
264 type Deleter = oio::OneShotDeleter<AzfileDeleter>;
265
266 fn info(&self) -> Arc<AccessorInfo> {
267 self.core.info.clone()
268 }
269
270 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
271 self.core.ensure_parent_dir_exists(path).await?;
272 let resp = self.core.azfile_create_dir(path).await?;
273 let status = resp.status();
274
275 match status {
276 StatusCode::CREATED => Ok(RpCreateDir::default()),
277 _ => {
278 if resp
284 .headers()
285 .get("x-ms-error-code")
286 .map(|value| value.to_str().unwrap_or(""))
287 .unwrap_or_else(|| "")
288 == "ResourceAlreadyExists"
289 {
290 Ok(RpCreateDir::default())
291 } else {
292 Err(parse_error(resp))
293 }
294 }
295 }
296 }
297
298 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
299 let resp = if path.ends_with('/') {
300 self.core.azfile_get_directory_properties(path).await?
301 } else {
302 self.core.azfile_get_file_properties(path).await?
303 };
304
305 let status = resp.status();
306 match status {
307 StatusCode::OK => {
308 let headers = resp.headers();
309 let mut meta = parse_into_metadata(path, headers)?;
310 let user_meta = parse_prefixed_headers(headers, X_MS_META_PREFIX);
311 if !user_meta.is_empty() {
312 meta = meta.with_user_metadata(user_meta);
313 }
314 Ok(RpStat::new(meta))
315 }
316 _ => Err(parse_error(resp)),
317 }
318 }
319
320 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
321 let resp = self.core.azfile_read(path, args.range()).await?;
322
323 let status = resp.status();
324 match status {
325 StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
326 _ => {
327 let (part, mut body) = resp.into_parts();
328 let buf = body.to_buffer().await?;
329 Err(parse_error(Response::from_parts(part, buf)))
330 }
331 }
332 }
333
334 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
335 self.core.ensure_parent_dir_exists(path).await?;
336 let w = AzfileWriter::new(self.core.clone(), args.clone(), path.to_string());
337 let w = if args.append() {
338 AzfileWriters::Two(oio::AppendWriter::new(w))
339 } else {
340 AzfileWriters::One(oio::OneShotWriter::new(w))
341 };
342 Ok((RpWrite::default(), w))
343 }
344
345 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
346 Ok((
347 RpDelete::default(),
348 oio::OneShotDeleter::new(AzfileDeleter::new(self.core.clone())),
349 ))
350 }
351
352 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
353 let l = AzfileLister::new(self.core.clone(), path.to_string(), args.limit());
354
355 Ok((RpList::default(), oio::PageLister::new(l)))
356 }
357
358 async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
359 self.core.ensure_parent_dir_exists(to).await?;
360 let resp = self.core.azfile_rename(from, to).await?;
361 let status = resp.status();
362 match status {
363 StatusCode::OK => Ok(RpRename::default()),
364 _ => Err(parse_error(resp)),
365 }
366 }
367}