opendal/services/hdfs/
writer.rs1use std::io::Write;
19use std::sync::Arc;
20
21use bytes::Buf;
22use futures::AsyncWriteExt;
23
24use crate::raw::*;
25use crate::*;
26
27pub struct HdfsWriter<F> {
28 target_path: String,
29 tmp_path: Option<String>,
30 f: Option<F>,
31 client: Arc<hdrs::Client>,
32 target_path_exists: bool,
33 size: u64,
34}
35
36unsafe impl<F> Sync for HdfsWriter<F> {}
40
41impl<F> HdfsWriter<F> {
42 pub fn new(
43 target_path: String,
44 tmp_path: Option<String>,
45 f: F,
46 client: Arc<hdrs::Client>,
47 target_path_exists: bool,
48 initial_size: u64,
49 ) -> Self {
50 Self {
51 target_path,
52 tmp_path,
53 f: Some(f),
54 client,
55 target_path_exists,
56 size: initial_size,
57 }
58 }
59}
60
61impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
62 async fn write(&mut self, mut bs: Buffer) -> Result<()> {
63 let len = bs.len() as u64;
64 let f = self.f.as_mut().expect("HdfsWriter must be initialized");
65
66 while bs.has_remaining() {
67 let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
68 bs.advance(n);
69 }
70
71 self.size += len;
72 Ok(())
73 }
74
75 async fn close(&mut self) -> Result<Metadata> {
76 let f = self.f.as_mut().expect("HdfsWriter must be initialized");
77 f.close().await.map_err(new_std_io_error)?;
78
79 if let Some(tmp_path) = &self.tmp_path {
81 if self.target_path_exists {
83 self.client
84 .remove_file(&self.target_path)
85 .map_err(new_std_io_error)?;
86 }
87 self.client
88 .rename_file(tmp_path, &self.target_path)
89 .map_err(new_std_io_error)?
90 }
91
92 Ok(Metadata::default().with_content_length(self.size))
93 }
94
95 async fn abort(&mut self) -> Result<()> {
96 Err(Error::new(
97 ErrorKind::Unsupported,
98 "HdfsWriter doesn't support abort",
99 ))
100 }
101}
102
103impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
104 fn write(&mut self, mut bs: Buffer) -> Result<()> {
105 let len = bs.len() as u64;
106
107 let f = self.f.as_mut().expect("HdfsWriter must be initialized");
108 while bs.has_remaining() {
109 let n = f.write(bs.chunk()).map_err(new_std_io_error)?;
110 bs.advance(n);
111 }
112
113 self.size += len;
114 Ok(())
115 }
116
117 fn close(&mut self) -> Result<Metadata> {
118 let f = self.f.as_mut().expect("HdfsWriter must be initialized");
119 f.flush().map_err(new_std_io_error)?;
120
121 if let Some(tmp_path) = &self.tmp_path {
122 if self.target_path_exists {
124 self.client
125 .remove_file(&self.target_path)
126 .map_err(new_std_io_error)?;
127 }
128 self.client
129 .rename_file(tmp_path, &self.target_path)
130 .map_err(new_std_io_error)?;
131 }
132
133 Ok(Metadata::default().with_content_length(self.size))
134 }
135}