opendal_core/services/azfile/
core.rs1use std::collections::VecDeque;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use http::HeaderName;
23use http::HeaderValue;
24use http::Request;
25use http::Response;
26use http::StatusCode;
27use http::header::CONTENT_DISPOSITION;
28use http::header::CONTENT_LENGTH;
29use http::header::CONTENT_TYPE;
30use http::header::RANGE;
31use reqsign::AzureStorageCredential;
32use reqsign::AzureStorageLoader;
33use reqsign::AzureStorageSigner;
34
35use super::error::parse_error;
36use crate::raw::*;
37use crate::*;
38
39const X_MS_VERSION: &str = "x-ms-version";
40const X_MS_WRITE: &str = "x-ms-write";
41const X_MS_FILE_RENAME_SOURCE: &str = "x-ms-file-rename-source";
42const X_MS_CONTENT_LENGTH: &str = "x-ms-content-length";
43const X_MS_TYPE: &str = "x-ms-type";
44const X_MS_FILE_RENAME_REPLACE_IF_EXISTS: &str = "x-ms-file-rename-replace-if-exists";
45pub const X_MS_META_PREFIX: &str = "x-ms-meta-";
46
47pub struct AzfileCore {
48 pub info: Arc<AccessorInfo>,
49 pub root: String,
50 pub endpoint: String,
51 pub share_name: String,
52 pub loader: AzureStorageLoader,
53 pub signer: AzureStorageSigner,
54}
55
56impl Debug for AzfileCore {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("AzfileCore")
59 .field("root", &self.root)
60 .field("endpoint", &self.endpoint)
61 .field("share_name", &self.share_name)
62 .finish_non_exhaustive()
63 }
64}
65
66impl AzfileCore {
67 async fn load_credential(&self) -> Result<AzureStorageCredential> {
68 let cred = self
69 .loader
70 .load()
71 .await
72 .map_err(new_request_credential_error)?;
73
74 if let Some(cred) = cred {
75 Ok(cred)
76 } else {
77 Err(Error::new(
78 ErrorKind::ConfigInvalid,
79 "no valid credential found",
80 ))
81 }
82 }
83
84 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
85 let cred = self.load_credential().await?;
86 req.headers_mut().insert(
88 HeaderName::from_static(X_MS_VERSION),
89 HeaderValue::from_static("2022-11-02"),
91 );
92 self.signer.sign(req, &cred).map_err(new_request_sign_error)
93 }
94
95 #[inline]
96 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
97 self.info.http_client().send(req).await
98 }
99
100 pub async fn azfile_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
101 let p = build_abs_path(&self.root, path);
102
103 let url = format!(
104 "{}/{}/{}",
105 self.endpoint,
106 self.share_name,
107 percent_encode_path(&p)
108 );
109
110 let mut req = Request::get(&url);
111
112 if !range.is_full() {
113 req = req.header(RANGE, range.to_header());
114 }
115
116 let req = req.extension(Operation::Read);
117
118 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
119 self.sign(&mut req).await?;
120 self.info.http_client().fetch(req).await
121 }
122
123 pub async fn azfile_create_file(
124 &self,
125 path: &str,
126 size: usize,
127 args: &OpWrite,
128 ) -> Result<Response<Buffer>> {
129 let p = build_abs_path(&self.root, path)
130 .trim_start_matches('/')
131 .to_string();
132 let url = format!(
133 "{}/{}/{}",
134 self.endpoint,
135 self.share_name,
136 percent_encode_path(&p)
137 );
138
139 let mut req = Request::put(&url);
140
141 req = req.header(X_MS_CONTENT_LENGTH, size);
144
145 req = req.header(X_MS_TYPE, "file");
146
147 req = req.header(CONTENT_LENGTH, 0);
149
150 if let Some(ty) = args.content_type() {
151 req = req.header(CONTENT_TYPE, ty);
152 }
153
154 if let Some(pos) = args.content_disposition() {
155 req = req.header(CONTENT_DISPOSITION, pos);
156 }
157
158 if let Some(user_metadata) = args.user_metadata() {
160 for (key, value) in user_metadata {
161 req = req.header(format!("{X_MS_META_PREFIX}{key}"), value);
162 }
163 }
164
165 let req = req.extension(Operation::Write);
166
167 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
168 self.sign(&mut req).await?;
169 self.send(req).await
170 }
171
172 pub async fn azfile_update(
173 &self,
174 path: &str,
175 size: u64,
176 position: u64,
177 body: Buffer,
178 ) -> Result<Response<Buffer>> {
179 let p = build_abs_path(&self.root, path)
180 .trim_start_matches('/')
181 .to_string();
182
183 let url = format!(
184 "{}/{}/{}?comp=range",
185 self.endpoint,
186 self.share_name,
187 percent_encode_path(&p)
188 );
189
190 let mut req = Request::put(&url);
191
192 req = req.header(CONTENT_LENGTH, size);
193
194 req = req.header(X_MS_WRITE, "update");
195
196 req = req.header(
197 RANGE,
198 BytesRange::from(position..position + size).to_header(),
199 );
200
201 let req = req.extension(Operation::Write);
202
203 let mut req = req.body(body).map_err(new_request_build_error)?;
204 self.sign(&mut req).await?;
205 self.send(req).await
206 }
207
208 pub async fn azfile_get_file_properties(&self, path: &str) -> Result<Response<Buffer>> {
209 let p = build_abs_path(&self.root, path);
210 let url = format!(
211 "{}/{}/{}",
212 self.endpoint,
213 self.share_name,
214 percent_encode_path(&p)
215 );
216
217 let req = Request::head(&url);
218
219 let req = req.extension(Operation::Stat);
220
221 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
222 self.sign(&mut req).await?;
223 self.send(req).await
224 }
225
226 pub async fn azfile_get_directory_properties(&self, path: &str) -> Result<Response<Buffer>> {
227 let p = build_abs_path(&self.root, path);
228
229 let url = format!(
230 "{}/{}/{}?restype=directory",
231 self.endpoint,
232 self.share_name,
233 percent_encode_path(&p)
234 );
235
236 let req = Request::head(&url);
237
238 let req = req.extension(Operation::Stat);
239
240 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
241 self.sign(&mut req).await?;
242 self.send(req).await
243 }
244
245 pub async fn azfile_rename(&self, path: &str, new_path: &str) -> Result<Response<Buffer>> {
246 let p = build_abs_path(&self.root, path)
247 .trim_start_matches('/')
248 .to_string();
249
250 let new_p = build_abs_path(&self.root, new_path)
251 .trim_start_matches('/')
252 .to_string();
253
254 let url = if path.ends_with('/') {
255 format!(
256 "{}/{}/{}?restype=directory&comp=rename",
257 self.endpoint,
258 self.share_name,
259 percent_encode_path(&new_p)
260 )
261 } else {
262 format!(
263 "{}/{}/{}?comp=rename",
264 self.endpoint,
265 self.share_name,
266 percent_encode_path(&new_p)
267 )
268 };
269
270 let mut req = Request::put(&url);
271
272 req = req.header(CONTENT_LENGTH, 0);
273
274 let source_url = format!(
280 "{}/{}/{}",
281 self.endpoint,
282 self.share_name,
283 percent_encode_path(&p)
284 );
285
286 req = req.header(X_MS_FILE_RENAME_SOURCE, &source_url);
287
288 req = req.header(X_MS_FILE_RENAME_REPLACE_IF_EXISTS, "true");
289
290 let req = req.extension(Operation::Rename);
291
292 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
293 self.sign(&mut req).await?;
294 self.send(req).await
295 }
296
297 pub async fn azfile_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
298 let p = build_abs_path(&self.root, path)
299 .trim_start_matches('/')
300 .to_string();
301
302 let url = format!(
303 "{}/{}/{}?restype=directory",
304 self.endpoint,
305 self.share_name,
306 percent_encode_path(&p)
307 );
308
309 let mut req = Request::put(&url);
310
311 req = req.header(CONTENT_LENGTH, 0);
312
313 let req = req.extension(Operation::CreateDir);
314
315 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
316 self.sign(&mut req).await?;
317 self.send(req).await
318 }
319
320 pub async fn azfile_delete_file(&self, path: &str) -> Result<Response<Buffer>> {
321 let p = build_abs_path(&self.root, path)
322 .trim_start_matches('/')
323 .to_string();
324
325 let url = format!(
326 "{}/{}/{}",
327 self.endpoint,
328 self.share_name,
329 percent_encode_path(&p)
330 );
331
332 let req = Request::delete(&url);
333
334 let req = req.extension(Operation::Delete);
335
336 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
337 self.sign(&mut req).await?;
338 self.send(req).await
339 }
340
341 pub async fn azfile_delete_dir(&self, path: &str) -> Result<Response<Buffer>> {
342 let p = build_abs_path(&self.root, path)
343 .trim_start_matches('/')
344 .to_string();
345
346 let url = format!(
347 "{}/{}/{}?restype=directory",
348 self.endpoint,
349 self.share_name,
350 percent_encode_path(&p)
351 );
352
353 let req = Request::delete(&url);
354
355 let req = req.extension(Operation::Delete);
356
357 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
358 self.sign(&mut req).await?;
359 self.send(req).await
360 }
361
362 pub async fn azfile_list(
363 &self,
364 path: &str,
365 limit: &Option<usize>,
366 continuation: &str,
367 ) -> Result<Response<Buffer>> {
368 let p = build_abs_path(&self.root, path)
369 .trim_start_matches('/')
370 .to_string();
371
372 let url = format!(
373 "{}/{}/{}",
374 self.endpoint,
375 self.share_name,
376 percent_encode_path(&p),
377 );
378
379 let mut url = QueryPairsWriter::new(&url)
380 .push("restype", "directory")
381 .push("comp", "list")
382 .push("include", "Timestamps,ETag");
383
384 if !continuation.is_empty() {
385 url = url.push("marker", continuation);
386 }
387
388 if let Some(limit) = limit {
389 url = url.push("maxresults", &limit.to_string());
390 }
391
392 let req = Request::get(url.finish());
393
394 let req = req.extension(Operation::List);
395
396 let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
397 self.sign(&mut req).await?;
398 self.send(req).await
399 }
400
401 pub async fn ensure_parent_dir_exists(&self, path: &str) -> Result<()> {
402 let mut dirs = VecDeque::default();
403 let mut p = path;
405 while p != "/" {
406 p = get_parent(p);
407 dirs.push_front(p);
408 }
409
410 let mut pop_dir_count = dirs.len();
411 for dir in dirs.iter().rev() {
412 let resp = self.azfile_get_directory_properties(dir).await?;
413 if resp.status() == StatusCode::NOT_FOUND {
414 pop_dir_count -= 1;
415 continue;
416 }
417 break;
418 }
419
420 for dir in dirs.iter().skip(pop_dir_count) {
421 let resp = self.azfile_create_dir(dir).await?;
422
423 if resp.status() == StatusCode::CREATED {
424 continue;
425 }
426
427 if resp
428 .headers()
429 .get("x-ms-error-code")
430 .map(|value| value.to_str().unwrap_or(""))
431 .unwrap_or_else(|| "")
432 == "ResourceAlreadyExists"
433 {
434 continue;
435 }
436
437 return Err(parse_error(resp));
438 }
439
440 Ok(())
441 }
442}