opendal/services/azdls/
writer.rs1use std::sync::Arc;
19
20use http::StatusCode;
21
22use super::core::AzdlsCore;
23use super::core::FILE;
24use super::core::X_MS_VERSION_ID;
25use super::error::parse_error;
26use crate::raw::*;
27use crate::*;
28
29pub type AzdlsWriters = TwoWays<oio::OneShotWriter<AzdlsWriter>, oio::AppendWriter<AzdlsWriter>>;
30
31pub struct AzdlsWriter {
32 core: Arc<AzdlsCore>,
33
34 op: OpWrite,
35 path: String,
36}
37
38impl AzdlsWriter {
39 pub fn new(core: Arc<AzdlsCore>, op: OpWrite, path: String) -> Self {
40 AzdlsWriter { core, op, path }
41 }
42
43 fn parse_metadata(headers: &http::HeaderMap) -> Result<Metadata> {
44 let mut metadata = Metadata::default();
45
46 if let Some(last_modified) = parse_last_modified(headers)? {
47 metadata.set_last_modified(last_modified);
48 }
49 let etag = parse_etag(headers)?;
50 if let Some(etag) = etag {
51 metadata.set_etag(etag);
52 }
53 let version_id = parse_header_to_str(headers, X_MS_VERSION_ID)?;
54 if let Some(version_id) = version_id {
55 metadata.set_version(version_id);
56 }
57
58 Ok(metadata)
59 }
60}
61
62impl oio::OneShotWrite for AzdlsWriter {
63 async fn write_once(&self, bs: Buffer) -> Result<Metadata> {
64 let resp = self.core.azdls_create(&self.path, FILE, &self.op).await?;
65
66 let status = resp.status();
67 match status {
68 StatusCode::CREATED | StatusCode::OK => {}
69 _ => {
70 return Err(parse_error(resp).with_operation("Backend::azdls_create_request"));
71 }
72 }
73
74 let resp = self
75 .core
76 .azdls_update(&self.path, Some(bs.len() as u64), 0, bs)
77 .await?;
78
79 let status = resp.status();
80 match status {
81 StatusCode::OK | StatusCode::ACCEPTED => Ok(Metadata::default()),
82 _ => Err(parse_error(resp).with_operation("Backend::azdls_update_request")),
83 }
84 }
85}
86
87impl oio::AppendWrite for AzdlsWriter {
88 async fn offset(&self) -> Result<u64> {
89 let resp = self.core.azdls_get_properties(&self.path).await?;
90
91 let status = resp.status();
92 let headers = resp.headers();
93
94 match status {
95 StatusCode::OK => Ok(parse_content_length(headers)?.unwrap_or_default()),
96 StatusCode::NOT_FOUND => Ok(0),
97 _ => Err(parse_error(resp)),
98 }
99 }
100
101 async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
102 if offset == 0 {
103 let resp = self.core.azdls_create(&self.path, FILE, &self.op).await?;
104 let status = resp.status();
105 match status {
106 StatusCode::CREATED | StatusCode::OK => {}
107 _ => {
108 return Err(parse_error(resp).with_operation("Backend::azdls_create_request"));
109 }
110 }
111 }
112
113 let resp = self
114 .core
115 .azdls_update(&self.path, Some(size), offset, body)
116 .await?;
117
118 let mut meta = AzdlsWriter::parse_metadata(resp.headers())?;
119 let md5 = parse_content_md5(resp.headers())?;
120 if let Some(md5) = md5 {
121 meta.set_content_md5(md5);
122 }
123 let status = resp.status();
124 match status {
125 StatusCode::OK | StatusCode::ACCEPTED => Ok(meta),
126 _ => Err(parse_error(resp).with_operation("Backend::azdls_update_request")),
127 }
128 }
129}