opendal/services/azfile/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use http::header::CONTENT_DISPOSITION;
19use http::header::CONTENT_LENGTH;
20use http::header::CONTENT_TYPE;
21use http::header::RANGE;
22use http::HeaderName;
23use http::HeaderValue;
24use http::Request;
25use http::Response;
26use http::StatusCode;
27use reqsign::AzureStorageCredential;
28use reqsign::AzureStorageLoader;
29use reqsign::AzureStorageSigner;
30use std::collections::VecDeque;
31use std::fmt::Debug;
32use std::fmt::Formatter;
33use std::sync::Arc;
34
35use super::error::parse_error;
36use crate::raw::*;
37use crate::*;
38
39const X_MS_VERSION: &str = "x-ms-version";
40const X_MS_WRITE: &str = "x-ms-write";
41const X_MS_FILE_RENAME_SOURCE: &str = "x-ms-file-rename-source";
42const X_MS_CONTENT_LENGTH: &str = "x-ms-content-length";
43const X_MS_TYPE: &str = "x-ms-type";
44const X_MS_FILE_RENAME_REPLACE_IF_EXISTS: &str = "x-ms-file-rename-replace-if-exists";
45
46pub struct AzfileCore {
47    pub info: Arc<AccessorInfo>,
48    pub root: String,
49    pub endpoint: String,
50    pub share_name: String,
51    pub loader: AzureStorageLoader,
52    pub signer: AzureStorageSigner,
53}
54
55impl Debug for AzfileCore {
56    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct("AzfileCore")
58            .field("root", &self.root)
59            .field("endpoint", &self.endpoint)
60            .field("share_name", &self.share_name)
61            .finish_non_exhaustive()
62    }
63}
64
65impl AzfileCore {
66    async fn load_credential(&self) -> Result<AzureStorageCredential> {
67        let cred = self
68            .loader
69            .load()
70            .await
71            .map_err(new_request_credential_error)?;
72
73        if let Some(cred) = cred {
74            Ok(cred)
75        } else {
76            Err(Error::new(
77                ErrorKind::ConfigInvalid,
78                "no valid credential found",
79            ))
80        }
81    }
82
83    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
84        let cred = self.load_credential().await?;
85        // Insert x-ms-version header for normal requests.
86        req.headers_mut().insert(
87            HeaderName::from_static(X_MS_VERSION),
88            // consistent with azdls and azblob
89            HeaderValue::from_static("2022-11-02"),
90        );
91        self.signer.sign(req, &cred).map_err(new_request_sign_error)
92    }
93
94    #[inline]
95    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
96        self.info.http_client().send(req).await
97    }
98
99    pub async fn azfile_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
100        let p = build_abs_path(&self.root, path);
101
102        let url = format!(
103            "{}/{}/{}",
104            self.endpoint,
105            self.share_name,
106            percent_encode_path(&p)
107        );
108
109        let mut req = Request::get(&url);
110
111        if !range.is_full() {
112            req = req.header(RANGE, range.to_header());
113        }
114
115        let req = req.extension(Operation::Read);
116
117        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
118        self.sign(&mut req).await?;
119        self.info.http_client().fetch(req).await
120    }
121
122    pub async fn azfile_create_file(
123        &self,
124        path: &str,
125        size: usize,
126        args: &OpWrite,
127    ) -> Result<Response<Buffer>> {
128        let p = build_abs_path(&self.root, path)
129            .trim_start_matches('/')
130            .to_string();
131        let url = format!(
132            "{}/{}/{}",
133            self.endpoint,
134            self.share_name,
135            percent_encode_path(&p)
136        );
137
138        let mut req = Request::put(&url);
139
140        // x-ms-content-length specifies the maximum size for the file, up to 4 tebibytes (TiB)
141        // https://learn.microsoft.com/en-us/rest/api/storageservices/create-file
142        req = req.header(X_MS_CONTENT_LENGTH, size);
143
144        req = req.header(X_MS_TYPE, "file");
145
146        // Content length must be 0 for create request.
147        req = req.header(CONTENT_LENGTH, 0);
148
149        if let Some(ty) = args.content_type() {
150            req = req.header(CONTENT_TYPE, ty);
151        }
152
153        if let Some(pos) = args.content_disposition() {
154            req = req.header(CONTENT_DISPOSITION, pos);
155        }
156
157        let req = req.extension(Operation::Write);
158
159        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
160        self.sign(&mut req).await?;
161        self.send(req).await
162    }
163
164    pub async fn azfile_update(
165        &self,
166        path: &str,
167        size: u64,
168        position: u64,
169        body: Buffer,
170    ) -> Result<Response<Buffer>> {
171        let p = build_abs_path(&self.root, path)
172            .trim_start_matches('/')
173            .to_string();
174
175        let url = format!(
176            "{}/{}/{}?comp=range",
177            self.endpoint,
178            self.share_name,
179            percent_encode_path(&p)
180        );
181
182        let mut req = Request::put(&url);
183
184        req = req.header(CONTENT_LENGTH, size);
185
186        req = req.header(X_MS_WRITE, "update");
187
188        req = req.header(
189            RANGE,
190            BytesRange::from(position..position + size).to_header(),
191        );
192
193        let req = req.extension(Operation::Write);
194
195        let mut req = req.body(body).map_err(new_request_build_error)?;
196        self.sign(&mut req).await?;
197        self.send(req).await
198    }
199
200    pub async fn azfile_get_file_properties(&self, path: &str) -> Result<Response<Buffer>> {
201        let p = build_abs_path(&self.root, path);
202        let url = format!(
203            "{}/{}/{}",
204            self.endpoint,
205            self.share_name,
206            percent_encode_path(&p)
207        );
208
209        let req = Request::head(&url);
210
211        let req = req.extension(Operation::Stat);
212
213        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
214        self.sign(&mut req).await?;
215        self.send(req).await
216    }
217
218    pub async fn azfile_get_directory_properties(&self, path: &str) -> Result<Response<Buffer>> {
219        let p = build_abs_path(&self.root, path);
220
221        let url = format!(
222            "{}/{}/{}?restype=directory",
223            self.endpoint,
224            self.share_name,
225            percent_encode_path(&p)
226        );
227
228        let req = Request::head(&url);
229
230        let req = req.extension(Operation::Stat);
231
232        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
233        self.sign(&mut req).await?;
234        self.send(req).await
235    }
236
237    pub async fn azfile_rename(&self, path: &str, new_path: &str) -> Result<Response<Buffer>> {
238        let p = build_abs_path(&self.root, path)
239            .trim_start_matches('/')
240            .to_string();
241
242        let new_p = build_abs_path(&self.root, new_path)
243            .trim_start_matches('/')
244            .to_string();
245
246        let url = if path.ends_with('/') {
247            format!(
248                "{}/{}/{}?restype=directory&comp=rename",
249                self.endpoint,
250                self.share_name,
251                percent_encode_path(&new_p)
252            )
253        } else {
254            format!(
255                "{}/{}/{}?comp=rename",
256                self.endpoint,
257                self.share_name,
258                percent_encode_path(&new_p)
259            )
260        };
261
262        let mut req = Request::put(&url);
263
264        req = req.header(CONTENT_LENGTH, 0);
265
266        // x-ms-file-rename-source specifies the file or directory to be renamed.
267        // the value must be a URL style path
268        // the official document does not mention the URL style path
269        // find the solution from the community FAQ and implementation of the Java-SDK
270        // ref: https://learn.microsoft.com/en-us/answers/questions/799611/azure-file-service-rest-api(rename)?page=1
271        let source_url = format!(
272            "{}/{}/{}",
273            self.endpoint,
274            self.share_name,
275            percent_encode_path(&p)
276        );
277
278        req = req.header(X_MS_FILE_RENAME_SOURCE, &source_url);
279
280        req = req.header(X_MS_FILE_RENAME_REPLACE_IF_EXISTS, "true");
281
282        let req = req.extension(Operation::Rename);
283
284        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
285        self.sign(&mut req).await?;
286        self.send(req).await
287    }
288
289    pub async fn azfile_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
290        let p = build_abs_path(&self.root, path)
291            .trim_start_matches('/')
292            .to_string();
293
294        let url = format!(
295            "{}/{}/{}?restype=directory",
296            self.endpoint,
297            self.share_name,
298            percent_encode_path(&p)
299        );
300
301        let mut req = Request::put(&url);
302
303        req = req.header(CONTENT_LENGTH, 0);
304
305        let req = req.extension(Operation::CreateDir);
306
307        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
308        self.sign(&mut req).await?;
309        self.send(req).await
310    }
311
312    pub async fn azfile_delete_file(&self, path: &str) -> Result<Response<Buffer>> {
313        let p = build_abs_path(&self.root, path)
314            .trim_start_matches('/')
315            .to_string();
316
317        let url = format!(
318            "{}/{}/{}",
319            self.endpoint,
320            self.share_name,
321            percent_encode_path(&p)
322        );
323
324        let req = Request::delete(&url);
325
326        let req = req.extension(Operation::Delete);
327
328        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
329        self.sign(&mut req).await?;
330        self.send(req).await
331    }
332
333    pub async fn azfile_delete_dir(&self, path: &str) -> Result<Response<Buffer>> {
334        let p = build_abs_path(&self.root, path)
335            .trim_start_matches('/')
336            .to_string();
337
338        let url = format!(
339            "{}/{}/{}?restype=directory",
340            self.endpoint,
341            self.share_name,
342            percent_encode_path(&p)
343        );
344
345        let req = Request::delete(&url);
346
347        let req = req.extension(Operation::Delete);
348
349        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
350        self.sign(&mut req).await?;
351        self.send(req).await
352    }
353
354    pub async fn azfile_list(
355        &self,
356        path: &str,
357        limit: &Option<usize>,
358        continuation: &str,
359    ) -> Result<Response<Buffer>> {
360        let p = build_abs_path(&self.root, path)
361            .trim_start_matches('/')
362            .to_string();
363
364        let url = format!(
365            "{}/{}/{}",
366            self.endpoint,
367            self.share_name,
368            percent_encode_path(&p),
369        );
370
371        let mut url = QueryPairsWriter::new(&url)
372            .push("restype", "directory")
373            .push("comp", "list")
374            .push("include", "Timestamps,ETag");
375
376        if !continuation.is_empty() {
377            url = url.push("marker", continuation);
378        }
379
380        if let Some(limit) = limit {
381            url = url.push("maxresults", &limit.to_string());
382        }
383
384        let req = Request::get(url.finish());
385
386        let req = req.extension(Operation::List);
387
388        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
389        self.sign(&mut req).await?;
390        self.send(req).await
391    }
392
393    pub async fn ensure_parent_dir_exists(&self, path: &str) -> Result<()> {
394        let mut dirs = VecDeque::default();
395        // azure file service does not support recursive directory creation
396        let mut p = path;
397        while p != "/" {
398            p = get_parent(p);
399            dirs.push_front(p);
400        }
401
402        let mut pop_dir_count = dirs.len();
403        for dir in dirs.iter().rev() {
404            let resp = self.azfile_get_directory_properties(dir).await?;
405            if resp.status() == StatusCode::NOT_FOUND {
406                pop_dir_count -= 1;
407                continue;
408            }
409            break;
410        }
411
412        for dir in dirs.iter().skip(pop_dir_count) {
413            let resp = self.azfile_create_dir(dir).await?;
414
415            if resp.status() == StatusCode::CREATED {
416                continue;
417            }
418
419            if resp
420                .headers()
421                .get("x-ms-error-code")
422                .map(|value| value.to_str().unwrap_or(""))
423                .unwrap_or_else(|| "")
424                == "ResourceAlreadyExists"
425            {
426                continue;
427            }
428
429            return Err(parse_error(resp));
430        }
431
432        Ok(())
433    }
434}