use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Write;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
use http::header::{CONTENT_DISPOSITION, IF_NONE_MATCH};
use http::HeaderName;
use http::HeaderValue;
use http::Request;
use http::Response;
use reqsign::AzureStorageCredential;
use reqsign::AzureStorageLoader;
use reqsign::AzureStorageSigner;
use crate::raw::*;
use crate::*;
const X_MS_RENAME_SOURCE: &str = "x-ms-rename-source";
const X_MS_VERSION: &str = "x-ms-version";
pub struct AzdlsCore {
pub filesystem: String,
pub root: String,
pub endpoint: String,
pub client: HttpClient,
pub loader: AzureStorageLoader,
pub signer: AzureStorageSigner,
}
impl Debug for AzdlsCore {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("AzdlsCore")
.field("filesystem", &self.filesystem)
.field("root", &self.root)
.field("endpoint", &self.endpoint)
.finish_non_exhaustive()
}
}
impl AzdlsCore {
async fn load_credential(&self) -> Result<AzureStorageCredential> {
let cred = self
.loader
.load()
.await
.map_err(new_request_credential_error)?;
if let Some(cred) = cred {
Ok(cred)
} else {
Err(Error::new(
ErrorKind::ConfigInvalid,
"no valid credential found",
))
}
}
pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
let cred = self.load_credential().await?;
req.headers_mut().insert(
HeaderName::from_static(X_MS_VERSION),
HeaderValue::from_static("2022-11-02"),
);
self.signer.sign(req, &cred).map_err(new_request_sign_error)
}
#[inline]
pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
self.client.send(req).await
}
}
impl AzdlsCore {
pub async fn azdls_read(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}/{}/{}",
self.endpoint,
self.filesystem,
percent_encode_path(&p)
);
let mut req = Request::get(&url);
if !range.is_full() {
req = req.header(http::header::RANGE, range.to_header());
}
let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
self.sign(&mut req).await?;
self.client.fetch(req).await
}
pub fn azdls_create_request(
&self,
path: &str,
resource: &str,
args: &OpWrite,
body: Buffer,
) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path)
.trim_end_matches('/')
.to_string();
let url = format!(
"{}/{}/{}?resource={resource}",
self.endpoint,
self.filesystem,
percent_encode_path(&p)
);
let mut req = Request::put(&url);
req = req.header(CONTENT_LENGTH, 0);
if let Some(ty) = args.content_type() {
req = req.header(CONTENT_TYPE, ty)
}
if let Some(pos) = args.content_disposition() {
req = req.header(CONTENT_DISPOSITION, pos)
}
if args.if_not_exists() {
req = req.header(IF_NONE_MATCH, "*")
}
if let Some(v) = args.if_none_match() {
req = req.header(IF_NONE_MATCH, v)
}
let req = req.body(body).map_err(new_request_build_error)?;
Ok(req)
}
pub async fn azdls_rename(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
let source = build_abs_path(&self.root, from);
let target = build_abs_path(&self.root, to);
let url = format!(
"{}/{}/{}",
self.endpoint,
self.filesystem,
percent_encode_path(&target)
);
let mut req = Request::put(&url)
.header(
X_MS_RENAME_SOURCE,
format!("/{}/{}", self.filesystem, percent_encode_path(&source)),
)
.header(CONTENT_LENGTH, 0)
.body(Buffer::new())
.map_err(new_request_build_error)?;
self.sign(&mut req).await?;
self.send(req).await
}
pub fn azdls_update_request(
&self,
path: &str,
size: Option<u64>,
position: u64,
body: Buffer,
) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}/{}/{}?action=append&close=true&flush=true&position={}",
self.endpoint,
self.filesystem,
percent_encode_path(&p),
position
);
let mut req = Request::patch(&url);
if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size)
}
let req = req.body(body).map_err(new_request_build_error)?;
Ok(req)
}
pub async fn azdls_get_properties(&self, path: &str) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path)
.trim_end_matches('/')
.to_string();
let url = format!(
"{}/{}/{}?action=getStatus",
self.endpoint,
self.filesystem,
percent_encode_path(&p)
);
let req = Request::head(&url);
let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
self.sign(&mut req).await?;
self.client.send(req).await
}
pub async fn azdls_delete(&self, path: &str) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path)
.trim_end_matches('/')
.to_string();
let url = format!(
"{}/{}/{}",
self.endpoint,
self.filesystem,
percent_encode_path(&p)
);
let req = Request::delete(&url);
let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
self.sign(&mut req).await?;
self.send(req).await
}
pub async fn azdls_list(
&self,
path: &str,
continuation: &str,
limit: Option<usize>,
) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path)
.trim_end_matches('/')
.to_string();
let mut url = format!(
"{}/{}?resource=filesystem&recursive=false",
self.endpoint, self.filesystem
);
if !p.is_empty() {
write!(url, "&directory={}", percent_encode_path(&p))
.expect("write into string must succeed");
}
if let Some(limit) = limit {
write!(url, "&maxResults={limit}").expect("write into string must succeed");
}
if !continuation.is_empty() {
write!(url, "&continuation={}", percent_encode_path(continuation))
.expect("write into string must succeed");
}
let mut req = Request::get(&url)
.body(Buffer::new())
.map_err(new_request_build_error)?;
self.sign(&mut req).await?;
self.send(req).await
}
pub async fn azdls_ensure_parent_path(&self, path: &str) -> Result<Option<Response<Buffer>>> {
let abs_target_path = path.trim_end_matches('/').to_string();
let abs_target_path = abs_target_path.as_str();
let mut parts: Vec<&str> = abs_target_path
.split('/')
.filter(|x| !x.is_empty())
.collect();
if !parts.is_empty() {
parts.pop();
}
if !parts.is_empty() {
let parent_path = parts.join("/");
let mut req = self.azdls_create_request(
&parent_path,
"directory",
&OpWrite::default(),
Buffer::new(),
)?;
self.sign(&mut req).await?;
Ok(Some(self.send(req).await?))
} else {
Ok(None)
}
}
}