opendal/services/yandex_disk/
core.rs1use bytes::Buf;
19use http::header;
20use http::request;
21use http::Request;
22use http::Response;
23use http::StatusCode;
24use serde::Deserialize;
25use std::fmt::Debug;
26use std::fmt::Formatter;
27use std::sync::Arc;
28
29use super::error::parse_error;
30use crate::raw::*;
31use crate::*;
32
33#[derive(Clone)]
34pub struct YandexDiskCore {
35 pub info: Arc<AccessorInfo>,
36 pub root: String,
38 pub access_token: String,
40}
41
42impl Debug for YandexDiskCore {
43 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44 f.debug_struct("Backend")
45 .field("root", &self.root)
46 .finish_non_exhaustive()
47 }
48}
49
50impl YandexDiskCore {
51 #[inline]
52 pub async fn send(&self, req: Request<Buffer>) -> Result<Response<Buffer>> {
53 self.info.http_client().send(req).await
54 }
55
56 #[inline]
57 pub fn sign(&self, req: request::Builder) -> request::Builder {
58 req.header(
59 header::AUTHORIZATION,
60 format!("OAuth {}", self.access_token),
61 )
62 }
63}
64
65impl YandexDiskCore {
66 async fn get_upload_url(&self, path: &str) -> Result<String> {
68 let path = build_rooted_abs_path(&self.root, path);
69
70 let url = format!(
71 "https://cloud-api.yandex.net/v1/disk/resources/upload?path={}&overwrite=true",
72 percent_encode_path(&path)
73 );
74
75 let req = Request::get(url);
76
77 let req = req.extension(Operation::Write);
78
79 let req = self.sign(req);
80
81 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
83
84 let resp = self.send(req).await?;
85
86 let status = resp.status();
87
88 match status {
89 StatusCode::OK => {
90 let bytes = resp.into_body();
91
92 let resp: GetUploadUrlResponse =
93 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
94
95 Ok(resp.href)
96 }
97 _ => Err(parse_error(resp)),
98 }
99 }
100
101 pub async fn upload(&self, path: &str, body: Buffer) -> Result<Response<Buffer>> {
102 let upload_url = self.get_upload_url(path).await?;
103 let req = Request::put(upload_url)
104 .extension(Operation::Write)
105 .body(body)
106 .map_err(new_request_build_error)?;
107
108 self.send(req).await
109 }
110
111 async fn get_download_url(&self, path: &str) -> Result<String> {
112 let path = build_rooted_abs_path(&self.root, path);
113
114 let url = format!(
115 "https://cloud-api.yandex.net/v1/disk/resources/download?path={}&overwrite=true",
116 percent_encode_path(&path)
117 );
118
119 let req = Request::get(url);
120
121 let req = req.extension(Operation::Read);
122
123 let req = self.sign(req);
124
125 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
127
128 let resp = self.send(req).await?;
129
130 let status = resp.status();
131
132 match status {
133 StatusCode::OK => {
134 let bytes = resp.into_body();
135
136 let resp: GetUploadUrlResponse =
137 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
138
139 Ok(resp.href)
140 }
141 _ => Err(parse_error(resp)),
142 }
143 }
144
145 pub async fn download(&self, path: &str, range: BytesRange) -> Result<Response<HttpBody>> {
146 let download_url = self.get_download_url(path).await?;
147 let req = Request::get(download_url)
148 .header(header::RANGE, range.to_header())
149 .extension(Operation::Read)
150 .body(Buffer::new())
151 .map_err(new_request_build_error)?;
152
153 self.info.http_client().fetch(req).await
154 }
155
156 pub async fn ensure_dir_exists(&self, path: &str) -> Result<()> {
157 let path = build_abs_path(&self.root, path);
158
159 let paths = path.split('/').collect::<Vec<&str>>();
160
161 for i in 0..paths.len() - 1 {
162 let path = paths[..i + 1].join("/");
163 let resp = self.create_dir(&path).await?;
164
165 let status = resp.status();
166
167 match status {
168 StatusCode::CREATED | StatusCode::CONFLICT => {}
169 _ => return Err(parse_error(resp)),
170 }
171 }
172 Ok(())
173 }
174
175 pub async fn create_dir(&self, path: &str) -> Result<Response<Buffer>> {
176 let url = format!(
177 "https://cloud-api.yandex.net/v1/disk/resources?path=/{}",
178 percent_encode_path(path),
179 );
180
181 let req = Request::put(url);
182
183 let req = req.extension(Operation::CreateDir);
184
185 let req = self.sign(req);
186
187 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
189
190 self.send(req).await
191 }
192
193 pub async fn copy(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
194 let from = build_rooted_abs_path(&self.root, from);
195 let to = build_rooted_abs_path(&self.root, to);
196
197 let url = format!(
198 "https://cloud-api.yandex.net/v1/disk/resources/copy?from={}&path={}&overwrite=true",
199 percent_encode_path(&from),
200 percent_encode_path(&to)
201 );
202
203 let req = Request::post(url);
204
205 let req = req.extension(Operation::Copy);
206
207 let req = self.sign(req);
208
209 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
211
212 self.send(req).await
213 }
214
215 pub async fn move_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> {
216 let from = build_rooted_abs_path(&self.root, from);
217 let to = build_rooted_abs_path(&self.root, to);
218
219 let url = format!(
220 "https://cloud-api.yandex.net/v1/disk/resources/move?from={}&path={}&overwrite=true",
221 percent_encode_path(&from),
222 percent_encode_path(&to)
223 );
224
225 let req = Request::post(url);
226
227 let req = req.extension(Operation::Rename);
228
229 let req = self.sign(req);
230
231 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
233
234 self.send(req).await
235 }
236
237 pub async fn delete(&self, path: &str) -> Result<Response<Buffer>> {
238 let path = build_rooted_abs_path(&self.root, path);
239
240 let url = format!(
241 "https://cloud-api.yandex.net/v1/disk/resources?path={}&permanently=true",
242 percent_encode_path(&path),
243 );
244
245 let req = Request::delete(url);
246
247 let req = req.extension(Operation::Delete);
248
249 let req = self.sign(req);
250
251 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
253
254 self.send(req).await
255 }
256
257 pub async fn metainformation(
258 &self,
259 path: &str,
260 limit: Option<usize>,
261 offset: Option<String>,
262 ) -> Result<Response<Buffer>> {
263 let path = build_rooted_abs_path(&self.root, path);
264
265 let mut url = format!(
266 "https://cloud-api.yandex.net/v1/disk/resources?path={}",
267 percent_encode_path(&path),
268 );
269
270 if let Some(limit) = limit {
271 url = format!("{}&limit={}", url, limit);
272 }
273
274 if let Some(offset) = offset {
275 url = format!("{}&offset={}", url, offset);
276 }
277
278 let req = Request::get(url);
279
280 let req = req.extension(Operation::Stat);
281
282 let req = self.sign(req);
283
284 let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
286
287 self.send(req).await
288 }
289}
290
291pub(super) fn parse_info(mf: MetainformationResponse) -> Result<Metadata> {
292 let mode = if mf.ty == "file" {
293 EntryMode::FILE
294 } else {
295 EntryMode::DIR
296 };
297
298 let mut m = Metadata::new(mode);
299
300 m.set_last_modified(parse_datetime_from_rfc3339(&mf.modified)?);
301
302 if let Some(md5) = mf.md5 {
303 m.set_content_md5(&md5);
304 }
305
306 if let Some(mime_type) = mf.mime_type {
307 m.set_content_type(&mime_type);
308 }
309
310 if let Some(size) = mf.size {
311 m.set_content_length(size);
312 }
313
314 Ok(m)
315}
316
317#[derive(Debug, Deserialize)]
318pub struct GetUploadUrlResponse {
319 pub href: String,
320}
321
322#[derive(Debug, Deserialize)]
323pub struct MetainformationResponse {
324 #[serde(rename = "type")]
325 pub ty: String,
326 pub path: String,
327 pub modified: String,
328 pub md5: Option<String>,
329 pub mime_type: Option<String>,
330 pub size: Option<u64>,
331 #[serde(rename = "_embedded")]
332 pub embedded: Option<Embedded>,
333}
334
335#[derive(Debug, Deserialize)]
336pub struct Embedded {
337 pub total: usize,
338 pub items: Vec<MetainformationResponse>,
339}