opendal/services/azblob/
writer.rs1use std::sync::Arc;
19
20use http::StatusCode;
21use uuid::Uuid;
22
23use super::core::AzblobCore;
24use super::error::parse_error;
25use crate::raw::*;
26use crate::*;
27
28const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";
29
30pub type AzblobWriters = TwoWays<oio::BlockWriter<AzblobWriter>, oio::AppendWriter<AzblobWriter>>;
31
32pub struct AzblobWriter {
33 core: Arc<AzblobCore>,
34
35 op: OpWrite,
36 path: String,
37}
38
39impl AzblobWriter {
40 pub fn new(core: Arc<AzblobCore>, op: OpWrite, path: String) -> Self {
41 AzblobWriter { core, op, path }
42 }
43}
44
45impl oio::AppendWrite for AzblobWriter {
46 async fn offset(&self) -> Result<u64> {
47 let resp = self
48 .core
49 .azblob_get_blob_properties(&self.path, &OpStat::default())
50 .await?;
51
52 let status = resp.status();
53
54 match status {
55 StatusCode::OK => {
56 let headers = resp.headers();
57 let blob_type = headers.get(X_MS_BLOB_TYPE).and_then(|v| v.to_str().ok());
58 if blob_type != Some("AppendBlob") {
59 return Err(Error::new(
60 ErrorKind::ConditionNotMatch,
61 "the blob is not an appendable blob.",
62 ));
63 }
64
65 Ok(parse_content_length(headers)?.unwrap_or_default())
66 }
67 StatusCode::NOT_FOUND => {
68 let mut req = self
69 .core
70 .azblob_init_appendable_blob_request(&self.path, &self.op)?;
71
72 self.core.sign(&mut req).await?;
73
74 let resp = self.core.info.http_client().send(req).await?;
75
76 let status = resp.status();
77 match status {
78 StatusCode::CREATED => {
79 }
81 _ => {
82 return Err(parse_error(resp));
83 }
84 }
85 Ok(0)
86 }
87 _ => Err(parse_error(resp)),
88 }
89 }
90
91 async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
92 let mut req = self
93 .core
94 .azblob_append_blob_request(&self.path, offset, size, body)?;
95
96 self.core.sign(&mut req).await?;
97
98 let resp = self.core.send(req).await?;
99
100 let status = resp.status();
101 match status {
102 StatusCode::CREATED => Ok(Metadata::default()),
103 _ => Err(parse_error(resp)),
104 }
105 }
106}
107
108impl oio::BlockWrite for AzblobWriter {
109 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
110 let mut req: http::Request<Buffer> =
111 self.core
112 .azblob_put_blob_request(&self.path, Some(size), &self.op, body)?;
113 self.core.sign(&mut req).await?;
114
115 let resp = self.core.send(req).await?;
116
117 let status = resp.status();
118
119 match status {
120 StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()),
121 _ => Err(parse_error(resp)),
122 }
123 }
124
125 async fn write_block(&self, block_id: Uuid, size: u64, body: Buffer) -> Result<()> {
126 let resp = self
127 .core
128 .azblob_put_block(&self.path, block_id, Some(size), &self.op, body)
129 .await?;
130
131 let status = resp.status();
132 match status {
133 StatusCode::CREATED | StatusCode::OK => Ok(()),
134 _ => Err(parse_error(resp)),
135 }
136 }
137
138 async fn complete_block(&self, block_ids: Vec<Uuid>) -> Result<Metadata> {
139 let resp = self
140 .core
141 .azblob_complete_put_block_list(&self.path, block_ids, &self.op)
142 .await?;
143
144 let status = resp.status();
145 match status {
146 StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()),
147 _ => Err(parse_error(resp)),
148 }
149 }
150
151 async fn abort_block(&self, _block_ids: Vec<Uuid>) -> Result<()> {
152 Ok(())
156 }
157}