opendal/services/yandex_disk/
backend.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use http::Response;
23use http::StatusCode;
24use log::debug;
25
26use super::YANDEX_DISK_SCHEME;
27use super::config::YandexDiskConfig;
28use super::core::*;
29use super::deleter::YandexDiskDeleter;
30use super::error::parse_error;
31use super::lister::YandexDiskLister;
32use super::writer::YandexDiskWriter;
33use super::writer::YandexDiskWriters;
34use crate::raw::*;
35use crate::*;
36
37#[doc = include_str!("docs.md")]
39#[derive(Default)]
40pub struct YandexDiskBuilder {
41 pub(super) config: YandexDiskConfig,
42
43 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
44 pub(super) http_client: Option<HttpClient>,
45}
46
47impl Debug for YandexDiskBuilder {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("YandexDiskBuilder")
50 .field("config", &self.config)
51 .finish_non_exhaustive()
52 }
53}
54
55impl YandexDiskBuilder {
56 pub fn root(mut self, root: &str) -> Self {
60 self.config.root = if root.is_empty() {
61 None
62 } else {
63 Some(root.to_string())
64 };
65
66 self
67 }
68
69 pub fn access_token(mut self, access_token: &str) -> Self {
74 self.config.access_token = access_token.to_string();
75
76 self
77 }
78
79 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
86 #[allow(deprecated)]
87 pub fn http_client(mut self, client: HttpClient) -> Self {
88 self.http_client = Some(client);
89 self
90 }
91}
92
93impl Builder for YandexDiskBuilder {
94 type Config = YandexDiskConfig;
95
96 fn build(self) -> Result<impl Access> {
98 debug!("backend build started: {:?}", &self);
99
100 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
101 debug!("backend use root {}", &root);
102
103 if self.config.access_token.is_empty() {
105 return Err(
106 Error::new(ErrorKind::ConfigInvalid, "access_token is empty")
107 .with_operation("Builder::build")
108 .with_context("service", YANDEX_DISK_SCHEME),
109 );
110 }
111
112 Ok(YandexDiskBackend {
113 core: Arc::new(YandexDiskCore {
114 info: {
115 let am = AccessorInfo::default();
116 am.set_scheme(YANDEX_DISK_SCHEME)
117 .set_root(&root)
118 .set_native_capability(Capability {
119 stat: true,
120
121 create_dir: true,
122
123 read: true,
124
125 write: true,
126 write_can_empty: true,
127
128 delete: true,
129 rename: true,
130 copy: true,
131
132 list: true,
133 list_with_limit: true,
134
135 shared: true,
136
137 ..Default::default()
138 });
139
140 #[allow(deprecated)]
142 if let Some(client) = self.http_client {
143 am.update_http_client(|_| client);
144 }
145
146 am.into()
147 },
148 root,
149 access_token: self.config.access_token.clone(),
150 }),
151 })
152 }
153}
154
155#[derive(Debug, Clone)]
157pub struct YandexDiskBackend {
158 core: Arc<YandexDiskCore>,
159}
160
161impl Access for YandexDiskBackend {
162 type Reader = HttpBody;
163 type Writer = YandexDiskWriters;
164 type Lister = oio::PageLister<YandexDiskLister>;
165 type Deleter = oio::OneShotDeleter<YandexDiskDeleter>;
166
167 fn info(&self) -> Arc<AccessorInfo> {
168 self.core.info.clone()
169 }
170
171 async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
172 self.core.ensure_dir_exists(path).await?;
173
174 Ok(RpCreateDir::default())
175 }
176
177 async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
178 self.core.ensure_dir_exists(to).await?;
179
180 let resp = self.core.move_object(from, to).await?;
181
182 let status = resp.status();
183
184 match status {
185 StatusCode::OK | StatusCode::CREATED => Ok(RpRename::default()),
186 _ => Err(parse_error(resp)),
187 }
188 }
189
190 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
191 self.core.ensure_dir_exists(to).await?;
192
193 let resp = self.core.copy(from, to).await?;
194
195 let status = resp.status();
196
197 match status {
198 StatusCode::OK | StatusCode::CREATED => Ok(RpCopy::default()),
199 _ => Err(parse_error(resp)),
200 }
201 }
202
203 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
204 let resp = self.core.download(path, args.range()).await?;
205
206 let status = resp.status();
207 match status {
208 StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
209 _ => {
210 let (part, mut body) = resp.into_parts();
211 let buf = body.to_buffer().await?;
212 Err(parse_error(Response::from_parts(part, buf)))
213 }
214 }
215 }
216
217 async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
218 let resp = self.core.metainformation(path, None, None).await?;
219
220 let status = resp.status();
221
222 match status {
223 StatusCode::OK => {
224 let bs = resp.into_body();
225
226 let mf: MetainformationResponse =
227 serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
228
229 parse_info(mf).map(RpStat::new)
230 }
231 _ => Err(parse_error(resp)),
232 }
233 }
234
235 async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
236 let writer = YandexDiskWriter::new(self.core.clone(), path.to_string());
237
238 let w = oio::OneShotWriter::new(writer);
239
240 Ok((RpWrite::default(), w))
241 }
242
243 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
244 Ok((
245 RpDelete::default(),
246 oio::OneShotDeleter::new(YandexDiskDeleter::new(self.core.clone())),
247 ))
248 }
249
250 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
251 let l = YandexDiskLister::new(self.core.clone(), path, args.limit());
252 Ok((RpList::default(), oio::PageLister::new(l)))
253 }
254}