1use std::fmt;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use http::header::CONTENT_DISPOSITION;
24use http::header::CONTENT_LENGTH;
25use http::header::CONTENT_TYPE;
26use http::header::IF_NONE_MATCH;
27use http::HeaderName;
28use http::HeaderValue;
29use http::Request;
30use http::Response;
31use http::StatusCode;
32use reqsign::AzureStorageCredential;
33use reqsign::AzureStorageLoader;
34use reqsign::AzureStorageSigner;
35
36use super::error::parse_error;
37use crate::raw::*;
38use crate::*;
39
40const X_MS_RENAME_SOURCE: &str = "x-ms-rename-source";
41const X_MS_VERSION: &str = "x-ms-version";
42pub const X_MS_VERSION_ID: &str = "x-ms-version-id";
43pub const DIRECTORY: &str = "directory";
44pub const FILE: &str = "file";
45
46pub struct AzdlsCore {
47 pub info: Arc<AccessorInfo>,
48 pub filesystem: String,
49 pub root: String,
50 pub endpoint: String,
51
52 pub loader: AzureStorageLoader,
53 pub signer: AzureStorageSigner,
54}
55
56impl Debug for AzdlsCore {
57 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
58 f.debug_struct("AzdlsCore")
59 .field("filesystem", &self.filesystem)
60 .field("root", &self.root)
61 .field("endpoint", &self.endpoint)
62 .finish_non_exhaustive()
63 }
64}
65
66impl AzdlsCore {
67 async fn load_credential(&self) -> Result<AzureStorageCredential> {
68 let cred = self
69 .loader
70 .load()
71 .await
72 .map_err(new_request_credential_error)?;
73
74 if let Some(cred) = cred {
75 Ok(cred)
76 } else {
77 Err(Error::new(
78 ErrorKind::ConfigInvalid,
79 "no valid credential found",
80 ))
81 }
82 }
83
84 pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
85 let cred = self.load_credential().await?;
86 req.headers_mut().insert(
88 HeaderName::from_static(X_MS_VERSION),
89 HeaderValue::from_static("2022-11-02"),
95 );
96 self.signer.sign(req, &cred).map_err(new_request_sign_error)
97 }
98
99 #[inline]
100 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
101 self.info.http_client().send(req).await
102 }
103}
104
105impl AzdlsCore {
106 pub async fn azdls_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
107 let p = build_abs_path(&self.root, path);
108
109 let url = format!(
110 "{}/{}/{}",
111 self.endpoint,
112 self.filesystem,
113 percent_encode_path(&p)
114 );
115
116 let mut req = Request::get(&url);
117
118 if !range.is_full() {
119 req = req.header(http::header::RANGE, range.to_header());
120 }
121
122 let mut req = req
123 .extension(Operation::Read)
124 .body(Buffer::new())
125 .map_err(new_request_build_error)?;
126
127 self.sign(&mut req).await?;
128 self.info.http_client().fetch(req).await
129 }
130
131 pub async fn azdls_create(
135 &self,
136 path: &str,
137 resource: &str,
138 args: &OpWrite,
139 ) -> Result<Response<Buffer>> {
140 let p = build_abs_path(&self.root, path)
141 .trim_end_matches('/')
142 .to_string();
143
144 let url = format!(
145 "{}/{}/{}?resource={resource}",
146 self.endpoint,
147 self.filesystem,
148 percent_encode_path(&p)
149 );
150
151 let mut req = Request::put(&url);
152
153 req = req.header(CONTENT_LENGTH, 0);
155
156 if let Some(ty) = args.content_type() {
157 req = req.header(CONTENT_TYPE, ty)
158 }
159
160 if let Some(pos) = args.content_disposition() {
161 req = req.header(CONTENT_DISPOSITION, pos)
162 }
163
164 if args.if_not_exists() {
165 req = req.header(IF_NONE_MATCH, "*")
166 }
167
168 if let Some(v) = args.if_none_match() {
169 req = req.header(IF_NONE_MATCH, v)
170 }
171
172 let operation = if resource == DIRECTORY {
173 Operation::CreateDir
174 } else {
175 Operation::Write
176 };
177
178 let mut req = req
179 .extension(operation)
180 .body(Buffer::new())
181 .map_err(new_request_build_error)?;
182
183 self.sign(&mut req).await?;
184 self.send(req).await
185 }
186
187 pub async fn azdls_rename(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
188 let source = build_abs_path(&self.root, from);
189 let target = build_abs_path(&self.root, to);
190
191 let url = format!(
192 "{}/{}/{}",
193 self.endpoint,
194 self.filesystem,
195 percent_encode_path(&target)
196 );
197
198 let source_path = format!("/{}/{}", self.filesystem, percent_encode_path(&source));
199
200 let mut req = Request::put(&url)
201 .header(X_MS_RENAME_SOURCE, source_path)
202 .header(CONTENT_LENGTH, 0)
203 .extension(Operation::Rename)
204 .body(Buffer::new())
205 .map_err(new_request_build_error)?;
206
207 self.sign(&mut req).await?;
208 self.send(req).await
209 }
210
211 pub async fn azdls_update(
213 &self,
214 path: &str,
215 size: Option<u64>,
216 position: u64,
217 body: Buffer,
218 ) -> Result<Response<Buffer>> {
219 let p = build_abs_path(&self.root, path);
220
221 let url = format!(
224 "{}/{}/{}?action=append&close=true&flush=true&position={}",
225 self.endpoint,
226 self.filesystem,
227 percent_encode_path(&p),
228 position
229 );
230
231 let mut req = Request::patch(&url);
232
233 if let Some(size) = size {
234 req = req.header(CONTENT_LENGTH, size)
235 }
236
237 let mut req = req
238 .extension(Operation::Write)
239 .body(body)
240 .map_err(new_request_build_error)?;
241
242 self.sign(&mut req).await?;
243 self.send(req).await
244 }
245
246 pub async fn azdls_get_properties(&self, path: &str) -> Result<Response<Buffer>> {
247 let p = build_abs_path(&self.root, path)
248 .trim_end_matches('/')
249 .to_string();
250
251 let url = format!(
252 "{}/{}/{}?action=getStatus",
253 self.endpoint,
254 self.filesystem,
255 percent_encode_path(&p)
256 );
257
258 let req = Request::head(&url);
259
260 let mut req = req
261 .extension(Operation::Stat)
262 .body(Buffer::new())
263 .map_err(new_request_build_error)?;
264
265 self.sign(&mut req).await?;
266 self.send(req).await
267 }
268
269 pub async fn azdls_stat_metadata(&self, path: &str) -> Result<Metadata> {
270 let resp = self.azdls_get_properties(path).await?;
271
272 if resp.status() != StatusCode::OK {
273 return Err(parse_error(resp));
274 }
275
276 let headers = resp.headers();
277 let mut meta = parse_into_metadata(path, headers)?;
278
279 if let Some(version_id) = parse_header_to_str(headers, X_MS_VERSION_ID)? {
280 meta.set_version(version_id);
281 }
282
283 let resource = resp
284 .headers()
285 .get("x-ms-resource-type")
286 .ok_or_else(|| {
287 Error::new(
288 ErrorKind::Unexpected,
289 "azdls should return x-ms-resource-type header, but it's missing",
290 )
291 })?
292 .to_str()
293 .map_err(|err| {
294 Error::new(
295 ErrorKind::Unexpected,
296 "azdls should return x-ms-resource-type header, but it's not a valid string",
297 )
298 .set_source(err)
299 })?;
300
301 match resource {
302 FILE => Ok(meta.with_mode(EntryMode::FILE)),
303 DIRECTORY => Ok(meta.with_mode(EntryMode::DIR)),
304 v => Err(Error::new(
305 ErrorKind::Unexpected,
306 "azdls returns an unknown x-ms-resource-type",
307 )
308 .with_context("resource", v)),
309 }
310 }
311
312 pub async fn azdls_delete(&self, path: &str) -> Result<Response<Buffer>> {
313 let p = build_abs_path(&self.root, path)
314 .trim_end_matches('/')
315 .to_string();
316
317 let url = format!(
318 "{}/{}/{}",
319 self.endpoint,
320 self.filesystem,
321 percent_encode_path(&p)
322 );
323
324 let mut req = Request::delete(&url)
325 .extension(Operation::Delete)
326 .body(Buffer::new())
327 .map_err(new_request_build_error)?;
328
329 self.sign(&mut req).await?;
330 self.send(req).await
331 }
332
333 pub async fn azdls_list(
334 &self,
335 path: &str,
336 continuation: &str,
337 limit: Option<usize>,
338 ) -> Result<Response<Buffer>> {
339 let p = build_abs_path(&self.root, path)
340 .trim_end_matches('/')
341 .to_string();
342
343 let mut url = QueryPairsWriter::new(&format!("{}/{}", self.endpoint, self.filesystem))
344 .push("resource", "filesystem")
345 .push("recursive", "false");
346 if !p.is_empty() {
347 url = url.push("directory", &percent_encode_path(&p));
348 }
349 if let Some(limit) = limit {
350 url = url.push("maxResults", &limit.to_string());
351 }
352 if !continuation.is_empty() {
353 url = url.push("continuation", &percent_encode_path(continuation));
354 }
355
356 let mut req = Request::get(url.finish())
357 .extension(Operation::List)
358 .body(Buffer::new())
359 .map_err(new_request_build_error)?;
360
361 self.sign(&mut req).await?;
362 self.send(req).await
363 }
364
365 pub async fn azdls_ensure_parent_path(&self, path: &str) -> Result<Option<Response<Buffer>>> {
366 let abs_target_path = path.trim_end_matches('/').to_string();
367 let abs_target_path = abs_target_path.as_str();
368 let mut parts: Vec<&str> = abs_target_path
369 .split('/')
370 .filter(|x| !x.is_empty())
371 .collect();
372
373 if !parts.is_empty() {
374 parts.pop();
375 }
376
377 if !parts.is_empty() {
378 let parent_path = parts.join("/");
379 let resp = self
380 .azdls_create(&parent_path, DIRECTORY, &OpWrite::default())
381 .await?;
382
383 Ok(Some(resp))
384 } else {
385 Ok(None)
386 }
387 }
388}