opendal/services/azdls/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use http::header::CONTENT_LENGTH;
19use http::header::CONTENT_TYPE;
20use http::header::{CONTENT_DISPOSITION, IF_NONE_MATCH};
21use http::HeaderName;
22use http::HeaderValue;
23use http::Request;
24use http::Response;
25use reqsign::AzureStorageCredential;
26use reqsign::AzureStorageLoader;
27use reqsign::AzureStorageSigner;
28use std::fmt;
29use std::fmt::Debug;
30use std::fmt::Formatter;
31use std::fmt::Write;
32use std::sync::Arc;
33
34use crate::raw::*;
35use crate::*;
36
37const X_MS_RENAME_SOURCE: &str = "x-ms-rename-source";
38const X_MS_VERSION: &str = "x-ms-version";
39
40pub struct AzdlsCore {
41    pub info: Arc<AccessorInfo>,
42    pub filesystem: String,
43    pub root: String,
44    pub endpoint: String,
45
46    pub loader: AzureStorageLoader,
47    pub signer: AzureStorageSigner,
48}
49
50impl Debug for AzdlsCore {
51    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
52        f.debug_struct("AzdlsCore")
53            .field("filesystem", &self.filesystem)
54            .field("root", &self.root)
55            .field("endpoint", &self.endpoint)
56            .finish_non_exhaustive()
57    }
58}
59
60impl AzdlsCore {
61    async fn load_credential(&self) -> Result<AzureStorageCredential> {
62        let cred = self
63            .loader
64            .load()
65            .await
66            .map_err(new_request_credential_error)?;
67
68        if let Some(cred) = cred {
69            Ok(cred)
70        } else {
71            Err(Error::new(
72                ErrorKind::ConfigInvalid,
73                "no valid credential found",
74            ))
75        }
76    }
77
78    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
79        let cred = self.load_credential().await?;
80        // Insert x-ms-version header for normal requests.
81        req.headers_mut().insert(
82            HeaderName::from_static(X_MS_VERSION),
83            // 2022-11-02 is the version supported by Azurite V3 and
84            // used by Azure Portal, We use this version to make
85            // sure most our developer happy.
86            //
87            // In the future, we could allow users to configure this value.
88            HeaderValue::from_static("2022-11-02"),
89        );
90        self.signer.sign(req, &cred).map_err(new_request_sign_error)
91    }
92
93    #[inline]
94    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
95        self.info.http_client().send(req).await
96    }
97}
98
99impl AzdlsCore {
100    pub async fn azdls_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
101        let p = build_abs_path(&self.root, path);
102
103        let url = format!(
104            "{}/{}/{}",
105            self.endpoint,
106            self.filesystem,
107            percent_encode_path(&p)
108        );
109
110        let mut req = Request::get(&url);
111
112        if !range.is_full() {
113            req = req.header(http::header::RANGE, range.to_header());
114        }
115
116        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
117
118        self.sign(&mut req).await?;
119        self.info.http_client().fetch(req).await
120    }
121
122    /// resource should be one of `file` or `directory`
123    ///
124    /// ref: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create
125    pub fn azdls_create_request(
126        &self,
127        path: &str,
128        resource: &str,
129        args: &OpWrite,
130        body: Buffer,
131    ) -> Result<Request<Buffer>> {
132        let p = build_abs_path(&self.root, path)
133            .trim_end_matches('/')
134            .to_string();
135
136        let url = format!(
137            "{}/{}/{}?resource={resource}",
138            self.endpoint,
139            self.filesystem,
140            percent_encode_path(&p)
141        );
142
143        let mut req = Request::put(&url);
144
145        // Content length must be 0 for create request.
146        req = req.header(CONTENT_LENGTH, 0);
147
148        if let Some(ty) = args.content_type() {
149            req = req.header(CONTENT_TYPE, ty)
150        }
151
152        if let Some(pos) = args.content_disposition() {
153            req = req.header(CONTENT_DISPOSITION, pos)
154        }
155
156        if args.if_not_exists() {
157            req = req.header(IF_NONE_MATCH, "*")
158        }
159
160        if let Some(v) = args.if_none_match() {
161            req = req.header(IF_NONE_MATCH, v)
162        }
163
164        // Set body
165        let req = req.body(body).map_err(new_request_build_error)?;
166
167        Ok(req)
168    }
169
170    pub async fn azdls_rename(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
171        let source = build_abs_path(&self.root, from);
172        let target = build_abs_path(&self.root, to);
173
174        let url = format!(
175            "{}/{}/{}",
176            self.endpoint,
177            self.filesystem,
178            percent_encode_path(&target)
179        );
180
181        let mut req = Request::put(&url)
182            .header(
183                X_MS_RENAME_SOURCE,
184                format!("/{}/{}", self.filesystem, percent_encode_path(&source)),
185            )
186            .header(CONTENT_LENGTH, 0)
187            .body(Buffer::new())
188            .map_err(new_request_build_error)?;
189
190        self.sign(&mut req).await?;
191        self.send(req).await
192    }
193
194    /// ref: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update
195    pub fn azdls_update_request(
196        &self,
197        path: &str,
198        size: Option<u64>,
199        position: u64,
200        body: Buffer,
201    ) -> Result<Request<Buffer>> {
202        let p = build_abs_path(&self.root, path);
203
204        // - close: Make this is the final action to this file.
205        // - flush: Flush the file directly.
206        let url = format!(
207            "{}/{}/{}?action=append&close=true&flush=true&position={}",
208            self.endpoint,
209            self.filesystem,
210            percent_encode_path(&p),
211            position
212        );
213
214        let mut req = Request::patch(&url);
215
216        if let Some(size) = size {
217            req = req.header(CONTENT_LENGTH, size)
218        }
219
220        // Set body
221        let req = req.body(body).map_err(new_request_build_error)?;
222
223        Ok(req)
224    }
225
226    pub async fn azdls_get_properties(&self, path: &str) -> Result<Response<Buffer>> {
227        let p = build_abs_path(&self.root, path)
228            .trim_end_matches('/')
229            .to_string();
230
231        let url = format!(
232            "{}/{}/{}?action=getStatus",
233            self.endpoint,
234            self.filesystem,
235            percent_encode_path(&p)
236        );
237
238        let req = Request::head(&url);
239
240        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
241
242        self.sign(&mut req).await?;
243        self.info.http_client().send(req).await
244    }
245
246    pub async fn azdls_delete(&self, path: &str) -> Result<Response<Buffer>> {
247        let p = build_abs_path(&self.root, path)
248            .trim_end_matches('/')
249            .to_string();
250
251        let url = format!(
252            "{}/{}/{}",
253            self.endpoint,
254            self.filesystem,
255            percent_encode_path(&p)
256        );
257
258        let req = Request::delete(&url);
259
260        let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
261
262        self.sign(&mut req).await?;
263        self.send(req).await
264    }
265
266    pub async fn azdls_list(
267        &self,
268        path: &str,
269        continuation: &str,
270        limit: Option<usize>,
271    ) -> Result<Response<Buffer>> {
272        let p = build_abs_path(&self.root, path)
273            .trim_end_matches('/')
274            .to_string();
275
276        let mut url = format!(
277            "{}/{}?resource=filesystem&recursive=false",
278            self.endpoint, self.filesystem
279        );
280        if !p.is_empty() {
281            write!(url, "&directory={}", percent_encode_path(&p))
282                .expect("write into string must succeed");
283        }
284        if let Some(limit) = limit {
285            write!(url, "&maxResults={limit}").expect("write into string must succeed");
286        }
287        if !continuation.is_empty() {
288            write!(url, "&continuation={}", percent_encode_path(continuation))
289                .expect("write into string must succeed");
290        }
291
292        let mut req = Request::get(&url)
293            .body(Buffer::new())
294            .map_err(new_request_build_error)?;
295
296        self.sign(&mut req).await?;
297        self.send(req).await
298    }
299
300    pub async fn azdls_ensure_parent_path(&self, path: &str) -> Result<Option<Response<Buffer>>> {
301        let abs_target_path = path.trim_end_matches('/').to_string();
302        let abs_target_path = abs_target_path.as_str();
303        let mut parts: Vec<&str> = abs_target_path
304            .split('/')
305            .filter(|x| !x.is_empty())
306            .collect();
307
308        if !parts.is_empty() {
309            parts.pop();
310        }
311
312        if !parts.is_empty() {
313            let parent_path = parts.join("/");
314            let mut req = self.azdls_create_request(
315                &parent_path,
316                "directory",
317                &OpWrite::default(),
318                Buffer::new(),
319            )?;
320
321            self.sign(&mut req).await?;
322
323            Ok(Some(self.send(req).await?))
324        } else {
325            Ok(None)
326        }
327    }
328}