opendal/services/hdfs/
writer.rs1use std::sync::Arc;
19
20use bytes::Buf;
21use futures::AsyncWriteExt;
22
23use crate::raw::*;
24use crate::*;
25
26pub struct HdfsWriter<F> {
27 target_path: String,
28 tmp_path: Option<String>,
29 f: Option<F>,
30 client: Arc<hdrs::Client>,
31 target_path_exists: bool,
32 size: u64,
33}
34
35unsafe impl<F> Sync for HdfsWriter<F> {}
39
40impl<F> HdfsWriter<F> {
41 pub fn new(
42 target_path: String,
43 tmp_path: Option<String>,
44 f: F,
45 client: Arc<hdrs::Client>,
46 target_path_exists: bool,
47 initial_size: u64,
48 ) -> Self {
49 Self {
50 target_path,
51 tmp_path,
52 f: Some(f),
53 client,
54 target_path_exists,
55 size: initial_size,
56 }
57 }
58}
59
60impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
61 async fn write(&mut self, mut bs: Buffer) -> Result<()> {
62 let len = bs.len() as u64;
63 let f = self.f.as_mut().expect("HdfsWriter must be initialized");
64
65 while bs.has_remaining() {
66 let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
67 bs.advance(n);
68 }
69
70 self.size += len;
71 Ok(())
72 }
73
74 async fn close(&mut self) -> Result<Metadata> {
75 let f = self.f.as_mut().expect("HdfsWriter must be initialized");
76 f.close().await.map_err(new_std_io_error)?;
77
78 if let Some(tmp_path) = &self.tmp_path {
80 if self.target_path_exists {
82 self.client
83 .remove_file(&self.target_path)
84 .map_err(new_std_io_error)?;
85 }
86 self.client
87 .rename_file(tmp_path, &self.target_path)
88 .map_err(new_std_io_error)?
89 }
90
91 Ok(Metadata::default().with_content_length(self.size))
92 }
93
94 async fn abort(&mut self) -> Result<()> {
95 Err(Error::new(
96 ErrorKind::Unsupported,
97 "HdfsWriter doesn't support abort",
98 ))
99 }
100}