opendal/services/webhdfs/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22use uuid::Uuid;
23
24use super::core::WebhdfsCore;
25use super::error::parse_error;
26use crate::raw::*;
27use crate::services::webhdfs::message::FileStatusWrapper;
28use crate::*;
29
30pub type WebhdfsWriters =
31    TwoWays<oio::BlockWriter<WebhdfsWriter>, oio::AppendWriter<WebhdfsWriter>>;
32
33pub struct WebhdfsWriter {
34    core: Arc<WebhdfsCore>,
35
36    op: OpWrite,
37    path: String,
38}
39
40impl WebhdfsWriter {
41    pub fn new(core: Arc<WebhdfsCore>, op: OpWrite, path: String) -> Self {
42        WebhdfsWriter { core, op, path }
43    }
44}
45
46impl oio::BlockWrite for WebhdfsWriter {
47    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
48        let resp = self
49            .core
50            .webhdfs_create_object(&self.path, Some(size), &self.op, body)
51            .await?;
52
53        let status = resp.status();
54        match status {
55            StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()),
56            _ => Err(parse_error(resp)),
57        }
58    }
59
60    async fn write_block(&self, block_id: Uuid, size: u64, body: Buffer) -> Result<()> {
61        let Some(ref atomic_write_dir) = self.core.atomic_write_dir else {
62            return Err(Error::new(
63                ErrorKind::Unsupported,
64                "write multi is not supported when atomic is not set",
65            ));
66        };
67        let resp = self
68            .core
69            .webhdfs_create_object(
70                &format!("{}{}", atomic_write_dir, block_id),
71                Some(size),
72                &self.op,
73                body,
74            )
75            .await?;
76
77        let status = resp.status();
78        match status {
79            StatusCode::CREATED | StatusCode::OK => Ok(()),
80            _ => Err(parse_error(resp)),
81        }
82    }
83
84    async fn complete_block(&self, block_ids: Vec<Uuid>) -> Result<Metadata> {
85        let Some(ref atomic_write_dir) = self.core.atomic_write_dir else {
86            return Err(Error::new(
87                ErrorKind::Unsupported,
88                "write multi is not supported when atomic is not set",
89            ));
90        };
91        let first_block_id = format!("{}{}", atomic_write_dir, block_ids[0].clone());
92        if block_ids.len() >= 2 {
93            let sources: Vec<String> = block_ids[1..]
94                .iter()
95                .map(|s| format!("{}{}", atomic_write_dir, s))
96                .collect();
97            // concat blocks
98            let resp = self.core.webhdfs_concat(&first_block_id, sources).await?;
99
100            let status = resp.status();
101            if status != StatusCode::OK {
102                return Err(parse_error(resp));
103            }
104        }
105        // delete the path file
106        let resp = self.core.webhdfs_delete(&self.path).await?;
107
108        let status = resp.status();
109        if status != StatusCode::OK {
110            return Err(parse_error(resp));
111        }
112
113        // rename concat file to path
114        let resp = self
115            .core
116            .webhdfs_rename_object(&first_block_id, &self.path)
117            .await?;
118
119        let status = resp.status();
120        match status {
121            StatusCode::OK => Ok(Metadata::default()),
122            _ => Err(parse_error(resp)),
123        }
124    }
125
126    async fn abort_block(&self, block_ids: Vec<Uuid>) -> Result<()> {
127        for block_id in block_ids {
128            let resp = self.core.webhdfs_delete(&block_id.to_string()).await?;
129            match resp.status() {
130                StatusCode::OK => {}
131                _ => return Err(parse_error(resp)),
132            }
133        }
134        Ok(())
135    }
136}
137
138impl oio::AppendWrite for WebhdfsWriter {
139    async fn offset(&self) -> Result<u64> {
140        let resp = self.core.webhdfs_get_file_status(&self.path).await?;
141
142        let status = resp.status();
143        match status {
144            StatusCode::OK => {
145                let bs = resp.into_body();
146                let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
147                    .map_err(new_json_deserialize_error)?
148                    .file_status;
149
150                Ok(file_status.length)
151            }
152            StatusCode::NOT_FOUND => {
153                let resp = self
154                    .core
155                    .webhdfs_create_object(&self.path, None, &self.op, Buffer::new())
156                    .await?;
157
158                let status = resp.status();
159                match status {
160                    StatusCode::CREATED | StatusCode::OK => Ok(0),
161                    _ => Err(parse_error(resp)),
162                }
163            }
164            _ => Err(parse_error(resp)),
165        }
166    }
167
168    async fn append(&self, _offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
169        let resp = self.core.webhdfs_append(&self.path, size, body).await?;
170
171        let status = resp.status();
172        match status {
173            StatusCode::OK => Ok(Metadata::default()),
174            _ => Err(parse_error(resp)),
175        }
176    }
177}