opendal_core/services/azdls/
writer.rs1use std::sync::Arc;
19
20use http::StatusCode;
21
22use super::core::AzdlsCore;
23use super::core::FILE;
24use super::core::X_MS_VERSION_ID;
25use super::error::parse_error;
26use crate::raw::*;
27use crate::*;
28
29pub type AzdlsWriters = TwoWays<oio::PositionWriter<AzdlsWriter>, oio::AppendWriter<AzdlsWriter>>;
31
32#[derive(Clone)]
33pub struct AzdlsWriter {
34 core: Arc<AzdlsCore>,
35 op: OpWrite,
36 path: String,
37}
38
39impl AzdlsWriter {
40 pub fn new(core: Arc<AzdlsCore>, op: OpWrite, path: String) -> Self {
41 Self { core, op, path }
42 }
43
44 pub async fn create(core: Arc<AzdlsCore>, op: OpWrite, path: String) -> Result<Self> {
45 let writer = Self::new(core, op, path);
46 writer.create_if_needed().await?;
47 Ok(writer)
48 }
49
50 async fn create_if_needed(&self) -> Result<()> {
51 let resp = self.core.azdls_create(&self.path, FILE, &self.op).await?;
52 match resp.status() {
53 StatusCode::CREATED | StatusCode::OK => Ok(()),
54 StatusCode::CONFLICT if self.op.if_not_exists() => {
55 Err(parse_error(resp).with_operation("Backend::azdls_create_request"))
56 }
57 StatusCode::CONFLICT => Ok(()),
58 _ => Err(parse_error(resp).with_operation("Backend::azdls_create_request")),
59 }
60 }
61
62 fn parse_metadata(headers: &http::HeaderMap) -> Result<Metadata> {
63 let mut metadata = Metadata::default();
64
65 if let Some(last_modified) = parse_last_modified(headers)? {
66 metadata.set_last_modified(last_modified);
67 }
68 let etag = parse_etag(headers)?;
69 if let Some(etag) = etag {
70 metadata.set_etag(etag);
71 }
72 let version_id = parse_header_to_str(headers, X_MS_VERSION_ID)?;
73 if let Some(version_id) = version_id {
74 metadata.set_version(version_id);
75 }
76
77 Ok(metadata)
78 }
79}
80
81impl oio::PositionWrite for AzdlsWriter {
82 async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
83 let size = buf.len() as u64;
84 let resp = self
85 .core
86 .azdls_append(&self.path, Some(size), offset, false, false, buf)
87 .await?;
88
89 match resp.status() {
90 StatusCode::OK | StatusCode::ACCEPTED => Ok(()),
91 _ => Err(parse_error(resp).with_operation("Backend::azdls_append_request")),
92 }
93 }
94
95 async fn close(&self, size: u64) -> Result<Metadata> {
96 let resp = self.core.azdls_flush(&self.path, size, true).await?;
98
99 let mut meta = AzdlsWriter::parse_metadata(resp.headers())?;
100 meta.set_content_length(size);
101
102 match resp.status() {
103 StatusCode::OK | StatusCode::ACCEPTED => Ok(meta),
104 _ => Err(parse_error(resp).with_operation("Backend::azdls_flush_request")),
105 }
106 }
107
108 async fn abort(&self) -> Result<()> {
109 Err(Error::new(
110 ErrorKind::Unsupported,
111 "Abort is not supported for azdls writer",
112 ))
113 }
114}
115
116impl oio::AppendWrite for AzdlsWriter {
117 async fn offset(&self) -> Result<u64> {
118 let resp = self.core.azdls_get_properties(&self.path).await?;
119
120 let status = resp.status();
121 let headers = resp.headers();
122
123 match status {
124 StatusCode::OK => Ok(parse_content_length(headers)?.unwrap_or_default()),
125 StatusCode::NOT_FOUND => Ok(0),
126 _ => Err(parse_error(resp)),
127 }
128 }
129
130 async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
131 if offset == 0 {
132 self.create_if_needed().await?;
134 }
135
136 let resp = self
138 .core
139 .azdls_append(&self.path, Some(size), offset, true, false, body)
140 .await?;
141
142 let mut meta = AzdlsWriter::parse_metadata(resp.headers())?;
143 let md5 = parse_content_md5(resp.headers())?;
144 if let Some(md5) = md5 {
145 meta.set_content_md5(md5);
146 }
147 meta.set_content_length(offset + size);
148
149 match resp.status() {
150 StatusCode::OK | StatusCode::ACCEPTED => Ok(meta),
151 _ => Err(parse_error(resp).with_operation("Backend::azdls_append_request")),
152 }
153 }
154}