opendal_core/services/koofr/
core.rs1use std::collections::VecDeque;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use bytes::Buf;
23use bytes::Bytes;
24use http::Request;
25use http::Response;
26use http::StatusCode;
27use http::header;
28use http::request;
29use mea::mutex::Mutex;
30use mea::once::OnceCell;
31use serde::Deserialize;
32use serde_json::json;
33
34use super::error::parse_error;
35use crate::raw::*;
36use crate::*;
37
38#[derive(Clone)]
39pub struct KoofrCore {
40 pub info: Arc<AccessorInfo>,
41 pub root: String,
43 pub endpoint: String,
45 pub email: String,
47 pub password: String,
49
50 pub signer: Arc<Mutex<KoofrSigner>>,
52
53 pub mount_id: OnceCell<String>,
55}
56
57impl Debug for KoofrCore {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("KoofrCore")
60 .field("root", &self.root)
61 .field("endpoint", &self.endpoint)
62 .field("email", &self.email)
63 .finish_non_exhaustive()
64 }
65}
66
67impl KoofrCore {
68 #[inline]
69 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
70 self.info.http_client().send(req).await
71 }
72
73 pub async fn get_mount_id(&self) -> Result<&String> {
74 self.mount_id
75 .get_or_try_init(|| async {
76 let req = Request::get(format!("{}/api/v2/mounts", self.endpoint));
77
78 let req = self.sign(req).await?;
79
80 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
81
82 let resp = self.send(req).await?;
83
84 let status = resp.status();
85
86 if status != StatusCode::OK {
87 return Err(parse_error(resp));
88 }
89
90 let bs = resp.into_body();
91
92 let resp: MountsResponse =
93 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
94
95 for mount in resp.mounts {
96 if mount.is_primary {
97 return Ok(mount.id);
98 }
99 }
100
101 Err(Error::new(ErrorKind::Unexpected, "No primary mount found"))
102 })
103 .await
104 }
105
106 pub async fn sign(&self, req: request::Builder) -> Result<request::Builder> {
107 let mut signer = self.signer.lock().await;
108 if !signer.token.is_empty() {
109 return Ok(req.header(
110 header::AUTHORIZATION,
111 format!("Token token={}", signer.token),
112 ));
113 }
114
115 let url = format!("{}/token", self.endpoint);
116
117 let body = json!({
118 "email": self.email,
119 "password": self.password,
120 });
121
122 let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
123
124 let auth_req = Request::post(url)
125 .header(header::CONTENT_TYPE, "application/json")
126 .body(Buffer::from(Bytes::from(bs)))
127 .map_err(new_request_build_error)?;
128
129 let resp = self.info.http_client().send(auth_req).await?;
130
131 let status = resp.status();
132
133 if status != StatusCode::OK {
134 return Err(parse_error(resp));
135 }
136
137 let bs = resp.into_body();
138 let resp: TokenResponse =
139 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
140
141 signer.token = resp.token;
142
143 Ok(req.header(
144 header::AUTHORIZATION,
145 format!("Token token={}", signer.token),
146 ))
147 }
148}
149
150impl KoofrCore {
151 pub async fn ensure_dir_exists(&self, path: &str) -> Result<()> {
152 let mut dirs = VecDeque::default();
153
154 let mut p = build_abs_path(&self.root, path);
155
156 while p != "/" {
157 let parent = get_parent(&p).to_string();
158
159 dirs.push_front(parent.clone());
160 p = parent;
161 }
162
163 for dir in dirs {
164 self.create_dir(&dir).await?;
165 }
166
167 Ok(())
168 }
169
170 pub async fn create_dir(&self, path: &str) -> Result<()> {
171 let resp = self.info(path).await?;
172
173 let status = resp.status();
174
175 match status {
176 StatusCode::NOT_FOUND => {
177 let name = get_basename(path).trim_end_matches('/');
178 let parent = get_parent(path);
179
180 let mount_id = self.get_mount_id().await?;
181
182 let url = format!(
183 "{}/api/v2/mounts/{}/files/folder?path={}",
184 self.endpoint,
185 mount_id,
186 percent_encode_path(parent)
187 );
188
189 let body = json!({
190 "name": name
191 });
192
193 let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
194
195 let req = Request::post(url);
196
197 let req = self.sign(req).await?;
198
199 let req = req
200 .header(header::CONTENT_TYPE, "application/json")
201 .extension(Operation::CreateDir)
202 .body(Buffer::from(Bytes::from(bs)))
203 .map_err(new_request_build_error)?;
204
205 let resp = self.info.http_client().send(req).await?;
206
207 let status = resp.status();
208
209 match status {
210 StatusCode::OK | StatusCode::CREATED | StatusCode::BAD_REQUEST => Ok(()),
213 _ => Err(parse_error(resp)),
214 }
215 }
216 StatusCode::OK => Ok(()),
217 _ => Err(parse_error(resp)),
218 }
219 }
220
221 pub async fn info(&self, path: &str) -> Result<Response<Buffer>> {
222 let mount_id = self.get_mount_id().await?;
223
224 let url = format!(
225 "{}/api/v2/mounts/{}/files/info?path={}",
226 self.endpoint,
227 mount_id,
228 percent_encode_path(path)
229 );
230
231 let req = Request::get(url);
232
233 let req = self.sign(req).await?;
234
235 let req = req
236 .extension(Operation::Stat)
237 .body(Buffer::new())
238 .map_err(new_request_build_error)?;
239
240 self.send(req).await
241 }
242
243 pub async fn get(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
244 let path = build_rooted_abs_path(&self.root, path);
245
246 let mount_id = self.get_mount_id().await?;
247
248 let url = format!(
249 "{}/api/v2/mounts/{}/files/get?path={}",
250 self.endpoint,
251 mount_id,
252 percent_encode_path(&path)
253 );
254
255 let req = Request::get(url).header(header::RANGE, range.to_header());
256
257 let req = self.sign(req).await?;
258
259 let req = req
260 .extension(Operation::Read)
261 .body(Buffer::new())
262 .map_err(new_request_build_error)?;
263
264 self.info.http_client().fetch(req).await
265 }
266
267 pub async fn put(&self, path: &str, bs: Buffer) -> Result<Response<Buffer>> {
268 let path = build_rooted_abs_path(&self.root, path);
269
270 let filename = get_basename(&path);
271 let parent = get_parent(&path);
272
273 let mount_id = self.get_mount_id().await?;
274
275 let url = format!(
276 "{}/content/api/v2/mounts/{}/files/put?path={}&filename={}&info=true&overwriteIgnoreNonexisting=&autorename=false&overwrite=true",
277 self.endpoint,
278 mount_id,
279 percent_encode_path(parent),
280 percent_encode_path(filename)
281 );
282
283 let file_part = FormDataPart::new("file")
284 .header(
285 header::CONTENT_DISPOSITION,
286 format!("form-data; name=\"file\"; filename=\"{filename}\"")
287 .parse()
288 .unwrap(),
289 )
290 .content(bs);
291
292 let multipart = Multipart::new().part(file_part);
293
294 let req = Request::post(url);
295
296 let req = self.sign(req).await?;
297
298 let req = req.extension(Operation::Write);
299
300 let req = multipart.apply(req)?;
301
302 self.send(req).await
303 }
304
305 pub async fn remove(&self, path: &str) -> Result<Response<Buffer>> {
306 let path = build_rooted_abs_path(&self.root, path);
307
308 let mount_id = self.get_mount_id().await?;
309
310 let url = format!(
311 "{}/api/v2/mounts/{}/files/remove?path={}",
312 self.endpoint,
313 mount_id,
314 percent_encode_path(&path)
315 );
316
317 let req = Request::delete(url);
318
319 let req = self.sign(req).await?;
320
321 let req = req
322 .extension(Operation::Delete)
323 .body(Buffer::new())
324 .map_err(new_request_build_error)?;
325
326 self.send(req).await
327 }
328
329 pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
330 let from = build_rooted_abs_path(&self.root, from);
331 let to = build_rooted_abs_path(&self.root, to);
332
333 let mount_id = self.get_mount_id().await?;
334
335 let url = format!(
336 "{}/api/v2/mounts/{}/files/copy?path={}",
337 self.endpoint,
338 mount_id,
339 percent_encode_path(&from),
340 );
341
342 let body = json!({
343 "toMountId": mount_id,
344 "toPath": to,
345 });
346
347 let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
348
349 let req = Request::put(url);
350
351 let req = self.sign(req).await?;
352
353 let req = req
354 .header(header::CONTENT_TYPE, "application/json")
355 .extension(Operation::Copy)
356 .body(Buffer::from(Bytes::from(bs)))
357 .map_err(new_request_build_error)?;
358
359 self.send(req).await
360 }
361
362 pub async fn move_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
363 let from = build_rooted_abs_path(&self.root, from);
364 let to = build_rooted_abs_path(&self.root, to);
365
366 let mount_id = self.get_mount_id().await?;
367
368 let url = format!(
369 "{}/api/v2/mounts/{}/files/move?path={}",
370 self.endpoint,
371 mount_id,
372 percent_encode_path(&from),
373 );
374
375 let body = json!({
376 "toMountId": mount_id,
377 "toPath": to,
378 });
379
380 let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
381
382 let req = Request::put(url);
383
384 let req = self.sign(req).await?;
385
386 let req = req
387 .header(header::CONTENT_TYPE, "application/json")
388 .extension(Operation::Rename)
389 .body(Buffer::from(Bytes::from(bs)))
390 .map_err(new_request_build_error)?;
391
392 self.send(req).await
393 }
394
395 pub async fn list(&self, path: &str) -> Result<Response<Buffer>> {
396 let path = build_rooted_abs_path(&self.root, path);
397
398 let mount_id = self.get_mount_id().await?;
399
400 let url = format!(
401 "{}/api/v2/mounts/{}/files/list?path={}",
402 self.endpoint,
403 mount_id,
404 percent_encode_path(&path)
405 );
406
407 let req = Request::get(url);
408
409 let req = self.sign(req).await?;
410
411 let req = req
412 .extension(Operation::List)
413 .body(Buffer::new())
414 .map_err(new_request_build_error)?;
415
416 self.send(req).await
417 }
418}
419
420#[derive(Clone, Default)]
421pub struct KoofrSigner {
422 pub token: String,
423}
424
425#[derive(Debug, Deserialize)]
426pub struct TokenResponse {
427 pub token: String,
428}
429
430#[derive(Debug, Deserialize)]
431pub struct MountsResponse {
432 pub mounts: Vec<Mount>,
433}
434
435#[derive(Debug, Deserialize)]
436#[serde(rename_all = "camelCase")]
437pub struct Mount {
438 pub id: String,
439 pub is_primary: bool,
440}
441
442#[derive(Debug, Deserialize)]
443#[serde(rename_all = "camelCase")]
444pub struct ListResponse {
445 pub files: Vec<File>,
446}
447
448#[derive(Debug, Deserialize)]
449#[serde(rename_all = "camelCase")]
450pub struct File {
451 pub name: String,
452 #[serde(rename = "type")]
453 pub ty: String,
454 pub size: u64,
455 pub modified: i64,
456 pub content_type: String,
457}