opendal_core/services/onedrive/
core.rs1use std::fmt::Debug;
19use std::sync::Arc;
20
21use bytes::Buf;
22use bytes::Bytes;
23use http::Request;
24use http::Response;
25use http::StatusCode;
26use http::header;
27use mea::mutex::Mutex;
28
29use super::error::parse_error;
30use super::graph_model::*;
31use crate::raw::*;
32use crate::*;
33
34pub struct OneDriveCore {
35 pub info: Arc<AccessorInfo>,
36 pub root: String,
37 pub signer: Arc<Mutex<OneDriveSigner>>,
38}
39
40impl Debug for OneDriveCore {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 f.debug_struct("OneDriveCore")
43 .field("root", &self.root)
44 .finish_non_exhaustive()
45 }
46}
47
48const SPECIAL_POSIX_ENTRIES: [&str; 3] = [".", "/", ""];
50
51impl OneDriveCore {
53 pub(crate) const DRIVE_ROOT_URL: &'static str =
55 "https://graph.microsoft.com/v1.0/me/drive/root";
56
57 pub(crate) fn onedrive_item_url(&self, path: &str, build_absolute_path: bool) -> String {
59 if self.root == "/" && SPECIAL_POSIX_ENTRIES.contains(&path) {
62 Self::DRIVE_ROOT_URL.to_string()
63 } else {
64 let absolute_path = if build_absolute_path {
66 let rooted_path = build_rooted_abs_path(&self.root, path);
67 rooted_path
68 .strip_suffix('/')
69 .unwrap_or(rooted_path.as_str())
70 .to_string()
71 } else {
72 path.to_string()
73 };
74 format!(
75 "{}:{}",
76 Self::DRIVE_ROOT_URL,
77 percent_encode_path(&absolute_path),
78 )
79 }
80 }
81
82 pub(crate) async fn onedrive_get_stat_plain(&self, path: &str) -> Result<Response<Buffer>> {
86 let url: String = format!(
87 "{}?{}",
88 self.onedrive_item_url(path, true),
89 GENERAL_SELECT_PARAM
90 );
91 let request = Request::get(&url);
92
93 let mut request = request
94 .extension(Operation::Stat)
95 .body(Buffer::new())
96 .map_err(new_request_build_error)?;
97
98 self.sign(&mut request).await?;
99
100 self.info.http_client().send(request).await
101 }
102
103 pub(crate) async fn ensure_directory(&self, path: &str) -> Result<OneDriveItem> {
109 let response = self.onedrive_get_stat_plain(path).await?;
110 let item: OneDriveItem = match response.status() {
111 StatusCode::OK => {
112 let bytes = response.into_body();
113 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?
114 }
115 StatusCode::NOT_FOUND => {
116 let response = self.onedrive_create_dir(path).await?;
118 match response.status() {
119 StatusCode::CREATED | StatusCode::OK => {
120 let bytes = response.into_body();
121 serde_json::from_reader(bytes.reader())
122 .map_err(new_json_deserialize_error)?
123 }
124 _ => return Err(parse_error(response)),
125 }
126 }
127 _ => return Err(parse_error(response)),
128 };
129
130 Ok(item)
131 }
132
133 pub(crate) async fn sign<T>(&self, request: &mut Request<T>) -> Result<()> {
134 let mut signer = self.signer.lock().await;
135 signer.sign(request).await
136 }
137}
138
139const MAX_MONITOR_ATTEMPT: i32 = 3600;
142const MONITOR_WAIT_SECOND: u64 = 1;
143
144impl OneDriveCore {
152 pub(crate) async fn onedrive_stat(&self, path: &str, args: OpStat) -> Result<Metadata> {
159 let mut url: String = self.onedrive_item_url(path, true);
160 if args.version().is_some() {
161 url += "?$expand=versions(";
162 url += VERSION_SELECT_PARAM;
163 url += ")";
164 }
165
166 let mut request = Request::get(&url);
167 if let Some(etag) = args.if_none_match() {
168 request = request.header(header::IF_NONE_MATCH, etag);
169 }
170
171 let mut request = request
172 .extension(Operation::Stat)
173 .body(Buffer::new())
174 .map_err(new_request_build_error)?;
175
176 self.sign(&mut request).await?;
177
178 let response = self.info.http_client().send(request).await?;
179 if !response.status().is_success() {
180 return Err(parse_error(response));
181 }
182
183 let bytes = response.into_body();
184 let decoded_response: OneDriveItem =
185 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
186
187 let entry_mode: EntryMode = match decoded_response.item_type {
188 ItemType::Folder { .. } => EntryMode::DIR,
189 ItemType::File { .. } => EntryMode::FILE,
190 };
191
192 let mut meta = Metadata::new(entry_mode)
193 .with_etag(decoded_response.e_tag)
194 .with_content_length(decoded_response.size.max(0) as u64);
195
196 if let Some(version) = args.version() {
197 for item_version in decoded_response.versions.as_deref().unwrap_or_default() {
198 if item_version.id == version {
199 meta.set_version(version);
200 break; }
202 }
203
204 if meta.version().is_none() {
205 return Err(Error::new(
206 ErrorKind::NotFound,
207 "cannot find this version of the item",
208 ));
209 }
210 }
211
212 let last_modified = decoded_response.last_modified_date_time;
213 let date_utc_last_modified = last_modified.parse::<Timestamp>()?;
214 meta.set_last_modified(date_utc_last_modified);
215
216 Ok(meta)
217 }
218
219 pub(crate) async fn onedrive_list_versions(
225 &self,
226 path: &str,
227 ) -> Result<Vec<OneDriveItemVersion>> {
228 let url: String = format!(
230 "{}:/versions?{}",
231 self.onedrive_item_url(path, true),
232 VERSION_SELECT_PARAM
233 );
234
235 let mut request = Request::get(url)
236 .extension(Operation::List)
237 .body(Buffer::new())
238 .map_err(new_request_build_error)?;
239
240 self.sign(&mut request).await?;
241
242 let response = self.info.http_client().send(request).await?;
243 let decoded_response: GraphApiOneDriveVersionsResponse =
244 serde_json::from_reader(response.into_body().reader())
245 .map_err(new_json_deserialize_error)?;
246 Ok(decoded_response.value)
247 }
248
249 pub(crate) async fn onedrive_get_next_list_page(&self, url: &str) -> Result<Response<Buffer>> {
250 let mut request = Request::get(url)
251 .extension(Operation::List)
252 .body(Buffer::new())
253 .map_err(new_request_build_error)?;
254
255 self.sign(&mut request).await?;
256
257 self.info.http_client().send(request).await
258 }
259
260 pub(crate) async fn onedrive_get_content(
270 &self,
271 path: &str,
272 args: &OpRead,
273 ) -> Result<Response<HttpBody>> {
274 let url: String = format!("{}:/content", self.onedrive_item_url(path, true));
276
277 let mut request = Request::get(&url).header(header::RANGE, args.range().to_header());
278 if let Some(etag) = args.if_none_match() {
279 request = request.header(header::IF_NONE_MATCH, etag);
280 }
281
282 let mut request = request
283 .extension(Operation::Read)
284 .body(Buffer::new())
285 .map_err(new_request_build_error)?;
286
287 self.sign(&mut request).await?;
288
289 self.info.http_client().fetch(request).await
290 }
291
292 pub async fn onedrive_upload_simple(
304 &self,
305 path: &str,
306 args: &OpWrite,
307 body: Buffer,
308 ) -> Result<Response<Buffer>> {
309 let url = format!(
310 "{}:/content?@microsoft.graph.conflictBehavior={}&{}",
311 self.onedrive_item_url(path, true),
312 REPLACE_EXISTING_ITEM_WHEN_CONFLICT,
313 GENERAL_SELECT_PARAM
314 );
315
316 let mut request = Request::put(&url)
323 .header(header::CONTENT_LENGTH, body.len())
324 .header(header::CONTENT_TYPE, "text/plain");
325
326 if let Some(if_match) = args.if_match() {
330 request = request.header(header::IF_MATCH, if_match);
331 }
332
333 let mut request = request
334 .extension(Operation::Write)
335 .body(body)
336 .map_err(new_request_build_error)?;
337
338 self.sign(&mut request).await?;
339
340 self.info.http_client().send(request).await
341 }
342
343 pub(crate) async fn onedrive_chunked_upload(
344 &self,
345 url: &str,
346 args: &OpWrite,
347 offset: usize,
348 chunk_end: usize,
349 total_len: usize,
350 body: Buffer,
351 ) -> Result<Response<Buffer>> {
352 let mut request = Request::put(url);
353
354 let range = format!("bytes {offset}-{chunk_end}/{total_len}");
355 request = request.header(header::CONTENT_RANGE, range);
356
357 let size = chunk_end - offset + 1;
358 request = request.header(header::CONTENT_LENGTH, size);
359
360 if let Some(mime) = args.content_type() {
361 request = request.header(header::CONTENT_TYPE, mime)
362 }
363
364 let request = request
365 .extension(Operation::Write)
366 .body(body)
367 .map_err(new_request_build_error)?;
368 self.info.http_client().send(request).await
371 }
372
373 pub(crate) async fn onedrive_create_upload_session(
379 &self,
380 path: &str,
381 args: &OpWrite,
382 ) -> Result<Response<Buffer>> {
383 let parent_path = get_parent(path);
384 let file_name = get_basename(path);
385 let url = format!(
386 "{}:/createUploadSession",
387 self.onedrive_item_url(parent_path, true),
388 );
389 let mut request = Request::post(url).header(header::CONTENT_TYPE, "application/json");
390
391 if let Some(if_match) = args.if_match() {
392 request = request.header(header::IF_MATCH, if_match);
393 }
394
395 let body = OneDriveUploadSessionCreationRequestBody::new(file_name.to_string());
396 let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
397 let body = Buffer::from(Bytes::from(body_bytes));
398 let mut request = request
399 .extension(Operation::Write)
400 .body(body)
401 .map_err(new_request_build_error)?;
402
403 self.sign(&mut request).await?;
404
405 self.info.http_client().send(request).await
406 }
407
408 pub(crate) async fn onedrive_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
415 let parent_path = get_parent(path);
416 let basename = get_basename(path);
417 let folder_name = basename.strip_suffix('/').unwrap_or(basename);
418
419 let url = format!(
420 "{}:/children?{}",
421 self.onedrive_item_url(parent_path, true),
422 GENERAL_SELECT_PARAM
423 );
424
425 let payload = CreateDirPayload::new(folder_name.to_string());
426 let body_bytes = serde_json::to_vec(&payload).map_err(new_json_serialize_error)?;
427 let body = Buffer::from(bytes::Bytes::from(body_bytes));
428
429 let mut request = Request::post(url)
430 .header(header::CONTENT_TYPE, "application/json")
431 .extension(Operation::CreateDir)
432 .body(body)
433 .map_err(new_request_build_error)?;
434
435 self.sign(&mut request).await?;
436
437 self.info.http_client().send(request).await
438 }
439
440 pub(crate) async fn onedrive_delete(&self, path: &str) -> Result<Response<Buffer>> {
444 let url = self.onedrive_item_url(path, true);
445
446 let mut request = Request::delete(&url)
447 .extension(Operation::Delete)
448 .body(Buffer::new())
449 .map_err(new_request_build_error)?;
450
451 self.sign(&mut request).await?;
452
453 self.info.http_client().send(request).await
454 }
455
456 pub(crate) async fn initialize_copy(&self, source: &str, destination: &str) -> Result<String> {
463 let response = self.onedrive_get_stat_plain(source).await?;
465 if !response.status().is_success() {
466 return Err(parse_error(response));
467 }
468
469 let destination_parent = get_parent(destination).to_string();
471 let basename = get_basename(destination);
472
473 let item = self.ensure_directory(&destination_parent).await?;
474 let body = OneDrivePatchRequestBody {
475 parent_reference: ParentReference {
476 path: "".to_string(), drive_id: item.parent_reference.drive_id,
478 id: item.id,
479 },
480 name: basename.to_string(),
481 };
482
483 let response = self.onedrive_get_stat_plain(destination).await?;
485 match response.status() {
486 StatusCode::OK => {
491 let response = self.onedrive_delete(destination).await?;
492 match response.status() {
493 StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => {} _ => return Err(parse_error(response)),
495 }
496 }
497 StatusCode::NOT_FOUND => {} _ => return Err(parse_error(response)),
499 }
500
501 let url: String = format!("{}:/copy", self.onedrive_item_url(source, true));
502
503 let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
504 let buffer = Buffer::from(Bytes::from(body_bytes));
505 let mut request = Request::post(&url)
506 .header(header::CONTENT_TYPE, "application/json")
507 .extension(Operation::Copy)
508 .body(buffer)
509 .map_err(new_request_build_error)?;
510
511 self.sign(&mut request).await?;
512
513 let response = self.info.http_client().send(request).await?;
514 match response.status() {
515 StatusCode::ACCEPTED => parse_location(response.headers())?
516 .ok_or_else(|| {
517 Error::new(
518 ErrorKind::Unexpected,
519 "OneDrive didn't return a location URL",
520 )
521 })
522 .map(String::from),
523 _ => Err(parse_error(response)),
524 }
525 }
526
527 pub(crate) async fn wait_until_complete(&self, monitor_url: String) -> Result<()> {
528 for _attempt in 0..MAX_MONITOR_ATTEMPT {
529 let mut request = Request::get(monitor_url.to_string())
530 .header(header::CONTENT_TYPE, "application/json")
531 .extension(Operation::Copy)
532 .body(Buffer::new())
533 .map_err(new_request_build_error)?;
534
535 self.sign(&mut request).await?;
536
537 let response = self.info.http_client().send(request).await?;
538 let status: OneDriveMonitorStatus =
539 serde_json::from_reader(response.into_body().reader())
540 .map_err(new_json_deserialize_error)?;
541 if status.status == "completed" {
542 return Ok(());
543 }
544
545 tokio::time::sleep(Duration::from_secs(MONITOR_WAIT_SECOND)).await;
546 }
547
548 Err(Error::new(
549 ErrorKind::Unexpected,
550 "Exceed monitoring timeout",
551 ))
552 }
553
554 pub(crate) async fn onedrive_move(&self, source: &str, destination: &str) -> Result<()> {
555 let response = self.onedrive_get_stat_plain(source).await?;
557 if !response.status().is_success() {
558 return Err(Error::new(ErrorKind::NotFound, "source not found"));
559 }
560
561 let destination_parent = get_parent(destination).to_string();
563 let basename = get_basename(destination);
564
565 let item = self.ensure_directory(&destination_parent).await?;
566 let body = OneDrivePatchRequestBody {
567 parent_reference: ParentReference {
568 path: "".to_string(), drive_id: item.parent_reference.drive_id,
571 id: item.id,
572 },
573 name: basename.to_string(),
574 };
575 let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
576 let buffer = Buffer::from(Bytes::from(body_bytes));
577 let url: String = format!(
578 "{}?@microsoft.graph.conflictBehavior={}&$select=id",
579 self.onedrive_item_url(source, true),
580 REPLACE_EXISTING_ITEM_WHEN_CONFLICT
581 );
582 let mut request = Request::patch(&url)
583 .header(header::CONTENT_TYPE, "application/json")
584 .extension(Operation::Rename)
585 .body(buffer)
586 .map_err(new_request_build_error)?;
587
588 self.sign(&mut request).await?;
589
590 let response = self.info.http_client().send(request).await?;
591 match response.status() {
592 StatusCode::OK => Ok(()),
594 _ => Err(parse_error(response)),
595 }
596 }
597}
598
599pub struct OneDriveSigner {
601 pub info: Arc<AccessorInfo>, pub client_id: String,
604 pub client_secret: String,
605 pub refresh_token: String,
606
607 pub access_token: String,
608 pub expires_in: Timestamp,
609}
610
611const ONEDRIVE_REFRESH_TOKEN: &str = "https://login.microsoftonline.com/common/oauth2/v2.0/token";
619
620impl OneDriveSigner {
621 pub fn new(info: Arc<AccessorInfo>) -> Self {
622 OneDriveSigner {
623 info,
624
625 client_id: "".to_string(),
626 client_secret: "".to_string(),
627 refresh_token: "".to_string(),
628 access_token: "".to_string(),
629 expires_in: Timestamp::MIN,
630 }
631 }
632
633 async fn refresh_tokens(&mut self) -> Result<()> {
634 let encoded_payload = format!(
636 "client_id={}&client_secret={}&scope=offline_access%20Files.ReadWrite&refresh_token={}&grant_type=refresh_token",
637 percent_encode_path(self.client_id.as_str()),
638 percent_encode_path(self.client_secret.as_str()),
639 percent_encode_path(self.refresh_token.as_str())
640 );
641 let request = Request::post(ONEDRIVE_REFRESH_TOKEN)
642 .header(header::CONTENT_TYPE, "application/x-www-form-urlencoded")
643 .body(Buffer::from(encoded_payload))
644 .map_err(new_request_build_error)?;
645
646 let response = self.info.http_client().send(request).await?;
647 match response.status() {
648 StatusCode::OK => {
649 let resp_body = response.into_body();
650 let data: GraphOAuthRefreshTokenResponseBody =
651 serde_json::from_reader(resp_body.reader())
652 .map_err(new_json_deserialize_error)?;
653 self.access_token = data.access_token;
654 self.refresh_token = data.refresh_token;
655 self.expires_in = Timestamp::now() + Duration::from_secs(data.expires_in)
656 - Duration::from_secs(120); Ok(())
658 }
659 _ => Err(parse_error(response)),
660 }
661 }
662
663 pub async fn sign<T>(&mut self, request: &mut Request<T>) -> Result<()> {
665 if !self.access_token.is_empty() && self.expires_in > Timestamp::now() {
666 let value = format!("Bearer {}", self.access_token)
667 .parse()
668 .expect("access_token must be valid header value");
669
670 request.headers_mut().insert(header::AUTHORIZATION, value);
671 return Ok(());
672 }
673
674 self.refresh_tokens().await?;
675
676 let auth_header_content = format!("Bearer {}", self.access_token)
677 .parse()
678 .expect("Fetched access_token is invalid as a header value");
679
680 request
681 .headers_mut()
682 .insert(header::AUTHORIZATION, auth_header_content);
683
684 Ok(())
685 }
686}