opendal/services/hdfs/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::Write;
19use std::sync::Arc;
20
21use bytes::Buf;
22use futures::AsyncWriteExt;
23
24use crate::raw::*;
25use crate::*;
26
27pub struct HdfsWriter<F> {
28    target_path: String,
29    tmp_path: Option<String>,
30    f: Option<F>,
31    client: Arc<hdrs::Client>,
32    target_path_exists: bool,
33    size: u64,
34}
35
36/// # Safety
37///
38/// We will only take `&mut Self` reference for HdfsWriter.
39unsafe impl<F> Sync for HdfsWriter<F> {}
40
41impl<F> HdfsWriter<F> {
42    pub fn new(
43        target_path: String,
44        tmp_path: Option<String>,
45        f: F,
46        client: Arc<hdrs::Client>,
47        target_path_exists: bool,
48        initial_size: u64,
49    ) -> Self {
50        Self {
51            target_path,
52            tmp_path,
53            f: Some(f),
54            client,
55            target_path_exists,
56            size: initial_size,
57        }
58    }
59}
60
61impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
62    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
63        let len = bs.len() as u64;
64        let f = self.f.as_mut().expect("HdfsWriter must be initialized");
65
66        while bs.has_remaining() {
67            let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
68            bs.advance(n);
69        }
70
71        self.size += len;
72        Ok(())
73    }
74
75    async fn close(&mut self) -> Result<Metadata> {
76        let f = self.f.as_mut().expect("HdfsWriter must be initialized");
77        f.close().await.map_err(new_std_io_error)?;
78
79        // TODO: we need to make rename async.
80        if let Some(tmp_path) = &self.tmp_path {
81            // we must delete the target_path, otherwise the rename_file operation will fail
82            if self.target_path_exists {
83                self.client
84                    .remove_file(&self.target_path)
85                    .map_err(new_std_io_error)?;
86            }
87            self.client
88                .rename_file(tmp_path, &self.target_path)
89                .map_err(new_std_io_error)?
90        }
91
92        Ok(Metadata::default().with_content_length(self.size))
93    }
94
95    async fn abort(&mut self) -> Result<()> {
96        Err(Error::new(
97            ErrorKind::Unsupported,
98            "HdfsWriter doesn't support abort",
99        ))
100    }
101}
102
103impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
104    fn write(&mut self, mut bs: Buffer) -> Result<()> {
105        let len = bs.len() as u64;
106
107        let f = self.f.as_mut().expect("HdfsWriter must be initialized");
108        while bs.has_remaining() {
109            let n = f.write(bs.chunk()).map_err(new_std_io_error)?;
110            bs.advance(n);
111        }
112
113        self.size += len;
114        Ok(())
115    }
116
117    fn close(&mut self) -> Result<Metadata> {
118        let f = self.f.as_mut().expect("HdfsWriter must be initialized");
119        f.flush().map_err(new_std_io_error)?;
120
121        if let Some(tmp_path) = &self.tmp_path {
122            // we must delete the target_path, otherwise the rename_file operation will fail
123            if self.target_path_exists {
124                self.client
125                    .remove_file(&self.target_path)
126                    .map_err(new_std_io_error)?;
127            }
128            self.client
129                .rename_file(tmp_path, &self.target_path)
130                .map_err(new_std_io_error)?;
131        }
132
133        Ok(Metadata::default().with_content_length(self.size))
134    }
135}