opendal/services/webhdfs/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22use uuid::Uuid;
23
24use super::core::WebhdfsCore;
25use super::error::parse_error;
26use crate::raw::*;
27use crate::services::webhdfs::message::FileStatusWrapper;
28use crate::*;
29
30pub type WebhdfsWriters =
31    TwoWays<oio::BlockWriter<WebhdfsWriter>, oio::AppendWriter<WebhdfsWriter>>;
32
33pub struct WebhdfsWriter {
34    core: Arc<WebhdfsCore>,
35
36    op: OpWrite,
37    path: String,
38}
39
40impl WebhdfsWriter {
41    pub fn new(core: Arc<WebhdfsCore>, op: OpWrite, path: String) -> Self {
42        WebhdfsWriter { core, op, path }
43    }
44}
45
46impl oio::BlockWrite for WebhdfsWriter {
47    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
48        let req = self
49            .core
50            .webhdfs_create_object_request(&self.path, Some(size), &self.op, body)
51            .await?;
52
53        let resp = self.core.info.http_client().send(req).await?;
54
55        let status = resp.status();
56        match status {
57            StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()),
58            _ => Err(parse_error(resp)),
59        }
60    }
61
62    async fn write_block(&self, block_id: Uuid, size: u64, body: Buffer) -> Result<()> {
63        let Some(ref atomic_write_dir) = self.core.atomic_write_dir else {
64            return Err(Error::new(
65                ErrorKind::Unsupported,
66                "write multi is not supported when atomic is not set",
67            ));
68        };
69        let req = self
70            .core
71            .webhdfs_create_object_request(
72                &format!("{}{}", atomic_write_dir, block_id),
73                Some(size),
74                &self.op,
75                body,
76            )
77            .await?;
78
79        let resp = self.core.info.http_client().send(req).await?;
80
81        let status = resp.status();
82        match status {
83            StatusCode::CREATED | StatusCode::OK => Ok(()),
84            _ => Err(parse_error(resp)),
85        }
86    }
87
88    async fn complete_block(&self, block_ids: Vec<Uuid>) -> Result<Metadata> {
89        let Some(ref atomic_write_dir) = self.core.atomic_write_dir else {
90            return Err(Error::new(
91                ErrorKind::Unsupported,
92                "write multi is not supported when atomic is not set",
93            ));
94        };
95        let first_block_id = format!("{}{}", atomic_write_dir, block_ids[0].clone());
96        if block_ids.len() >= 2 {
97            let sources: Vec<String> = block_ids[1..]
98                .iter()
99                .map(|s| format!("{}{}", atomic_write_dir, s))
100                .collect();
101            // concat blocks
102            let req = self.core.webhdfs_concat_request(&first_block_id, sources)?;
103
104            let resp = self.core.info.http_client().send(req).await?;
105
106            let status = resp.status();
107
108            if status != StatusCode::OK {
109                return Err(parse_error(resp));
110            }
111        }
112        // delete the path file
113        let resp = self.core.webhdfs_delete(&self.path).await?;
114        let status = resp.status();
115        if status != StatusCode::OK {
116            return Err(parse_error(resp));
117        }
118
119        // rename concat file to path
120        let resp = self
121            .core
122            .webhdfs_rename_object(&first_block_id, &self.path)
123            .await?;
124
125        let status = resp.status();
126
127        match status {
128            StatusCode::OK => Ok(Metadata::default()),
129            _ => Err(parse_error(resp)),
130        }
131    }
132
133    async fn abort_block(&self, block_ids: Vec<Uuid>) -> Result<()> {
134        for block_id in block_ids {
135            let resp = self.core.webhdfs_delete(&block_id.to_string()).await?;
136            match resp.status() {
137                StatusCode::OK => {}
138                _ => return Err(parse_error(resp)),
139            }
140        }
141        Ok(())
142    }
143}
144
145impl oio::AppendWrite for WebhdfsWriter {
146    async fn offset(&self) -> Result<u64> {
147        let resp = self.core.webhdfs_get_file_status(&self.path).await?;
148        let status = resp.status();
149        match status {
150            StatusCode::OK => {
151                let bs = resp.into_body();
152                let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
153                    .map_err(new_json_deserialize_error)?
154                    .file_status;
155
156                Ok(file_status.length)
157            }
158            StatusCode::NOT_FOUND => {
159                let req = self
160                    .core
161                    .webhdfs_create_object_request(&self.path, None, &self.op, Buffer::new())
162                    .await?;
163
164                let resp = self.core.info.http_client().send(req).await?;
165                let status = resp.status();
166
167                match status {
168                    StatusCode::CREATED | StatusCode::OK => Ok(0),
169                    _ => Err(parse_error(resp)),
170                }
171            }
172            _ => Err(parse_error(resp)),
173        }
174    }
175
176    async fn append(&self, _offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
177        let location = self.core.webhdfs_init_append_request(&self.path).await?;
178        let req = self.core.webhdfs_append_request(&location, size, body)?;
179        let resp = self.core.info.http_client().send(req).await?;
180
181        let status = resp.status();
182        match status {
183            StatusCode::OK => Ok(Metadata::default()),
184            _ => Err(parse_error(resp)),
185        }
186    }
187}