opendal/services/koofr/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::VecDeque;
19use std::fmt::Debug;
20use std::fmt::Formatter;
21use std::sync::Arc;
22
23use bytes::Buf;
24use bytes::Bytes;
25use http::header;
26use http::request;
27use http::Request;
28use http::Response;
29use http::StatusCode;
30use serde::Deserialize;
31use serde_json::json;
32use tokio::sync::Mutex;
33use tokio::sync::OnceCell;
34
35use super::error::parse_error;
36use crate::raw::*;
37use crate::*;
38
39#[derive(Clone)]
40pub struct KoofrCore {
41    pub info: Arc<AccessorInfo>,
42    /// The root of this core.
43    pub root: String,
44    /// The endpoint of this backend.
45    pub endpoint: String,
46    /// Koofr email
47    pub email: String,
48    /// Koofr password
49    pub password: String,
50
51    /// signer of this backend.
52    pub signer: Arc<Mutex<KoofrSigner>>,
53
54    // Koofr mount_id.
55    pub mount_id: OnceCell<String>,
56}
57
58impl Debug for KoofrCore {
59    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
60        f.debug_struct("Backend")
61            .field("root", &self.root)
62            .field("endpoint", &self.endpoint)
63            .field("email", &self.email)
64            .finish_non_exhaustive()
65    }
66}
67
68impl KoofrCore {
69    #[inline]
70    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
71        self.info.http_client().send(req).await
72    }
73
74    pub async fn get_mount_id(&self) -> Result<&String> {
75        self.mount_id
76            .get_or_try_init(|| async {
77                let req = Request::get(format!("{}/api/v2/mounts", self.endpoint));
78
79                let req = self.sign(req).await?;
80
81                let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
82
83                let resp = self.send(req).await?;
84
85                let status = resp.status();
86
87                if status != StatusCode::OK {
88                    return Err(parse_error(resp));
89                }
90
91                let bs = resp.into_body();
92
93                let resp: MountsResponse =
94                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
95
96                for mount in resp.mounts {
97                    if mount.is_primary {
98                        return Ok(mount.id);
99                    }
100                }
101
102                Err(Error::new(ErrorKind::Unexpected, "No primary mount found"))
103            })
104            .await
105    }
106
107    pub async fn sign(&self, req: request::Builder) -> Result<request::Builder> {
108        let mut signer = self.signer.lock().await;
109        if !signer.token.is_empty() {
110            return Ok(req.header(
111                header::AUTHORIZATION,
112                format!("Token token={}", signer.token),
113            ));
114        }
115
116        let url = format!("{}/token", self.endpoint);
117
118        let body = json!({
119            "email": self.email,
120            "password": self.password,
121        });
122
123        let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
124
125        let auth_req = Request::post(url)
126            .header(header::CONTENT_TYPE, "application/json")
127            .body(Buffer::from(Bytes::from(bs)))
128            .map_err(new_request_build_error)?;
129
130        let resp = self.info.http_client().send(auth_req).await?;
131
132        let status = resp.status();
133
134        if status != StatusCode::OK {
135            return Err(parse_error(resp));
136        }
137
138        let bs = resp.into_body();
139        let resp: TokenResponse =
140            serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
141
142        signer.token = resp.token;
143
144        Ok(req.header(
145            header::AUTHORIZATION,
146            format!("Token token={}", signer.token),
147        ))
148    }
149}
150
151impl KoofrCore {
152    pub async fn ensure_dir_exists(&self, path: &str) -> Result<()> {
153        let mut dirs = VecDeque::default();
154
155        let mut p = build_abs_path(&self.root, path);
156
157        while p != "/" {
158            let parent = get_parent(&p).to_string();
159
160            dirs.push_front(parent.clone());
161            p = parent;
162        }
163
164        for dir in dirs {
165            self.create_dir(&dir).await?;
166        }
167
168        Ok(())
169    }
170
171    pub async fn create_dir(&self, path: &str) -> Result<()> {
172        let resp = self.info(path).await?;
173
174        let status = resp.status();
175
176        match status {
177            StatusCode::NOT_FOUND => {
178                let name = get_basename(path).trim_end_matches('/');
179                let parent = get_parent(path);
180
181                let mount_id = self.get_mount_id().await?;
182
183                let url = format!(
184                    "{}/api/v2/mounts/{}/files/folder?path={}",
185                    self.endpoint,
186                    mount_id,
187                    percent_encode_path(parent)
188                );
189
190                let body = json!({
191                    "name": name
192                });
193
194                let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
195
196                let req = Request::post(url);
197
198                let req = self.sign(req).await?;
199
200                let req = req
201                    .header(header::CONTENT_TYPE, "application/json")
202                    .extension(Operation::CreateDir)
203                    .body(Buffer::from(Bytes::from(bs)))
204                    .map_err(new_request_build_error)?;
205
206                let resp = self.info.http_client().send(req).await?;
207
208                let status = resp.status();
209
210                match status {
211                    // When the directory already exists, Koofr returns 400 Bad Request.
212                    // We should treat it as success.
213                    StatusCode::OK | StatusCode::CREATED | StatusCode::BAD_REQUEST => Ok(()),
214                    _ => Err(parse_error(resp)),
215                }
216            }
217            StatusCode::OK => Ok(()),
218            _ => Err(parse_error(resp)),
219        }
220    }
221
222    pub async fn info(&self, path: &str) -> Result<Response<Buffer>> {
223        let mount_id = self.get_mount_id().await?;
224
225        let url = format!(
226            "{}/api/v2/mounts/{}/files/info?path={}",
227            self.endpoint,
228            mount_id,
229            percent_encode_path(path)
230        );
231
232        let req = Request::get(url);
233
234        let req = self.sign(req).await?;
235
236        let req = req
237            .extension(Operation::Stat)
238            .body(Buffer::new())
239            .map_err(new_request_build_error)?;
240
241        self.send(req).await
242    }
243
244    pub async fn get(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
245        let path = build_rooted_abs_path(&self.root, path);
246
247        let mount_id = self.get_mount_id().await?;
248
249        let url = format!(
250            "{}/api/v2/mounts/{}/files/get?path={}",
251            self.endpoint,
252            mount_id,
253            percent_encode_path(&path)
254        );
255
256        let req = Request::get(url).header(header::RANGE, range.to_header());
257
258        let req = self.sign(req).await?;
259
260        let req = req
261            .extension(Operation::Read)
262            .body(Buffer::new())
263            .map_err(new_request_build_error)?;
264
265        self.info.http_client().fetch(req).await
266    }
267
268    pub async fn put(&self, path: &str, bs: Buffer) -> Result<Response<Buffer>> {
269        let path = build_rooted_abs_path(&self.root, path);
270
271        let filename = get_basename(&path);
272        let parent = get_parent(&path);
273
274        let mount_id = self.get_mount_id().await?;
275
276        let url = format!(
277            "{}/content/api/v2/mounts/{}/files/put?path={}&filename={}&info=true&overwriteIgnoreNonexisting=&autorename=false&overwrite=true",
278            self.endpoint,
279            mount_id,
280            percent_encode_path(parent),
281            percent_encode_path(filename)
282        );
283
284        let file_part = FormDataPart::new("file")
285            .header(
286                header::CONTENT_DISPOSITION,
287                format!("form-data; name=\"file\"; filename=\"{filename}\"")
288                    .parse()
289                    .unwrap(),
290            )
291            .content(bs);
292
293        let multipart = Multipart::new().part(file_part);
294
295        let req = Request::post(url);
296
297        let req = self.sign(req).await?;
298
299        let req = req.extension(Operation::Write);
300
301        let req = multipart.apply(req)?;
302
303        self.send(req).await
304    }
305
306    pub async fn remove(&self, path: &str) -> Result<Response<Buffer>> {
307        let path = build_rooted_abs_path(&self.root, path);
308
309        let mount_id = self.get_mount_id().await?;
310
311        let url = format!(
312            "{}/api/v2/mounts/{}/files/remove?path={}",
313            self.endpoint,
314            mount_id,
315            percent_encode_path(&path)
316        );
317
318        let req = Request::delete(url);
319
320        let req = self.sign(req).await?;
321
322        let req = req
323            .extension(Operation::Delete)
324            .body(Buffer::new())
325            .map_err(new_request_build_error)?;
326
327        self.send(req).await
328    }
329
330    pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
331        let from = build_rooted_abs_path(&self.root, from);
332        let to = build_rooted_abs_path(&self.root, to);
333
334        let mount_id = self.get_mount_id().await?;
335
336        let url = format!(
337            "{}/api/v2/mounts/{}/files/copy?path={}",
338            self.endpoint,
339            mount_id,
340            percent_encode_path(&from),
341        );
342
343        let body = json!({
344            "toMountId": mount_id,
345            "toPath": to,
346        });
347
348        let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
349
350        let req = Request::put(url);
351
352        let req = self.sign(req).await?;
353
354        let req = req
355            .header(header::CONTENT_TYPE, "application/json")
356            .extension(Operation::Copy)
357            .body(Buffer::from(Bytes::from(bs)))
358            .map_err(new_request_build_error)?;
359
360        self.send(req).await
361    }
362
363    pub async fn move_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
364        let from = build_rooted_abs_path(&self.root, from);
365        let to = build_rooted_abs_path(&self.root, to);
366
367        let mount_id = self.get_mount_id().await?;
368
369        let url = format!(
370            "{}/api/v2/mounts/{}/files/move?path={}",
371            self.endpoint,
372            mount_id,
373            percent_encode_path(&from),
374        );
375
376        let body = json!({
377            "toMountId": mount_id,
378            "toPath": to,
379        });
380
381        let bs = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
382
383        let req = Request::put(url);
384
385        let req = self.sign(req).await?;
386
387        let req = req
388            .header(header::CONTENT_TYPE, "application/json")
389            .extension(Operation::Rename)
390            .body(Buffer::from(Bytes::from(bs)))
391            .map_err(new_request_build_error)?;
392
393        self.send(req).await
394    }
395
396    pub async fn list(&self, path: &str) -> Result<Response<Buffer>> {
397        let path = build_rooted_abs_path(&self.root, path);
398
399        let mount_id = self.get_mount_id().await?;
400
401        let url = format!(
402            "{}/api/v2/mounts/{}/files/list?path={}",
403            self.endpoint,
404            mount_id,
405            percent_encode_path(&path)
406        );
407
408        let req = Request::get(url);
409
410        let req = self.sign(req).await?;
411
412        let req = req
413            .extension(Operation::List)
414            .body(Buffer::new())
415            .map_err(new_request_build_error)?;
416
417        self.send(req).await
418    }
419}
420
421#[derive(Clone, Default)]
422pub struct KoofrSigner {
423    pub token: String,
424}
425
426#[derive(Debug, Deserialize)]
427pub struct TokenResponse {
428    pub token: String,
429}
430
431#[derive(Debug, Deserialize)]
432pub struct MountsResponse {
433    pub mounts: Vec<Mount>,
434}
435
436#[derive(Debug, Deserialize)]
437#[serde(rename_all = "camelCase")]
438pub struct Mount {
439    pub id: String,
440    pub is_primary: bool,
441}
442
443#[derive(Debug, Deserialize)]
444#[serde(rename_all = "camelCase")]
445pub struct ListResponse {
446    pub files: Vec<File>,
447}
448
449#[derive(Debug, Deserialize)]
450#[serde(rename_all = "camelCase")]
451pub struct File {
452    pub name: String,
453    #[serde(rename = "type")]
454    pub ty: String,
455    pub size: u64,
456    pub modified: i64,
457    pub content_type: String,
458}