opendal/services/s3/
writer.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use constants::X_AMZ_OBJECT_SIZE;
22use constants::X_AMZ_VERSION_ID;
23use http::StatusCode;
24
25use super::core::*;
26use super::error::from_s3_error;
27use super::error::parse_error;
28use super::error::S3Error;
29use crate::raw::*;
30use crate::*;
31
32pub type S3Writers = TwoWays<oio::MultipartWriter<S3Writer>, oio::AppendWriter<S3Writer>>;
33
34pub struct S3Writer {
35 core: Arc<S3Core>,
36
37 op: OpWrite,
38 path: String,
39}
40
41impl S3Writer {
42 pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> Self {
43 S3Writer {
44 core,
45 path: path.to_string(),
46 op,
47 }
48 }
49
50 fn parse_header_into_meta(path: &str, headers: &http::HeaderMap) -> Result<Metadata> {
51 let mut meta = Metadata::new(EntryMode::from_path(path));
52 if let Some(etag) = parse_etag(headers)? {
53 meta.set_etag(etag);
54 }
55 if let Some(version) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
56 meta.set_version(version);
57 }
58 if let Some(size) = parse_header_to_str(headers, X_AMZ_OBJECT_SIZE)? {
59 if let Ok(value) = size.parse() {
60 meta.set_content_length(value);
61 }
62 }
63 Ok(meta)
64 }
65}
66
67impl oio::MultipartWrite for S3Writer {
68 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
69 let mut req = self
70 .core
71 .s3_put_object_request(&self.path, Some(size), &self.op, body)?;
72
73 self.core.sign(&mut req).await?;
74
75 let resp = self.core.send(req).await?;
76
77 let status = resp.status();
78
79 let meta = S3Writer::parse_header_into_meta(&self.path, resp.headers())?;
80
81 match status {
82 StatusCode::CREATED | StatusCode::OK => Ok(meta),
83 _ => Err(parse_error(resp)),
84 }
85 }
86
87 async fn initiate_part(&self) -> Result<String> {
88 let resp = self
89 .core
90 .s3_initiate_multipart_upload(&self.path, &self.op)
91 .await?;
92
93 let status = resp.status();
94
95 match status {
96 StatusCode::OK => {
97 let bs = resp.into_body();
98
99 let result: InitiateMultipartUploadResult =
100 quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
101
102 Ok(result.upload_id)
103 }
104 _ => Err(parse_error(resp)),
105 }
106 }
107
108 async fn write_part(
109 &self,
110 upload_id: &str,
111 part_number: usize,
112 size: u64,
113 body: Buffer,
114 ) -> Result<oio::MultipartPart> {
115 let part_number = part_number + 1;
117
118 let checksum = self.core.calculate_checksum(&body);
119
120 let mut req = self.core.s3_upload_part_request(
121 &self.path,
122 upload_id,
123 part_number,
124 size,
125 body,
126 checksum.clone(),
127 )?;
128
129 self.core.sign(&mut req).await?;
130
131 let resp = self.core.send(req).await?;
132
133 let status = resp.status();
134
135 match status {
136 StatusCode::OK => {
137 let etag = parse_etag(resp.headers())?
138 .ok_or_else(|| {
139 Error::new(
140 ErrorKind::Unexpected,
141 "ETag not present in returning response",
142 )
143 })?
144 .to_string();
145
146 Ok(oio::MultipartPart {
147 part_number,
148 etag,
149 checksum,
150 })
151 }
152 _ => Err(parse_error(resp)),
153 }
154 }
155
156 async fn complete_part(
157 &self,
158 upload_id: &str,
159 parts: &[oio::MultipartPart],
160 ) -> Result<Metadata> {
161 let parts = parts
162 .iter()
163 .map(|p| match &self.core.checksum_algorithm {
164 None => CompleteMultipartUploadRequestPart {
165 part_number: p.part_number,
166 etag: p.etag.clone(),
167 ..Default::default()
168 },
169 Some(checksum_algorithm) => match checksum_algorithm {
170 ChecksumAlgorithm::Crc32c => CompleteMultipartUploadRequestPart {
171 part_number: p.part_number,
172 etag: p.etag.clone(),
173 checksum_crc32c: p.checksum.clone(),
174 },
175 },
176 })
177 .collect();
178
179 let resp = self
180 .core
181 .s3_complete_multipart_upload(&self.path, upload_id, parts)
182 .await?;
183
184 let status = resp.status();
185
186 let mut meta = S3Writer::parse_header_into_meta(&self.path, resp.headers())?;
187
188 match status {
189 StatusCode::OK => {
190 let (parts, body) = resp.into_parts();
193
194 let ret: CompleteMultipartUploadResult =
195 quick_xml::de::from_reader(body.reader()).map_err(new_xml_deserialize_error)?;
196 if !ret.code.is_empty() {
197 return Err(from_s3_error(
198 S3Error {
199 code: ret.code,
200 message: ret.message,
201 resource: "".to_string(),
202 request_id: ret.request_id,
203 },
204 parts,
205 ));
206 }
207 meta.set_etag(&ret.etag);
208
209 Ok(meta)
210 }
211 _ => Err(parse_error(resp)),
212 }
213 }
214
215 async fn abort_part(&self, upload_id: &str) -> Result<()> {
216 let resp = self
217 .core
218 .s3_abort_multipart_upload(&self.path, upload_id)
219 .await?;
220 match resp.status() {
221 StatusCode::NO_CONTENT => Ok(()),
223 _ => Err(parse_error(resp)),
224 }
225 }
226}
227
228impl oio::AppendWrite for S3Writer {
229 async fn offset(&self) -> Result<u64> {
230 let resp = self
231 .core
232 .s3_head_object(&self.path, OpStat::default())
233 .await?;
234
235 let status = resp.status();
236
237 match status {
238 StatusCode::OK => Ok(parse_content_length(resp.headers())?.unwrap_or_default()),
239 StatusCode::NOT_FOUND => Ok(0),
240 _ => Err(parse_error(resp)),
241 }
242 }
243
244 async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
245 let mut req = self
246 .core
247 .s3_append_object_request(&self.path, offset, size, &self.op, body)?;
248
249 self.core.sign(&mut req).await?;
250
251 let resp = self.core.send(req).await?;
252
253 let status = resp.status();
254
255 let meta = S3Writer::parse_header_into_meta(&self.path, resp.headers())?;
256
257 match status {
258 StatusCode::CREATED | StatusCode::OK => Ok(meta),
259 _ => Err(parse_error(resp)),
260 }
261 }
262}