virtiofs_opendal/
filesystem.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::ffi::CStr;
20use std::io::Read;
21use std::io::Write;
22use std::mem::size_of;
23use std::sync::Mutex;
24use std::time::Duration;
25
26use log::debug;
27use opendal::Buffer;
28use opendal::ErrorKind;
29use opendal::Operator;
30use sharded_slab::Slab;
31use tokio::runtime::Builder;
32use tokio::runtime::Runtime;
33use vm_memory::ByteValued;
34
35use crate::buffer::BufferWrapper;
36use crate::error::*;
37use crate::filesystem_message::*;
38use crate::virtiofs_util::Reader;
39use crate::virtiofs_util::Writer;
40
41/// Version number of this interface.
42const KERNEL_VERSION: u32 = 7;
43/// Minor version number of this interface.
44const KERNEL_MINOR_VERSION: u32 = 38;
45/// Minimum Minor version number supported.
46const MIN_KERNEL_MINOR_VERSION: u32 = 27;
47/// The length of the header part of the message.
48const BUFFER_HEADER_SIZE: u32 = 4096;
49/// The maximum length of the data part of the message, used for read/write data.
50const MAX_BUFFER_SIZE: u32 = 1 << 20;
51/// The default time to live of the attributes.
52const DEFAULT_TTL: Duration = Duration::from_secs(1);
53/// The default mode of the opened file.
54const DEFAULT_OPENED_FILE_MODE: u32 = 0o755;
55
56enum FileType {
57    Dir,
58    File,
59}
60
61struct InnerWriter {
62    writer: opendal::Writer,
63    written: u64,
64}
65
66#[derive(Clone)]
67struct OpenedFile {
68    path: String,
69    metadata: Attr,
70}
71
72impl OpenedFile {
73    fn new(file_type: FileType, path: &str, uid: u32, gid: u32) -> OpenedFile {
74        let mut attr: Attr = unsafe { std::mem::zeroed() };
75        attr.uid = uid;
76        attr.gid = gid;
77        match file_type {
78            FileType::Dir => {
79                attr.nlink = 2;
80                attr.mode = libc::S_IFDIR | DEFAULT_OPENED_FILE_MODE;
81            }
82            FileType::File => {
83                attr.nlink = 1;
84                attr.mode = libc::S_IFREG | DEFAULT_OPENED_FILE_MODE;
85            }
86        }
87        OpenedFile {
88            path: path.to_string(),
89            metadata: attr,
90        }
91    }
92}
93
94fn opendal_error2error(error: opendal::Error) -> Error {
95    match error.kind() {
96        ErrorKind::Unsupported => Error::from(libc::EOPNOTSUPP),
97        ErrorKind::IsADirectory => Error::from(libc::EISDIR),
98        ErrorKind::NotFound => Error::from(libc::ENOENT),
99        ErrorKind::PermissionDenied => Error::from(libc::EACCES),
100        ErrorKind::AlreadyExists => Error::from(libc::EEXIST),
101        ErrorKind::NotADirectory => Error::from(libc::ENOTDIR),
102        ErrorKind::RangeNotSatisfied => Error::from(libc::EINVAL),
103        ErrorKind::RateLimited => Error::from(libc::EBUSY),
104        _ => Error::from(libc::ENOENT),
105    }
106}
107
108fn opendal_metadata2opened_file(
109    path: &str,
110    metadata: &opendal::Metadata,
111    uid: u32,
112    gid: u32,
113) -> OpenedFile {
114    let file_type = match metadata.mode() {
115        opendal::EntryMode::DIR => FileType::Dir,
116        _ => FileType::File,
117    };
118    OpenedFile::new(file_type, path, uid, gid)
119}
120
121/// Filesystem is a filesystem implementation with opendal backend,
122/// and will decode and process messages from VMs.
123pub struct Filesystem {
124    rt: Runtime,
125    core: Operator,
126    uid: u32,
127    gid: u32,
128    opened_files: Slab<OpenedFile>,
129    // Since we need to manually manage the allocation of inodes,
130    // we record the inode of each opened file here.
131    opened_files_map: Mutex<HashMap<String, u64>>,
132    opened_files_writer: tokio::sync::Mutex<HashMap<String, InnerWriter>>,
133}
134
135impl Filesystem {
136    pub fn new(core: Operator) -> Filesystem {
137        let rt = Builder::new_multi_thread()
138            .worker_threads(4)
139            .enable_all()
140            .build()
141            .unwrap();
142
143        // Here we set the uid and gid to 1000, which is the default value.
144        Filesystem {
145            rt,
146            core,
147            uid: 1000,
148            gid: 1000,
149            opened_files: Slab::new(),
150            opened_files_map: Mutex::new(HashMap::new()),
151            opened_files_writer: tokio::sync::Mutex::new(HashMap::new()),
152        }
153    }
154
155    pub fn handle_message(&self, mut r: Reader, w: Writer) -> Result<usize> {
156        let in_header: InHeader = r.read_obj().map_err(|e| {
157            new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
158        })?;
159        if in_header.len > (MAX_BUFFER_SIZE + BUFFER_HEADER_SIZE) {
160            // The message is too long here.
161            return Filesystem::reply_error(in_header.unique, w);
162        }
163        if let Ok(opcode) = Opcode::try_from(in_header.opcode) {
164            match opcode {
165                Opcode::Init => self.init(in_header, r, w),
166                Opcode::Destroy => self.destroy(in_header, r, w),
167                Opcode::Lookup => self.lookup(in_header, r, w),
168                Opcode::Getattr => self.getattr(in_header, r, w),
169                Opcode::Setattr => self.setattr(in_header, r, w),
170                Opcode::Create => self.create(in_header, r, w),
171                Opcode::Unlink => self.unlink(in_header, r, w),
172                Opcode::Release => self.release(in_header, r, w),
173                Opcode::Flush => self.flush(in_header, r, w),
174                Opcode::Forget => self.forget(in_header, r),
175                Opcode::Open => self.open(in_header, r, w),
176                Opcode::Read => self.read(in_header, r, w),
177                Opcode::Write => self.write(in_header, r, w),
178            }
179        } else {
180            Filesystem::reply_error(in_header.unique, w)
181        }
182    }
183}
184
185impl Filesystem {
186    fn reply_ok<T: ByteValued>(
187        out: Option<T>,
188        data: Option<&[u8]>,
189        unique: u64,
190        mut w: Writer,
191    ) -> Result<usize> {
192        let mut len = size_of::<OutHeader>();
193        if out.is_some() {
194            len += size_of::<T>();
195        }
196        if let Some(data) = data {
197            len += data.len();
198        }
199        let header = OutHeader {
200            unique,
201            error: 0, // Return no error.
202            len: len as u32,
203        };
204        w.write_all(header.as_slice()).map_err(|e| {
205            new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into()))
206        })?;
207        if let Some(out) = out {
208            w.write_all(out.as_slice()).map_err(|e| {
209                new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into()))
210            })?;
211        }
212        if let Some(data) = data {
213            w.write_all(data).map_err(|e| {
214                new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into()))
215            })?;
216        }
217        Ok(w.bytes_written())
218    }
219
220    fn reply_error(unique: u64, mut w: Writer) -> Result<usize> {
221        let header = OutHeader {
222            unique,
223            error: libc::EIO, // Here we simply return I/O error.
224            len: size_of::<OutHeader>() as u32,
225        };
226        w.write_all(header.as_slice()).map_err(|e| {
227            new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into()))
228        })?;
229        Ok(w.bytes_written())
230    }
231
232    fn bytes_to_str(buf: &[u8]) -> Result<&str> {
233        Filesystem::bytes_to_cstr(buf)?.to_str().map_err(|e| {
234            new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
235        })
236    }
237
238    fn bytes_to_cstr(buf: &[u8]) -> Result<&CStr> {
239        CStr::from_bytes_with_nul(buf).map_err(|e| {
240            new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
241        })
242    }
243
244    fn check_flags(&self, flags: u32) -> Result<(bool, bool)> {
245        let is_trunc = flags & libc::O_TRUNC as u32 != 0 || flags & libc::O_CREAT as u32 != 0;
246        let is_append = flags & libc::O_APPEND as u32 != 0;
247
248        let mode = flags & libc::O_ACCMODE as u32;
249        let is_write = mode == libc::O_WRONLY as u32 || mode == libc::O_RDWR as u32 || is_append;
250
251        let capability = self.core.info().full_capability();
252        if is_trunc && !capability.write {
253            Err(Error::from(libc::EACCES))?;
254        }
255        if is_append && !capability.write_can_append {
256            Err(Error::from(libc::EACCES))?;
257        }
258
259        Ok((is_write, is_append))
260    }
261}
262
263impl Filesystem {
264    fn init(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result<usize> {
265        let InitIn { major, minor, .. } = r.read_obj().map_err(|e| {
266            new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
267        })?;
268
269        if major != KERNEL_VERSION || minor < MIN_KERNEL_MINOR_VERSION {
270            return Filesystem::reply_error(in_header.unique, w);
271        }
272
273        let mut attr = OpenedFile::new(FileType::Dir, "/", self.uid, self.gid);
274        attr.metadata.ino = 1;
275        // We need to allocate the inode 1 for the root directory. The double insertion
276        // here makes 1 the first inode and avoids extra alignment and processing elsewhere.
277        self.opened_files
278            .insert(attr.clone())
279            .expect("failed to allocate inode");
280        self.opened_files
281            .insert(attr.clone())
282            .expect("failed to allocate inode");
283        let mut opened_files_map = self.opened_files_map.lock().unwrap();
284        opened_files_map.insert("/".to_string(), 1);
285
286        let out = InitOut {
287            major: KERNEL_VERSION,
288            minor: KERNEL_MINOR_VERSION,
289            max_write: MAX_BUFFER_SIZE,
290            ..Default::default()
291        };
292        Filesystem::reply_ok(Some(out), None, in_header.unique, w)
293    }
294
295    fn destroy(&self, _in_header: InHeader, _r: Reader, _w: Writer) -> Result<usize> {
296        // do nothing for destroy.
297        Ok(0)
298    }
299
300    fn flush(&self, _in_header: InHeader, _r: Reader, _w: Writer) -> Result<usize> {
301        // do nothing for flush.
302        Ok(0)
303    }
304
305    fn forget(&self, _in_header: InHeader, _r: Reader) -> Result<usize> {
306        // do nothing for forget.
307        Ok(0)
308    }
309
310    fn lookup(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result<usize> {
311        let name_len = in_header.len as usize - size_of::<InHeader>();
312        let mut buf = vec![0; name_len];
313        r.read_exact(&mut buf).map_err(|e| {
314            new_unexpected_error("failed to decode protocol messages", Some(e.into()))
315        })?;
316        let name = match Filesystem::bytes_to_str(buf.as_ref()) {
317            Ok(name) => name,
318            Err(_) => return Filesystem::reply_error(in_header.unique, w),
319        };
320
321        debug!("lookup: parent inode={} name={}", in_header.nodeid, name);
322
323        let parent_path = match self
324            .opened_files
325            .get(in_header.nodeid as usize)
326            .map(|f| f.path.clone())
327        {
328            Some(path) => path,
329            None => return Filesystem::reply_error(in_header.unique, w),
330        };
331
332        let path = format!("{}/{}", parent_path, name);
333        let metadata = match self.rt.block_on(self.do_get_metadata(&path)) {
334            Ok(metadata) => metadata,
335            Err(_) => return Filesystem::reply_error(in_header.unique, w),
336        };
337
338        let out = EntryOut {
339            nodeid: metadata.metadata.ino,
340            entry_valid: DEFAULT_TTL.as_secs(),
341            attr_valid: DEFAULT_TTL.as_secs(),
342            entry_valid_nsec: DEFAULT_TTL.subsec_nanos(),
343            attr_valid_nsec: DEFAULT_TTL.subsec_nanos(),
344            attr: metadata.metadata,
345            ..Default::default()
346        };
347        Filesystem::reply_ok(Some(out), None, in_header.unique, w)
348    }
349
350    fn getattr(&self, in_header: InHeader, _r: Reader, w: Writer) -> Result<usize> {
351        debug!("getattr: inode={}", in_header.nodeid);
352
353        let path = match self
354            .opened_files
355            .get(in_header.nodeid as usize)
356            .map(|f| f.path.clone())
357        {
358            Some(path) => path,
359            None => return Filesystem::reply_error(in_header.unique, w),
360        };
361
362        let metadata = match self.rt.block_on(self.do_get_metadata(&path)) {
363            Ok(metadata) => metadata,
364            Err(_) => return Filesystem::reply_error(in_header.unique, w),
365        };
366
367        let out = AttrOut {
368            attr_valid: DEFAULT_TTL.as_secs(),
369            attr_valid_nsec: DEFAULT_TTL.subsec_nanos(),
370            attr: metadata.metadata,
371            ..Default::default()
372        };
373        Filesystem::reply_ok(Some(out), None, in_header.unique, w)
374    }
375
376    fn setattr(&self, in_header: InHeader, _r: Reader, w: Writer) -> Result<usize> {
377        debug!("setattr: inode={}", in_header.nodeid);
378
379        // do nothing for setattr.
380        self.getattr(in_header, _r, w)
381    }
382
383    fn create(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result<usize> {
384        let CreateIn { flags, .. } = r.read_obj().map_err(|e| {
385            new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
386        })?;
387
388        let name_len = in_header.len as usize - size_of::<InHeader>() - size_of::<CreateIn>();
389        let mut buf = vec![0; name_len];
390        r.read_exact(&mut buf).map_err(|e| {
391            new_unexpected_error("failed to decode protocol messages", Some(e.into()))
392        })?;
393        let name = match Filesystem::bytes_to_str(buf.as_ref()) {
394            Ok(name) => name,
395            Err(_) => return Filesystem::reply_error(in_header.unique, w),
396        };
397
398        debug!("create: parent inode={} name={}", in_header.nodeid, name);
399
400        let parent_path = match self
401            .opened_files
402            .get(in_header.nodeid as usize)
403            .map(|f| f.path.clone())
404        {
405            Some(path) => path,
406            None => return Filesystem::reply_error(in_header.unique, w),
407        };
408
409        let path = format!("{}/{}", parent_path, name);
410        let mut attr = OpenedFile::new(FileType::File, &path, self.uid, self.gid);
411        let inode = self
412            .opened_files
413            .insert(attr.clone())
414            .expect("failed to allocate inode");
415        attr.metadata.ino = inode as u64;
416        let mut opened_files_map = self.opened_files_map.lock().unwrap();
417        opened_files_map.insert(path.to_string(), inode as u64);
418
419        match self.rt.block_on(self.do_set_writer(&path, flags)) {
420            Ok(writer) => writer,
421            Err(_) => return Filesystem::reply_error(in_header.unique, w),
422        };
423
424        let entry_out = EntryOut {
425            nodeid: attr.metadata.ino,
426            entry_valid: DEFAULT_TTL.as_secs(),
427            attr_valid: DEFAULT_TTL.as_secs(),
428            entry_valid_nsec: DEFAULT_TTL.subsec_nanos(),
429            attr_valid_nsec: DEFAULT_TTL.subsec_nanos(),
430            attr: attr.metadata,
431            ..Default::default()
432        };
433        let open_out = OpenOut {
434            ..Default::default()
435        };
436        Filesystem::reply_ok(
437            Some(entry_out),
438            Some(open_out.as_slice()),
439            in_header.unique,
440            w,
441        )
442    }
443
444    fn unlink(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result<usize> {
445        let name_len = in_header.len as usize - size_of::<InHeader>();
446        let mut buf = vec![0; name_len];
447        r.read_exact(&mut buf).map_err(|e| {
448            new_unexpected_error("failed to decode protocol messages", Some(e.into()))
449        })?;
450        let name = match Filesystem::bytes_to_str(buf.as_ref()) {
451            Ok(name) => name,
452            Err(_) => return Filesystem::reply_error(in_header.unique, w),
453        };
454
455        debug!("unlink: parent inode={} name={}", in_header.nodeid, name);
456
457        let parent_path = match self
458            .opened_files
459            .get(in_header.nodeid as usize)
460            .map(|f| f.path.clone())
461        {
462            Some(path) => path,
463            None => return Filesystem::reply_error(in_header.unique, w),
464        };
465
466        let path = format!("{}/{}", parent_path, name);
467        if self.rt.block_on(self.do_delete(&path)).is_err() {
468            return Filesystem::reply_error(in_header.unique, w);
469        }
470
471        let mut opened_files_map = self.opened_files_map.lock().unwrap();
472        opened_files_map.remove(&path);
473
474        Filesystem::reply_ok(None::<u8>, None, in_header.unique, w)
475    }
476
477    fn release(&self, in_header: InHeader, _r: Reader, w: Writer) -> Result<usize> {
478        debug!("release: inode={}", in_header.nodeid);
479
480        let path = match self
481            .opened_files
482            .get(in_header.nodeid as usize)
483            .map(|f| f.path.clone())
484        {
485            Some(path) => path,
486            None => return Filesystem::reply_error(in_header.unique, w),
487        };
488
489        if self.rt.block_on(self.do_release_writer(&path)).is_err() {
490            return Filesystem::reply_error(in_header.unique, w);
491        }
492
493        Filesystem::reply_ok(None::<u8>, None, in_header.unique, w)
494    }
495
496    fn open(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result<usize> {
497        debug!("open: inode={}", in_header.nodeid);
498
499        let OpenIn { flags, .. } = r.read_obj().map_err(|e| {
500            new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
501        })?;
502
503        let path = match self
504            .opened_files
505            .get(in_header.nodeid as usize)
506            .map(|f| f.path.clone())
507        {
508            Some(path) => path,
509            None => return Filesystem::reply_error(in_header.unique, w),
510        };
511
512        match self.rt.block_on(self.do_set_writer(&path, flags)) {
513            Ok(writer) => writer,
514            Err(_) => return Filesystem::reply_error(in_header.unique, w),
515        };
516
517        let out = OpenOut {
518            ..Default::default()
519        };
520        Filesystem::reply_ok(Some(out), None, in_header.unique, w)
521    }
522
523    fn read(&self, in_header: InHeader, mut r: Reader, mut w: Writer) -> Result<usize> {
524        let path = match self
525            .opened_files
526            .get(in_header.nodeid as usize)
527            .map(|f| f.path.clone())
528        {
529            Some(path) => path,
530            None => return Filesystem::reply_error(in_header.unique, w),
531        };
532
533        let ReadIn { offset, size, .. } = r.read_obj().map_err(|e| {
534            new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
535        })?;
536
537        debug!(
538            "read: inode={} offset={} size={}",
539            in_header.nodeid, offset, size
540        );
541
542        let data = match self.rt.block_on(self.do_read(&path, offset)) {
543            Ok(data) => data,
544            Err(_) => return Filesystem::reply_error(in_header.unique, w),
545        };
546        let len = data.len();
547        let buffer = BufferWrapper::new(data);
548
549        let mut data_writer = w.split_at(size_of::<OutHeader>()).unwrap();
550        data_writer.write_from_at(&buffer, len).map_err(|e| {
551            new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into()))
552        })?;
553
554        let out = OutHeader {
555            len: (size_of::<OutHeader>() + len) as u32,
556            error: 0,
557            unique: in_header.unique,
558        };
559        w.write_all(out.as_slice()).map_err(|e| {
560            new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into()))
561        })?;
562        Ok(out.len as usize)
563    }
564
565    fn write(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result<usize> {
566        debug!("write: inode={}", in_header.nodeid);
567
568        let path = match self
569            .opened_files
570            .get(in_header.nodeid as usize)
571            .map(|f| f.path.clone())
572        {
573            Some(path) => path,
574            None => return Filesystem::reply_error(in_header.unique, w),
575        };
576
577        let WriteIn { offset, size, .. } = r.read_obj().map_err(|e| {
578            new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
579        })?;
580
581        let buffer = BufferWrapper::new(Buffer::new());
582        r.read_to_at(&buffer, size as usize).map_err(|e| {
583            new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
584        })?;
585        let buffer = buffer.get_buffer();
586
587        match self.rt.block_on(self.do_write(&path, offset, buffer)) {
588            Ok(writer) => writer,
589            Err(_) => return Filesystem::reply_error(in_header.unique, w),
590        };
591
592        let out = WriteOut {
593            size,
594            ..Default::default()
595        };
596        Filesystem::reply_ok(Some(out), None, in_header.unique, w)
597    }
598}
599
600impl Filesystem {
601    async fn do_get_metadata(&self, path: &str) -> Result<OpenedFile> {
602        let metadata = self.core.stat(path).await.map_err(opendal_error2error)?;
603        let mut attr = opendal_metadata2opened_file(path, &metadata, self.uid, self.gid);
604        attr.metadata.size = metadata.content_length();
605        let mut opened_files_map = self.opened_files_map.lock().unwrap();
606        if let Some(inode) = opened_files_map.get(path) {
607            attr.metadata.ino = *inode;
608        } else {
609            let inode = self
610                .opened_files
611                .insert(attr.clone())
612                .expect("failed to allocate inode");
613            attr.metadata.ino = inode as u64;
614            opened_files_map.insert(path.to_string(), inode as u64);
615        }
616
617        Ok(attr)
618    }
619
620    async fn do_set_writer(&self, path: &str, flags: u32) -> Result<()> {
621        let (is_write, is_append) = self.check_flags(flags)?;
622        if !is_write {
623            return Ok(());
624        }
625
626        let writer = self
627            .core
628            .writer_with(path)
629            .append(is_append)
630            .await
631            .map_err(opendal_error2error)?;
632        let written = if is_append {
633            self.core
634                .stat(path)
635                .await
636                .map_err(opendal_error2error)?
637                .content_length()
638        } else {
639            0
640        };
641
642        let inner_writer = InnerWriter { writer, written };
643        let mut opened_file_writer = self.opened_files_writer.lock().await;
644        opened_file_writer.insert(path.to_string(), inner_writer);
645
646        Ok(())
647    }
648
649    async fn do_release_writer(&self, path: &str) -> Result<()> {
650        let mut opened_file_writer = self.opened_files_writer.lock().await;
651        let inner_writer = opened_file_writer
652            .get_mut(path)
653            .ok_or(Error::from(libc::EINVAL))?;
654        inner_writer
655            .writer
656            .close()
657            .await
658            .map_err(opendal_error2error)?;
659        opened_file_writer.remove(path);
660
661        Ok(())
662    }
663
664    async fn do_delete(&self, path: &str) -> Result<()> {
665        self.core.delete(path).await.map_err(opendal_error2error)?;
666
667        Ok(())
668    }
669
670    async fn do_read(&self, path: &str, offset: u64) -> Result<Buffer> {
671        let data = self
672            .core
673            .read_with(path)
674            .range(offset..)
675            .await
676            .map_err(opendal_error2error)?;
677
678        Ok(data)
679    }
680
681    async fn do_write(&self, path: &str, offset: u64, data: Buffer) -> Result<usize> {
682        let len = data.len();
683        let mut opened_file_writer = self.opened_files_writer.lock().await;
684        let inner_writer = opened_file_writer
685            .get_mut(path)
686            .ok_or(Error::from(libc::EINVAL))?;
687        if offset != inner_writer.written {
688            return Err(Error::from(libc::EINVAL));
689        }
690        inner_writer
691            .writer
692            .write_from(data)
693            .await
694            .map_err(opendal_error2error)?;
695        inner_writer.written += len as u64;
696
697        Ok(len)
698    }
699}