opendal/services/yandex_disk/
core.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Buf;
19use http::header;
20use http::request;
21use http::Request;
22use http::Response;
23use http::StatusCode;
24use serde::Deserialize;
25use std::fmt::Debug;
26use std::fmt::Formatter;
27use std::sync::Arc;
28
29use super::error::parse_error;
30use crate::raw::*;
31use crate::*;
32
33#[derive(Clone)]
34pub struct YandexDiskCore {
35    pub info: Arc<AccessorInfo>,
36    /// The root of this core.
37    pub root: String,
38    /// Yandex Disk oauth access_token.
39    pub access_token: String,
40}
41
42impl Debug for YandexDiskCore {
43    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44        f.debug_struct("Backend")
45            .field("root", &self.root)
46            .finish_non_exhaustive()
47    }
48}
49
50impl YandexDiskCore {
51    #[inline]
52    pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
53        self.info.http_client().send(req).await
54    }
55
56    #[inline]
57    pub fn sign(&self, req: request::Builder) -> request::Builder {
58        req.header(
59            header::AUTHORIZATION,
60            format!("OAuth {}", self.access_token),
61        )
62    }
63}
64
65impl YandexDiskCore {
66    /// Get upload url.
67    pub async fn get_upload_url(&self, path: &str) -> Result<String> {
68        let path = build_rooted_abs_path(&self.root, path);
69
70        let url = format!(
71            "https://cloud-api.yandex.net/v1/disk/resources/upload?path={}&overwrite=true",
72            percent_encode_path(&path)
73        );
74
75        let req = Request::get(url);
76
77        let req = self.sign(req);
78
79        // Set body
80        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
81
82        let resp = self.send(req).await?;
83
84        let status = resp.status();
85
86        match status {
87            StatusCode::OK => {
88                let bytes = resp.into_body();
89
90                let resp: GetUploadUrlResponse =
91                    serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
92
93                Ok(resp.href)
94            }
95            _ => Err(parse_error(resp)),
96        }
97    }
98
99    pub async fn get_download_url(&self, path: &str) -> Result<String> {
100        let path = build_rooted_abs_path(&self.root, path);
101
102        let url = format!(
103            "https://cloud-api.yandex.net/v1/disk/resources/download?path={}&overwrite=true",
104            percent_encode_path(&path)
105        );
106
107        let req = Request::get(url);
108
109        let req = self.sign(req);
110
111        // Set body
112        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
113
114        let resp = self.send(req).await?;
115
116        let status = resp.status();
117
118        match status {
119            StatusCode::OK => {
120                let bytes = resp.into_body();
121
122                let resp: GetUploadUrlResponse =
123                    serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
124
125                Ok(resp.href)
126            }
127            _ => Err(parse_error(resp)),
128        }
129    }
130
131    pub async fn ensure_dir_exists(&self, path: &str) -> Result<()> {
132        let path = build_abs_path(&self.root, path);
133
134        let paths = path.split('/').collect::<Vec<&str>>();
135
136        for i in 0..paths.len() - 1 {
137            let path = paths[..i + 1].join("/");
138            let resp = self.create_dir(&path).await?;
139
140            let status = resp.status();
141
142            match status {
143                StatusCode::CREATED | StatusCode::CONFLICT => {}
144                _ => return Err(parse_error(resp)),
145            }
146        }
147        Ok(())
148    }
149
150    pub async fn create_dir(&self, path: &str) -> Result<Response<Buffer>> {
151        let url = format!(
152            "https://cloud-api.yandex.net/v1/disk/resources?path=/{}",
153            percent_encode_path(path),
154        );
155
156        let req = Request::put(url);
157
158        let req = self.sign(req);
159
160        // Set body
161        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
162
163        self.send(req).await
164    }
165
166    pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
167        let from = build_rooted_abs_path(&self.root, from);
168        let to = build_rooted_abs_path(&self.root, to);
169
170        let url = format!(
171            "https://cloud-api.yandex.net/v1/disk/resources/copy?from={}&path={}&overwrite=true",
172            percent_encode_path(&from),
173            percent_encode_path(&to)
174        );
175
176        let req = Request::post(url);
177
178        let req = self.sign(req);
179
180        // Set body
181        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
182
183        self.send(req).await
184    }
185
186    pub async fn move_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
187        let from = build_rooted_abs_path(&self.root, from);
188        let to = build_rooted_abs_path(&self.root, to);
189
190        let url = format!(
191            "https://cloud-api.yandex.net/v1/disk/resources/move?from={}&path={}&overwrite=true",
192            percent_encode_path(&from),
193            percent_encode_path(&to)
194        );
195
196        let req = Request::post(url);
197
198        let req = self.sign(req);
199
200        // Set body
201        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
202
203        self.send(req).await
204    }
205
206    pub async fn delete(&self, path: &str) -> Result<Response<Buffer>> {
207        let path = build_rooted_abs_path(&self.root, path);
208
209        let url = format!(
210            "https://cloud-api.yandex.net/v1/disk/resources?path={}&permanently=true",
211            percent_encode_path(&path),
212        );
213
214        let req = Request::delete(url);
215
216        let req = self.sign(req);
217
218        // Set body
219        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
220
221        self.send(req).await
222    }
223
224    pub async fn metainformation(
225        &self,
226        path: &str,
227        limit: Option<usize>,
228        offset: Option<String>,
229    ) -> Result<Response<Buffer>> {
230        let path = build_rooted_abs_path(&self.root, path);
231
232        let mut url = format!(
233            "https://cloud-api.yandex.net/v1/disk/resources?path={}",
234            percent_encode_path(&path),
235        );
236
237        if let Some(limit) = limit {
238            url = format!("{}&limit={}", url, limit);
239        }
240
241        if let Some(offset) = offset {
242            url = format!("{}&offset={}", url, offset);
243        }
244
245        let req = Request::get(url);
246
247        let req = self.sign(req);
248
249        // Set body
250        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
251
252        self.send(req).await
253    }
254}
255
256pub(super) fn parse_info(mf: MetainformationResponse) -> Result<Metadata> {
257    let mode = if mf.ty == "file" {
258        EntryMode::FILE
259    } else {
260        EntryMode::DIR
261    };
262
263    let mut m = Metadata::new(mode);
264
265    m.set_last_modified(parse_datetime_from_rfc3339(&mf.modified)?);
266
267    if let Some(md5) = mf.md5 {
268        m.set_content_md5(&md5);
269    }
270
271    if let Some(mime_type) = mf.mime_type {
272        m.set_content_type(&mime_type);
273    }
274
275    if let Some(size) = mf.size {
276        m.set_content_length(size);
277    }
278
279    Ok(m)
280}
281
282#[derive(Debug, Deserialize)]
283pub struct GetUploadUrlResponse {
284    pub href: String,
285}
286
287#[derive(Debug, Deserialize)]
288pub struct MetainformationResponse {
289    #[serde(rename = "type")]
290    pub ty: String,
291    pub path: String,
292    pub modified: String,
293    pub md5: Option<String>,
294    pub mime_type: Option<String>,
295    pub size: Option<u64>,
296    #[serde(rename = "_embedded")]
297    pub embedded: Option<Embedded>,
298}
299
300#[derive(Debug, Deserialize)]
301pub struct Embedded {
302    pub total: usize,
303    pub items: Vec<MetainformationResponse>,
304}