object_store_opendal/service/
writer.rs1use std::sync::Arc;
19
20use object_store::path::Path as ObjectStorePath;
21use object_store::MultipartUpload;
22use object_store::ObjectStore;
23use object_store::PutPayload;
24use object_store::{Attribute, AttributeValue};
25
26use opendal::raw::oio::MultipartPart;
27use opendal::raw::*;
28use opendal::*;
29use tokio::sync::Mutex;
30
31use super::core::{format_put_multipart_options, format_put_result, parse_op_write};
32use super::error::parse_error;
33
34pub struct ObjectStoreWriter {
35 store: Arc<dyn ObjectStore + 'static>,
36 path: ObjectStorePath,
37 args: OpWrite,
38 upload: Mutex<Option<Box<dyn MultipartUpload>>>,
39}
40
41impl ObjectStoreWriter {
42 pub fn new(store: Arc<dyn ObjectStore + 'static>, path: &str, args: OpWrite) -> Self {
43 Self {
44 store,
45 path: ObjectStorePath::from(path),
46 args,
47 upload: Mutex::new(None),
48 }
49 }
50}
51
52impl oio::MultipartWrite for ObjectStoreWriter {
53 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
56 let actual_size = body.len() as u64;
58 if actual_size != size {
59 return Err(Error::new(
60 ErrorKind::Unexpected,
61 format!("Expected size {size} but got {actual_size}"),
62 ));
63 }
64
65 let bytes = body.to_bytes();
66 let payload = PutPayload::from(bytes);
67 let mut opts = parse_op_write(&self.args)?;
68
69 opts.attributes.insert(
71 Attribute::Metadata("content-size".into()),
72 AttributeValue::from(size.to_string()),
73 );
74
75 let result = self
76 .store
77 .put_opts(&self.path, payload, opts)
78 .await
79 .map_err(parse_error)?;
80
81 let mut metadata = Metadata::new(EntryMode::FILE);
83 if let Some(etag) = &result.e_tag {
84 metadata.set_etag(etag);
85 }
86 if let Some(version) = &result.version {
87 metadata.set_version(version);
88 }
89
90 Ok(metadata)
91 }
92
93 async fn initiate_part(&self) -> Result<String> {
95 let opts = parse_op_write(&self.args)?;
97 let multipart_opts = format_put_multipart_options(opts);
98 let upload = self
99 .store
100 .put_multipart_opts(&self.path, multipart_opts)
101 .await
102 .map_err(parse_error)?;
103
104 let mut guard = self.upload.lock().await;
106 if guard.is_some() {
107 return Err(Error::new(
108 ErrorKind::Unexpected,
109 "Upload already initiated, abort the previous upload first",
110 ));
111 }
112 *guard = Some(upload);
113
114 Ok("".to_string())
118 }
119
120 async fn write_part(
124 &self,
125 _upload_id: &str,
126 part_number: usize,
127 size: u64,
128 body: Buffer,
129 ) -> Result<MultipartPart> {
130 let actual_size = body.len() as u64;
132 if actual_size != size {
133 return Err(Error::new(
134 ErrorKind::Unexpected,
135 format!("Expected size {size} but got {actual_size}"),
136 ));
137 }
138
139 let bytes = body.to_bytes();
141
142 let etag = String::new();
144
145 let payload = PutPayload::from(bytes);
146
147 let mut guard = self.upload.lock().await;
149 let upload = guard
150 .as_mut()
151 .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not initiated"))?;
152 upload.put_part(payload).await.map_err(parse_error)?;
153
154 let multipart_part = MultipartPart {
156 part_number,
157 etag,
158 checksum: None, };
160 Ok(multipart_part)
161 }
162
163 async fn complete_part(
164 &self,
165 _upload_id: &str,
166 parts: &[oio::MultipartPart],
167 ) -> Result<Metadata> {
168 if parts.is_empty() {
170 return Err(Error::new(
171 ErrorKind::Unexpected,
172 "Cannot complete multipart upload with no parts",
173 ));
174 }
175
176 let mut guard = self.upload.lock().await;
178 let upload = guard
179 .as_mut()
180 .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not initiated"))?;
181
182 let result = upload.complete().await.map_err(parse_error)?;
184 *guard = None;
185
186 let metadata = format_put_result(result);
188 Ok(metadata)
189 }
190
191 async fn abort_part(&self, _upload_id: &str) -> Result<()> {
192 let mut guard = self.upload.lock().await;
194 let upload = guard
195 .as_mut()
196 .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not initiated"))?;
197
198 upload.abort().await.map_err(parse_error)?;
200 *guard = None;
201
202 Ok(())
203 }
204}