opendal/services/aliyun_drive/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21
22use super::core::AliyunDriveCore;
23use super::core::CheckNameMode;
24use super::core::CreateResponse;
25use super::core::CreateType;
26use crate::raw::*;
27use crate::*;
28
29pub struct AliyunDriveWriter {
30    core: Arc<AliyunDriveCore>,
31
32    _op: OpWrite,
33    parent_file_id: String,
34    name: String,
35
36    file_id: Option<String>,
37    upload_id: Option<String>,
38    part_number: usize,
39}
40
41impl AliyunDriveWriter {
42    pub fn new(core: Arc<AliyunDriveCore>, parent_file_id: &str, name: &str, op: OpWrite) -> Self {
43        AliyunDriveWriter {
44            core,
45            _op: op,
46            parent_file_id: parent_file_id.to_string(),
47            name: name.to_string(),
48            file_id: None,
49            upload_id: None,
50            part_number: 1, // must start from 1
51        }
52    }
53}
54
55impl oio::Write for AliyunDriveWriter {
56    async fn write(&mut self, bs: Buffer) -> Result<()> {
57        let (upload_id, file_id) = match (self.upload_id.as_ref(), self.file_id.as_ref()) {
58            (Some(upload_id), Some(file_id)) => (upload_id, file_id),
59            _ => {
60                let res = self
61                    .core
62                    .create(
63                        Some(&self.parent_file_id),
64                        &self.name,
65                        CreateType::File,
66                        CheckNameMode::Refuse,
67                    )
68                    .await?;
69                let output: CreateResponse =
70                    serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
71                if output.exist.is_some_and(|x| x) {
72                    return Err(Error::new(ErrorKind::AlreadyExists, "file exists"));
73                }
74                self.upload_id = output.upload_id;
75                self.file_id = Some(output.file_id);
76                (
77                    self.upload_id.as_ref().expect("cannot find upload_id"),
78                    self.file_id.as_ref().expect("cannot find file_id"),
79                )
80            }
81        };
82
83        if let Err(err) = self
84            .core
85            .upload(file_id, upload_id, self.part_number, bs)
86            .await
87        {
88            if err.kind() != ErrorKind::AlreadyExists {
89                return Err(err);
90            }
91        };
92
93        self.part_number += 1;
94
95        Ok(())
96    }
97
98    async fn close(&mut self) -> Result<Metadata> {
99        let (Some(upload_id), Some(file_id)) = (self.upload_id.as_ref(), self.file_id.as_ref())
100        else {
101            return Ok(Metadata::default());
102        };
103
104        self.core.complete(file_id, upload_id).await?;
105        Ok(Metadata::default())
106    }
107
108    async fn abort(&mut self) -> Result<()> {
109        let Some(file_id) = self.file_id.as_ref() else {
110            return Ok(());
111        };
112        self.core.delete_path(file_id).await
113    }
114}