virtiofs_opendal/
virtiofs.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io;
19use std::sync::Arc;
20use std::sync::RwLock;
21
22use log::warn;
23use opendal::Operator;
24use vhost::vhost_user::message::VhostUserProtocolFeatures;
25use vhost::vhost_user::message::VhostUserVirtioFeatures;
26use vhost::vhost_user::Backend;
27use vhost::vhost_user::Listener;
28use vhost_user_backend::VhostUserBackend;
29use vhost_user_backend::VhostUserDaemon;
30use vhost_user_backend::VringMutex;
31use vhost_user_backend::VringState;
32use vhost_user_backend::VringT;
33use virtio_bindings::bindings::virtio_config::VIRTIO_F_VERSION_1;
34use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
35use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_INDIRECT_DESC;
36use virtio_queue::DescriptorChain;
37use virtio_queue::QueueOwnedT;
38use vm_memory::GuestAddressSpace;
39use vm_memory::GuestMemoryAtomic;
40use vm_memory::GuestMemoryLoadGuard;
41use vm_memory::GuestMemoryMmap;
42use vmm_sys_util::epoll::EventSet;
43use vmm_sys_util::eventfd::EventFd;
44
45use crate::error::*;
46use crate::filesystem::Filesystem;
47use crate::virtiofs_util::Reader;
48use crate::virtiofs_util::Writer;
49
50/// Marks an event from the high priority queue.
51const HIPRIO_QUEUE_EVENT: u16 = 0;
52/// Marks an event from the request queue.
53const REQ_QUEUE_EVENT: u16 = 1;
54/// The maximum queue size supported.
55const QUEUE_SIZE: usize = 32768;
56/// The number of request queues supported.
57/// The vitrofs spec allows for multiple request queues, but we'll only support one.
58const REQUEST_QUEUES: usize = 1;
59/// In addition to request queues there is one high priority queue.
60const NUM_QUEUES: usize = REQUEST_QUEUES + 1;
61
62/// VhostUserFsThread represents the actual worker process used to handle file system requests from VMs.
63struct VhostUserFsThread {
64    core: Filesystem,
65    mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
66    vu_req: Option<Backend>,
67    event_idx: bool,
68    kill_event_fd: EventFd,
69}
70
71impl VhostUserFsThread {
72    fn new(core: Filesystem) -> Result<VhostUserFsThread> {
73        let event_fd = EventFd::new(libc::EFD_NONBLOCK).map_err(|err| {
74            new_unexpected_error("failed to create kill eventfd", Some(err.into()))
75        })?;
76        Ok(VhostUserFsThread {
77            core,
78            mem: None,
79            vu_req: None,
80            event_idx: false,
81            kill_event_fd: event_fd,
82        })
83    }
84
85    /// This is used when the backend has processed a request and needs to notify the frontend.
86    fn return_descriptor(
87        vring_state: &mut VringState,
88        head_index: u16,
89        event_idx: bool,
90        len: usize,
91    ) {
92        if vring_state.add_used(head_index, len as u32).is_err() {
93            warn!("Failed to add used to used queue.");
94        };
95        // Check if the used queue needs to be signaled.
96        if event_idx {
97            match vring_state.needs_notification() {
98                Ok(needs_notification) => {
99                    if needs_notification && vring_state.signal_used_queue().is_err() {
100                        warn!("Failed to signal used queue.");
101                    }
102                }
103                Err(_) => {
104                    if vring_state.signal_used_queue().is_err() {
105                        warn!("Failed to signal used queue.");
106                    };
107                }
108            }
109        } else if vring_state.signal_used_queue().is_err() {
110            warn!("Failed to signal used queue.");
111        }
112    }
113
114    /// Process filesystem requests one at a time in a serialized manner.
115    fn handle_event_serial(&self, device_event: u16, vrings: &[VringMutex]) -> Result<()> {
116        let mut vring_state = match device_event {
117            HIPRIO_QUEUE_EVENT => vrings[0].get_mut(),
118            REQ_QUEUE_EVENT => vrings[1].get_mut(),
119            _ => return Err(new_unexpected_error("failed to handle unknown event", None)),
120        };
121        if self.event_idx {
122            // If EVENT_IDX is enabled, we could keep calling process_queue()
123            // until it stops finding new request on the queue.
124            loop {
125                if vring_state.disable_notification().is_err() {
126                    warn!("Failed to disable used queue notification.");
127                }
128                self.process_queue_serial(&mut vring_state)?;
129                if let Ok(has_more) = vring_state.enable_notification() {
130                    if !has_more {
131                        break;
132                    }
133                } else {
134                    warn!("Failed to enable used queue notification.");
135                }
136            }
137        } else {
138            // Without EVENT_IDX, a single call is enough.
139            self.process_queue_serial(&mut vring_state)?;
140        }
141        Ok(())
142    }
143
144    /// Forwards filesystem messages to specific functions and
145    /// returns the filesystem request execution result.
146    fn process_queue_serial(&self, vring_state: &mut VringState) -> Result<bool> {
147        let mut used_any = false;
148        let mem = match &self.mem {
149            Some(m) => m.memory(),
150            None => return Err(new_unexpected_error("no memory configured", None)),
151        };
152        let avail_chains: Vec<DescriptorChain<GuestMemoryLoadGuard<GuestMemoryMmap>>> = vring_state
153            .get_queue_mut()
154            .iter(mem.clone())
155            .map_err(|_| new_unexpected_error("iterating through the queue failed", None))?
156            .collect();
157        for chain in avail_chains {
158            used_any = true;
159            let head_index = chain.head_index();
160            let reader = Reader::new(&mem, chain.clone())
161                .map_err(|_| new_unexpected_error("creating a queue reader failed", None))
162                .unwrap();
163            let writer = Writer::new(&mem, chain.clone())
164                .map_err(|_| new_unexpected_error("creating a queue writer failed", None))
165                .unwrap();
166            let len = self
167                .core
168                .handle_message(reader, writer)
169                .map_err(|_| new_unexpected_error("processing a queue request failed", None))
170                .unwrap();
171            VhostUserFsThread::return_descriptor(vring_state, head_index, self.event_idx, len);
172        }
173        Ok(used_any)
174    }
175}
176
177/// VhostUserFsBackend is a structure that implements the VhostUserBackend trait
178/// and implements concrete services for the vhost user backend server.
179struct VhostUserFsBackend {
180    thread: RwLock<VhostUserFsThread>,
181}
182
183impl VhostUserFsBackend {
184    fn new(core: Filesystem) -> Result<VhostUserFsBackend> {
185        let thread = RwLock::new(VhostUserFsThread::new(core)?);
186        Ok(VhostUserFsBackend { thread })
187    }
188
189    fn kill(&self) -> Result<()> {
190        self.thread
191            .read()
192            .unwrap()
193            .kill_event_fd
194            .write(1)
195            .map_err(|err| {
196                new_unexpected_error("failed to write to kill eventfd", Some(err.into()))
197            })
198    }
199}
200
201impl VhostUserBackend for VhostUserFsBackend {
202    type Bitmap = ();
203    type Vring = VringMutex;
204
205    /// Get number of queues supported.
206    fn num_queues(&self) -> usize {
207        NUM_QUEUES
208    }
209
210    /// Get maximum queue size supported.
211    fn max_queue_size(&self) -> usize {
212        QUEUE_SIZE
213    }
214
215    /// Get available virtio features.
216    fn features(&self) -> u64 {
217        // Align to the virtiofsd's features here.
218        (1 << VIRTIO_F_VERSION_1)
219            | (1 << VIRTIO_RING_F_INDIRECT_DESC)
220            | (1 << VIRTIO_RING_F_EVENT_IDX)
221            | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits()
222    }
223
224    /// Get available vhost protocol features.
225    fn protocol_features(&self) -> VhostUserProtocolFeatures {
226        // Align to the virtiofsd's protocol features here.
227        VhostUserProtocolFeatures::MQ
228            | VhostUserProtocolFeatures::BACKEND_REQ
229            | VhostUserProtocolFeatures::BACKEND_SEND_FD
230            | VhostUserProtocolFeatures::REPLY_ACK
231            | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS
232    }
233
234    /// Enable or disabled the virtio EVENT_IDX feature.
235    fn set_event_idx(&self, enabled: bool) {
236        self.thread.write().unwrap().event_idx = enabled;
237    }
238
239    /// Update guest memory regions.
240    fn update_memory(&self, mem: GuestMemoryAtomic<GuestMemoryMmap>) -> io::Result<()> {
241        self.thread.write().unwrap().mem = Some(mem);
242        Ok(())
243    }
244
245    /// Set handler for communicating with the frontend by the backend communication channel.
246    fn set_backend_req_fd(&self, vu_req: Backend) {
247        self.thread.write().unwrap().vu_req = Some(vu_req);
248    }
249
250    /// Provide an optional exit EventFd for the specified worker thread.
251    fn exit_event(&self, _thread_index: usize) -> Option<EventFd> {
252        Some(
253            self.thread
254                .read()
255                .unwrap()
256                .kill_event_fd
257                .try_clone()
258                .unwrap(),
259        )
260    }
261
262    /// Handle IO events for backend registered file descriptors.
263    fn handle_event(
264        &self,
265        device_event: u16,
266        evset: EventSet,
267        vrings: &[Self::Vring],
268        _thread_id: usize,
269    ) -> io::Result<()> {
270        if evset != EventSet::IN {
271            return Err(new_unexpected_error(
272                "failed to handle event other than input event",
273                None,
274            )
275            .into());
276        }
277        let thread = self.thread.read().unwrap();
278        thread
279            .handle_event_serial(device_event, vrings)
280            .map_err(|err| err.into())
281    }
282}
283
284/// VirtioFS is a structure that represents the virtiofs service.
285/// It is used to run the virtiofs service with the given operator and socket path.
286/// The operator is used to interact with the backend storage system.
287/// The socket path is used to communicate with the QEMU and VMs.
288pub struct VirtioFs {
289    socket_path: String,
290    filesystem_backend: Arc<VhostUserFsBackend>,
291}
292
293impl VirtioFs {
294    pub fn new(core: Operator, socket_path: &str) -> Result<VirtioFs> {
295        let filesystem_core = Filesystem::new(core);
296        let filesystem_backend = Arc::new(VhostUserFsBackend::new(filesystem_core).unwrap());
297        Ok(VirtioFs {
298            socket_path: socket_path.to_string(),
299            filesystem_backend,
300        })
301    }
302
303    // Run the virtiofs service.
304    pub fn run(&self) -> Result<()> {
305        let listener = Listener::new(&self.socket_path, true)
306            .map_err(|_| new_unexpected_error("failed to create listener", None))?;
307        let mut daemon = VhostUserDaemon::new(
308            String::from("virtiofs-backend"),
309            self.filesystem_backend.clone(),
310            GuestMemoryAtomic::new(GuestMemoryMmap::new()),
311        )
312        .unwrap();
313        if daemon.start(listener).is_err() {
314            return Err(new_unexpected_error("failed to start daemon", None));
315        }
316        if daemon.wait().is_err() {
317            return Err(new_unexpected_error("failed to wait daemon", None));
318        }
319        Ok(())
320    }
321
322    // Kill the virtiofs service.
323    pub fn kill(&self) -> Result<()> {
324        if self.filesystem_backend.kill().is_err() {
325            return Err(new_unexpected_error("failed to kill backend", None));
326        }
327        Ok(())
328    }
329}