opendal/services/s3/
writer.rs1use std::sync::Arc;
19
20use super::core::*;
21use super::error::from_s3_error;
22use super::error::parse_error;
23use super::error::S3Error;
24use crate::raw::*;
25use crate::*;
26use bytes::Buf;
27use constants::{X_AMZ_OBJECT_SIZE, X_AMZ_VERSION_ID};
28use http::StatusCode;
29
30pub type S3Writers = TwoWays<oio::MultipartWriter<S3Writer>, oio::AppendWriter<S3Writer>>;
31
32pub struct S3Writer {
33 core: Arc<S3Core>,
34
35 op: OpWrite,
36 path: String,
37}
38
39impl S3Writer {
40 pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> Self {
41 S3Writer {
42 core,
43 path: path.to_string(),
44 op,
45 }
46 }
47
48 fn parse_header_into_meta(path: &str, headers: &http::HeaderMap) -> Result<Metadata> {
49 let mut meta = Metadata::new(EntryMode::from_path(path));
50 if let Some(etag) = parse_etag(headers)? {
51 meta.set_etag(etag);
52 }
53 if let Some(version) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
54 meta.set_version(version);
55 }
56 if let Some(size) = parse_header_to_str(headers, X_AMZ_OBJECT_SIZE)? {
57 if let Ok(value) = size.parse() {
58 meta.set_content_length(value);
59 }
60 }
61 Ok(meta)
62 }
63}
64
65impl oio::MultipartWrite for S3Writer {
66 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
67 let mut req = self
68 .core
69 .s3_put_object_request(&self.path, Some(size), &self.op, body)?;
70
71 self.core.sign(&mut req).await?;
72
73 let resp = self.core.send(req).await?;
74
75 let status = resp.status();
76
77 let meta = S3Writer::parse_header_into_meta(&self.path, resp.headers())?;
78
79 match status {
80 StatusCode::CREATED | StatusCode::OK => Ok(meta),
81 _ => Err(parse_error(resp)),
82 }
83 }
84
85 async fn initiate_part(&self) -> Result<String> {
86 let resp = self
87 .core
88 .s3_initiate_multipart_upload(&self.path, &self.op)
89 .await?;
90
91 let status = resp.status();
92
93 match status {
94 StatusCode::OK => {
95 let bs = resp.into_body();
96
97 let result: InitiateMultipartUploadResult =
98 quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
99
100 Ok(result.upload_id)
101 }
102 _ => Err(parse_error(resp)),
103 }
104 }
105
106 async fn write_part(
107 &self,
108 upload_id: &str,
109 part_number: usize,
110 size: u64,
111 body: Buffer,
112 ) -> Result<oio::MultipartPart> {
113 let part_number = part_number + 1;
115
116 let checksum = self.core.calculate_checksum(&body);
117
118 let mut req = self.core.s3_upload_part_request(
119 &self.path,
120 upload_id,
121 part_number,
122 size,
123 body,
124 checksum.clone(),
125 )?;
126
127 self.core.sign(&mut req).await?;
128
129 let resp = self.core.send(req).await?;
130
131 let status = resp.status();
132
133 match status {
134 StatusCode::OK => {
135 let etag = parse_etag(resp.headers())?
136 .ok_or_else(|| {
137 Error::new(
138 ErrorKind::Unexpected,
139 "ETag not present in returning response",
140 )
141 })?
142 .to_string();
143
144 Ok(oio::MultipartPart {
145 part_number,
146 etag,
147 checksum,
148 })
149 }
150 _ => Err(parse_error(resp)),
151 }
152 }
153
154 async fn complete_part(
155 &self,
156 upload_id: &str,
157 parts: &[oio::MultipartPart],
158 ) -> Result<Metadata> {
159 let parts = parts
160 .iter()
161 .map(|p| match &self.core.checksum_algorithm {
162 None => CompleteMultipartUploadRequestPart {
163 part_number: p.part_number,
164 etag: p.etag.clone(),
165 ..Default::default()
166 },
167 Some(checksum_algorithm) => match checksum_algorithm {
168 ChecksumAlgorithm::Crc32c => CompleteMultipartUploadRequestPart {
169 part_number: p.part_number,
170 etag: p.etag.clone(),
171 checksum_crc32c: p.checksum.clone(),
172 },
173 },
174 })
175 .collect();
176
177 let resp = self
178 .core
179 .s3_complete_multipart_upload(&self.path, upload_id, parts)
180 .await?;
181
182 let status = resp.status();
183
184 let mut meta = S3Writer::parse_header_into_meta(&self.path, resp.headers())?;
185
186 match status {
187 StatusCode::OK => {
188 let (parts, body) = resp.into_parts();
191
192 let ret: CompleteMultipartUploadResult =
193 quick_xml::de::from_reader(body.reader()).map_err(new_xml_deserialize_error)?;
194 if !ret.code.is_empty() {
195 return Err(from_s3_error(
196 S3Error {
197 code: ret.code,
198 message: ret.message,
199 resource: "".to_string(),
200 request_id: ret.request_id,
201 },
202 parts,
203 ));
204 }
205 meta.set_etag(&ret.etag);
206
207 Ok(meta)
208 }
209 _ => Err(parse_error(resp)),
210 }
211 }
212
213 async fn abort_part(&self, upload_id: &str) -> Result<()> {
214 let resp = self
215 .core
216 .s3_abort_multipart_upload(&self.path, upload_id)
217 .await?;
218 match resp.status() {
219 StatusCode::NO_CONTENT => Ok(()),
221 _ => Err(parse_error(resp)),
222 }
223 }
224}
225
226impl oio::AppendWrite for S3Writer {
227 async fn offset(&self) -> Result<u64> {
228 let resp = self
229 .core
230 .s3_head_object(&self.path, OpStat::default())
231 .await?;
232
233 let status = resp.status();
234
235 match status {
236 StatusCode::OK => Ok(parse_content_length(resp.headers())?.unwrap_or_default()),
237 StatusCode::NOT_FOUND => Ok(0),
238 _ => Err(parse_error(resp)),
239 }
240 }
241
242 async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
243 let mut req = self
244 .core
245 .s3_append_object_request(&self.path, offset, size, &self.op, body)?;
246
247 self.core.sign(&mut req).await?;
248
249 let resp = self.core.send(req).await?;
250
251 let status = resp.status();
252
253 let meta = S3Writer::parse_header_into_meta(&self.path, resp.headers())?;
254
255 match status {
256 StatusCode::CREATED | StatusCode::OK => Ok(meta),
257 _ => Err(parse_error(resp)),
258 }
259 }
260}