1use std::fmt;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use http::header::CONTENT_DISPOSITION;
24use http::header::CONTENT_LENGTH;
25use http::header::CONTENT_TYPE;
26use http::header::IF_NONE_MATCH;
27use http::HeaderName;
28use http::HeaderValue;
29use http::Request;
30use http::Response;
31use http::StatusCode;
32use reqsign::AzureStorageCredential;
33use reqsign::AzureStorageLoader;
34use reqsign::AzureStorageSigner;
35
36use super::error::parse_error;
37use crate::raw::*;
38use crate::*;
39
40const X_MS_RENAME_SOURCE: &str = "x-ms-rename-source";
41const X_MS_VERSION: &str = "x-ms-version";
42pub const DIRECTORY: &str = "directory";
43pub const FILE: &str = "file";
44
45pub struct AzdlsCore {
46 pub info: Arc<AccessorInfo>,
47 pub filesystem: String,
48 pub root: String,
49 pub endpoint: String,
50
51 pub loader: AzureStorageLoader,
52 pub signer: AzureStorageSigner,
53}
54
55impl Debug for AzdlsCore {
56 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
57 f.debug_struct("AzdlsCore")
58 .field("filesystem", &self.filesystem)
59 .field("root", &self.root)
60 .field("endpoint", &self.endpoint)
61 .finish_non_exhaustive()
62 }
63}
64
65impl AzdlsCore {
66 async fn load_credential(&self) -> Result<AzureStorageCredential> {
67 let cred = self
68 .loader
69 .load()
70 .await
71 .map_err(new_request_credential_error)?;
72
73 if let Some(cred) = cred {
74 Ok(cred)
75 } else {
76 Err(Error::new(
77 ErrorKind::ConfigInvalid,
78 "no valid credential found",
79 ))
80 }
81 }
82
83 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
84 let cred = self.load_credential().await?;
85 req.headers_mut().insert(
87 HeaderName::from_static(X_MS_VERSION),
88 HeaderValue::from_static("2022-11-02"),
94 );
95 self.signer.sign(req, &cred).map_err(new_request_sign_error)
96 }
97
98 #[inline]
99 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
100 self.info.http_client().send(req).await
101 }
102}
103
104impl AzdlsCore {
105 pub async fn azdls_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
106 let p = build_abs_path(&self.root, path);
107
108 let url = format!(
109 "{}/{}/{}",
110 self.endpoint,
111 self.filesystem,
112 percent_encode_path(&p)
113 );
114
115 let mut req = Request::get(&url);
116
117 if !range.is_full() {
118 req = req.header(http::header::RANGE, range.to_header());
119 }
120
121 let mut req = req
122 .extension(Operation::Read)
123 .body(Buffer::new())
124 .map_err(new_request_build_error)?;
125
126 self.sign(&mut req).await?;
127 self.info.http_client().fetch(req).await
128 }
129
130 pub async fn azdls_create(
134 &self,
135 path: &str,
136 resource: &str,
137 args: &OpWrite,
138 ) -> Result<Response<Buffer>> {
139 let p = build_abs_path(&self.root, path)
140 .trim_end_matches('/')
141 .to_string();
142
143 let url = format!(
144 "{}/{}/{}?resource={resource}",
145 self.endpoint,
146 self.filesystem,
147 percent_encode_path(&p)
148 );
149
150 let mut req = Request::put(&url);
151
152 req = req.header(CONTENT_LENGTH, 0);
154
155 if let Some(ty) = args.content_type() {
156 req = req.header(CONTENT_TYPE, ty)
157 }
158
159 if let Some(pos) = args.content_disposition() {
160 req = req.header(CONTENT_DISPOSITION, pos)
161 }
162
163 if args.if_not_exists() {
164 req = req.header(IF_NONE_MATCH, "*")
165 }
166
167 if let Some(v) = args.if_none_match() {
168 req = req.header(IF_NONE_MATCH, v)
169 }
170
171 let operation = if resource == DIRECTORY {
172 Operation::CreateDir
173 } else {
174 Operation::Write
175 };
176
177 let mut req = req
178 .extension(operation)
179 .body(Buffer::new())
180 .map_err(new_request_build_error)?;
181
182 self.sign(&mut req).await?;
183 self.send(req).await
184 }
185
186 pub async fn azdls_rename(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
187 let source = build_abs_path(&self.root, from);
188 let target = build_abs_path(&self.root, to);
189
190 let url = format!(
191 "{}/{}/{}",
192 self.endpoint,
193 self.filesystem,
194 percent_encode_path(&target)
195 );
196
197 let source_path = format!("/{}/{}", self.filesystem, percent_encode_path(&source));
198
199 let mut req = Request::put(&url)
200 .header(X_MS_RENAME_SOURCE, source_path)
201 .header(CONTENT_LENGTH, 0)
202 .extension(Operation::Rename)
203 .body(Buffer::new())
204 .map_err(new_request_build_error)?;
205
206 self.sign(&mut req).await?;
207 self.send(req).await
208 }
209
210 pub async fn azdls_update(
212 &self,
213 path: &str,
214 size: Option<u64>,
215 position: u64,
216 body: Buffer,
217 ) -> Result<Response<Buffer>> {
218 let p = build_abs_path(&self.root, path);
219
220 let url = format!(
223 "{}/{}/{}?action=append&close=true&flush=true&position={}",
224 self.endpoint,
225 self.filesystem,
226 percent_encode_path(&p),
227 position
228 );
229
230 let mut req = Request::patch(&url);
231
232 if let Some(size) = size {
233 req = req.header(CONTENT_LENGTH, size)
234 }
235
236 let mut req = req
237 .extension(Operation::Write)
238 .body(body)
239 .map_err(new_request_build_error)?;
240
241 self.sign(&mut req).await?;
242 self.send(req).await
243 }
244
245 pub async fn azdls_get_properties(&self, path: &str) -> Result<Response<Buffer>> {
246 let p = build_abs_path(&self.root, path)
247 .trim_end_matches('/')
248 .to_string();
249
250 let url = format!(
251 "{}/{}/{}?action=getStatus",
252 self.endpoint,
253 self.filesystem,
254 percent_encode_path(&p)
255 );
256
257 let req = Request::head(&url);
258
259 let mut req = req
260 .extension(Operation::Stat)
261 .body(Buffer::new())
262 .map_err(new_request_build_error)?;
263
264 self.sign(&mut req).await?;
265 self.send(req).await
266 }
267
268 pub async fn azdls_stat_metadata(&self, path: &str) -> Result<Metadata> {
269 let resp = self.azdls_get_properties(path).await?;
270
271 if resp.status() != StatusCode::OK {
272 return Err(parse_error(resp));
273 }
274
275 let meta = parse_into_metadata(path, resp.headers())?;
276 let resource = resp
277 .headers()
278 .get("x-ms-resource-type")
279 .ok_or_else(|| {
280 Error::new(
281 ErrorKind::Unexpected,
282 "azdls should return x-ms-resource-type header, but it's missing",
283 )
284 })?
285 .to_str()
286 .map_err(|err| {
287 Error::new(
288 ErrorKind::Unexpected,
289 "azdls should return x-ms-resource-type header, but it's not a valid string",
290 )
291 .set_source(err)
292 })?;
293
294 match resource {
295 FILE => Ok(meta.with_mode(EntryMode::FILE)),
296 DIRECTORY => Ok(meta.with_mode(EntryMode::DIR)),
297 v => Err(Error::new(
298 ErrorKind::Unexpected,
299 "azdls returns an unknown x-ms-resource-type",
300 )
301 .with_context("resource", v)),
302 }
303 }
304
305 pub async fn azdls_delete(&self, path: &str) -> Result<Response<Buffer>> {
306 let p = build_abs_path(&self.root, path)
307 .trim_end_matches('/')
308 .to_string();
309
310 let url = format!(
311 "{}/{}/{}",
312 self.endpoint,
313 self.filesystem,
314 percent_encode_path(&p)
315 );
316
317 let mut req = Request::delete(&url)
318 .extension(Operation::Delete)
319 .body(Buffer::new())
320 .map_err(new_request_build_error)?;
321
322 self.sign(&mut req).await?;
323 self.send(req).await
324 }
325
326 pub async fn azdls_list(
327 &self,
328 path: &str,
329 continuation: &str,
330 limit: Option<usize>,
331 ) -> Result<Response<Buffer>> {
332 let p = build_abs_path(&self.root, path)
333 .trim_end_matches('/')
334 .to_string();
335
336 let mut url = QueryPairsWriter::new(&format!("{}/{}", self.endpoint, self.filesystem))
337 .push("resource", "filesystem")
338 .push("recursive", "false");
339 if !p.is_empty() {
340 url = url.push("directory", &percent_encode_path(&p));
341 }
342 if let Some(limit) = limit {
343 url = url.push("maxResults", &limit.to_string());
344 }
345 if !continuation.is_empty() {
346 url = url.push("continuation", &percent_encode_path(continuation));
347 }
348
349 let mut req = Request::get(url.finish())
350 .extension(Operation::List)
351 .body(Buffer::new())
352 .map_err(new_request_build_error)?;
353
354 self.sign(&mut req).await?;
355 self.send(req).await
356 }
357
358 pub async fn azdls_ensure_parent_path(&self, path: &str) -> Result<Option<Response<Buffer>>> {
359 let abs_target_path = path.trim_end_matches('/').to_string();
360 let abs_target_path = abs_target_path.as_str();
361 let mut parts: Vec<&str> = abs_target_path
362 .split('/')
363 .filter(|x| !x.is_empty())
364 .collect();
365
366 if !parts.is_empty() {
367 parts.pop();
368 }
369
370 if !parts.is_empty() {
371 let parent_path = parts.join("/");
372 let resp = self
373 .azdls_create(&parent_path, DIRECTORY, &OpWrite::default())
374 .await?;
375
376 Ok(Some(resp))
377 } else {
378 Ok(None)
379 }
380 }
381}