object_store_opendal/service/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use object_store::path::Path as ObjectStorePath;
21use object_store::MultipartUpload;
22use object_store::ObjectStore;
23use object_store::PutPayload;
24use object_store::{Attribute, AttributeValue};
25
26use opendal::raw::oio::MultipartPart;
27use opendal::raw::*;
28use opendal::*;
29use tokio::sync::Mutex;
30
31use super::core::{format_put_multipart_options, format_put_result, parse_op_write};
32use super::error::parse_error;
33
34pub struct ObjectStoreWriter {
35    store: Arc<dyn ObjectStore + 'static>,
36    path: ObjectStorePath,
37    args: OpWrite,
38    upload: Mutex<Option<Box<dyn MultipartUpload>>>,
39}
40
41impl ObjectStoreWriter {
42    pub fn new(store: Arc<dyn ObjectStore + 'static>, path: &str, args: OpWrite) -> Self {
43        Self {
44            store,
45            path: ObjectStorePath::from(path),
46            args,
47            upload: Mutex::new(None),
48        }
49    }
50}
51
52impl oio::MultipartWrite for ObjectStoreWriter {
53    /// Write the entire object in one go.
54    /// Used when the object is small enough to bypass multipart upload.
55    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
56        // Validate that actual body size matches expected size
57        let actual_size = body.len() as u64;
58        if actual_size != size {
59            return Err(Error::new(
60                ErrorKind::Unexpected,
61                format!("Expected size {size} but got {actual_size}"),
62            ));
63        }
64
65        let bytes = body.to_bytes();
66        let payload = PutPayload::from(bytes);
67        let mut opts = parse_op_write(&self.args)?;
68
69        // Add size metadata for tracking
70        opts.attributes.insert(
71            Attribute::Metadata("content-size".into()),
72            AttributeValue::from(size.to_string()),
73        );
74
75        let result = self
76            .store
77            .put_opts(&self.path, payload, opts)
78            .await
79            .map_err(parse_error)?;
80
81        // Build metadata from put result
82        let mut metadata = Metadata::new(EntryMode::FILE);
83        if let Some(etag) = &result.e_tag {
84            metadata.set_etag(etag);
85        }
86        if let Some(version) = &result.version {
87            metadata.set_version(version);
88        }
89
90        Ok(metadata)
91    }
92
93    // Generate a unique upload ID that we'll use to track this session
94    async fn initiate_part(&self) -> Result<String> {
95        // Start a new multipart upload using object_store
96        let opts = parse_op_write(&self.args)?;
97        let multipart_opts = format_put_multipart_options(opts);
98        let upload = self
99            .store
100            .put_multipart_opts(&self.path, multipart_opts)
101            .await
102            .map_err(parse_error)?;
103
104        // Store the multipart upload for later use
105        let mut guard = self.upload.lock().await;
106        if guard.is_some() {
107            return Err(Error::new(
108                ErrorKind::Unexpected,
109                "Upload already initiated, abort the previous upload first",
110            ));
111        }
112        *guard = Some(upload);
113
114        // object_store does not provide a way to get the upload id, so we use a fixed string
115        // as the upload id. it's ok because the upload id is already tracked inside the upload
116        // object.
117        Ok("".to_string())
118    }
119
120    /// Upload a single part of the multipart upload.
121    /// Part numbers must be sequential starting from 1.
122    /// Returns the ETag and part information for this uploaded part.
123    async fn write_part(
124        &self,
125        _upload_id: &str,
126        part_number: usize,
127        size: u64,
128        body: Buffer,
129    ) -> Result<MultipartPart> {
130        // Validate that actual body size matches expected size
131        let actual_size = body.len() as u64;
132        if actual_size != size {
133            return Err(Error::new(
134                ErrorKind::Unexpected,
135                format!("Expected size {size} but got {actual_size}"),
136            ));
137        }
138
139        // Convert Buffer to PutPayload
140        let bytes = body.to_bytes();
141
142        // Return empty string as ETag since it's not used by object_store
143        let etag = String::new();
144
145        let payload = PutPayload::from(bytes);
146
147        // Upload the part
148        let mut guard = self.upload.lock().await;
149        let upload = guard
150            .as_mut()
151            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not initiated"))?;
152        upload.put_part(payload).await.map_err(parse_error)?;
153
154        // Create MultipartPart with the proper ETag
155        let multipart_part = MultipartPart {
156            part_number,
157            etag,
158            checksum: None, // No checksum for now
159        };
160        Ok(multipart_part)
161    }
162
163    async fn complete_part(
164        &self,
165        _upload_id: &str,
166        parts: &[oio::MultipartPart],
167    ) -> Result<Metadata> {
168        // Validate that we have parts to complete
169        if parts.is_empty() {
170            return Err(Error::new(
171                ErrorKind::Unexpected,
172                "Cannot complete multipart upload with no parts",
173            ));
174        }
175
176        // Get the multipart upload for this upload_id
177        let mut guard = self.upload.lock().await;
178        let upload = guard
179            .as_mut()
180            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not initiated"))?;
181
182        // Complete the multipart upload
183        let result = upload.complete().await.map_err(parse_error)?;
184        *guard = None;
185
186        // Build metadata from the result
187        let metadata = format_put_result(result);
188        Ok(metadata)
189    }
190
191    async fn abort_part(&self, _upload_id: &str) -> Result<()> {
192        // Get the multipart upload for this upload_id
193        let mut guard = self.upload.lock().await;
194        let upload = guard
195            .as_mut()
196            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not initiated"))?;
197
198        // Abort the multipart upload
199        upload.abort().await.map_err(parse_error)?;
200        *guard = None;
201
202        Ok(())
203    }
204}