opendal/services/azblob/
writer.rs1use std::sync::Arc;
19
20use http::StatusCode;
21use uuid::Uuid;
22
23use super::core::constants::X_MS_VERSION_ID;
24use super::core::AzblobCore;
25use super::error::parse_error;
26use crate::raw::*;
27use crate::*;
28
29const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";
30
31pub type AzblobWriters = TwoWays<oio::BlockWriter<AzblobWriter>, oio::AppendWriter<AzblobWriter>>;
32
33pub struct AzblobWriter {
34 core: Arc<AzblobCore>,
35
36 op: OpWrite,
37 path: String,
38}
39
40impl AzblobWriter {
41 pub fn new(core: Arc<AzblobCore>, op: OpWrite, path: String) -> Self {
42 AzblobWriter { core, op, path }
43 }
44
45 fn parse_metadata(headers: &http::HeaderMap) -> Result<Metadata> {
48 let mut metadata = Metadata::default();
49
50 if let Some(last_modified) = parse_last_modified(headers)? {
51 metadata.set_last_modified(last_modified);
52 }
53 let etag = parse_etag(headers)?;
54 if let Some(etag) = etag {
55 metadata.set_etag(etag);
56 }
57 let version_id = parse_header_to_str(headers, X_MS_VERSION_ID)?;
58 if let Some(version_id) = version_id {
59 metadata.set_version(version_id);
60 }
61
62 Ok(metadata)
63 }
64}
65
66impl oio::AppendWrite for AzblobWriter {
67 async fn offset(&self) -> Result<u64> {
68 let resp = self
69 .core
70 .azblob_get_blob_properties(&self.path, &OpStat::default())
71 .await?;
72
73 let status = resp.status();
74
75 match status {
76 StatusCode::OK => {
77 let headers = resp.headers();
78 let blob_type = headers.get(X_MS_BLOB_TYPE).and_then(|v| v.to_str().ok());
79 if blob_type != Some("AppendBlob") {
80 return Err(Error::new(
81 ErrorKind::ConditionNotMatch,
82 "the blob is not an appendable blob.",
83 ));
84 }
85
86 Ok(parse_content_length(headers)?.unwrap_or_default())
87 }
88 StatusCode::NOT_FOUND => {
89 let resp = self
90 .core
91 .azblob_init_appendable_blob(&self.path, &self.op)
92 .await?;
93
94 let status = resp.status();
95 match status {
96 StatusCode::CREATED => {
97 }
99 _ => {
100 return Err(parse_error(resp));
101 }
102 }
103 Ok(0)
104 }
105 _ => Err(parse_error(resp)),
106 }
107 }
108
109 async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
110 let resp = self
111 .core
112 .azblob_append_blob(&self.path, offset, size, body)
113 .await?;
114
115 let meta = AzblobWriter::parse_metadata(resp.headers())?;
116 let status = resp.status();
117 match status {
118 StatusCode::CREATED => Ok(meta),
119 _ => Err(parse_error(resp)),
120 }
121 }
122}
123
124impl oio::BlockWrite for AzblobWriter {
125 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
126 let resp = self
127 .core
128 .azblob_put_blob(&self.path, Some(size), &self.op, body)
129 .await?;
130
131 let status = resp.status();
132
133 let mut meta = AzblobWriter::parse_metadata(resp.headers())?;
134 let md5 = parse_content_md5(resp.headers())?;
135 if let Some(md5) = md5 {
136 meta.set_content_md5(md5);
137 }
138 match status {
139 StatusCode::CREATED | StatusCode::OK => Ok(meta),
140 _ => Err(parse_error(resp)),
141 }
142 }
143
144 async fn write_block(&self, block_id: Uuid, size: u64, body: Buffer) -> Result<()> {
145 let resp = self
146 .core
147 .azblob_put_block(&self.path, block_id, Some(size), &self.op, body)
148 .await?;
149
150 let status = resp.status();
151 match status {
152 StatusCode::CREATED | StatusCode::OK => Ok(()),
153 _ => Err(parse_error(resp)),
154 }
155 }
156
157 async fn complete_block(&self, block_ids: Vec<Uuid>) -> Result<Metadata> {
158 let resp = self
159 .core
160 .azblob_complete_put_block_list(&self.path, block_ids, &self.op)
161 .await?;
162
163 let meta = AzblobWriter::parse_metadata(resp.headers())?;
164 let status = resp.status();
165 match status {
166 StatusCode::CREATED | StatusCode::OK => Ok(meta),
167 _ => Err(parse_error(resp)),
168 }
169 }
170
171 async fn abort_block(&self, _block_ids: Vec<Uuid>) -> Result<()> {
172 Ok(())
176 }
177}