opendal_core/services/azdls/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use http::HeaderName;
22use http::HeaderValue;
23use http::Request;
24use http::Response;
25use http::StatusCode;
26use http::header::CONTENT_DISPOSITION;
27use http::header::CONTENT_LENGTH;
28use http::header::CONTENT_TYPE;
29use http::header::IF_NONE_MATCH;
30use reqsign::AzureStorageCredential;
31use reqsign::AzureStorageLoader;
32use reqsign::AzureStorageSigner;
33
34use super::error::parse_error;
35use crate::raw::*;
36use crate::*;
37
38const X_MS_RENAME_SOURCE: &str = "x-ms-rename-source";
39const X_MS_VERSION: &str = "x-ms-version";
40pub const X_MS_VERSION_ID: &str = "x-ms-version-id";
41pub const DIRECTORY: &str = "directory";
42pub const FILE: &str = "file";
43
44pub struct AzdlsCore {
45    pub info: Arc<AccessorInfo>,
46    pub filesystem: String,
47    pub root: String,
48    pub endpoint: String,
49
50    pub loader: AzureStorageLoader,
51    pub signer: AzureStorageSigner,
52}
53
54impl Debug for AzdlsCore {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("AzdlsCore")
57            .field("filesystem", &self.filesystem)
58            .field("root", &self.root)
59            .field("endpoint", &self.endpoint)
60            .finish_non_exhaustive()
61    }
62}
63
64impl AzdlsCore {
65    async fn load_credential(&self) -> Result<AzureStorageCredential> {
66        let cred = self
67            .loader
68            .load()
69            .await
70            .map_err(new_request_credential_error)?;
71
72        if let Some(cred) = cred {
73            Ok(cred)
74        } else {
75            Err(Error::new(
76                ErrorKind::ConfigInvalid,
77                "no valid credential found",
78            ))
79        }
80    }
81
82    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
83        let cred = self.load_credential().await?;
84        // Insert x-ms-version header for normal requests.
85        req.headers_mut().insert(
86            HeaderName::from_static(X_MS_VERSION),
87            // 2022-11-02 is the version supported by Azurite V3 and
88            // used by Azure Portal, We use this version to make
89            // sure most our developer happy.
90            //
91            // In the future, we could allow users to configure this value.
92            HeaderValue::from_static("2022-11-02"),
93        );
94        self.signer.sign(req, &cred).map_err(new_request_sign_error)
95    }
96
97    #[inline]
98    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
99        self.info.http_client().send(req).await
100    }
101}
102
103impl AzdlsCore {
104    pub async fn azdls_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
105        let p = build_abs_path(&self.root, path);
106
107        let url = format!(
108            "{}/{}/{}",
109            self.endpoint,
110            self.filesystem,
111            percent_encode_path(&p)
112        );
113
114        let mut req = Request::get(&url);
115
116        if !range.is_full() {
117            req = req.header(http::header::RANGE, range.to_header());
118        }
119
120        let mut req = req
121            .extension(Operation::Read)
122            .body(Buffer::new())
123            .map_err(new_request_build_error)?;
124
125        self.sign(&mut req).await?;
126        self.info.http_client().fetch(req).await
127    }
128
129    /// resource should be one of `file` or `directory`
130    ///
131    /// ref: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create
132    pub async fn azdls_create(
133        &self,
134        path: &str,
135        resource: &str,
136        args: &OpWrite,
137    ) -> Result<Response<Buffer>> {
138        let p = build_abs_path(&self.root, path)
139            .trim_end_matches('/')
140            .to_string();
141
142        let url = format!(
143            "{}/{}/{}?resource={resource}",
144            self.endpoint,
145            self.filesystem,
146            percent_encode_path(&p)
147        );
148
149        let mut req = Request::put(&url);
150
151        // Content length must be 0 for create request.
152        req = req.header(CONTENT_LENGTH, 0);
153
154        if let Some(ty) = args.content_type() {
155            req = req.header(CONTENT_TYPE, ty)
156        }
157
158        if let Some(pos) = args.content_disposition() {
159            req = req.header(CONTENT_DISPOSITION, pos)
160        }
161
162        if args.if_not_exists() {
163            req = req.header(IF_NONE_MATCH, "*")
164        }
165
166        if let Some(v) = args.if_none_match() {
167            req = req.header(IF_NONE_MATCH, v)
168        }
169
170        let operation = if resource == DIRECTORY {
171            Operation::CreateDir
172        } else {
173            Operation::Write
174        };
175
176        let mut req = req
177            .extension(operation)
178            .body(Buffer::new())
179            .map_err(new_request_build_error)?;
180
181        self.sign(&mut req).await?;
182        self.send(req).await
183    }
184
185    pub async fn azdls_rename(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
186        let source = build_abs_path(&self.root, from);
187        let target = build_abs_path(&self.root, to);
188
189        let url = format!(
190            "{}/{}/{}",
191            self.endpoint,
192            self.filesystem,
193            percent_encode_path(&target)
194        );
195
196        let source_path = format!("/{}/{}", self.filesystem, percent_encode_path(&source));
197
198        let mut req = Request::put(&url)
199            .header(X_MS_RENAME_SOURCE, source_path)
200            .header(CONTENT_LENGTH, 0)
201            .extension(Operation::Rename)
202            .body(Buffer::new())
203            .map_err(new_request_build_error)?;
204
205        self.sign(&mut req).await?;
206        self.send(req).await
207    }
208
209    /// ref: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update
210    pub async fn azdls_append(
211        &self,
212        path: &str,
213        size: Option<u64>,
214        position: u64,
215        flush: bool,
216        close: bool,
217        body: Buffer,
218    ) -> Result<Response<Buffer>> {
219        let p = build_abs_path(&self.root, path);
220
221        let mut url = format!(
222            "{}/{}/{}?action=append&position={}",
223            self.endpoint,
224            self.filesystem,
225            percent_encode_path(&p),
226            position
227        );
228
229        if flush {
230            url.push_str("&flush=true");
231        }
232        if close {
233            url.push_str("&close=true");
234        }
235
236        let mut req = Request::patch(&url);
237
238        if let Some(size) = size {
239            req = req.header(CONTENT_LENGTH, size)
240        }
241
242        let mut req = req
243            .extension(Operation::Write)
244            .body(body)
245            .map_err(new_request_build_error)?;
246
247        self.sign(&mut req).await?;
248        self.send(req).await
249    }
250
251    /// Flush pending data appended by [`azdls_append`].
252    pub async fn azdls_flush(
253        &self,
254        path: &str,
255        position: u64,
256        close: bool,
257    ) -> Result<Response<Buffer>> {
258        let p = build_abs_path(&self.root, path);
259
260        let mut url = format!(
261            "{}/{}/{}?action=flush&position={}",
262            self.endpoint,
263            self.filesystem,
264            percent_encode_path(&p),
265            position
266        );
267
268        if close {
269            url.push_str("&close=true");
270        }
271
272        let mut req = Request::patch(&url)
273            .header(CONTENT_LENGTH, 0)
274            .extension(Operation::Write)
275            .body(Buffer::new())
276            .map_err(new_request_build_error)?;
277
278        self.sign(&mut req).await?;
279        self.send(req).await
280    }
281
282    pub async fn azdls_get_properties(&self, path: &str) -> Result<Response<Buffer>> {
283        let p = build_abs_path(&self.root, path)
284            .trim_end_matches('/')
285            .to_string();
286
287        let url = format!(
288            "{}/{}/{}?action=getStatus",
289            self.endpoint,
290            self.filesystem,
291            percent_encode_path(&p)
292        );
293
294        let req = Request::head(&url);
295
296        let mut req = req
297            .extension(Operation::Stat)
298            .body(Buffer::new())
299            .map_err(new_request_build_error)?;
300
301        self.sign(&mut req).await?;
302        self.send(req).await
303    }
304
305    pub async fn azdls_stat_metadata(&self, path: &str) -> Result<Metadata> {
306        let resp = self.azdls_get_properties(path).await?;
307
308        if resp.status() != StatusCode::OK {
309            return Err(parse_error(resp));
310        }
311
312        let headers = resp.headers();
313        let mut meta = parse_into_metadata(path, headers)?;
314
315        if let Some(version_id) = parse_header_to_str(headers, X_MS_VERSION_ID)? {
316            meta.set_version(version_id);
317        }
318
319        let resource = resp
320            .headers()
321            .get("x-ms-resource-type")
322            .ok_or_else(|| {
323                Error::new(
324                    ErrorKind::Unexpected,
325                    "azdls should return x-ms-resource-type header, but it's missing",
326                )
327            })?
328            .to_str()
329            .map_err(|err| {
330                Error::new(
331                    ErrorKind::Unexpected,
332                    "azdls should return x-ms-resource-type header, but it's not a valid string",
333                )
334                .set_source(err)
335            })?;
336
337        match resource {
338            FILE => Ok(meta.with_mode(EntryMode::FILE)),
339            DIRECTORY => Ok(meta.with_mode(EntryMode::DIR)),
340            v => Err(Error::new(
341                ErrorKind::Unexpected,
342                "azdls returns an unknown x-ms-resource-type",
343            )
344            .with_context("resource", v)),
345        }
346    }
347
348    pub async fn azdls_delete(&self, path: &str) -> Result<Response<Buffer>> {
349        let p = build_abs_path(&self.root, path)
350            .trim_end_matches('/')
351            .to_string();
352
353        let url = format!(
354            "{}/{}/{}",
355            self.endpoint,
356            self.filesystem,
357            percent_encode_path(&p)
358        );
359
360        let mut req = Request::delete(&url)
361            .extension(Operation::Delete)
362            .body(Buffer::new())
363            .map_err(new_request_build_error)?;
364
365        self.sign(&mut req).await?;
366        self.send(req).await
367    }
368
369    pub async fn azdls_list(
370        &self,
371        path: &str,
372        continuation: &str,
373        limit: Option<usize>,
374    ) -> Result<Response<Buffer>> {
375        let p = build_abs_path(&self.root, path)
376            .trim_end_matches('/')
377            .to_string();
378
379        let mut url = QueryPairsWriter::new(&format!("{}/{}", self.endpoint, self.filesystem))
380            .push("resource", "filesystem")
381            .push("recursive", "false");
382        if !p.is_empty() {
383            url = url.push("directory", &percent_encode_path(&p));
384        }
385        if let Some(limit) = limit {
386            url = url.push("maxResults", &limit.to_string());
387        }
388        if !continuation.is_empty() {
389            url = url.push("continuation", &percent_encode_path(continuation));
390        }
391
392        let mut req = Request::get(url.finish())
393            .extension(Operation::List)
394            .body(Buffer::new())
395            .map_err(new_request_build_error)?;
396
397        self.sign(&mut req).await?;
398        self.send(req).await
399    }
400
401    pub async fn azdls_ensure_parent_path(&self, path: &str) -> Result<Option<Response<Buffer>>> {
402        let abs_target_path = path.trim_end_matches('/').to_string();
403        let abs_target_path = abs_target_path.as_str();
404        let mut parts: Vec<&str> = abs_target_path
405            .split('/')
406            .filter(|x| !x.is_empty())
407            .collect();
408
409        if !parts.is_empty() {
410            parts.pop();
411        }
412
413        if !parts.is_empty() {
414            let parent_path = parts.join("/");
415            let resp = self
416                .azdls_create(&parent_path, DIRECTORY, &OpWrite::default())
417                .await?;
418
419            Ok(Some(resp))
420        } else {
421            Ok(None)
422        }
423    }
424}