opendal/services/obs/
writer.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use http::HeaderMap;
22use http::HeaderValue;
23use http::StatusCode;
24
25use super::core::*;
26use super::error::parse_error;
27use crate::raw::oio::MultipartPart;
28use crate::raw::*;
29use crate::*;
30
31pub type ObsWriters = TwoWays<oio::MultipartWriter<ObsWriter>, oio::AppendWriter<ObsWriter>>;
32
33pub struct ObsWriter {
34 core: Arc<ObsCore>,
35
36 op: OpWrite,
37 path: String,
38}
39
40impl ObsWriter {
41 pub fn new(core: Arc<ObsCore>, path: &str, op: OpWrite) -> Self {
42 ObsWriter {
43 core,
44 path: path.to_string(),
45 op,
46 }
47 }
48
49 fn parse_metadata(headers: &HeaderMap<HeaderValue>) -> Result<Metadata> {
50 let mut meta = Metadata::default();
51 if let Some(etag) = parse_etag(headers)? {
52 meta.set_etag(etag);
53 }
54 if let Some(md5) = parse_content_md5(headers)? {
55 meta.set_content_md5(md5);
56 }
57 if let Some(version) = parse_header_to_str(headers, constants::X_OBS_VERSION_ID)? {
58 meta.set_version(version);
59 }
60
61 Ok(meta)
62 }
63}
64
65impl oio::MultipartWrite for ObsWriter {
66 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
67 let mut req = self
68 .core
69 .obs_put_object_request(&self.path, Some(size), &self.op, body)?;
70
71 self.core.sign(&mut req).await?;
72
73 let resp = self.core.send(req).await?;
74
75 let meta = Self::parse_metadata(resp.headers())?;
76
77 let status = resp.status();
78
79 match status {
80 StatusCode::CREATED | StatusCode::OK => Ok(meta),
81 _ => Err(parse_error(resp)),
82 }
83 }
84
85 async fn initiate_part(&self) -> Result<String> {
86 let resp = self
87 .core
88 .obs_initiate_multipart_upload(&self.path, self.op.content_type())
89 .await?;
90
91 let status = resp.status();
92
93 match status {
94 StatusCode::OK => {
95 let bs = resp.into_body();
96
97 let result: InitiateMultipartUploadResult =
98 quick_xml::de::from_reader(bytes::Buf::reader(bs))
99 .map_err(new_xml_deserialize_error)?;
100
101 Ok(result.upload_id)
102 }
103 _ => Err(parse_error(resp)),
104 }
105 }
106
107 async fn write_part(
108 &self,
109 upload_id: &str,
110 part_number: usize,
111 size: u64,
112 body: Buffer,
113 ) -> Result<MultipartPart> {
114 let part_number = part_number + 1;
116
117 let resp = self
118 .core
119 .obs_upload_part_request(&self.path, upload_id, part_number, Some(size), body)
120 .await?;
121
122 let status = resp.status();
123
124 match status {
125 StatusCode::OK => {
126 let etag = parse_etag(resp.headers())?
127 .ok_or_else(|| {
128 Error::new(
129 ErrorKind::Unexpected,
130 "ETag not present in returning response",
131 )
132 })?
133 .to_string();
134
135 Ok(MultipartPart {
136 part_number,
137 etag,
138 checksum: None,
139 })
140 }
141 _ => Err(parse_error(resp)),
142 }
143 }
144
145 async fn complete_part(&self, upload_id: &str, parts: &[MultipartPart]) -> Result<Metadata> {
146 let parts = parts
147 .iter()
148 .map(|p| CompleteMultipartUploadRequestPart {
149 part_number: p.part_number,
150 etag: p.etag.clone(),
151 })
152 .collect();
153
154 let mut resp = self
155 .core
156 .obs_complete_multipart_upload(&self.path, upload_id, parts)
157 .await?;
158
159 let mut meta = Self::parse_metadata(resp.headers())?;
160
161 let result: CompleteMultipartUploadResult =
162 quick_xml::de::from_reader(resp.body_mut().reader())
163 .map_err(new_xml_deserialize_error)?;
164 meta.set_etag(&result.etag);
165
166 let status = resp.status();
167
168 match status {
169 StatusCode::OK => Ok(meta),
170 _ => Err(parse_error(resp)),
171 }
172 }
173
174 async fn abort_part(&self, upload_id: &str) -> Result<()> {
175 let resp = self
176 .core
177 .obs_abort_multipart_upload(&self.path, upload_id)
178 .await?;
179 match resp.status() {
180 StatusCode::NO_CONTENT => Ok(()),
183 _ => Err(parse_error(resp)),
184 }
185 }
186}
187
188impl oio::AppendWrite for ObsWriter {
189 async fn offset(&self) -> Result<u64> {
190 let resp = self
191 .core
192 .obs_head_object(&self.path, &OpStat::default())
193 .await?;
194
195 let status = resp.status();
196 match status {
197 StatusCode::OK => {
198 let content_length = parse_content_length(resp.headers())?.ok_or_else(|| {
199 Error::new(
200 ErrorKind::Unexpected,
201 "Content-Length not present in returning response",
202 )
203 })?;
204 Ok(content_length)
205 }
206 StatusCode::NOT_FOUND => Ok(0),
207 _ => Err(parse_error(resp)),
208 }
209 }
210
211 async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
212 let mut req = self
213 .core
214 .obs_append_object_request(&self.path, offset, size, &self.op, body)?;
215
216 self.core.sign(&mut req).await?;
217
218 let resp = self.core.send(req).await?;
219
220 let mut meta = Metadata::default();
221 if let Some(md5) = parse_content_md5(resp.headers())? {
222 meta.set_content_md5(md5);
223 }
224 if let Some(version) = parse_header_to_str(resp.headers(), constants::X_OBS_VERSION_ID)? {
225 meta.set_version(version);
226 }
227
228 let status = resp.status();
229
230 match status {
231 StatusCode::OK => Ok(meta),
232 _ => Err(parse_error(resp)),
233 }
234 }
235}