opendal/services/azdls/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use http::header::CONTENT_DISPOSITION;
24use http::header::CONTENT_LENGTH;
25use http::header::CONTENT_TYPE;
26use http::header::IF_NONE_MATCH;
27use http::HeaderName;
28use http::HeaderValue;
29use http::Request;
30use http::Response;
31use http::StatusCode;
32use reqsign::AzureStorageCredential;
33use reqsign::AzureStorageLoader;
34use reqsign::AzureStorageSigner;
35
36use super::error::parse_error;
37use crate::raw::*;
38use crate::*;
39
40const X_MS_RENAME_SOURCE: &str = "x-ms-rename-source";
41const X_MS_VERSION: &str = "x-ms-version";
42pub const DIRECTORY: &str = "directory";
43pub const FILE: &str = "file";
44
45pub struct AzdlsCore {
46    pub info: Arc<AccessorInfo>,
47    pub filesystem: String,
48    pub root: String,
49    pub endpoint: String,
50
51    pub loader: AzureStorageLoader,
52    pub signer: AzureStorageSigner,
53}
54
55impl Debug for AzdlsCore {
56    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
57        f.debug_struct("AzdlsCore")
58            .field("filesystem", &self.filesystem)
59            .field("root", &self.root)
60            .field("endpoint", &self.endpoint)
61            .finish_non_exhaustive()
62    }
63}
64
65impl AzdlsCore {
66    async fn load_credential(&self) -> Result<AzureStorageCredential> {
67        let cred = self
68            .loader
69            .load()
70            .await
71            .map_err(new_request_credential_error)?;
72
73        if let Some(cred) = cred {
74            Ok(cred)
75        } else {
76            Err(Error::new(
77                ErrorKind::ConfigInvalid,
78                "no valid credential found",
79            ))
80        }
81    }
82
83    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
84        let cred = self.load_credential().await?;
85        // Insert x-ms-version header for normal requests.
86        req.headers_mut().insert(
87            HeaderName::from_static(X_MS_VERSION),
88            // 2022-11-02 is the version supported by Azurite V3 and
89            // used by Azure Portal, We use this version to make
90            // sure most our developer happy.
91            //
92            // In the future, we could allow users to configure this value.
93            HeaderValue::from_static("2022-11-02"),
94        );
95        self.signer.sign(req, &cred).map_err(new_request_sign_error)
96    }
97
98    #[inline]
99    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
100        self.info.http_client().send(req).await
101    }
102}
103
104impl AzdlsCore {
105    pub async fn azdls_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
106        let p = build_abs_path(&self.root, path);
107
108        let url = format!(
109            "{}/{}/{}",
110            self.endpoint,
111            self.filesystem,
112            percent_encode_path(&p)
113        );
114
115        let mut req = Request::get(&url);
116
117        if !range.is_full() {
118            req = req.header(http::header::RANGE, range.to_header());
119        }
120
121        let mut req = req
122            .extension(Operation::Read)
123            .body(Buffer::new())
124            .map_err(new_request_build_error)?;
125
126        self.sign(&mut req).await?;
127        self.info.http_client().fetch(req).await
128    }
129
130    /// resource should be one of `file` or `directory`
131    ///
132    /// ref: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create
133    pub async fn azdls_create(
134        &self,
135        path: &str,
136        resource: &str,
137        args: &OpWrite,
138    ) -> Result<Response<Buffer>> {
139        let p = build_abs_path(&self.root, path)
140            .trim_end_matches('/')
141            .to_string();
142
143        let url = format!(
144            "{}/{}/{}?resource={resource}",
145            self.endpoint,
146            self.filesystem,
147            percent_encode_path(&p)
148        );
149
150        let mut req = Request::put(&url);
151
152        // Content length must be 0 for create request.
153        req = req.header(CONTENT_LENGTH, 0);
154
155        if let Some(ty) = args.content_type() {
156            req = req.header(CONTENT_TYPE, ty)
157        }
158
159        if let Some(pos) = args.content_disposition() {
160            req = req.header(CONTENT_DISPOSITION, pos)
161        }
162
163        if args.if_not_exists() {
164            req = req.header(IF_NONE_MATCH, "*")
165        }
166
167        if let Some(v) = args.if_none_match() {
168            req = req.header(IF_NONE_MATCH, v)
169        }
170
171        let operation = if resource == DIRECTORY {
172            Operation::CreateDir
173        } else {
174            Operation::Write
175        };
176
177        let mut req = req
178            .extension(operation)
179            .body(Buffer::new())
180            .map_err(new_request_build_error)?;
181
182        self.sign(&mut req).await?;
183        self.send(req).await
184    }
185
186    pub async fn azdls_rename(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
187        let source = build_abs_path(&self.root, from);
188        let target = build_abs_path(&self.root, to);
189
190        let url = format!(
191            "{}/{}/{}",
192            self.endpoint,
193            self.filesystem,
194            percent_encode_path(&target)
195        );
196
197        let source_path = format!("/{}/{}", self.filesystem, percent_encode_path(&source));
198
199        let mut req = Request::put(&url)
200            .header(X_MS_RENAME_SOURCE, source_path)
201            .header(CONTENT_LENGTH, 0)
202            .extension(Operation::Rename)
203            .body(Buffer::new())
204            .map_err(new_request_build_error)?;
205
206        self.sign(&mut req).await?;
207        self.send(req).await
208    }
209
210    /// ref: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update
211    pub async fn azdls_update(
212        &self,
213        path: &str,
214        size: Option<u64>,
215        position: u64,
216        body: Buffer,
217    ) -> Result<Response<Buffer>> {
218        let p = build_abs_path(&self.root, path);
219
220        // - close: Make this is the final action to this file.
221        // - flush: Flush the file directly.
222        let url = format!(
223            "{}/{}/{}?action=append&close=true&flush=true&position={}",
224            self.endpoint,
225            self.filesystem,
226            percent_encode_path(&p),
227            position
228        );
229
230        let mut req = Request::patch(&url);
231
232        if let Some(size) = size {
233            req = req.header(CONTENT_LENGTH, size)
234        }
235
236        let mut req = req
237            .extension(Operation::Write)
238            .body(body)
239            .map_err(new_request_build_error)?;
240
241        self.sign(&mut req).await?;
242        self.send(req).await
243    }
244
245    pub async fn azdls_get_properties(&self, path: &str) -> Result<Response<Buffer>> {
246        let p = build_abs_path(&self.root, path)
247            .trim_end_matches('/')
248            .to_string();
249
250        let url = format!(
251            "{}/{}/{}?action=getStatus",
252            self.endpoint,
253            self.filesystem,
254            percent_encode_path(&p)
255        );
256
257        let req = Request::head(&url);
258
259        let mut req = req
260            .extension(Operation::Stat)
261            .body(Buffer::new())
262            .map_err(new_request_build_error)?;
263
264        self.sign(&mut req).await?;
265        self.send(req).await
266    }
267
268    pub async fn azdls_stat_metadata(&self, path: &str) -> Result<Metadata> {
269        let resp = self.azdls_get_properties(path).await?;
270
271        if resp.status() != StatusCode::OK {
272            return Err(parse_error(resp));
273        }
274
275        let meta = parse_into_metadata(path, resp.headers())?;
276        let resource = resp
277            .headers()
278            .get("x-ms-resource-type")
279            .ok_or_else(|| {
280                Error::new(
281                    ErrorKind::Unexpected,
282                    "azdls should return x-ms-resource-type header, but it's missing",
283                )
284            })?
285            .to_str()
286            .map_err(|err| {
287                Error::new(
288                    ErrorKind::Unexpected,
289                    "azdls should return x-ms-resource-type header, but it's not a valid string",
290                )
291                .set_source(err)
292            })?;
293
294        match resource {
295            FILE => Ok(meta.with_mode(EntryMode::FILE)),
296            DIRECTORY => Ok(meta.with_mode(EntryMode::DIR)),
297            v => Err(Error::new(
298                ErrorKind::Unexpected,
299                "azdls returns an unknown x-ms-resource-type",
300            )
301            .with_context("resource", v)),
302        }
303    }
304
305    pub async fn azdls_delete(&self, path: &str) -> Result<Response<Buffer>> {
306        let p = build_abs_path(&self.root, path)
307            .trim_end_matches('/')
308            .to_string();
309
310        let url = format!(
311            "{}/{}/{}",
312            self.endpoint,
313            self.filesystem,
314            percent_encode_path(&p)
315        );
316
317        let mut req = Request::delete(&url)
318            .extension(Operation::Delete)
319            .body(Buffer::new())
320            .map_err(new_request_build_error)?;
321
322        self.sign(&mut req).await?;
323        self.send(req).await
324    }
325
326    pub async fn azdls_list(
327        &self,
328        path: &str,
329        continuation: &str,
330        limit: Option<usize>,
331    ) -> Result<Response<Buffer>> {
332        let p = build_abs_path(&self.root, path)
333            .trim_end_matches('/')
334            .to_string();
335
336        let mut url = QueryPairsWriter::new(&format!("{}/{}", self.endpoint, self.filesystem))
337            .push("resource", "filesystem")
338            .push("recursive", "false");
339        if !p.is_empty() {
340            url = url.push("directory", &percent_encode_path(&p));
341        }
342        if let Some(limit) = limit {
343            url = url.push("maxResults", &limit.to_string());
344        }
345        if !continuation.is_empty() {
346            url = url.push("continuation", &percent_encode_path(continuation));
347        }
348
349        let mut req = Request::get(url.finish())
350            .extension(Operation::List)
351            .body(Buffer::new())
352            .map_err(new_request_build_error)?;
353
354        self.sign(&mut req).await?;
355        self.send(req).await
356    }
357
358    pub async fn azdls_ensure_parent_path(&self, path: &str) -> Result<Option<Response<Buffer>>> {
359        let abs_target_path = path.trim_end_matches('/').to_string();
360        let abs_target_path = abs_target_path.as_str();
361        let mut parts: Vec<&str> = abs_target_path
362            .split('/')
363            .filter(|x| !x.is_empty())
364            .collect();
365
366        if !parts.is_empty() {
367            parts.pop();
368        }
369
370        if !parts.is_empty() {
371            let parent_path = parts.join("/");
372            let resp = self
373                .azdls_create(&parent_path, DIRECTORY, &OpWrite::default())
374                .await?;
375
376            Ok(Some(resp))
377        } else {
378            Ok(None)
379        }
380    }
381}