opendal/services/webhdfs/
writer.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22use uuid::Uuid;
23
24use super::core::WebhdfsCore;
25use super::error::parse_error;
26use crate::raw::*;
27use crate::services::webhdfs::message::FileStatusWrapper;
28use crate::*;
29
30pub type WebhdfsWriters =
31 TwoWays<oio::BlockWriter<WebhdfsWriter>, oio::AppendWriter<WebhdfsWriter>>;
32
33pub struct WebhdfsWriter {
34 core: Arc<WebhdfsCore>,
35
36 op: OpWrite,
37 path: String,
38}
39
40impl WebhdfsWriter {
41 pub fn new(core: Arc<WebhdfsCore>, op: OpWrite, path: String) -> Self {
42 WebhdfsWriter { core, op, path }
43 }
44}
45
46impl oio::BlockWrite for WebhdfsWriter {
47 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
48 let resp = self
49 .core
50 .webhdfs_create_object(&self.path, Some(size), &self.op, body)
51 .await?;
52
53 let status = resp.status();
54 match status {
55 StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()),
56 _ => Err(parse_error(resp)),
57 }
58 }
59
60 async fn write_block(&self, block_id: Uuid, size: u64, body: Buffer) -> Result<()> {
61 let Some(ref atomic_write_dir) = self.core.atomic_write_dir else {
62 return Err(Error::new(
63 ErrorKind::Unsupported,
64 "write multi is not supported when atomic is not set",
65 ));
66 };
67 let resp = self
68 .core
69 .webhdfs_create_object(
70 &format!("{}{}", atomic_write_dir, block_id),
71 Some(size),
72 &self.op,
73 body,
74 )
75 .await?;
76
77 let status = resp.status();
78 match status {
79 StatusCode::CREATED | StatusCode::OK => Ok(()),
80 _ => Err(parse_error(resp)),
81 }
82 }
83
84 async fn complete_block(&self, block_ids: Vec<Uuid>) -> Result<Metadata> {
85 let Some(ref atomic_write_dir) = self.core.atomic_write_dir else {
86 return Err(Error::new(
87 ErrorKind::Unsupported,
88 "write multi is not supported when atomic is not set",
89 ));
90 };
91 let first_block_id = format!("{}{}", atomic_write_dir, block_ids[0].clone());
92 if block_ids.len() >= 2 {
93 let sources: Vec<String> = block_ids[1..]
94 .iter()
95 .map(|s| format!("{}{}", atomic_write_dir, s))
96 .collect();
97 let resp = self.core.webhdfs_concat(&first_block_id, sources).await?;
99
100 let status = resp.status();
101 if status != StatusCode::OK {
102 return Err(parse_error(resp));
103 }
104 }
105 let resp = self.core.webhdfs_delete(&self.path).await?;
107
108 let status = resp.status();
109 if status != StatusCode::OK {
110 return Err(parse_error(resp));
111 }
112
113 let resp = self
115 .core
116 .webhdfs_rename_object(&first_block_id, &self.path)
117 .await?;
118
119 let status = resp.status();
120 match status {
121 StatusCode::OK => Ok(Metadata::default()),
122 _ => Err(parse_error(resp)),
123 }
124 }
125
126 async fn abort_block(&self, block_ids: Vec<Uuid>) -> Result<()> {
127 for block_id in block_ids {
128 let resp = self.core.webhdfs_delete(&block_id.to_string()).await?;
129 match resp.status() {
130 StatusCode::OK => {}
131 _ => return Err(parse_error(resp)),
132 }
133 }
134 Ok(())
135 }
136}
137
138impl oio::AppendWrite for WebhdfsWriter {
139 async fn offset(&self) -> Result<u64> {
140 let resp = self.core.webhdfs_get_file_status(&self.path).await?;
141
142 let status = resp.status();
143 match status {
144 StatusCode::OK => {
145 let bs = resp.into_body();
146 let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
147 .map_err(new_json_deserialize_error)?
148 .file_status;
149
150 Ok(file_status.length)
151 }
152 StatusCode::NOT_FOUND => {
153 let resp = self
154 .core
155 .webhdfs_create_object(&self.path, None, &self.op, Buffer::new())
156 .await?;
157
158 let status = resp.status();
159 match status {
160 StatusCode::CREATED | StatusCode::OK => Ok(0),
161 _ => Err(parse_error(resp)),
162 }
163 }
164 _ => Err(parse_error(resp)),
165 }
166 }
167
168 async fn append(&self, _offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
169 let resp = self.core.webhdfs_append(&self.path, size, body).await?;
170
171 let status = resp.status();
172 match status {
173 StatusCode::OK => Ok(Metadata::default()),
174 _ => Err(parse_error(resp)),
175 }
176 }
177}