opendal/services/b2/
writer.rs
1use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22
23use super::core::B2Core;
24use super::core::StartLargeFileResponse;
25use super::core::UploadPartResponse;
26use super::core::UploadResponse;
27use super::error::parse_error;
28use crate::raw::*;
29use crate::*;
30
31pub type B2Writers = oio::MultipartWriter<B2Writer>;
32
33pub struct B2Writer {
34 core: Arc<B2Core>,
35
36 op: OpWrite,
37 path: String,
38}
39
40impl B2Writer {
41 pub fn new(core: Arc<B2Core>, path: &str, op: OpWrite) -> Self {
42 B2Writer {
43 core,
44 path: path.to_string(),
45 op,
46 }
47 }
48
49 pub fn parse_body_into_meta(path: &str, resp: UploadResponse) -> Metadata {
50 let mut meta = Metadata::new(EntryMode::from_path(path));
51
52 if let Some(md5) = resp.content_md5 {
53 meta.set_content_md5(&md5);
54 }
55
56 if let Some(content_type) = resp.content_type {
57 meta.set_content_type(&content_type);
58 }
59
60 meta.set_content_length(resp.content_length);
61
62 meta
63 }
64}
65
66impl oio::MultipartWrite for B2Writer {
67 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
68 let resp = self
69 .core
70 .upload_file(&self.path, Some(size), &self.op, body)
71 .await?;
72
73 let status = resp.status();
74
75 match status {
76 StatusCode::OK => {
77 let bs = resp.into_body();
78
79 let result: UploadResponse =
80 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
81
82 let meta = Self::parse_body_into_meta(&self.path, result);
83
84 Ok(meta)
85 }
86 _ => Err(parse_error(resp)),
87 }
88 }
89
90 async fn initiate_part(&self) -> Result<String> {
91 let resp = self.core.start_large_file(&self.path, &self.op).await?;
92
93 let status = resp.status();
94
95 match status {
96 StatusCode::OK => {
97 let bs = resp.into_body();
98
99 let result: StartLargeFileResponse =
100 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
101
102 Ok(result.file_id)
103 }
104 _ => Err(parse_error(resp)),
105 }
106 }
107
108 async fn write_part(
109 &self,
110 upload_id: &str,
111 part_number: usize,
112 size: u64,
113 body: Buffer,
114 ) -> Result<oio::MultipartPart> {
115 let part_number = part_number + 1;
117
118 let resp = self
119 .core
120 .upload_part(upload_id, part_number, size, body)
121 .await?;
122
123 let status = resp.status();
124
125 match status {
126 StatusCode::OK => {
127 let bs = resp.into_body();
128
129 let result: UploadPartResponse =
130 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
131
132 Ok(oio::MultipartPart {
133 etag: result.content_sha1,
134 part_number,
135 checksum: None,
136 })
137 }
138 _ => Err(parse_error(resp)),
139 }
140 }
141
142 async fn complete_part(
143 &self,
144 upload_id: &str,
145 parts: &[oio::MultipartPart],
146 ) -> Result<Metadata> {
147 let part_sha1_array = parts
148 .iter()
149 .map(|p| {
150 let binding = p.etag.clone();
151 let sha1 = binding.strip_prefix("unverified:");
152 let Some(sha1) = sha1 else {
153 return "".to_string();
154 };
155 sha1.to_string()
156 })
157 .collect();
158
159 let resp = self
160 .core
161 .finish_large_file(upload_id, part_sha1_array)
162 .await?;
163
164 let status = resp.status();
165
166 match status {
167 StatusCode::OK => {
168 let bs = resp.into_body();
169
170 let result: UploadResponse =
171 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
172
173 let meta = Self::parse_body_into_meta(&self.path, result);
174
175 Ok(meta)
176 }
177 _ => Err(parse_error(resp)),
178 }
179 }
180
181 async fn abort_part(&self, upload_id: &str) -> Result<()> {
182 let resp = self.core.cancel_large_file(upload_id).await?;
183 match resp.status() {
184 StatusCode::OK => Ok(()),
186 _ => Err(parse_error(resp)),
187 }
188 }
189}