opendal/raw/oio/write/
append_write.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::future::Future;
19
20use crate::raw::*;
21use crate::*;
22
23/// AppendWrite is used to implement [`oio::Write`] based on append
24/// object. By implementing AppendWrite, services don't need to
25/// care about the details of buffering and uploading parts.
26///
27/// The layout after adopting [`AppendWrite`]:
28///
29/// - Services impl `AppendWrite`
30/// - `AppendWriter` impl `Write`
31/// - Expose `AppendWriter` as `Accessor::Writer`
32///
33/// ## Requirements
34///
35/// Services that implement `AppendWrite` must fulfill the following requirements:
36///
37/// - Must be a http service that could accept `AsyncBody`.
38/// - Provide a way to get the current offset of the append object.
39pub trait AppendWrite: Send + Sync + Unpin + 'static {
40    /// Get the current offset of the append object.
41    ///
42    /// Returns `0` if the object is not exist.
43    fn offset(&self) -> impl Future<Output = Result<u64>> + MaybeSend;
44
45    /// Append the data to the end of this object.
46    fn append(
47        &self,
48        offset: u64,
49        size: u64,
50        body: Buffer,
51    ) -> impl Future<Output = Result<Metadata>> + MaybeSend;
52}
53
54/// AppendWriter will implements [`oio::Write`] based on append object.
55///
56/// ## TODO
57///
58/// - Allow users to switch to un-buffered mode if users write 16MiB every time.
59pub struct AppendWriter<W: AppendWrite> {
60    inner: W,
61
62    offset: Option<u64>,
63
64    meta: Metadata,
65}
66
67/// # Safety
68///
69/// wasm32 is a special target that we only have one event-loop for this state.
70impl<W: AppendWrite> AppendWriter<W> {
71    /// Create a new AppendWriter.
72    pub fn new(inner: W) -> Self {
73        Self {
74            inner,
75            offset: None,
76            meta: Metadata::default(),
77        }
78    }
79}
80
81impl<W> oio::Write for AppendWriter<W>
82where
83    W: AppendWrite,
84{
85    async fn write(&mut self, bs: Buffer) -> Result<()> {
86        let offset = match self.offset {
87            Some(offset) => offset,
88            None => {
89                let offset = self.inner.offset().await?;
90                self.offset = Some(offset);
91                offset
92            }
93        };
94
95        let size = bs.len();
96        self.meta = self.inner.append(offset, size as u64, bs).await?;
97        // Update offset after succeed.
98        self.offset = Some(offset + size as u64);
99        Ok(())
100    }
101
102    async fn close(&mut self) -> Result<Metadata> {
103        self.meta
104            .set_content_length(self.offset.unwrap_or_default());
105        Ok(self.meta.clone())
106    }
107
108    async fn abort(&mut self) -> Result<()> {
109        Ok(())
110    }
111}