opendal/services/oss/
writer.rs1use std::sync::Arc;
19
20use http::{HeaderMap, HeaderValue, StatusCode};
21
22use super::core::*;
23use super::error::parse_error;
24use crate::raw::*;
25use crate::*;
26
27pub type OssWriters = TwoWays<oio::MultipartWriter<OssWriter>, oio::AppendWriter<OssWriter>>;
28
29pub struct OssWriter {
30 core: Arc<OssCore>,
31
32 op: OpWrite,
33 path: String,
34}
35
36impl OssWriter {
37 pub fn new(core: Arc<OssCore>, path: &str, op: OpWrite) -> Self {
38 OssWriter {
39 core,
40 path: path.to_string(),
41 op,
42 }
43 }
44
45 fn parse_metadata(headers: &HeaderMap<HeaderValue>) -> Result<Metadata> {
46 let mut meta = Metadata::default();
47 if let Some(etag) = parse_etag(headers)? {
48 meta.set_etag(etag);
49 }
50 if let Some(md5) = parse_content_md5(headers)? {
51 meta.set_content_md5(md5);
52 }
53 if let Some(version) = parse_header_to_str(headers, constants::X_OSS_VERSION_ID)? {
54 meta.set_version(version);
55 }
56
57 Ok(meta)
58 }
59}
60
61impl oio::MultipartWrite for OssWriter {
62 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
63 let mut req =
64 self.core
65 .oss_put_object_request(&self.path, Some(size), &self.op, body, false)?;
66
67 self.core.sign(&mut req).await?;
68
69 let resp = self.core.send(req).await?;
70
71 let meta = Self::parse_metadata(resp.headers())?;
72 let status = resp.status();
73
74 match status {
75 StatusCode::CREATED | StatusCode::OK => Ok(meta),
76 _ => Err(parse_error(resp)),
77 }
78 }
79
80 async fn initiate_part(&self) -> Result<String> {
81 let resp = self
82 .core
83 .oss_initiate_upload(
84 &self.path,
85 self.op.content_type(),
86 self.op.content_disposition(),
87 self.op.cache_control(),
88 false,
89 )
90 .await?;
91
92 let status = resp.status();
93
94 match status {
95 StatusCode::OK => {
96 let bs = resp.into_body();
97
98 let result: InitiateMultipartUploadResult =
99 quick_xml::de::from_reader(bytes::Buf::reader(bs))
100 .map_err(new_xml_deserialize_error)?;
101
102 Ok(result.upload_id)
103 }
104 _ => Err(parse_error(resp)),
105 }
106 }
107
108 async fn write_part(
109 &self,
110 upload_id: &str,
111 part_number: usize,
112 size: u64,
113 body: Buffer,
114 ) -> Result<oio::MultipartPart> {
115 let part_number = part_number + 1;
117
118 let resp = self
119 .core
120 .oss_upload_part_request(&self.path, upload_id, part_number, false, size, body)
121 .await?;
122
123 let status = resp.status();
124
125 match status {
126 StatusCode::OK => {
127 let etag = parse_etag(resp.headers())?
128 .ok_or_else(|| {
129 Error::new(
130 ErrorKind::Unexpected,
131 "ETag not present in returning response",
132 )
133 })?
134 .to_string();
135
136 Ok(oio::MultipartPart {
137 part_number,
138 etag,
139 checksum: None,
140 })
141 }
142 _ => Err(parse_error(resp)),
143 }
144 }
145
146 async fn complete_part(
147 &self,
148 upload_id: &str,
149 parts: &[oio::MultipartPart],
150 ) -> Result<Metadata> {
151 let parts = parts
152 .iter()
153 .map(|p| MultipartUploadPart {
154 part_number: p.part_number,
155 etag: p.etag.clone(),
156 })
157 .collect();
158
159 let resp = self
160 .core
161 .oss_complete_multipart_upload_request(&self.path, upload_id, false, parts)
162 .await?;
163
164 let meta = Self::parse_metadata(resp.headers())?;
165 let status = resp.status();
166
167 match status {
168 StatusCode::OK => Ok(meta),
169 _ => Err(parse_error(resp)),
170 }
171 }
172
173 async fn abort_part(&self, upload_id: &str) -> Result<()> {
174 let resp = self
175 .core
176 .oss_abort_multipart_upload(&self.path, upload_id)
177 .await?;
178 match resp.status() {
179 StatusCode::NO_CONTENT => Ok(()),
181 _ => Err(parse_error(resp)),
182 }
183 }
184}
185
186impl oio::AppendWrite for OssWriter {
187 async fn offset(&self) -> Result<u64> {
188 let resp = self
189 .core
190 .oss_head_object(&self.path, &OpStat::new())
191 .await?;
192
193 let status = resp.status();
194 match status {
195 StatusCode::OK => {
196 let content_length = parse_content_length(resp.headers())?.ok_or_else(|| {
197 Error::new(
198 ErrorKind::Unexpected,
199 "Content-Length not present in returning response",
200 )
201 })?;
202 Ok(content_length)
203 }
204 StatusCode::NOT_FOUND => Ok(0),
205 _ => Err(parse_error(resp)),
206 }
207 }
208
209 async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
210 let mut req = self
211 .core
212 .oss_append_object_request(&self.path, offset, size, &self.op, body)?;
213
214 self.core.sign(&mut req).await?;
215
216 let resp = self.core.send(req).await?;
217
218 let meta = Self::parse_metadata(resp.headers())?;
219 let status = resp.status();
220
221 match status {
222 StatusCode::OK => Ok(meta),
223 _ => Err(parse_error(resp)),
224 }
225 }
226}