opendal/services/onedrive/
core.rs1use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21use std::time::Duration;
22
23use bytes::Buf;
24use bytes::Bytes;
25use chrono::DateTime;
26use chrono::Utc;
27use http::header;
28use http::Request;
29use http::Response;
30
31use http::StatusCode;
32use tokio::sync::Mutex;
33
34use super::error::parse_error;
35use super::graph_model::*;
36use crate::raw::*;
37use crate::*;
38
39pub struct OneDriveCore {
40 pub info: Arc<AccessorInfo>,
41 pub root: String,
42 pub signer: Arc<Mutex<OneDriveSigner>>,
43}
44
45impl Debug for OneDriveCore {
46 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("OneDriveCore")
48 .field("root", &self.root)
49 .finish_non_exhaustive()
50 }
51}
52
53const SPECIAL_POSIX_ENTRIES: [&str; 3] = [".", "/", ""];
55
56impl OneDriveCore {
58 pub(crate) const DRIVE_ROOT_URL: &str = "https://graph.microsoft.com/v1.0/me/drive/root";
60
61 pub(crate) fn onedrive_item_url(&self, path: &str, build_absolute_path: bool) -> String {
63 if self.root == "/" && SPECIAL_POSIX_ENTRIES.contains(&path) {
66 Self::DRIVE_ROOT_URL.to_string()
67 } else {
68 let absolute_path = if build_absolute_path {
70 let rooted_path = build_rooted_abs_path(&self.root, path);
71 rooted_path
72 .strip_suffix('/')
73 .unwrap_or(rooted_path.as_str())
74 .to_string()
75 } else {
76 path.to_string()
77 };
78 format!(
79 "{}:{}",
80 Self::DRIVE_ROOT_URL,
81 percent_encode_path(&absolute_path),
82 )
83 }
84 }
85
86 pub(crate) async fn onedrive_get_stat_plain(&self, path: &str) -> Result<Response<Buffer>> {
90 let url: String = format!(
91 "{}?{}",
92 self.onedrive_item_url(path, true),
93 GENERAL_SELECT_PARAM
94 );
95 let request = Request::get(&url);
96
97 let mut request = request
98 .body(Buffer::new())
99 .map_err(new_request_build_error)?;
100
101 self.sign(&mut request).await?;
102
103 self.info.http_client().send(request).await
104 }
105
106 pub(crate) async fn ensure_directory(&self, path: &str) -> Result<OneDriveItem> {
112 let response = self.onedrive_get_stat_plain(path).await?;
113 let item: OneDriveItem = match response.status() {
114 StatusCode::OK => {
115 let bytes = response.into_body();
116 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?
117 }
118 StatusCode::NOT_FOUND => {
119 let response = self.onedrive_create_dir(path).await?;
121 match response.status() {
122 StatusCode::CREATED | StatusCode::OK => {
123 let bytes = response.into_body();
124 serde_json::from_reader(bytes.reader())
125 .map_err(new_json_deserialize_error)?
126 }
127 _ => return Err(parse_error(response)),
128 }
129 }
130 _ => return Err(parse_error(response)),
131 };
132
133 Ok(item)
134 }
135
136 pub(crate) async fn sign<T>(&self, request: &mut Request<T>) -> Result<()> {
137 let mut signer = self.signer.lock().await;
138 signer.sign(request).await
139 }
140}
141
142const MAX_MONITOR_ATTEMPT: i32 = 3600;
145const MONITOR_WAIT_SECOND: u64 = 1;
146
147impl OneDriveCore {
155 pub(crate) async fn onedrive_stat(&self, path: &str, args: OpStat) -> Result<Metadata> {
162 let mut url: String = self.onedrive_item_url(path, true);
163 if args.version().is_some() {
164 url += "?$expand=versions(";
165 url += VERSION_SELECT_PARAM;
166 url += ")";
167 }
168
169 let mut request = Request::get(&url);
170 if let Some(etag) = args.if_none_match() {
171 request = request.header(header::IF_NONE_MATCH, etag);
172 }
173
174 let mut request = request
175 .body(Buffer::new())
176 .map_err(new_request_build_error)?;
177
178 self.sign(&mut request).await?;
179
180 let response = self.info.http_client().send(request).await?;
181 if !response.status().is_success() {
182 return Err(parse_error(response));
183 }
184
185 let bytes = response.into_body();
186 let decoded_response: OneDriveItem =
187 serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?;
188
189 let entry_mode: EntryMode = match decoded_response.item_type {
190 ItemType::Folder { .. } => EntryMode::DIR,
191 ItemType::File { .. } => EntryMode::FILE,
192 };
193
194 let mut meta = Metadata::new(entry_mode)
195 .with_etag(decoded_response.e_tag)
196 .with_content_length(decoded_response.size.max(0) as u64);
197
198 if let Some(version) = args.version() {
199 for item_version in decoded_response.versions.as_deref().unwrap_or_default() {
200 if item_version.id == version {
201 meta.set_version(version);
202 break; }
204 }
205
206 if meta.version().is_none() {
207 return Err(Error::new(
208 ErrorKind::NotFound,
209 "cannot find this version of the item",
210 ));
211 }
212 }
213
214 let last_modified = decoded_response.last_modified_date_time;
215 let date_utc_last_modified = parse_datetime_from_rfc3339(&last_modified)?;
216 meta.set_last_modified(date_utc_last_modified);
217
218 Ok(meta)
219 }
220
221 pub(crate) async fn onedrive_list_versions(
227 &self,
228 path: &str,
229 ) -> Result<Vec<OneDriveItemVersion>> {
230 let url: String = format!(
232 "{}:/versions?{}",
233 self.onedrive_item_url(path, true),
234 VERSION_SELECT_PARAM
235 );
236
237 let mut request = Request::get(url)
238 .body(Buffer::new())
239 .map_err(new_request_build_error)?;
240
241 self.sign(&mut request).await?;
242
243 let response = self.info.http_client().send(request).await?;
244 let decoded_response: GraphApiOneDriveVersionsResponse =
245 serde_json::from_reader(response.into_body().reader())
246 .map_err(new_json_deserialize_error)?;
247 Ok(decoded_response.value)
248 }
249
250 pub(crate) async fn onedrive_get_next_list_page(&self, url: &str) -> Result<Response<Buffer>> {
251 let mut request = Request::get(url)
252 .body(Buffer::new())
253 .map_err(new_request_build_error)?;
254
255 self.sign(&mut request).await?;
256
257 self.info.http_client().send(request).await
258 }
259
260 pub(crate) async fn onedrive_get_content(
270 &self,
271 path: &str,
272 args: &OpRead,
273 ) -> Result<Response<HttpBody>> {
274 let url: String = format!("{}:/content", self.onedrive_item_url(path, true));
276
277 let mut request = Request::get(&url).header(header::RANGE, args.range().to_header());
278 if let Some(etag) = args.if_none_match() {
279 request = request.header(header::IF_NONE_MATCH, etag);
280 }
281
282 let mut request = request
283 .body(Buffer::new())
284 .map_err(new_request_build_error)?;
285
286 self.sign(&mut request).await?;
287
288 self.info.http_client().fetch(request).await
289 }
290
291 pub async fn onedrive_upload_simple(
303 &self,
304 path: &str,
305 args: &OpWrite,
306 body: Buffer,
307 ) -> Result<Response<Buffer>> {
308 let url = format!(
309 "{}:/content?@microsoft.graph.conflictBehavior={}&{}",
310 self.onedrive_item_url(path, true),
311 REPLACE_EXISTING_ITEM_WHEN_CONFLICT,
312 GENERAL_SELECT_PARAM
313 );
314
315 let mut request = Request::put(&url)
322 .header(header::CONTENT_LENGTH, body.len())
323 .header(header::CONTENT_TYPE, "text/plain");
324
325 if let Some(if_match) = args.if_match() {
329 request = request.header(header::IF_MATCH, if_match);
330 }
331
332 let mut request = request.body(body).map_err(new_request_build_error)?;
333
334 self.sign(&mut request).await?;
335
336 self.info.http_client().send(request).await
337 }
338
339 pub(crate) async fn onedrive_chunked_upload(
340 &self,
341 url: &str,
342 args: &OpWrite,
343 offset: usize,
344 chunk_end: usize,
345 total_len: usize,
346 body: Buffer,
347 ) -> Result<Response<Buffer>> {
348 let mut request = Request::put(url);
349
350 let range = format!("bytes {}-{}/{}", offset, chunk_end, total_len);
351 request = request.header(header::CONTENT_RANGE, range);
352
353 let size = chunk_end - offset + 1;
354 request = request.header(header::CONTENT_LENGTH, size);
355
356 if let Some(mime) = args.content_type() {
357 request = request.header(header::CONTENT_TYPE, mime)
358 }
359
360 let request = request.body(body).map_err(new_request_build_error)?;
361 self.info.http_client().send(request).await
364 }
365
366 pub(crate) async fn onedrive_create_upload_session(
372 &self,
373 path: &str,
374 args: &OpWrite,
375 ) -> Result<Response<Buffer>> {
376 let parent_path = get_parent(path);
377 let file_name = get_basename(path);
378 let url = format!(
379 "{}:/createUploadSession",
380 self.onedrive_item_url(parent_path, true),
381 );
382 let mut request = Request::post(url).header(header::CONTENT_TYPE, "application/json");
383
384 if let Some(if_match) = args.if_match() {
385 request = request.header(header::IF_MATCH, if_match);
386 }
387
388 let body = OneDriveUploadSessionCreationRequestBody::new(file_name.to_string());
389 let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
390 let body = Buffer::from(Bytes::from(body_bytes));
391 let mut request = request.body(body).map_err(new_request_build_error)?;
392
393 self.sign(&mut request).await?;
394
395 self.info.http_client().send(request).await
396 }
397
398 pub(crate) async fn onedrive_create_dir(&self, path: &str) -> Result<Response<Buffer>> {
405 let parent_path = get_parent(path);
406 let basename = get_basename(path);
407 let folder_name = basename.strip_suffix('/').unwrap_or(basename);
408
409 let url = format!(
410 "{}:/children?{}",
411 self.onedrive_item_url(parent_path, true),
412 GENERAL_SELECT_PARAM
413 );
414
415 let payload = CreateDirPayload::new(folder_name.to_string());
416 let body_bytes = serde_json::to_vec(&payload).map_err(new_json_serialize_error)?;
417 let body = Buffer::from(bytes::Bytes::from(body_bytes));
418
419 let mut request = Request::post(url)
420 .header(header::CONTENT_TYPE, "application/json")
421 .body(body)
422 .map_err(new_request_build_error)?;
423
424 self.sign(&mut request).await?;
425
426 self.info.http_client().send(request).await
427 }
428
429 pub(crate) async fn onedrive_delete(&self, path: &str) -> Result<Response<Buffer>> {
433 let url = self.onedrive_item_url(path, true);
434
435 let mut request = Request::delete(&url)
436 .body(Buffer::new())
437 .map_err(new_request_build_error)?;
438
439 self.sign(&mut request).await?;
440
441 self.info.http_client().send(request).await
442 }
443
444 pub(crate) async fn initialize_copy(&self, source: &str, destination: &str) -> Result<String> {
451 let response = self.onedrive_get_stat_plain(source).await?;
453 if !response.status().is_success() {
454 return Err(parse_error(response));
455 }
456
457 let destination_parent = get_parent(destination).to_string();
459 let basename = get_basename(destination);
460
461 let item = self.ensure_directory(&destination_parent).await?;
462 let body = OneDrivePatchRequestBody {
463 parent_reference: ParentReference {
464 path: "".to_string(), drive_id: item.parent_reference.drive_id,
466 id: item.id,
467 },
468 name: basename.to_string(),
469 };
470
471 let response = self.onedrive_get_stat_plain(destination).await?;
473 match response.status() {
474 StatusCode::OK => {
479 let response = self.onedrive_delete(destination).await?;
480 match response.status() {
481 StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => {} _ => return Err(parse_error(response)),
483 }
484 }
485 StatusCode::NOT_FOUND => {} _ => return Err(parse_error(response)),
487 }
488
489 let url: String = format!("{}:/copy", self.onedrive_item_url(source, true));
490
491 let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
492 let buffer = Buffer::from(Bytes::from(body_bytes));
493 let mut request = Request::post(&url)
494 .header(header::CONTENT_TYPE, "application/json")
495 .body(buffer)
496 .map_err(new_request_build_error)?;
497
498 self.sign(&mut request).await?;
499
500 let response = self.info.http_client().send(request).await?;
501 match response.status() {
502 StatusCode::ACCEPTED => parse_location(response.headers())?
503 .ok_or_else(|| {
504 Error::new(
505 ErrorKind::Unexpected,
506 "OneDrive didn't return a location URL",
507 )
508 })
509 .map(String::from),
510 _ => Err(parse_error(response)),
511 }
512 }
513
514 pub(crate) async fn wait_until_complete(&self, monitor_url: String) -> Result<()> {
515 for _attempt in 0..MAX_MONITOR_ATTEMPT {
516 let mut request = Request::get(monitor_url.to_string())
517 .header(header::CONTENT_TYPE, "application/json")
518 .body(Buffer::new())
519 .map_err(new_request_build_error)?;
520
521 self.sign(&mut request).await?;
522
523 let response = self.info.http_client().send(request).await?;
524 let status: OneDriveMonitorStatus =
525 serde_json::from_reader(response.into_body().reader())
526 .map_err(new_json_deserialize_error)?;
527 if status.status == "completed" {
528 return Ok(());
529 }
530
531 tokio::time::sleep(Duration::from_secs(MONITOR_WAIT_SECOND)).await;
532 }
533
534 Err(Error::new(
535 ErrorKind::Unexpected,
536 "Exceed monitoring timeout",
537 ))
538 }
539
540 pub(crate) async fn onedrive_move(&self, source: &str, destination: &str) -> Result<()> {
541 let response = self.onedrive_get_stat_plain(source).await?;
543 if !response.status().is_success() {
544 return Err(Error::new(ErrorKind::NotFound, "source not found"));
545 }
546
547 let destination_parent = get_parent(destination).to_string();
549 let basename = get_basename(destination);
550
551 let item = self.ensure_directory(&destination_parent).await?;
552 let body = OneDrivePatchRequestBody {
553 parent_reference: ParentReference {
554 path: "".to_string(), drive_id: item.parent_reference.drive_id,
557 id: item.id,
558 },
559 name: basename.to_string(),
560 };
561 let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
562 let buffer = Buffer::from(Bytes::from(body_bytes));
563 let url: String = format!(
564 "{}?@microsoft.graph.conflictBehavior={}&$select=id",
565 self.onedrive_item_url(source, true),
566 REPLACE_EXISTING_ITEM_WHEN_CONFLICT
567 );
568 let mut request = Request::patch(&url)
569 .header(header::CONTENT_TYPE, "application/json")
570 .body(buffer)
571 .map_err(new_request_build_error)?;
572
573 self.sign(&mut request).await?;
574
575 let response = self.info.http_client().send(request).await?;
576 match response.status() {
577 StatusCode::OK => Ok(()),
579 _ => Err(parse_error(response)),
580 }
581 }
582}
583
584pub struct OneDriveSigner {
586 pub info: Arc<AccessorInfo>, pub client_id: String,
589 pub client_secret: String,
590 pub refresh_token: String,
591
592 pub access_token: String,
593 pub expires_in: DateTime<Utc>,
594}
595
596const ONEDRIVE_REFRESH_TOKEN: &str = "https://login.microsoftonline.com/common/oauth2/v2.0/token";
604
605impl OneDriveSigner {
606 pub fn new(info: Arc<AccessorInfo>) -> Self {
607 OneDriveSigner {
608 info,
609
610 client_id: "".to_string(),
611 client_secret: "".to_string(),
612 refresh_token: "".to_string(),
613 access_token: "".to_string(),
614 expires_in: DateTime::<Utc>::MIN_UTC,
615 }
616 }
617
618 async fn refresh_tokens(&mut self) -> Result<()> {
619 let encoded_payload = format!(
621 "client_id={}&client_secret={}&scope=offline_access%20Files.ReadWrite&refresh_token={}&grant_type=refresh_token",
622 percent_encode_path(self.client_id.as_str()),
623 percent_encode_path(self.client_secret.as_str()),
624 percent_encode_path(self.refresh_token.as_str())
625 );
626 let request = Request::post(ONEDRIVE_REFRESH_TOKEN)
627 .header(header::CONTENT_TYPE, "application/x-www-form-urlencoded")
628 .body(Buffer::from(encoded_payload))
629 .map_err(new_request_build_error)?;
630
631 let response = self.info.http_client().send(request).await?;
632 match response.status() {
633 StatusCode::OK => {
634 let resp_body = response.into_body();
635 let data: GraphOAuthRefreshTokenResponseBody =
636 serde_json::from_reader(resp_body.reader())
637 .map_err(new_json_deserialize_error)?;
638 self.access_token = data.access_token;
639 self.refresh_token = data.refresh_token;
640 self.expires_in = Utc::now()
641 + chrono::TimeDelta::try_seconds(data.expires_in)
642 .expect("expires_in must be valid seconds")
643 - chrono::TimeDelta::minutes(2); Ok(())
645 }
646 _ => Err(parse_error(response)),
647 }
648 }
649
650 pub async fn sign<T>(&mut self, request: &mut Request<T>) -> Result<()> {
652 if !self.access_token.is_empty() && self.expires_in > Utc::now() {
653 let value = format!("Bearer {}", self.access_token)
654 .parse()
655 .expect("access_token must be valid header value");
656
657 request.headers_mut().insert(header::AUTHORIZATION, value);
658 return Ok(());
659 }
660
661 self.refresh_tokens().await?;
662
663 let auth_header_content = format!("Bearer {}", self.access_token)
664 .parse()
665 .expect("Fetched access_token is invalid as a header value");
666
667 request
668 .headers_mut()
669 .insert(header::AUTHORIZATION, auth_header_content);
670
671 Ok(())
672 }
673}