opendal/services/compfs/
writer.rs1use std::io::Cursor;
19use std::sync::Arc;
20
21use compio::buf::buf_try;
22use compio::fs::File;
23use compio::io::AsyncWriteExt;
24
25use super::core::CompfsCore;
26use crate::raw::*;
27use crate::*;
28
29#[derive(Debug)]
30pub struct CompfsWriter {
31 core: Arc<CompfsCore>,
32 file: Option<Cursor<File>>,
33}
34
35impl CompfsWriter {
36 pub(super) fn new(core: Arc<CompfsCore>, file: Cursor<File>) -> Self {
37 Self {
38 core,
39 file: Some(file),
40 }
41 }
42}
43
44impl oio::Write for CompfsWriter {
45 async fn write(&mut self, bs: Buffer) -> Result<()> {
51 let Some(mut file) = self.file.clone() else {
52 return Err(Error::new(ErrorKind::Unexpected, "file has closed"));
53 };
54
55 let pos = self
56 .core
57 .exec(move || async move {
58 for b in bs {
59 buf_try!(@try file.write_all(b).await);
60 }
61 Ok(file.position())
62 })
63 .await?;
64 self.file.as_mut().unwrap().set_position(pos);
65
66 Ok(())
67 }
68
69 async fn close(&mut self) -> Result<Metadata> {
70 let Some(f) = self.file.take() else {
71 return Err(Error::new(ErrorKind::Unexpected, "file has closed"));
72 };
73
74 self.core
75 .exec(move || async move {
76 f.get_ref().sync_all().await?;
77 f.into_inner().close().await
78 })
79 .await?;
80
81 Ok(Metadata::default())
82 }
83
84 async fn abort(&mut self) -> Result<()> {
85 Err(Error::new(
86 ErrorKind::Unsupported,
87 "cannot abort completion-based operations",
88 ))
89 }
90}