opendal_core/services/hdfs/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::io;
20use std::io::SeekFrom;
21use std::sync::Arc;
22
23use crate::raw::*;
24use crate::*;
25
26/// HdfsCore contains code that directly interacts with HDFS.
27#[derive(Clone)]
28pub struct HdfsCore {
29    pub info: Arc<AccessorInfo>,
30    pub root: String,
31    pub atomic_write_dir: Option<String>,
32    pub client: Arc<hdrs::Client>,
33}
34
35impl Debug for HdfsCore {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_struct("HdfsCore")
38            .field("root", &self.root)
39            .field("atomic_write_dir", &self.atomic_write_dir)
40            .finish_non_exhaustive()
41    }
42}
43
44impl HdfsCore {
45    pub fn hdfs_create_dir(&self, path: &str) -> Result<()> {
46        let p = build_rooted_abs_path(&self.root, path);
47        self.client.create_dir(&p).map_err(new_std_io_error)?;
48        Ok(())
49    }
50
51    pub fn hdfs_stat(&self, path: &str) -> Result<Metadata> {
52        let p = build_rooted_abs_path(&self.root, path);
53        let meta = self.client.metadata(&p).map_err(new_std_io_error)?;
54
55        let mode = if meta.is_dir() {
56            EntryMode::DIR
57        } else if meta.is_file() {
58            EntryMode::FILE
59        } else {
60            EntryMode::Unknown
61        };
62        let mut m = Metadata::new(mode);
63        m.set_content_length(meta.len());
64        m.set_last_modified(Timestamp::try_from(meta.modified())?);
65
66        Ok(m)
67    }
68
69    pub async fn hdfs_read(&self, path: &str, args: &OpRead) -> Result<hdrs::AsyncFile> {
70        let p = build_rooted_abs_path(&self.root, path);
71
72        let client = self.client.clone();
73        let mut f = client
74            .open_file()
75            .read(true)
76            .async_open(&p)
77            .await
78            .map_err(new_std_io_error)?;
79
80        if args.range().offset() != 0 {
81            use futures::AsyncSeekExt;
82
83            f.seek(SeekFrom::Start(args.range().offset()))
84                .await
85                .map_err(new_std_io_error)?;
86        }
87
88        Ok(f)
89    }
90
91    pub async fn hdfs_write(
92        &self,
93        path: &str,
94        op: &OpWrite,
95    ) -> Result<(String, Option<String>, hdrs::AsyncFile, bool, u64)> {
96        let target_path = build_rooted_abs_path(&self.root, path);
97        let mut initial_size = 0;
98        let target_exists = match self.client.metadata(&target_path) {
99            Ok(meta) => {
100                initial_size = meta.len();
101                true
102            }
103            Err(err) => {
104                if err.kind() != io::ErrorKind::NotFound {
105                    return Err(new_std_io_error(err));
106                }
107                false
108            }
109        };
110
111        let should_append = op.append() && target_exists;
112        let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| {
113            // If the target file exists, we should append to the end of it directly.
114            (!should_append).then_some(build_rooted_abs_path(
115                atomic_write_dir,
116                &build_tmp_path_of(path),
117            ))
118        });
119
120        if !target_exists {
121            let parent = get_parent(&target_path);
122            self.client.create_dir(parent).map_err(new_std_io_error)?;
123        }
124        if !should_append {
125            initial_size = 0;
126        }
127
128        let mut open_options = self.client.open_file();
129        open_options.create(true);
130        if should_append {
131            open_options.append(true);
132        } else {
133            open_options.write(true);
134        }
135
136        let f = open_options
137            .async_open(tmp_path.as_ref().unwrap_or(&target_path))
138            .await
139            .map_err(new_std_io_error)?;
140
141        Ok((target_path, tmp_path, f, target_exists, initial_size))
142    }
143
144    pub fn hdfs_list(&self, path: &str) -> Result<Option<hdrs::Readdir>> {
145        let p = build_rooted_abs_path(&self.root, path);
146
147        match self.client.read_dir(&p) {
148            Ok(f) => Ok(Some(f)),
149            Err(e) => {
150                if e.kind() == io::ErrorKind::NotFound {
151                    Ok(None)
152                } else {
153                    Err(new_std_io_error(e))
154                }
155            }
156        }
157    }
158
159    pub fn hdfs_rename(&self, from: &str, to: &str) -> Result<()> {
160        let from_path = build_rooted_abs_path(&self.root, from);
161        self.client.metadata(&from_path).map_err(new_std_io_error)?;
162
163        let to_path = build_rooted_abs_path(&self.root, to);
164        let result = self.client.metadata(&to_path);
165        match result {
166            Err(err) => {
167                // Early return if other error happened.
168                if err.kind() != io::ErrorKind::NotFound {
169                    return Err(new_std_io_error(err));
170                }
171
172                let parent = std::path::PathBuf::from(&to_path)
173                    .parent()
174                    .ok_or_else(|| {
175                        Error::new(
176                            ErrorKind::Unexpected,
177                            "path should have parent but not, it must be malformed",
178                        )
179                        .with_context("input", &to_path)
180                    })?
181                    .to_path_buf();
182
183                self.client
184                    .create_dir(&parent.to_string_lossy())
185                    .map_err(new_std_io_error)?;
186            }
187            Ok(metadata) => {
188                if metadata.is_file() {
189                    self.client
190                        .remove_file(&to_path)
191                        .map_err(new_std_io_error)?;
192                } else {
193                    return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
194                        .with_context("input", &to_path));
195                }
196            }
197        }
198
199        self.client
200            .rename_file(&from_path, &to_path)
201            .map_err(new_std_io_error)?;
202
203        Ok(())
204    }
205}