virtiofs_opendal/
virtiofs.rs1use std::io;
19use std::sync::Arc;
20use std::sync::RwLock;
21
22use log::warn;
23use opendal::Operator;
24use vhost::vhost_user::message::VhostUserProtocolFeatures;
25use vhost::vhost_user::message::VhostUserVirtioFeatures;
26use vhost::vhost_user::Backend;
27use vhost::vhost_user::Listener;
28use vhost_user_backend::VhostUserBackend;
29use vhost_user_backend::VhostUserDaemon;
30use vhost_user_backend::VringMutex;
31use vhost_user_backend::VringState;
32use vhost_user_backend::VringT;
33use virtio_bindings::bindings::virtio_config::VIRTIO_F_VERSION_1;
34use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
35use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_INDIRECT_DESC;
36use virtio_queue::DescriptorChain;
37use virtio_queue::QueueOwnedT;
38use vm_memory::GuestAddressSpace;
39use vm_memory::GuestMemoryAtomic;
40use vm_memory::GuestMemoryLoadGuard;
41use vm_memory::GuestMemoryMmap;
42use vmm_sys_util::epoll::EventSet;
43use vmm_sys_util::eventfd::EventFd;
44
45use crate::error::*;
46use crate::filesystem::Filesystem;
47use crate::virtiofs_util::Reader;
48use crate::virtiofs_util::Writer;
49
50const HIPRIO_QUEUE_EVENT: u16 = 0;
52const REQ_QUEUE_EVENT: u16 = 1;
54const QUEUE_SIZE: usize = 32768;
56const REQUEST_QUEUES: usize = 1;
59const NUM_QUEUES: usize = REQUEST_QUEUES + 1;
61
62struct VhostUserFsThread {
64 core: Filesystem,
65 mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
66 vu_req: Option<Backend>,
67 event_idx: bool,
68 kill_event_fd: EventFd,
69}
70
71impl VhostUserFsThread {
72 fn new(core: Filesystem) -> Result<VhostUserFsThread> {
73 let event_fd = EventFd::new(libc::EFD_NONBLOCK).map_err(|err| {
74 new_unexpected_error("failed to create kill eventfd", Some(err.into()))
75 })?;
76 Ok(VhostUserFsThread {
77 core,
78 mem: None,
79 vu_req: None,
80 event_idx: false,
81 kill_event_fd: event_fd,
82 })
83 }
84
85 fn return_descriptor(
87 vring_state: &mut VringState,
88 head_index: u16,
89 event_idx: bool,
90 len: usize,
91 ) {
92 if vring_state.add_used(head_index, len as u32).is_err() {
93 warn!("Failed to add used to used queue.");
94 };
95 if event_idx {
97 match vring_state.needs_notification() {
98 Ok(needs_notification) => {
99 if needs_notification && vring_state.signal_used_queue().is_err() {
100 warn!("Failed to signal used queue.");
101 }
102 }
103 Err(_) => {
104 if vring_state.signal_used_queue().is_err() {
105 warn!("Failed to signal used queue.");
106 };
107 }
108 }
109 } else if vring_state.signal_used_queue().is_err() {
110 warn!("Failed to signal used queue.");
111 }
112 }
113
114 fn handle_event_serial(&self, device_event: u16, vrings: &[VringMutex]) -> Result<()> {
116 let mut vring_state = match device_event {
117 HIPRIO_QUEUE_EVENT => vrings[0].get_mut(),
118 REQ_QUEUE_EVENT => vrings[1].get_mut(),
119 _ => return Err(new_unexpected_error("failed to handle unknown event", None)),
120 };
121 if self.event_idx {
122 loop {
125 if vring_state.disable_notification().is_err() {
126 warn!("Failed to disable used queue notification.");
127 }
128 self.process_queue_serial(&mut vring_state)?;
129 if let Ok(has_more) = vring_state.enable_notification() {
130 if !has_more {
131 break;
132 }
133 } else {
134 warn!("Failed to enable used queue notification.");
135 }
136 }
137 } else {
138 self.process_queue_serial(&mut vring_state)?;
140 }
141 Ok(())
142 }
143
144 fn process_queue_serial(&self, vring_state: &mut VringState) -> Result<bool> {
147 let mut used_any = false;
148 let mem = match &self.mem {
149 Some(m) => m.memory(),
150 None => return Err(new_unexpected_error("no memory configured", None)),
151 };
152 let avail_chains: Vec<DescriptorChain<GuestMemoryLoadGuard<GuestMemoryMmap>>> = vring_state
153 .get_queue_mut()
154 .iter(mem.clone())
155 .map_err(|_| new_unexpected_error("iterating through the queue failed", None))?
156 .collect();
157 for chain in avail_chains {
158 used_any = true;
159 let head_index = chain.head_index();
160 let reader = Reader::new(&mem, chain.clone())
161 .map_err(|_| new_unexpected_error("creating a queue reader failed", None))
162 .unwrap();
163 let writer = Writer::new(&mem, chain.clone())
164 .map_err(|_| new_unexpected_error("creating a queue writer failed", None))
165 .unwrap();
166 let len = self
167 .core
168 .handle_message(reader, writer)
169 .map_err(|_| new_unexpected_error("processing a queue request failed", None))
170 .unwrap();
171 VhostUserFsThread::return_descriptor(vring_state, head_index, self.event_idx, len);
172 }
173 Ok(used_any)
174 }
175}
176
177struct VhostUserFsBackend {
180 thread: RwLock<VhostUserFsThread>,
181}
182
183impl VhostUserFsBackend {
184 fn new(core: Filesystem) -> Result<VhostUserFsBackend> {
185 let thread = RwLock::new(VhostUserFsThread::new(core)?);
186 Ok(VhostUserFsBackend { thread })
187 }
188
189 fn kill(&self) -> Result<()> {
190 self.thread
191 .read()
192 .unwrap()
193 .kill_event_fd
194 .write(1)
195 .map_err(|err| {
196 new_unexpected_error("failed to write to kill eventfd", Some(err.into()))
197 })
198 }
199}
200
201impl VhostUserBackend for VhostUserFsBackend {
202 type Bitmap = ();
203 type Vring = VringMutex;
204
205 fn num_queues(&self) -> usize {
207 NUM_QUEUES
208 }
209
210 fn max_queue_size(&self) -> usize {
212 QUEUE_SIZE
213 }
214
215 fn features(&self) -> u64 {
217 (1 << VIRTIO_F_VERSION_1)
219 | (1 << VIRTIO_RING_F_INDIRECT_DESC)
220 | (1 << VIRTIO_RING_F_EVENT_IDX)
221 | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits()
222 }
223
224 fn protocol_features(&self) -> VhostUserProtocolFeatures {
226 VhostUserProtocolFeatures::MQ
228 | VhostUserProtocolFeatures::BACKEND_REQ
229 | VhostUserProtocolFeatures::BACKEND_SEND_FD
230 | VhostUserProtocolFeatures::REPLY_ACK
231 | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS
232 }
233
234 fn set_event_idx(&self, enabled: bool) {
236 self.thread.write().unwrap().event_idx = enabled;
237 }
238
239 fn update_memory(&self, mem: GuestMemoryAtomic<GuestMemoryMmap>) -> io::Result<()> {
241 self.thread.write().unwrap().mem = Some(mem);
242 Ok(())
243 }
244
245 fn set_backend_req_fd(&self, vu_req: Backend) {
247 self.thread.write().unwrap().vu_req = Some(vu_req);
248 }
249
250 fn exit_event(&self, _thread_index: usize) -> Option<EventFd> {
252 Some(
253 self.thread
254 .read()
255 .unwrap()
256 .kill_event_fd
257 .try_clone()
258 .unwrap(),
259 )
260 }
261
262 fn handle_event(
264 &self,
265 device_event: u16,
266 evset: EventSet,
267 vrings: &[Self::Vring],
268 _thread_id: usize,
269 ) -> io::Result<()> {
270 if evset != EventSet::IN {
271 return Err(new_unexpected_error(
272 "failed to handle event other than input event",
273 None,
274 )
275 .into());
276 }
277 let thread = self.thread.read().unwrap();
278 thread
279 .handle_event_serial(device_event, vrings)
280 .map_err(|err| err.into())
281 }
282}
283
284pub struct VirtioFs {
289 socket_path: String,
290 filesystem_backend: Arc<VhostUserFsBackend>,
291}
292
293impl VirtioFs {
294 pub fn new(core: Operator, socket_path: &str) -> Result<VirtioFs> {
295 let filesystem_core = Filesystem::new(core);
296 let filesystem_backend = Arc::new(VhostUserFsBackend::new(filesystem_core).unwrap());
297 Ok(VirtioFs {
298 socket_path: socket_path.to_string(),
299 filesystem_backend,
300 })
301 }
302
303 pub fn run(&self) -> Result<()> {
305 let listener = Listener::new(&self.socket_path, true)
306 .map_err(|_| new_unexpected_error("failed to create listener", None))?;
307 let mut daemon = VhostUserDaemon::new(
308 String::from("virtiofs-backend"),
309 self.filesystem_backend.clone(),
310 GuestMemoryAtomic::new(GuestMemoryMmap::new()),
311 )
312 .unwrap();
313 if daemon.start(listener).is_err() {
314 return Err(new_unexpected_error("failed to start daemon", None));
315 }
316 if daemon.wait().is_err() {
317 return Err(new_unexpected_error("failed to wait daemon", None));
318 }
319 Ok(())
320 }
321
322 pub fn kill(&self) -> Result<()> {
324 if self.filesystem_backend.kill().is_err() {
325 return Err(new_unexpected_error("failed to kill backend", None));
326 }
327 Ok(())
328 }
329}