opendal/services/cos/
writer.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use http::{HeaderMap, HeaderValue, StatusCode};
22
23use super::core::*;
24use super::error::parse_error;
25use crate::raw::*;
26use crate::*;
27
28pub type CosWriters = TwoWays<oio::MultipartWriter<CosWriter>, oio::AppendWriter<CosWriter>>;
29
30pub struct CosWriter {
31 core: Arc<CosCore>,
32
33 op: OpWrite,
34 path: String,
35}
36
37impl CosWriter {
38 pub fn new(core: Arc<CosCore>, path: &str, op: OpWrite) -> Self {
39 CosWriter {
40 core,
41 path: path.to_string(),
42 op,
43 }
44 }
45
46 fn parse_metadata(headers: &HeaderMap<HeaderValue>) -> Result<Metadata> {
47 let mut meta = Metadata::default();
48 if let Some(etag) = parse_etag(headers)? {
49 meta.set_etag(etag);
50 }
51 if let Some(md5) = parse_content_md5(headers)? {
52 meta.set_content_md5(md5);
53 }
54 if let Some(version) = parse_header_to_str(headers, constants::X_COS_VERSION_ID)? {
55 if version != "null" {
56 meta.set_version(version);
57 }
58 }
59
60 Ok(meta)
61 }
62}
63
64impl oio::MultipartWrite for CosWriter {
65 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
66 let mut req = self
67 .core
68 .cos_put_object_request(&self.path, Some(size), &self.op, body)?;
69
70 self.core.sign(&mut req).await?;
71
72 let resp = self.core.send(req).await?;
73
74 let meta = Self::parse_metadata(resp.headers())?;
75
76 let status = resp.status();
77
78 match status {
79 StatusCode::CREATED | StatusCode::OK => Ok(meta),
80 _ => Err(parse_error(resp)),
81 }
82 }
83
84 async fn initiate_part(&self) -> Result<String> {
85 let resp = self
86 .core
87 .cos_initiate_multipart_upload(&self.path, &self.op)
88 .await?;
89
90 let status = resp.status();
91
92 match status {
93 StatusCode::OK => {
94 let bs = resp.into_body();
95
96 let result: InitiateMultipartUploadResult =
97 quick_xml::de::from_reader(bytes::Buf::reader(bs))
98 .map_err(new_xml_deserialize_error)?;
99
100 Ok(result.upload_id)
101 }
102 _ => Err(parse_error(resp)),
103 }
104 }
105
106 async fn write_part(
107 &self,
108 upload_id: &str,
109 part_number: usize,
110 size: u64,
111 body: Buffer,
112 ) -> Result<oio::MultipartPart> {
113 let part_number = part_number + 1;
115
116 let resp = self
117 .core
118 .cos_upload_part_request(&self.path, upload_id, part_number, size, body)
119 .await?;
120
121 let status = resp.status();
122
123 match status {
124 StatusCode::OK => {
125 let etag = parse_etag(resp.headers())?
126 .ok_or_else(|| {
127 Error::new(
128 ErrorKind::Unexpected,
129 "ETag not present in returning response",
130 )
131 })?
132 .to_string();
133
134 Ok(oio::MultipartPart {
135 part_number,
136 etag,
137 checksum: None,
138 })
139 }
140 _ => Err(parse_error(resp)),
141 }
142 }
143
144 async fn complete_part(
145 &self,
146 upload_id: &str,
147 parts: &[oio::MultipartPart],
148 ) -> Result<Metadata> {
149 let parts = parts
150 .iter()
151 .map(|p| CompleteMultipartUploadRequestPart {
152 part_number: p.part_number,
153 etag: p.etag.clone(),
154 })
155 .collect();
156
157 let mut resp = self
158 .core
159 .cos_complete_multipart_upload(&self.path, upload_id, parts)
160 .await?;
161
162 let mut meta = Self::parse_metadata(resp.headers())?;
163
164 let result: CompleteMultipartUploadResult =
165 quick_xml::de::from_reader(resp.body_mut().reader())
166 .map_err(new_xml_deserialize_error)?;
167 meta.set_etag(&result.etag);
168
169 let status = resp.status();
170
171 match status {
172 StatusCode::OK => Ok(meta),
173 _ => Err(parse_error(resp)),
174 }
175 }
176
177 async fn abort_part(&self, upload_id: &str) -> Result<()> {
178 let resp = self
179 .core
180 .cos_abort_multipart_upload(&self.path, upload_id)
181 .await?;
182 match resp.status() {
183 StatusCode::NO_CONTENT => Ok(()),
186 _ => Err(parse_error(resp)),
187 }
188 }
189}
190
191impl oio::AppendWrite for CosWriter {
192 async fn offset(&self) -> Result<u64> {
193 let resp = self
194 .core
195 .cos_head_object(&self.path, &OpStat::default())
196 .await?;
197
198 let status = resp.status();
199 match status {
200 StatusCode::OK => {
201 let content_length = parse_content_length(resp.headers())?.ok_or_else(|| {
202 Error::new(
203 ErrorKind::Unexpected,
204 "Content-Length not present in returning response",
205 )
206 })?;
207 Ok(content_length)
208 }
209 StatusCode::NOT_FOUND => Ok(0),
210 _ => Err(parse_error(resp)),
211 }
212 }
213
214 async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
215 let mut req = self
216 .core
217 .cos_append_object_request(&self.path, offset, size, &self.op, body)?;
218
219 self.core.sign(&mut req).await?;
220
221 let resp = self.core.send(req).await?;
222
223 let meta = Self::parse_metadata(resp.headers())?;
224
225 let status = resp.status();
226
227 match status {
228 StatusCode::OK => Ok(meta),
229 _ => Err(parse_error(resp)),
230 }
231 }
232}