1use http::header::CONTENT_DISPOSITION;
19use http::header::CONTENT_LENGTH;
20use http::header::CONTENT_TYPE;
21use http::header::RANGE;
22use http::HeaderName;
23use http::HeaderValue;
24use http::Request;
25use http::Response;
26use http::StatusCode;
27use reqsign::AzureStorageCredential;
28use reqsign::AzureStorageLoader;
29use reqsign::AzureStorageSigner;
30use std::collections::VecDeque;
31use std::fmt::Debug;
32use std::fmt::Formatter;
33use std::fmt::Write;
34use std::sync::Arc;
35
36use super::error::parse_error;
37use crate::raw::*;
38use crate::*;
39
40const X_MS_VERSION: &str = "x-ms-version";
41const X_MS_WRITE: &str = "x-ms-write";
42const X_MS_FILE_RENAME_SOURCE: &str = "x-ms-file-rename-source";
43const X_MS_CONTENT_LENGTH: &str = "x-ms-content-length";
44const X_MS_TYPE: &str = "x-ms-type";
45const X_MS_FILE_RENAME_REPLACE_IF_EXISTS: &str = "x-ms-file-rename-replace-if-exists";
46
47pub struct AzfileCore {
48 pub info: Arc<AccessorInfo>,
49 pub root: String,
50 pub endpoint: String,
51 pub share_name: String,
52 pub loader: AzureStorageLoader,
53 pub signer: AzureStorageSigner,
54}
55
56impl Debug for AzfileCore {
57 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("AzfileCore")
59 .field("root", &self.root)
60 .field("endpoint", &self.endpoint)
61 .field("share_name", &self.share_name)
62 .finish_non_exhaustive()
63 }
64}
65
66impl AzfileCore {
67 async fn load_credential(&self) -> Result<AzureStorageCredential> {
68 let cred = self
69 .loader
70 .load()
71 .await
72 .map_err(new_request_credential_error)?;
73
74 if let Some(cred) = cred {
75 Ok(cred)
76 } else {
77 Err(Error::new(
78 ErrorKind::ConfigInvalid,
79 "no valid credential found",
80 ))
81 }
82 }
83
84 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
85 let cred = self.load_credential().await?;
86 req.headers_mut().insert(
88 HeaderName::from_static(X_MS_VERSION),
89 HeaderValue::from_static("2022-11-02"),
91 );
92 self.signer.sign(req, &cred).map_err(new_request_sign_error)
93 }
94
95 #[inline]
96 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
97 self.info.http_client().send(req).await
98 }
99
100 pub async fn azfile_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
101 let p = build_abs_path(&self.root, path);
102
103 let url = format!(
104 "{}/{}/{}",
105 self.endpoint,
106 self.share_name,
107 percent_encode_path(&p)
108 );
109
110 let mut req = Request::get(&url);
111
112 if !range.is_full() {
113 req = req.header(RANGE, range.to_header());
114 }
115
116 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
117 self.sign(&mut req).await?;
118 self.info.http_client().fetch(req).await
119 }
120
121 pub async fn azfile_create_file(
122 &self,
123 path: &str,
124 size: usize,
125 args: &OpWrite,
126 ) -> Result<Response<Buffer>> {
127 let p = build_abs_path(&self.root, path)
128 .trim_start_matches('/')
129 .to_string();
130 let url = format!(
131 "{}/{}/{}",
132 self.endpoint,
133 self.share_name,
134 percent_encode_path(&p)
135 );
136
137 let mut req = Request::put(&url);
138
139 req = req.header(X_MS_CONTENT_LENGTH, size);
142
143 req = req.header(X_MS_TYPE, "file");
144
145 req = req.header(CONTENT_LENGTH, 0);
147
148 if let Some(ty) = args.content_type() {
149 req = req.header(CONTENT_TYPE, ty);
150 }
151
152 if let Some(pos) = args.content_disposition() {
153 req = req.header(CONTENT_DISPOSITION, pos);
154 }
155
156 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
157 self.sign(&mut req).await?;
158 self.send(req).await
159 }
160
161 pub async fn azfile_update(
162 &self,
163 path: &str,
164 size: u64,
165 position: u64,
166 body: Buffer,
167 ) -> Result<Response<Buffer>> {
168 let p = build_abs_path(&self.root, path)
169 .trim_start_matches('/')
170 .to_string();
171
172 let url = format!(
173 "{}/{}/{}?comp=range",
174 self.endpoint,
175 self.share_name,
176 percent_encode_path(&p)
177 );
178
179 let mut req = Request::put(&url);
180
181 req = req.header(CONTENT_LENGTH, size);
182
183 req = req.header(X_MS_WRITE, "update");
184
185 req = req.header(
186 RANGE,
187 BytesRange::from(position..position + size).to_header(),
188 );
189
190 let mut req = req.body(body).map_err(new_request_build_error)?;
191 self.sign(&mut req).await?;
192 self.send(req).await
193 }
194
195 pub async fn azfile_get_file_properties(&self, path: &str) -> Result<Response<Buffer>> {
196 let p = build_abs_path(&self.root, path);
197 let url = format!(
198 "{}/{}/{}",
199 self.endpoint,
200 self.share_name,
201 percent_encode_path(&p)
202 );
203
204 let req = Request::head(&url);
205
206 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
207 self.sign(&mut req).await?;
208 self.send(req).await
209 }
210
211 pub async fn azfile_get_directory_properties(&self, path: &str) -> Result<Response<Buffer>> {
212 let p = build_abs_path(&self.root, path);
213
214 let url = format!(
215 "{}/{}/{}?restype=directory",
216 self.endpoint,
217 self.share_name,
218 percent_encode_path(&p)
219 );
220
221 let req = Request::head(&url);
222
223 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
224 self.sign(&mut req).await?;
225 self.send(req).await
226 }
227
228 pub async fn azfile_rename(&self, path: &str, new_path: &str) -> Result<Response<Buffer>> {
229 let p = build_abs_path(&self.root, path)
230 .trim_start_matches('/')
231 .to_string();
232
233 let new_p = build_abs_path(&self.root, new_path)
234 .trim_start_matches('/')
235 .to_string();
236
237 let url = if path.ends_with('/') {
238 format!(
239 "{}/{}/{}?restype=directory&comp=rename",
240 self.endpoint,
241 self.share_name,
242 percent_encode_path(&new_p)
243 )
244 } else {
245 format!(
246 "{}/{}/{}?comp=rename",
247 self.endpoint,
248 self.share_name,
249 percent_encode_path(&new_p)
250 )
251 };
252
253 let mut req = Request::put(&url);
254
255 req = req.header(CONTENT_LENGTH, 0);
256
257 let source_url = format!(
263 "{}/{}/{}",
264 self.endpoint,
265 self.share_name,
266 percent_encode_path(&p)
267 );
268
269 req = req.header(X_MS_FILE_RENAME_SOURCE, &source_url);
270
271 req = req.header(X_MS_FILE_RENAME_REPLACE_IF_EXISTS, "true");
272
273 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
274 self.sign(&mut req).await?;
275 self.send(req).await
276 }
277
278 pub async fn azfile_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
279 let p = build_abs_path(&self.root, path)
280 .trim_start_matches('/')
281 .to_string();
282
283 let url = format!(
284 "{}/{}/{}?restype=directory",
285 self.endpoint,
286 self.share_name,
287 percent_encode_path(&p)
288 );
289
290 let mut req = Request::put(&url);
291
292 req = req.header(CONTENT_LENGTH, 0);
293
294 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
295 self.sign(&mut req).await?;
296 self.send(req).await
297 }
298
299 pub async fn azfile_delete_file(&self, path: &str) -> Result<Response<Buffer>> {
300 let p = build_abs_path(&self.root, path)
301 .trim_start_matches('/')
302 .to_string();
303
304 let url = format!(
305 "{}/{}/{}",
306 self.endpoint,
307 self.share_name,
308 percent_encode_path(&p)
309 );
310
311 let req = Request::delete(&url);
312
313 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
314 self.sign(&mut req).await?;
315 self.send(req).await
316 }
317
318 pub async fn azfile_delete_dir(&self, path: &str) -> Result<Response<Buffer>> {
319 let p = build_abs_path(&self.root, path)
320 .trim_start_matches('/')
321 .to_string();
322
323 let url = format!(
324 "{}/{}/{}?restype=directory",
325 self.endpoint,
326 self.share_name,
327 percent_encode_path(&p)
328 );
329
330 let req = Request::delete(&url);
331
332 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
333 self.sign(&mut req).await?;
334 self.send(req).await
335 }
336
337 pub async fn azfile_list(
338 &self,
339 path: &str,
340 limit: &Option<usize>,
341 continuation: &String,
342 ) -> Result<Response<Buffer>> {
343 let p = build_abs_path(&self.root, path)
344 .trim_start_matches('/')
345 .to_string();
346
347 let mut url = format!(
348 "{}/{}/{}?restype=directory&comp=list&include=Timestamps,ETag",
349 self.endpoint,
350 self.share_name,
351 percent_encode_path(&p),
352 );
353
354 if !continuation.is_empty() {
355 write!(url, "&marker={}", &continuation).expect("write into string must succeed");
356 }
357
358 if let Some(limit) = limit {
359 write!(url, "&maxresults={}", limit).expect("write into string must succeed");
360 }
361
362 let req = Request::get(&url);
363
364 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
365 self.sign(&mut req).await?;
366 self.send(req).await
367 }
368
369 pub async fn ensure_parent_dir_exists(&self, path: &str) -> Result<()> {
370 let mut dirs = VecDeque::default();
371 let mut p = path;
373 while p != "/" {
374 p = get_parent(p);
375 dirs.push_front(p);
376 }
377
378 let mut pop_dir_count = dirs.len();
379 for dir in dirs.iter().rev() {
380 let resp = self.azfile_get_directory_properties(dir).await?;
381 if resp.status() == StatusCode::NOT_FOUND {
382 pop_dir_count -= 1;
383 continue;
384 }
385 break;
386 }
387
388 for dir in dirs.iter().skip(pop_dir_count) {
389 let resp = self.azfile_create_dir(dir).await?;
390
391 if resp.status() == StatusCode::CREATED {
392 continue;
393 }
394
395 if resp
396 .headers()
397 .get("x-ms-error-code")
398 .map(|value| value.to_str().unwrap_or(""))
399 .unwrap_or_else(|| "")
400 == "ResourceAlreadyExists"
401 {
402 continue;
403 }
404
405 return Err(parse_error(resp));
406 }
407
408 Ok(())
409 }
410}