opendal/services/compfs/
writer.rs1use std::io::Cursor;
19use std::sync::Arc;
20
21use compio::buf::buf_try;
22use compio::fs::File;
23use compio::io::AsyncWriteExt;
24
25use super::core::CompfsCore;
26use crate::raw::*;
27use crate::*;
28
29#[derive(Debug)]
30pub struct CompfsWriter {
31 core: Arc<CompfsCore>,
32 file: Option<Cursor<File>>,
33}
34
35impl CompfsWriter {
36 pub(super) fn new(core: Arc<CompfsCore>, file: Cursor<File>) -> Self {
37 Self {
38 core,
39 file: Some(file),
40 }
41 }
42}
43
44impl oio::Write for CompfsWriter {
45 async fn write(&mut self, bs: Buffer) -> Result<()> {
46 let Some(mut file) = self.file.clone() else {
47 return Err(Error::new(ErrorKind::Unexpected, "file has closed"));
48 };
49
50 let pos = self
51 .core
52 .exec(move || async move {
53 buf_try!(@try file.write_vectored_all(bs).await);
54 Ok(file.position())
55 })
56 .await?;
57 self.file.as_mut().unwrap().set_position(pos);
58
59 Ok(())
60 }
61
62 async fn close(&mut self) -> Result<Metadata> {
63 let Some(f) = self.file.take() else {
64 return Err(Error::new(ErrorKind::Unexpected, "file has closed"));
65 };
66
67 self.core
68 .exec(move || async move {
69 f.get_ref().sync_all().await?;
70 f.into_inner().close().await
71 })
72 .await?;
73
74 Ok(Metadata::default())
75 }
76
77 async fn abort(&mut self) -> Result<()> {
78 Err(Error::new(
79 ErrorKind::Unsupported,
80 "cannot abort completion-based operations",
81 ))
82 }
83}