opendal_core/services/koofr/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::VecDeque;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use bytes::Buf;
23use bytes::Bytes;
24use http::Request;
25use http::Response;
26use http::StatusCode;
27use http::header;
28use http::request;
29use mea::mutex::Mutex;
30use mea::once::OnceCell;
31use serde::Deserialize;
32use serde_json::json;
33
34use super::error::parse_error;
35use crate::raw::*;
36use crate::*;
37
38#[derive(Clone)]
39pub struct KoofrCore {
40    pub info: Arc<AccessorInfo>,
41    /// The root of this core.
42    pub root: String,
43    /// The endpoint of this backend.
44    pub endpoint: String,
45    /// Koofr email
46    pub email: String,
47    /// Koofr password
48    pub password: String,
49
50    /// signer of this backend.
51    pub signer: Arc<Mutex<KoofrSigner>>,
52
53    // Koofr mount_id.
54    pub mount_id: OnceCell<String>,
55}
56
57impl Debug for KoofrCore {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("KoofrCore")
60            .field("root", &self.root)
61            .field("endpoint", &self.endpoint)
62            .field("email", &self.email)
63            .finish_non_exhaustive()
64    }
65}
66
67impl KoofrCore {
68    #[inline]
69    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
70        self.info.http_client().send(req).await
71    }
72
73    pub async fn get_mount_id(&self) -> Result<&String> {
74        self.mount_id
75            .get_or_try_init(|| async {
76                let req = Request::get(format!("{}/api/v2/mounts", self.endpoint));
77
78                let req = self.sign(req).await?;
79
80                let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
81
82                let resp = self.send(req).await?;
83
84                let status = resp.status();
85
86                if status != StatusCode::OK {
87                    return Err(parse_error(resp));
88                }
89
90                let bs = resp.into_body();
91
92                let resp: MountsResponse =
93                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
94
95                for mount in resp.mounts {
96                    if mount.is_primary {
97                        return Ok(mount.id);
98                    }
99                }
100
101                Err(Error::new(ErrorKind::Unexpected, "No primary mount found"))
102            })
103            .await
104    }
105
106    pub async fn sign(&self, req: request::Builder) -> Result<request::Builder> {
107        let mut signer = self.signer.lock().await;
108        if !signer.token.is_empty() {
109            return Ok(req.header(
110                header::AUTHORIZATION,
111                format!("Token token={}", signer.token),
112            ));
113        }
114
115        let url = format!("{}/token", self.endpoint);
116
117        let body = json!({
118            "email": self.email,
119            "password": self.password,
120        });
121
122        let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
123
124        let auth_req = Request::post(url)
125            .header(header::CONTENT_TYPE, "application/json")
126            .body(Buffer::from(Bytes::from(bs)))
127            .map_err(new_request_build_error)?;
128
129        let resp = self.info.http_client().send(auth_req).await?;
130
131        let status = resp.status();
132
133        if status != StatusCode::OK {
134            return Err(parse_error(resp));
135        }
136
137        let bs = resp.into_body();
138        let resp: TokenResponse =
139            serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
140
141        signer.token = resp.token;
142
143        Ok(req.header(
144            header::AUTHORIZATION,
145            format!("Token token={}", signer.token),
146        ))
147    }
148}
149
150impl KoofrCore {
151    pub async fn ensure_dir_exists(&self, path: &str) -> Result<()> {
152        let mut dirs = VecDeque::default();
153
154        let mut p = build_abs_path(&self.root, path);
155
156        while p != "/" {
157            let parent = get_parent(&p).to_string();
158
159            dirs.push_front(parent.clone());
160            p = parent;
161        }
162
163        for dir in dirs {
164            self.create_dir(&dir).await?;
165        }
166
167        Ok(())
168    }
169
170    pub async fn create_dir(&self, path: &str) -> Result<()> {
171        let resp = self.info(path).await?;
172
173        let status = resp.status();
174
175        match status {
176            StatusCode::NOT_FOUND => {
177                let name = get_basename(path).trim_end_matches('/');
178                let parent = get_parent(path);
179
180                let mount_id = self.get_mount_id().await?;
181
182                let url = format!(
183                    "{}/api/v2/mounts/{}/files/folder?path={}",
184                    self.endpoint,
185                    mount_id,
186                    percent_encode_path(parent)
187                );
188
189                let body = json!({
190                    "name": name
191                });
192
193                let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
194
195                let req = Request::post(url);
196
197                let req = self.sign(req).await?;
198
199                let req = req
200                    .header(header::CONTENT_TYPE, "application/json")
201                    .extension(Operation::CreateDir)
202                    .body(Buffer::from(Bytes::from(bs)))
203                    .map_err(new_request_build_error)?;
204
205                let resp = self.info.http_client().send(req).await?;
206
207                let status = resp.status();
208
209                match status {
210                    // When the directory already exists, Koofr returns 400 Bad Request.
211                    // We should treat it as success.
212                    StatusCode::OK | StatusCode::CREATED | StatusCode::BAD_REQUEST => Ok(()),
213                    _ => Err(parse_error(resp)),
214                }
215            }
216            StatusCode::OK => Ok(()),
217            _ => Err(parse_error(resp)),
218        }
219    }
220
221    pub async fn info(&self, path: &str) -> Result<Response<Buffer>> {
222        let mount_id = self.get_mount_id().await?;
223
224        let url = format!(
225            "{}/api/v2/mounts/{}/files/info?path={}",
226            self.endpoint,
227            mount_id,
228            percent_encode_path(path)
229        );
230
231        let req = Request::get(url);
232
233        let req = self.sign(req).await?;
234
235        let req = req
236            .extension(Operation::Stat)
237            .body(Buffer::new())
238            .map_err(new_request_build_error)?;
239
240        self.send(req).await
241    }
242
243    pub async fn get(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
244        let path = build_rooted_abs_path(&self.root, path);
245
246        let mount_id = self.get_mount_id().await?;
247
248        let url = format!(
249            "{}/api/v2/mounts/{}/files/get?path={}",
250            self.endpoint,
251            mount_id,
252            percent_encode_path(&path)
253        );
254
255        let req = Request::get(url).header(header::RANGE, range.to_header());
256
257        let req = self.sign(req).await?;
258
259        let req = req
260            .extension(Operation::Read)
261            .body(Buffer::new())
262            .map_err(new_request_build_error)?;
263
264        self.info.http_client().fetch(req).await
265    }
266
267    pub async fn put(&self, path: &str, bs: Buffer) -> Result<Response<Buffer>> {
268        let path = build_rooted_abs_path(&self.root, path);
269
270        let filename = get_basename(&path);
271        let parent = get_parent(&path);
272
273        let mount_id = self.get_mount_id().await?;
274
275        let url = format!(
276            "{}/content/api/v2/mounts/{}/files/put?path={}&filename={}&info=true&overwriteIgnoreNonexisting=&autorename=false&overwrite=true",
277            self.endpoint,
278            mount_id,
279            percent_encode_path(parent),
280            percent_encode_path(filename)
281        );
282
283        let file_part = FormDataPart::new("file")
284            .header(
285                header::CONTENT_DISPOSITION,
286                format!("form-data; name=\"file\"; filename=\"{filename}\"")
287                    .parse()
288                    .unwrap(),
289            )
290            .content(bs);
291
292        let multipart = Multipart::new().part(file_part);
293
294        let req = Request::post(url);
295
296        let req = self.sign(req).await?;
297
298        let req = req.extension(Operation::Write);
299
300        let req = multipart.apply(req)?;
301
302        self.send(req).await
303    }
304
305    pub async fn remove(&self, path: &str) -> Result<Response<Buffer>> {
306        let path = build_rooted_abs_path(&self.root, path);
307
308        let mount_id = self.get_mount_id().await?;
309
310        let url = format!(
311            "{}/api/v2/mounts/{}/files/remove?path={}",
312            self.endpoint,
313            mount_id,
314            percent_encode_path(&path)
315        );
316
317        let req = Request::delete(url);
318
319        let req = self.sign(req).await?;
320
321        let req = req
322            .extension(Operation::Delete)
323            .body(Buffer::new())
324            .map_err(new_request_build_error)?;
325
326        self.send(req).await
327    }
328
329    pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
330        let from = build_rooted_abs_path(&self.root, from);
331        let to = build_rooted_abs_path(&self.root, to);
332
333        let mount_id = self.get_mount_id().await?;
334
335        let url = format!(
336            "{}/api/v2/mounts/{}/files/copy?path={}",
337            self.endpoint,
338            mount_id,
339            percent_encode_path(&from),
340        );
341
342        let body = json!({
343            "toMountId": mount_id,
344            "toPath": to,
345        });
346
347        let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
348
349        let req = Request::put(url);
350
351        let req = self.sign(req).await?;
352
353        let req = req
354            .header(header::CONTENT_TYPE, "application/json")
355            .extension(Operation::Copy)
356            .body(Buffer::from(Bytes::from(bs)))
357            .map_err(new_request_build_error)?;
358
359        self.send(req).await
360    }
361
362    pub async fn move_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
363        let from = build_rooted_abs_path(&self.root, from);
364        let to = build_rooted_abs_path(&self.root, to);
365
366        let mount_id = self.get_mount_id().await?;
367
368        let url = format!(
369            "{}/api/v2/mounts/{}/files/move?path={}",
370            self.endpoint,
371            mount_id,
372            percent_encode_path(&from),
373        );
374
375        let body = json!({
376            "toMountId": mount_id,
377            "toPath": to,
378        });
379
380        let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
381
382        let req = Request::put(url);
383
384        let req = self.sign(req).await?;
385
386        let req = req
387            .header(header::CONTENT_TYPE, "application/json")
388            .extension(Operation::Rename)
389            .body(Buffer::from(Bytes::from(bs)))
390            .map_err(new_request_build_error)?;
391
392        self.send(req).await
393    }
394
395    pub async fn list(&self, path: &str) -> Result<Response<Buffer>> {
396        let path = build_rooted_abs_path(&self.root, path);
397
398        let mount_id = self.get_mount_id().await?;
399
400        let url = format!(
401            "{}/api/v2/mounts/{}/files/list?path={}",
402            self.endpoint,
403            mount_id,
404            percent_encode_path(&path)
405        );
406
407        let req = Request::get(url);
408
409        let req = self.sign(req).await?;
410
411        let req = req
412            .extension(Operation::List)
413            .body(Buffer::new())
414            .map_err(new_request_build_error)?;
415
416        self.send(req).await
417    }
418}
419
420#[derive(Clone, Default)]
421pub struct KoofrSigner {
422    pub token: String,
423}
424
425#[derive(Debug, Deserialize)]
426pub struct TokenResponse {
427    pub token: String,
428}
429
430#[derive(Debug, Deserialize)]
431pub struct MountsResponse {
432    pub mounts: Vec<Mount>,
433}
434
435#[derive(Debug, Deserialize)]
436#[serde(rename_all = "camelCase")]
437pub struct Mount {
438    pub id: String,
439    pub is_primary: bool,
440}
441
442#[derive(Debug, Deserialize)]
443#[serde(rename_all = "camelCase")]
444pub struct ListResponse {
445    pub files: Vec<File>,
446}
447
448#[derive(Debug, Deserialize)]
449#[serde(rename_all = "camelCase")]
450pub struct File {
451    pub name: String,
452    #[serde(rename = "type")]
453    pub ty: String,
454    pub size: u64,
455    pub modified: i64,
456    pub content_type: String,
457}