opendal/services/dbfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::StatusCode;
24use log::debug;
25use serde::Deserialize;
26
27use super::core::DbfsCore;
28use super::delete::DbfsDeleter;
29use super::error::parse_error;
30use super::lister::DbfsLister;
31use super::writer::DbfsWriter;
32use super::DEFAULT_SCHEME;
33use crate::raw::*;
34use crate::services::DbfsConfig;
35use crate::*;
36impl Configurator for DbfsConfig {
37    type Builder = DbfsBuilder;
38    fn into_builder(self) -> Self::Builder {
39        DbfsBuilder { config: self }
40    }
41}
42
43/// [Dbfs](https://docs.databricks.com/api/azure/workspace/dbfs)'s REST API support.
44#[doc = include_str!("docs.md")]
45#[derive(Default, Clone)]
46pub struct DbfsBuilder {
47    config: DbfsConfig,
48}
49
50impl Debug for DbfsBuilder {
51    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52        let mut ds = f.debug_struct("DbfsBuilder");
53
54        ds.field("config", &self.config);
55
56        ds.finish()
57    }
58}
59
60impl DbfsBuilder {
61    /// Set root of this backend.
62    ///
63    /// All operations will happen under this root.
64    pub fn root(mut self, root: &str) -> Self {
65        self.config.root = if root.is_empty() {
66            None
67        } else {
68            Some(root.to_string())
69        };
70
71        self
72    }
73
74    /// Set endpoint of this backend.
75    ///
76    /// Endpoint must be full uri, e.g.
77    ///
78    /// - Azure: `https://adb-1234567890123456.78.azuredatabricks.net`
79    /// - Aws: `https://dbc-123a5678-90bc.cloud.databricks.com`
80    pub fn endpoint(mut self, endpoint: &str) -> Self {
81        self.config.endpoint = if endpoint.is_empty() {
82            None
83        } else {
84            Some(endpoint.trim_end_matches('/').to_string())
85        };
86        self
87    }
88
89    /// Set the token of this backend.
90    pub fn token(mut self, token: &str) -> Self {
91        if !token.is_empty() {
92            self.config.token = Some(token.to_string());
93        }
94        self
95    }
96}
97
98impl Builder for DbfsBuilder {
99    type Config = DbfsConfig;
100
101    /// Build a DbfsBackend.
102    fn build(self) -> Result<impl Access> {
103        debug!("backend build started: {:?}", &self);
104
105        let root = normalize_root(&self.config.root.unwrap_or_default());
106        debug!("backend use root {root}");
107
108        let endpoint = match &self.config.endpoint {
109            Some(endpoint) => Ok(endpoint.clone()),
110            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
111                .with_operation("Builder::build")
112                .with_context("service", Scheme::Dbfs)),
113        }?;
114        debug!("backend use endpoint: {}", &endpoint);
115
116        let token = match self.config.token {
117            Some(token) => token,
118            None => {
119                return Err(Error::new(
120                    ErrorKind::ConfigInvalid,
121                    "missing token for Dbfs",
122                ));
123            }
124        };
125
126        let client = HttpClient::new()?;
127        Ok(DbfsBackend {
128            core: Arc::new(DbfsCore {
129                root,
130                endpoint: endpoint.to_string(),
131                token,
132                client,
133            }),
134        })
135    }
136}
137
138/// Backend for DBFS service
139#[derive(Debug, Clone)]
140pub struct DbfsBackend {
141    core: Arc<DbfsCore>,
142}
143
144impl Access for DbfsBackend {
145    type Reader = ();
146    type Writer = oio::OneShotWriter<DbfsWriter>;
147    type Lister = oio::PageLister<DbfsLister>;
148    type Deleter = oio::OneShotDeleter<DbfsDeleter>;
149
150    fn info(&self) -> Arc<AccessorInfo> {
151        let am = AccessorInfo::default();
152        am.set_scheme(DEFAULT_SCHEME)
153            .set_root(&self.core.root)
154            .set_native_capability(Capability {
155                stat: true,
156
157                write: true,
158                create_dir: true,
159                delete: true,
160                rename: true,
161
162                list: true,
163
164                shared: true,
165
166                ..Default::default()
167            });
168        am.into()
169    }
170
171    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
172        let resp = self.core.dbfs_create_dir(path).await?;
173
174        let status = resp.status();
175
176        match status {
177            StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
178            _ => Err(parse_error(resp)),
179        }
180    }
181
182    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
183        // Stat root always returns a DIR.
184        if path == "/" {
185            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
186        }
187
188        let resp = self.core.dbfs_get_status(path).await?;
189
190        let status = resp.status();
191
192        match status {
193            StatusCode::OK => {
194                let mut meta = parse_into_metadata(path, resp.headers())?;
195                let bs = resp.into_body();
196                let decoded_response: DbfsStatus =
197                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
198                meta.set_last_modified(parse_datetime_from_from_timestamp_millis(
199                    decoded_response.modification_time,
200                )?);
201                match decoded_response.is_dir {
202                    true => meta.set_mode(EntryMode::DIR),
203                    false => {
204                        meta.set_mode(EntryMode::FILE);
205                        meta.set_content_length(decoded_response.file_size as u64)
206                    }
207                };
208                Ok(RpStat::new(meta))
209            }
210            StatusCode::NOT_FOUND if path.ends_with('/') => {
211                Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
212            }
213            _ => Err(parse_error(resp)),
214        }
215    }
216
217    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
218        Ok((
219            RpWrite::default(),
220            oio::OneShotWriter::new(DbfsWriter::new(self.core.clone(), args, path.to_string())),
221        ))
222    }
223
224    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
225        Ok((
226            RpDelete::default(),
227            oio::OneShotDeleter::new(DbfsDeleter::new(self.core.clone())),
228        ))
229    }
230
231    async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
232        let l = DbfsLister::new(self.core.clone(), path.to_string());
233
234        Ok((RpList::default(), oio::PageLister::new(l)))
235    }
236
237    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
238        self.core.dbfs_ensure_parent_path(to).await?;
239
240        let resp = self.core.dbfs_rename(from, to).await?;
241
242        let status = resp.status();
243
244        match status {
245            StatusCode::OK => Ok(RpRename::default()),
246            _ => Err(parse_error(resp)),
247        }
248    }
249}
250
251#[derive(Deserialize)]
252struct DbfsStatus {
253    // Not used fields.
254    // path: String,
255    is_dir: bool,
256    file_size: i64,
257    modification_time: i64,
258}