1use std::collections::VecDeque;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use http::header::CONTENT_DISPOSITION;
24use http::header::CONTENT_LENGTH;
25use http::header::CONTENT_TYPE;
26use http::header::RANGE;
27use http::HeaderName;
28use http::HeaderValue;
29use http::Request;
30use http::Response;
31use http::StatusCode;
32use reqsign::AzureStorageCredential;
33use reqsign::AzureStorageLoader;
34use reqsign::AzureStorageSigner;
35
36use super::error::parse_error;
37use crate::raw::*;
38use crate::*;
39
40const X_MS_VERSION: &str = "x-ms-version";
41const X_MS_WRITE: &str = "x-ms-write";
42const X_MS_FILE_RENAME_SOURCE: &str = "x-ms-file-rename-source";
43const X_MS_CONTENT_LENGTH: &str = "x-ms-content-length";
44const X_MS_TYPE: &str = "x-ms-type";
45const X_MS_FILE_RENAME_REPLACE_IF_EXISTS: &str = "x-ms-file-rename-replace-if-exists";
46
47pub struct AzfileCore {
48 pub info: Arc<AccessorInfo>,
49 pub root: String,
50 pub endpoint: String,
51 pub share_name: String,
52 pub loader: AzureStorageLoader,
53 pub signer: AzureStorageSigner,
54}
55
56impl Debug for AzfileCore {
57 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("AzfileCore")
59 .field("root", &self.root)
60 .field("endpoint", &self.endpoint)
61 .field("share_name", &self.share_name)
62 .finish_non_exhaustive()
63 }
64}
65
66impl AzfileCore {
67 async fn load_credential(&self) -> Result<AzureStorageCredential> {
68 let cred = self
69 .loader
70 .load()
71 .await
72 .map_err(new_request_credential_error)?;
73
74 if let Some(cred) = cred {
75 Ok(cred)
76 } else {
77 Err(Error::new(
78 ErrorKind::ConfigInvalid,
79 "no valid credential found",
80 ))
81 }
82 }
83
84 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
85 let cred = self.load_credential().await?;
86 req.headers_mut().insert(
88 HeaderName::from_static(X_MS_VERSION),
89 HeaderValue::from_static("2022-11-02"),
91 );
92 self.signer.sign(req, &cred).map_err(new_request_sign_error)
93 }
94
95 #[inline]
96 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
97 self.info.http_client().send(req).await
98 }
99
100 pub async fn azfile_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
101 let p = build_abs_path(&self.root, path);
102
103 let url = format!(
104 "{}/{}/{}",
105 self.endpoint,
106 self.share_name,
107 percent_encode_path(&p)
108 );
109
110 let mut req = Request::get(&url);
111
112 if !range.is_full() {
113 req = req.header(RANGE, range.to_header());
114 }
115
116 let req = req.extension(Operation::Read);
117
118 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
119 self.sign(&mut req).await?;
120 self.info.http_client().fetch(req).await
121 }
122
123 pub async fn azfile_create_file(
124 &self,
125 path: &str,
126 size: usize,
127 args: &OpWrite,
128 ) -> Result<Response<Buffer>> {
129 let p = build_abs_path(&self.root, path)
130 .trim_start_matches('/')
131 .to_string();
132 let url = format!(
133 "{}/{}/{}",
134 self.endpoint,
135 self.share_name,
136 percent_encode_path(&p)
137 );
138
139 let mut req = Request::put(&url);
140
141 req = req.header(X_MS_CONTENT_LENGTH, size);
144
145 req = req.header(X_MS_TYPE, "file");
146
147 req = req.header(CONTENT_LENGTH, 0);
149
150 if let Some(ty) = args.content_type() {
151 req = req.header(CONTENT_TYPE, ty);
152 }
153
154 if let Some(pos) = args.content_disposition() {
155 req = req.header(CONTENT_DISPOSITION, pos);
156 }
157
158 let req = req.extension(Operation::Write);
159
160 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
161 self.sign(&mut req).await?;
162 self.send(req).await
163 }
164
165 pub async fn azfile_update(
166 &self,
167 path: &str,
168 size: u64,
169 position: u64,
170 body: Buffer,
171 ) -> Result<Response<Buffer>> {
172 let p = build_abs_path(&self.root, path)
173 .trim_start_matches('/')
174 .to_string();
175
176 let url = format!(
177 "{}/{}/{}?comp=range",
178 self.endpoint,
179 self.share_name,
180 percent_encode_path(&p)
181 );
182
183 let mut req = Request::put(&url);
184
185 req = req.header(CONTENT_LENGTH, size);
186
187 req = req.header(X_MS_WRITE, "update");
188
189 req = req.header(
190 RANGE,
191 BytesRange::from(position..position + size).to_header(),
192 );
193
194 let req = req.extension(Operation::Write);
195
196 let mut req = req.body(body).map_err(new_request_build_error)?;
197 self.sign(&mut req).await?;
198 self.send(req).await
199 }
200
201 pub async fn azfile_get_file_properties(&self, path: &str) -> Result<Response<Buffer>> {
202 let p = build_abs_path(&self.root, path);
203 let url = format!(
204 "{}/{}/{}",
205 self.endpoint,
206 self.share_name,
207 percent_encode_path(&p)
208 );
209
210 let req = Request::head(&url);
211
212 let req = req.extension(Operation::Stat);
213
214 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
215 self.sign(&mut req).await?;
216 self.send(req).await
217 }
218
219 pub async fn azfile_get_directory_properties(&self, path: &str) -> Result<Response<Buffer>> {
220 let p = build_abs_path(&self.root, path);
221
222 let url = format!(
223 "{}/{}/{}?restype=directory",
224 self.endpoint,
225 self.share_name,
226 percent_encode_path(&p)
227 );
228
229 let req = Request::head(&url);
230
231 let req = req.extension(Operation::Stat);
232
233 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
234 self.sign(&mut req).await?;
235 self.send(req).await
236 }
237
238 pub async fn azfile_rename(&self, path: &str, new_path: &str) -> Result<Response<Buffer>> {
239 let p = build_abs_path(&self.root, path)
240 .trim_start_matches('/')
241 .to_string();
242
243 let new_p = build_abs_path(&self.root, new_path)
244 .trim_start_matches('/')
245 .to_string();
246
247 let url = if path.ends_with('/') {
248 format!(
249 "{}/{}/{}?restype=directory&comp=rename",
250 self.endpoint,
251 self.share_name,
252 percent_encode_path(&new_p)
253 )
254 } else {
255 format!(
256 "{}/{}/{}?comp=rename",
257 self.endpoint,
258 self.share_name,
259 percent_encode_path(&new_p)
260 )
261 };
262
263 let mut req = Request::put(&url);
264
265 req = req.header(CONTENT_LENGTH, 0);
266
267 let source_url = format!(
273 "{}/{}/{}",
274 self.endpoint,
275 self.share_name,
276 percent_encode_path(&p)
277 );
278
279 req = req.header(X_MS_FILE_RENAME_SOURCE, &source_url);
280
281 req = req.header(X_MS_FILE_RENAME_REPLACE_IF_EXISTS, "true");
282
283 let req = req.extension(Operation::Rename);
284
285 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
286 self.sign(&mut req).await?;
287 self.send(req).await
288 }
289
290 pub async fn azfile_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
291 let p = build_abs_path(&self.root, path)
292 .trim_start_matches('/')
293 .to_string();
294
295 let url = format!(
296 "{}/{}/{}?restype=directory",
297 self.endpoint,
298 self.share_name,
299 percent_encode_path(&p)
300 );
301
302 let mut req = Request::put(&url);
303
304 req = req.header(CONTENT_LENGTH, 0);
305
306 let req = req.extension(Operation::CreateDir);
307
308 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
309 self.sign(&mut req).await?;
310 self.send(req).await
311 }
312
313 pub async fn azfile_delete_file(&self, path: &str) -> Result<Response<Buffer>> {
314 let p = build_abs_path(&self.root, path)
315 .trim_start_matches('/')
316 .to_string();
317
318 let url = format!(
319 "{}/{}/{}",
320 self.endpoint,
321 self.share_name,
322 percent_encode_path(&p)
323 );
324
325 let req = Request::delete(&url);
326
327 let req = req.extension(Operation::Delete);
328
329 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
330 self.sign(&mut req).await?;
331 self.send(req).await
332 }
333
334 pub async fn azfile_delete_dir(&self, path: &str) -> Result<Response<Buffer>> {
335 let p = build_abs_path(&self.root, path)
336 .trim_start_matches('/')
337 .to_string();
338
339 let url = format!(
340 "{}/{}/{}?restype=directory",
341 self.endpoint,
342 self.share_name,
343 percent_encode_path(&p)
344 );
345
346 let req = Request::delete(&url);
347
348 let req = req.extension(Operation::Delete);
349
350 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
351 self.sign(&mut req).await?;
352 self.send(req).await
353 }
354
355 pub async fn azfile_list(
356 &self,
357 path: &str,
358 limit: &Option<usize>,
359 continuation: &str,
360 ) -> Result<Response<Buffer>> {
361 let p = build_abs_path(&self.root, path)
362 .trim_start_matches('/')
363 .to_string();
364
365 let url = format!(
366 "{}/{}/{}",
367 self.endpoint,
368 self.share_name,
369 percent_encode_path(&p),
370 );
371
372 let mut url = QueryPairsWriter::new(&url)
373 .push("restype", "directory")
374 .push("comp", "list")
375 .push("include", "Timestamps,ETag");
376
377 if !continuation.is_empty() {
378 url = url.push("marker", continuation);
379 }
380
381 if let Some(limit) = limit {
382 url = url.push("maxresults", &limit.to_string());
383 }
384
385 let req = Request::get(url.finish());
386
387 let req = req.extension(Operation::List);
388
389 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
390 self.sign(&mut req).await?;
391 self.send(req).await
392 }
393
394 pub async fn ensure_parent_dir_exists(&self, path: &str) -> Result<()> {
395 let mut dirs = VecDeque::default();
396 let mut p = path;
398 while p != "/" {
399 p = get_parent(p);
400 dirs.push_front(p);
401 }
402
403 let mut pop_dir_count = dirs.len();
404 for dir in dirs.iter().rev() {
405 let resp = self.azfile_get_directory_properties(dir).await?;
406 if resp.status() == StatusCode::NOT_FOUND {
407 pop_dir_count -= 1;
408 continue;
409 }
410 break;
411 }
412
413 for dir in dirs.iter().skip(pop_dir_count) {
414 let resp = self.azfile_create_dir(dir).await?;
415
416 if resp.status() == StatusCode::CREATED {
417 continue;
418 }
419
420 if resp
421 .headers()
422 .get("x-ms-error-code")
423 .map(|value| value.to_str().unwrap_or(""))
424 .unwrap_or_else(|| "")
425 == "ResourceAlreadyExists"
426 {
427 continue;
428 }
429
430 return Err(parse_error(resp));
431 }
432
433 Ok(())
434 }
435}