opendal/services/koofr/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::VecDeque;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use bytes::Buf;
24use bytes::Bytes;
25use http::header;
26use http::request;
27use http::Request;
28use http::Response;
29use http::StatusCode;
30use serde::Deserialize;
31use serde_json::json;
32use tokio::sync::Mutex;
33use tokio::sync::OnceCell;
34
35use super::error::parse_error;
36use crate::raw::*;
37use crate::*;
38
39#[derive(Clone)]
40pub struct KoofrCore {
41    pub info: Arc<AccessorInfo>,
42    /// The root of this core.
43    pub root: String,
44    /// The endpoint of this backend.
45    pub endpoint: String,
46    /// Koofr email
47    pub email: String,
48    /// Koofr password
49    pub password: String,
50
51    /// signer of this backend.
52    pub signer: Arc<Mutex<KoofrSigner>>,
53
54    // Koofr mount_id.
55    pub mount_id: OnceCell<String>,
56}
57
58impl Debug for KoofrCore {
59    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
60        f.debug_struct("Backend")
61            .field("root", &self.root)
62            .field("endpoint", &self.endpoint)
63            .field("email", &self.email)
64            .finish_non_exhaustive()
65    }
66}
67
68impl KoofrCore {
69    #[inline]
70    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
71        self.info.http_client().send(req).await
72    }
73
74    pub async fn get_mount_id(&self) -> Result<&String> {
75        self.mount_id
76            .get_or_try_init(|| async {
77                let req = Request::get(format!("{}/api/v2/mounts", self.endpoint));
78
79                let req = self.sign(req).await?;
80
81                let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
82
83                let resp = self.send(req).await?;
84
85                let status = resp.status();
86
87                if status != StatusCode::OK {
88                    return Err(parse_error(resp));
89                }
90
91                let bs = resp.into_body();
92
93                let resp: MountsResponse =
94                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
95
96                for mount in resp.mounts {
97                    if mount.is_primary {
98                        return Ok(mount.id);
99                    }
100                }
101
102                Err(Error::new(ErrorKind::Unexpected, "No primary mount found"))
103            })
104            .await
105    }
106
107    pub async fn sign(&self, req: request::Builder) -> Result<request::Builder> {
108        let mut signer = self.signer.lock().await;
109        if !signer.token.is_empty() {
110            return Ok(req.header(
111                header::AUTHORIZATION,
112                format!("Token token={}", signer.token),
113            ));
114        }
115
116        let url = format!("{}/token", self.endpoint);
117
118        let body = json!({
119            "email": self.email,
120            "password": self.password,
121        });
122
123        let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
124
125        let auth_req = Request::post(url)
126            .header(header::CONTENT_TYPE, "application/json")
127            .body(Buffer::from(Bytes::from(bs)))
128            .map_err(new_request_build_error)?;
129
130        let resp = self.info.http_client().send(auth_req).await?;
131
132        let status = resp.status();
133
134        if status != StatusCode::OK {
135            return Err(parse_error(resp));
136        }
137
138        let bs = resp.into_body();
139        let resp: TokenResponse =
140            serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
141
142        signer.token = resp.token;
143
144        Ok(req.header(
145            header::AUTHORIZATION,
146            format!("Token token={}", signer.token),
147        ))
148    }
149}
150
151impl KoofrCore {
152    pub async fn ensure_dir_exists(&self, path: &str) -> Result<()> {
153        let mut dirs = VecDeque::default();
154
155        let mut p = build_abs_path(&self.root, path);
156
157        while p != "/" {
158            let parent = get_parent(&p).to_string();
159
160            dirs.push_front(parent.clone());
161            p = parent;
162        }
163
164        for dir in dirs {
165            self.create_dir(&dir).await?;
166        }
167
168        Ok(())
169    }
170
171    pub async fn create_dir(&self, path: &str) -> Result<()> {
172        let resp = self.info(path).await?;
173
174        let status = resp.status();
175
176        match status {
177            StatusCode::NOT_FOUND => {
178                let name = get_basename(path).trim_end_matches('/');
179                let parent = get_parent(path);
180
181                let mount_id = self.get_mount_id().await?;
182
183                let url = format!(
184                    "{}/api/v2/mounts/{}/files/folder?path={}",
185                    self.endpoint,
186                    mount_id,
187                    percent_encode_path(parent)
188                );
189
190                let body = json!({
191                    "name": name
192                });
193
194                let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
195
196                let req = Request::post(url);
197
198                let req = self.sign(req).await?;
199
200                let req = req
201                    .header(header::CONTENT_TYPE, "application/json")
202                    .body(Buffer::from(Bytes::from(bs)))
203                    .map_err(new_request_build_error)?;
204
205                let resp = self.info.http_client().send(req).await?;
206
207                let status = resp.status();
208
209                match status {
210                    // When the directory already exists, Koofr returns 400 Bad Request.
211                    // We should treat it as success.
212                    StatusCode::OK | StatusCode::CREATED | StatusCode::BAD_REQUEST => Ok(()),
213                    _ => Err(parse_error(resp)),
214                }
215            }
216            StatusCode::OK => Ok(()),
217            _ => Err(parse_error(resp)),
218        }
219    }
220
221    pub async fn info(&self, path: &str) -> Result<Response<Buffer>> {
222        let mount_id = self.get_mount_id().await?;
223
224        let url = format!(
225            "{}/api/v2/mounts/{}/files/info?path={}",
226            self.endpoint,
227            mount_id,
228            percent_encode_path(path)
229        );
230
231        let req = Request::get(url);
232
233        let req = self.sign(req).await?;
234
235        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
236
237        self.send(req).await
238    }
239
240    pub async fn get(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
241        let path = build_rooted_abs_path(&self.root, path);
242
243        let mount_id = self.get_mount_id().await?;
244
245        let url = format!(
246            "{}/api/v2/mounts/{}/files/get?path={}",
247            self.endpoint,
248            mount_id,
249            percent_encode_path(&path)
250        );
251
252        let req = Request::get(url).header(header::RANGE, range.to_header());
253
254        let req = self.sign(req).await?;
255
256        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
257
258        self.info.http_client().fetch(req).await
259    }
260
261    pub async fn put(&self, path: &str, bs: Buffer) -> Result<Response<Buffer>> {
262        let path = build_rooted_abs_path(&self.root, path);
263
264        let filename = get_basename(&path);
265        let parent = get_parent(&path);
266
267        let mount_id = self.get_mount_id().await?;
268
269        let url = format!(
270            "{}/content/api/v2/mounts/{}/files/put?path={}&filename={}&info=true&overwriteIgnoreNonexisting=&autorename=false&overwrite=true",
271            self.endpoint,
272            mount_id,
273            percent_encode_path(parent),
274            percent_encode_path(filename)
275        );
276
277        let file_part = FormDataPart::new("file")
278            .header(
279                header::CONTENT_DISPOSITION,
280                format!("form-data; name=\"file\"; filename=\"{filename}\"")
281                    .parse()
282                    .unwrap(),
283            )
284            .content(bs);
285
286        let multipart = Multipart::new().part(file_part);
287
288        let req = Request::post(url);
289
290        let req = self.sign(req).await?;
291
292        let req = multipart.apply(req)?;
293
294        self.send(req).await
295    }
296
297    pub async fn remove(&self, path: &str) -> Result<Response<Buffer>> {
298        let path = build_rooted_abs_path(&self.root, path);
299
300        let mount_id = self.get_mount_id().await?;
301
302        let url = format!(
303            "{}/api/v2/mounts/{}/files/remove?path={}",
304            self.endpoint,
305            mount_id,
306            percent_encode_path(&path)
307        );
308
309        let req = Request::delete(url);
310
311        let req = self.sign(req).await?;
312
313        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
314
315        self.send(req).await
316    }
317
318    pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
319        let from = build_rooted_abs_path(&self.root, from);
320        let to = build_rooted_abs_path(&self.root, to);
321
322        let mount_id = self.get_mount_id().await?;
323
324        let url = format!(
325            "{}/api/v2/mounts/{}/files/copy?path={}",
326            self.endpoint,
327            mount_id,
328            percent_encode_path(&from),
329        );
330
331        let body = json!({
332            "toMountId": mount_id,
333            "toPath": to,
334        });
335
336        let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
337
338        let req = Request::put(url);
339
340        let req = self.sign(req).await?;
341
342        let req = req
343            .header(header::CONTENT_TYPE, "application/json")
344            .body(Buffer::from(Bytes::from(bs)))
345            .map_err(new_request_build_error)?;
346
347        self.send(req).await
348    }
349
350    pub async fn move_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
351        let from = build_rooted_abs_path(&self.root, from);
352        let to = build_rooted_abs_path(&self.root, to);
353
354        let mount_id = self.get_mount_id().await?;
355
356        let url = format!(
357            "{}/api/v2/mounts/{}/files/move?path={}",
358            self.endpoint,
359            mount_id,
360            percent_encode_path(&from),
361        );
362
363        let body = json!({
364            "toMountId": mount_id,
365            "toPath": to,
366        });
367
368        let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
369
370        let req = Request::put(url);
371
372        let req = self.sign(req).await?;
373
374        let req = req
375            .header(header::CONTENT_TYPE, "application/json")
376            .body(Buffer::from(Bytes::from(bs)))
377            .map_err(new_request_build_error)?;
378
379        self.send(req).await
380    }
381
382    pub async fn list(&self, path: &str) -> Result<Response<Buffer>> {
383        let path = build_rooted_abs_path(&self.root, path);
384
385        let mount_id = self.get_mount_id().await?;
386
387        let url = format!(
388            "{}/api/v2/mounts/{}/files/list?path={}",
389            self.endpoint,
390            mount_id,
391            percent_encode_path(&path)
392        );
393
394        let req = Request::get(url);
395
396        let req = self.sign(req).await?;
397
398        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
399
400        self.send(req).await
401    }
402}
403
404#[derive(Clone, Default)]
405pub struct KoofrSigner {
406    pub token: String,
407}
408
409#[derive(Debug, Deserialize)]
410pub struct TokenResponse {
411    pub token: String,
412}
413
414#[derive(Debug, Deserialize)]
415pub struct MountsResponse {
416    pub mounts: Vec<Mount>,
417}
418
419#[derive(Debug, Deserialize)]
420#[serde(rename_all = "camelCase")]
421pub struct Mount {
422    pub id: String,
423    pub is_primary: bool,
424}
425
426#[derive(Debug, Deserialize)]
427#[serde(rename_all = "camelCase")]
428pub struct ListResponse {
429    pub files: Vec<File>,
430}
431
432#[derive(Debug, Deserialize)]
433#[serde(rename_all = "camelCase")]
434pub struct File {
435    pub name: String,
436    #[serde(rename = "type")]
437    pub ty: String,
438    pub size: u64,
439    pub modified: i64,
440    pub content_type: String,
441}