opendal/services/obs/
writer.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use http::{HeaderMap, HeaderValue, StatusCode};
22
23use super::core::*;
24use super::error::parse_error;
25use crate::raw::oio::MultipartPart;
26use crate::raw::*;
27use crate::*;
28
29pub type ObsWriters = TwoWays<oio::MultipartWriter<ObsWriter>, oio::AppendWriter<ObsWriter>>;
30
31pub struct ObsWriter {
32 core: Arc<ObsCore>,
33
34 op: OpWrite,
35 path: String,
36}
37
38impl ObsWriter {
39 pub fn new(core: Arc<ObsCore>, path: &str, op: OpWrite) -> Self {
40 ObsWriter {
41 core,
42 path: path.to_string(),
43 op,
44 }
45 }
46
47 fn parse_metadata(headers: &HeaderMap<HeaderValue>) -> Result<Metadata> {
48 let mut meta = Metadata::default();
49 if let Some(etag) = parse_etag(headers)? {
50 meta.set_etag(etag);
51 }
52 if let Some(md5) = parse_content_md5(headers)? {
53 meta.set_content_md5(md5);
54 }
55 if let Some(version) = parse_header_to_str(headers, constants::X_OBS_VERSION_ID)? {
56 meta.set_version(version);
57 }
58
59 Ok(meta)
60 }
61}
62
63impl oio::MultipartWrite for ObsWriter {
64 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
65 let mut req = self
66 .core
67 .obs_put_object_request(&self.path, Some(size), &self.op, body)?;
68
69 self.core.sign(&mut req).await?;
70
71 let resp = self.core.send(req).await?;
72
73 let meta = Self::parse_metadata(resp.headers())?;
74
75 let status = resp.status();
76
77 match status {
78 StatusCode::CREATED | StatusCode::OK => Ok(meta),
79 _ => Err(parse_error(resp)),
80 }
81 }
82
83 async fn initiate_part(&self) -> Result<String> {
84 let resp = self
85 .core
86 .obs_initiate_multipart_upload(&self.path, self.op.content_type())
87 .await?;
88
89 let status = resp.status();
90
91 match status {
92 StatusCode::OK => {
93 let bs = resp.into_body();
94
95 let result: InitiateMultipartUploadResult =
96 quick_xml::de::from_reader(bytes::Buf::reader(bs))
97 .map_err(new_xml_deserialize_error)?;
98
99 Ok(result.upload_id)
100 }
101 _ => Err(parse_error(resp)),
102 }
103 }
104
105 async fn write_part(
106 &self,
107 upload_id: &str,
108 part_number: usize,
109 size: u64,
110 body: Buffer,
111 ) -> Result<MultipartPart> {
112 let part_number = part_number + 1;
114
115 let resp = self
116 .core
117 .obs_upload_part_request(&self.path, upload_id, part_number, Some(size), body)
118 .await?;
119
120 let status = resp.status();
121
122 match status {
123 StatusCode::OK => {
124 let etag = parse_etag(resp.headers())?
125 .ok_or_else(|| {
126 Error::new(
127 ErrorKind::Unexpected,
128 "ETag not present in returning response",
129 )
130 })?
131 .to_string();
132
133 Ok(MultipartPart {
134 part_number,
135 etag,
136 checksum: None,
137 })
138 }
139 _ => Err(parse_error(resp)),
140 }
141 }
142
143 async fn complete_part(&self, upload_id: &str, parts: &[MultipartPart]) -> Result<Metadata> {
144 let parts = parts
145 .iter()
146 .map(|p| CompleteMultipartUploadRequestPart {
147 part_number: p.part_number,
148 etag: p.etag.clone(),
149 })
150 .collect();
151
152 let mut resp = self
153 .core
154 .obs_complete_multipart_upload(&self.path, upload_id, parts)
155 .await?;
156
157 let mut meta = Self::parse_metadata(resp.headers())?;
158
159 let result: CompleteMultipartUploadResult =
160 quick_xml::de::from_reader(resp.body_mut().reader())
161 .map_err(new_xml_deserialize_error)?;
162 meta.set_etag(&result.etag);
163
164 let status = resp.status();
165
166 match status {
167 StatusCode::OK => Ok(meta),
168 _ => Err(parse_error(resp)),
169 }
170 }
171
172 async fn abort_part(&self, upload_id: &str) -> Result<()> {
173 let resp = self
174 .core
175 .obs_abort_multipart_upload(&self.path, upload_id)
176 .await?;
177 match resp.status() {
178 StatusCode::NO_CONTENT => Ok(()),
181 _ => Err(parse_error(resp)),
182 }
183 }
184}
185
186impl oio::AppendWrite for ObsWriter {
187 async fn offset(&self) -> Result<u64> {
188 let resp = self
189 .core
190 .obs_head_object(&self.path, &OpStat::default())
191 .await?;
192
193 let status = resp.status();
194 match status {
195 StatusCode::OK => {
196 let content_length = parse_content_length(resp.headers())?.ok_or_else(|| {
197 Error::new(
198 ErrorKind::Unexpected,
199 "Content-Length not present in returning response",
200 )
201 })?;
202 Ok(content_length)
203 }
204 StatusCode::NOT_FOUND => Ok(0),
205 _ => Err(parse_error(resp)),
206 }
207 }
208
209 async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
210 let mut req = self
211 .core
212 .obs_append_object_request(&self.path, offset, size, &self.op, body)?;
213
214 self.core.sign(&mut req).await?;
215
216 let resp = self.core.send(req).await?;
217
218 let mut meta = Metadata::default();
219 if let Some(md5) = parse_content_md5(resp.headers())? {
220 meta.set_content_md5(md5);
221 }
222 if let Some(version) = parse_header_to_str(resp.headers(), constants::X_OBS_VERSION_ID)? {
223 meta.set_version(version);
224 }
225
226 let status = resp.status();
227
228 match status {
229 StatusCode::OK => Ok(meta),
230 _ => Err(parse_error(resp)),
231 }
232 }
233}