opendal/services/oss/
writer.rs1use std::sync::Arc;
19
20use http::HeaderMap;
21use http::HeaderValue;
22use http::StatusCode;
23
24use super::core::*;
25use super::error::parse_error;
26use crate::raw::*;
27use crate::*;
28
29pub type OssWriters = TwoWays<oio::MultipartWriter<OssWriter>, oio::AppendWriter<OssWriter>>;
30
31pub struct OssWriter {
32 core: Arc<OssCore>,
33
34 op: OpWrite,
35 path: String,
36}
37
38impl OssWriter {
39 pub fn new(core: Arc<OssCore>, path: &str, op: OpWrite) -> Self {
40 OssWriter {
41 core,
42 path: path.to_string(),
43 op,
44 }
45 }
46
47 fn parse_metadata(headers: &HeaderMap<HeaderValue>) -> Result<Metadata> {
48 let mut meta = Metadata::default();
49 if let Some(etag) = parse_etag(headers)? {
50 meta.set_etag(etag);
51 }
52 if let Some(md5) = parse_content_md5(headers)? {
53 meta.set_content_md5(md5);
54 }
55 if let Some(version) = parse_header_to_str(headers, constants::X_OSS_VERSION_ID)? {
56 meta.set_version(version);
57 }
58
59 Ok(meta)
60 }
61}
62
63impl oio::MultipartWrite for OssWriter {
64 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
65 let mut req =
66 self.core
67 .oss_put_object_request(&self.path, Some(size), &self.op, body, false)?;
68
69 self.core.sign(&mut req).await?;
70
71 let resp = self.core.send(req).await?;
72
73 let meta = Self::parse_metadata(resp.headers())?;
74 let status = resp.status();
75
76 match status {
77 StatusCode::CREATED | StatusCode::OK => Ok(meta),
78 _ => Err(parse_error(resp)),
79 }
80 }
81
82 async fn initiate_part(&self) -> Result<String> {
83 let resp = self
84 .core
85 .oss_initiate_upload(
86 &self.path,
87 self.op.content_type(),
88 self.op.content_disposition(),
89 self.op.cache_control(),
90 false,
91 )
92 .await?;
93
94 let status = resp.status();
95
96 match status {
97 StatusCode::OK => {
98 let bs = resp.into_body();
99
100 let result: InitiateMultipartUploadResult =
101 quick_xml::de::from_reader(bytes::Buf::reader(bs))
102 .map_err(new_xml_deserialize_error)?;
103
104 Ok(result.upload_id)
105 }
106 _ => Err(parse_error(resp)),
107 }
108 }
109
110 async fn write_part(
111 &self,
112 upload_id: &str,
113 part_number: usize,
114 size: u64,
115 body: Buffer,
116 ) -> Result<oio::MultipartPart> {
117 let part_number = part_number + 1;
119
120 let resp = self
121 .core
122 .oss_upload_part_request(&self.path, upload_id, part_number, false, size, body)
123 .await?;
124
125 let status = resp.status();
126
127 match status {
128 StatusCode::OK => {
129 let etag = parse_etag(resp.headers())?
130 .ok_or_else(|| {
131 Error::new(
132 ErrorKind::Unexpected,
133 "ETag not present in returning response",
134 )
135 })?
136 .to_string();
137
138 Ok(oio::MultipartPart {
139 part_number,
140 etag,
141 checksum: None,
142 })
143 }
144 _ => Err(parse_error(resp)),
145 }
146 }
147
148 async fn complete_part(
149 &self,
150 upload_id: &str,
151 parts: &[oio::MultipartPart],
152 ) -> Result<Metadata> {
153 let parts = parts
154 .iter()
155 .map(|p| MultipartUploadPart {
156 part_number: p.part_number,
157 etag: p.etag.clone(),
158 })
159 .collect();
160
161 let resp = self
162 .core
163 .oss_complete_multipart_upload_request(&self.path, upload_id, false, parts)
164 .await?;
165
166 let meta = Self::parse_metadata(resp.headers())?;
167 let status = resp.status();
168
169 match status {
170 StatusCode::OK => Ok(meta),
171 _ => Err(parse_error(resp)),
172 }
173 }
174
175 async fn abort_part(&self, upload_id: &str) -> Result<()> {
176 let resp = self
177 .core
178 .oss_abort_multipart_upload(&self.path, upload_id)
179 .await?;
180 match resp.status() {
181 StatusCode::NO_CONTENT => Ok(()),
183 _ => Err(parse_error(resp)),
184 }
185 }
186}
187
188impl oio::AppendWrite for OssWriter {
189 async fn offset(&self) -> Result<u64> {
190 let resp = self
191 .core
192 .oss_head_object(&self.path, &OpStat::new())
193 .await?;
194
195 let status = resp.status();
196 match status {
197 StatusCode::OK => {
198 let content_length = parse_content_length(resp.headers())?.ok_or_else(|| {
199 Error::new(
200 ErrorKind::Unexpected,
201 "Content-Length not present in returning response",
202 )
203 })?;
204 Ok(content_length)
205 }
206 StatusCode::NOT_FOUND => Ok(0),
207 _ => Err(parse_error(resp)),
208 }
209 }
210
211 async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
212 let mut req = self
213 .core
214 .oss_append_object_request(&self.path, offset, size, &self.op, body)?;
215
216 self.core.sign(&mut req).await?;
217
218 let resp = self.core.send(req).await?;
219
220 let meta = Self::parse_metadata(resp.headers())?;
221 let status = resp.status();
222
223 match status {
224 StatusCode::OK => Ok(meta),
225 _ => Err(parse_error(resp)),
226 }
227 }
228}