opendal/services/hdfs/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use futures::AsyncWriteExt;
22
23use crate::raw::*;
24use crate::*;
25
26pub struct HdfsWriter<F> {
27    target_path: String,
28    tmp_path: Option<String>,
29    f: Option<F>,
30    client: Arc<hdrs::Client>,
31    target_path_exists: bool,
32    size: u64,
33}
34
35/// # Safety
36///
37/// We will only take `&mut Self` reference for HdfsWriter.
38unsafe impl<F> Sync for HdfsWriter<F> {}
39
40impl<F> HdfsWriter<F> {
41    pub fn new(
42        target_path: String,
43        tmp_path: Option<String>,
44        f: F,
45        client: Arc<hdrs::Client>,
46        target_path_exists: bool,
47        initial_size: u64,
48    ) -> Self {
49        Self {
50            target_path,
51            tmp_path,
52            f: Some(f),
53            client,
54            target_path_exists,
55            size: initial_size,
56        }
57    }
58}
59
60impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
61    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
62        let len = bs.len() as u64;
63        let f = self.f.as_mut().expect("HdfsWriter must be initialized");
64
65        while bs.has_remaining() {
66            let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
67            bs.advance(n);
68        }
69
70        self.size += len;
71        Ok(())
72    }
73
74    async fn close(&mut self) -> Result<Metadata> {
75        let f = self.f.as_mut().expect("HdfsWriter must be initialized");
76        f.close().await.map_err(new_std_io_error)?;
77
78        // TODO: we need to make rename async.
79        if let Some(tmp_path) = &self.tmp_path {
80            // we must delete the target_path, otherwise the rename_file operation will fail
81            if self.target_path_exists {
82                self.client
83                    .remove_file(&self.target_path)
84                    .map_err(new_std_io_error)?;
85            }
86            self.client
87                .rename_file(tmp_path, &self.target_path)
88                .map_err(new_std_io_error)?
89        }
90
91        Ok(Metadata::default().with_content_length(self.size))
92    }
93
94    async fn abort(&mut self) -> Result<()> {
95        Err(Error::new(
96            ErrorKind::Unsupported,
97            "HdfsWriter doesn't support abort",
98        ))
99    }
100}