1use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::Response;
22use http::StatusCode;
23use log::debug;
24use reqsign::AzureStorageConfig;
25use reqsign::AzureStorageLoader;
26use reqsign::AzureStorageSigner;
27
28use super::AZFILE_SCHEME;
29use super::config::AzfileConfig;
30use super::core::AzfileCore;
31use super::deleter::AzfileDeleter;
32use super::error::parse_error;
33use super::lister::AzfileLister;
34use super::writer::AzfileWriter;
35use super::writer::AzfileWriters;
36use crate::raw::*;
37use crate::*;
38
39impl From<AzureStorageConfig> for AzfileConfig {
40 fn from(config: AzureStorageConfig) -> Self {
41 AzfileConfig {
42 account_name: config.account_name,
43 account_key: config.account_key,
44 sas_token: config.sas_token,
45 endpoint: config.endpoint,
46 root: None, share_name: String::new(), }
49 }
50}
51
52#[doc = include_str!("docs.md")]
54#[derive(Default)]
55pub struct AzfileBuilder {
56 pub(super) config: AzfileConfig,
57
58 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
59 pub(super) http_client: Option<HttpClient>,
60}
61
62impl Debug for AzfileBuilder {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("AzfileBuilder")
65 .field("config", &self.config)
66 .finish_non_exhaustive()
67 }
68}
69
70impl AzfileBuilder {
71 pub fn root(mut self, root: &str) -> Self {
75 self.config.root = if root.is_empty() {
76 None
77 } else {
78 Some(root.to_string())
79 };
80
81 self
82 }
83
84 pub fn endpoint(mut self, endpoint: &str) -> Self {
86 if !endpoint.is_empty() {
87 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string());
89 }
90
91 self
92 }
93
94 pub fn account_name(mut self, account_name: &str) -> Self {
99 if !account_name.is_empty() {
100 self.config.account_name = Some(account_name.to_string());
101 }
102
103 self
104 }
105
106 pub fn account_key(mut self, account_key: &str) -> Self {
111 if !account_key.is_empty() {
112 self.config.account_key = Some(account_key.to_string());
113 }
114
115 self
116 }
117
118 pub fn share_name(mut self, share_name: &str) -> Self {
123 if !share_name.is_empty() {
124 self.config.share_name = share_name.to_string();
125 }
126
127 self
128 }
129
130 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
137 #[allow(deprecated)]
138 pub fn http_client(mut self, client: HttpClient) -> Self {
139 self.http_client = Some(client);
140 self
141 }
142
143 pub fn from_connection_string(conn_str: &str) -> Result<Self> {
162 let config =
163 raw::azure_config_from_connection_string(conn_str, raw::AzureStorageService::File)?;
164
165 Ok(AzfileConfig::from(config).into_builder())
166 }
167}
168
169impl Builder for AzfileBuilder {
170 type Config = AzfileConfig;
171
172 fn build(self) -> Result<impl Access> {
173 debug!("backend build started: {:?}", &self);
174
175 let root = normalize_root(&self.config.root.unwrap_or_default());
176 debug!("backend use root {root}");
177
178 let endpoint = match &self.config.endpoint {
179 Some(endpoint) => Ok(endpoint.clone()),
180 None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
181 .with_operation("Builder::build")
182 .with_context("service", AZFILE_SCHEME)),
183 }?;
184 debug!("backend use endpoint {}", &endpoint);
185
186 let account_name_option = self
187 .config
188 .account_name
189 .clone()
190 .or_else(|| raw::azure_account_name_from_endpoint(endpoint.as_str()));
191
192 let account_name = match account_name_option {
193 Some(account_name) => Ok(account_name),
194 None => Err(
195 Error::new(ErrorKind::ConfigInvalid, "account_name is empty")
196 .with_operation("Builder::build")
197 .with_context("service", AZFILE_SCHEME),
198 ),
199 }?;
200
201 let config_loader = AzureStorageConfig {
202 account_name: Some(account_name),
203 account_key: self.config.account_key.clone(),
204 sas_token: self.config.sas_token.clone(),
205 ..Default::default()
206 };
207
208 let cred_loader = AzureStorageLoader::new(config_loader);
209 let signer = AzureStorageSigner::new();
210 Ok(AzfileBackend {
211 core: Arc::new(AzfileCore {
212 info: {
213 let am = AccessorInfo::default();
214 am.set_scheme(AZFILE_SCHEME)
215 .set_root(&root)
216 .set_native_capability(Capability {
217 stat: true,
218
219 read: true,
220
221 write: true,
222 create_dir: true,
223 delete: true,
224 rename: true,
225
226 list: true,
227
228 shared: true,
229
230 ..Default::default()
231 });
232
233 #[allow(deprecated)]
235 if let Some(client) = self.http_client {
236 am.update_http_client(|_| client);
237 }
238
239 am.into()
240 },
241 root,
242 endpoint,
243 loader: cred_loader,
244 signer,
245 share_name: self.config.share_name.clone(),
246 }),
247 })
248 }
249}
250
251#[derive(Debug, Clone)]
253pub struct AzfileBackend {
254 core: Arc<AzfileCore>,
255}
256
257impl Access for AzfileBackend {
258 type Reader = HttpBody;
259 type Writer = AzfileWriters;
260 type Lister = oio::PageLister<AzfileLister>;
261 type Deleter = oio::OneShotDeleter<AzfileDeleter>;
262
263 fn info(&self) -> Arc<AccessorInfo> {
264 self.core.info.clone()
265 }
266
267 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
268 self.core.ensure_parent_dir_exists(path).await?;
269 let resp = self.core.azfile_create_dir(path).await?;
270 let status = resp.status();
271
272 match status {
273 StatusCode::CREATED => Ok(RpCreateDir::default()),
274 _ => {
275 if resp
281 .headers()
282 .get("x-ms-error-code")
283 .map(|value| value.to_str().unwrap_or(""))
284 .unwrap_or_else(|| "")
285 == "ResourceAlreadyExists"
286 {
287 Ok(RpCreateDir::default())
288 } else {
289 Err(parse_error(resp))
290 }
291 }
292 }
293 }
294
295 async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
296 let resp = if path.ends_with('/') {
297 self.core.azfile_get_directory_properties(path).await?
298 } else {
299 self.core.azfile_get_file_properties(path).await?
300 };
301
302 let status = resp.status();
303 match status {
304 StatusCode::OK => {
305 let meta = parse_into_metadata(path, resp.headers())?;
306 Ok(RpStat::new(meta))
307 }
308 _ => Err(parse_error(resp)),
309 }
310 }
311
312 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
313 let resp = self.core.azfile_read(path, args.range()).await?;
314
315 let status = resp.status();
316 match status {
317 StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
318 _ => {
319 let (part, mut body) = resp.into_parts();
320 let buf = body.to_buffer().await?;
321 Err(parse_error(Response::from_parts(part, buf)))
322 }
323 }
324 }
325
326 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
327 self.core.ensure_parent_dir_exists(path).await?;
328 let w = AzfileWriter::new(self.core.clone(), args.clone(), path.to_string());
329 let w = if args.append() {
330 AzfileWriters::Two(oio::AppendWriter::new(w))
331 } else {
332 AzfileWriters::One(oio::OneShotWriter::new(w))
333 };
334 Ok((RpWrite::default(), w))
335 }
336
337 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
338 Ok((
339 RpDelete::default(),
340 oio::OneShotDeleter::new(AzfileDeleter::new(self.core.clone())),
341 ))
342 }
343
344 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
345 let l = AzfileLister::new(self.core.clone(), path.to_string(), args.limit());
346
347 Ok((RpList::default(), oio::PageLister::new(l)))
348 }
349
350 async fn rename(&self, from: &str, to: &str, _: OpRename) -> Result<RpRename> {
351 self.core.ensure_parent_dir_exists(to).await?;
352 let resp = self.core.azfile_rename(from, to).await?;
353 let status = resp.status();
354 match status {
355 StatusCode::OK => Ok(RpRename::default()),
356 _ => Err(parse_error(resp)),
357 }
358 }
359}