opendal/services/webhdfs/
writer.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use http::StatusCode;
22use uuid::Uuid;
23
24use super::core::WebhdfsCore;
25use super::error::parse_error;
26use crate::raw::*;
27use crate::services::webhdfs::message::FileStatusWrapper;
28use crate::*;
29
30pub type WebhdfsWriters =
31 TwoWays<oio::BlockWriter<WebhdfsWriter>, oio::AppendWriter<WebhdfsWriter>>;
32
33pub struct WebhdfsWriter {
34 core: Arc<WebhdfsCore>,
35
36 op: OpWrite,
37 path: String,
38}
39
40impl WebhdfsWriter {
41 pub fn new(core: Arc<WebhdfsCore>, op: OpWrite, path: String) -> Self {
42 WebhdfsWriter { core, op, path }
43 }
44}
45
46impl oio::BlockWrite for WebhdfsWriter {
47 async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
48 let req = self
49 .core
50 .webhdfs_create_object_request(&self.path, Some(size), &self.op, body)
51 .await?;
52
53 let resp = self.core.info.http_client().send(req).await?;
54
55 let status = resp.status();
56 match status {
57 StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()),
58 _ => Err(parse_error(resp)),
59 }
60 }
61
62 async fn write_block(&self, block_id: Uuid, size: u64, body: Buffer) -> Result<()> {
63 let Some(ref atomic_write_dir) = self.core.atomic_write_dir else {
64 return Err(Error::new(
65 ErrorKind::Unsupported,
66 "write multi is not supported when atomic is not set",
67 ));
68 };
69 let req = self
70 .core
71 .webhdfs_create_object_request(
72 &format!("{}{}", atomic_write_dir, block_id),
73 Some(size),
74 &self.op,
75 body,
76 )
77 .await?;
78
79 let resp = self.core.info.http_client().send(req).await?;
80
81 let status = resp.status();
82 match status {
83 StatusCode::CREATED | StatusCode::OK => Ok(()),
84 _ => Err(parse_error(resp)),
85 }
86 }
87
88 async fn complete_block(&self, block_ids: Vec<Uuid>) -> Result<Metadata> {
89 let Some(ref atomic_write_dir) = self.core.atomic_write_dir else {
90 return Err(Error::new(
91 ErrorKind::Unsupported,
92 "write multi is not supported when atomic is not set",
93 ));
94 };
95 let first_block_id = format!("{}{}", atomic_write_dir, block_ids[0].clone());
96 if block_ids.len() >= 2 {
97 let sources: Vec<String> = block_ids[1..]
98 .iter()
99 .map(|s| format!("{}{}", atomic_write_dir, s))
100 .collect();
101 let req = self.core.webhdfs_concat_request(&first_block_id, sources)?;
103
104 let resp = self.core.info.http_client().send(req).await?;
105
106 let status = resp.status();
107
108 if status != StatusCode::OK {
109 return Err(parse_error(resp));
110 }
111 }
112 let resp = self.core.webhdfs_delete(&self.path).await?;
114 let status = resp.status();
115 if status != StatusCode::OK {
116 return Err(parse_error(resp));
117 }
118
119 let resp = self
121 .core
122 .webhdfs_rename_object(&first_block_id, &self.path)
123 .await?;
124
125 let status = resp.status();
126
127 match status {
128 StatusCode::OK => Ok(Metadata::default()),
129 _ => Err(parse_error(resp)),
130 }
131 }
132
133 async fn abort_block(&self, block_ids: Vec<Uuid>) -> Result<()> {
134 for block_id in block_ids {
135 let resp = self.core.webhdfs_delete(&block_id.to_string()).await?;
136 match resp.status() {
137 StatusCode::OK => {}
138 _ => return Err(parse_error(resp)),
139 }
140 }
141 Ok(())
142 }
143}
144
145impl oio::AppendWrite for WebhdfsWriter {
146 async fn offset(&self) -> Result<u64> {
147 let resp = self.core.webhdfs_get_file_status(&self.path).await?;
148 let status = resp.status();
149 match status {
150 StatusCode::OK => {
151 let bs = resp.into_body();
152 let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
153 .map_err(new_json_deserialize_error)?
154 .file_status;
155
156 Ok(file_status.length)
157 }
158 StatusCode::NOT_FOUND => {
159 let req = self
160 .core
161 .webhdfs_create_object_request(&self.path, None, &self.op, Buffer::new())
162 .await?;
163
164 let resp = self.core.info.http_client().send(req).await?;
165 let status = resp.status();
166
167 match status {
168 StatusCode::CREATED | StatusCode::OK => Ok(0),
169 _ => Err(parse_error(resp)),
170 }
171 }
172 _ => Err(parse_error(resp)),
173 }
174 }
175
176 async fn append(&self, _offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
177 let location = self.core.webhdfs_init_append_request(&self.path).await?;
178 let req = self.core.webhdfs_append_request(&location, size, body)?;
179 let resp = self.core.info.http_client().send(req).await?;
180
181 let status = resp.status();
182 match status {
183 StatusCode::OK => Ok(Metadata::default()),
184 _ => Err(parse_error(resp)),
185 }
186 }
187}