opendal/services/dbfs/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::Debug;
19use std::fmt::Formatter;
20use std::sync::Arc;
21
22use bytes::Buf;
23use http::StatusCode;
24use log::debug;
25use serde::Deserialize;
26
27use super::core::DbfsCore;
28use super::delete::DbfsDeleter;
29use super::error::parse_error;
30use super::lister::DbfsLister;
31use super::writer::DbfsWriter;
32use crate::raw::*;
33use crate::services::DbfsConfig;
34use crate::*;
35
36impl Configurator for DbfsConfig {
37    type Builder = DbfsBuilder;
38    fn into_builder(self) -> Self::Builder {
39        DbfsBuilder { config: self }
40    }
41}
42
43/// [Dbfs](https://docs.databricks.com/api/azure/workspace/dbfs)'s REST API support.
44#[doc = include_str!("docs.md")]
45#[derive(Default, Clone)]
46pub struct DbfsBuilder {
47    config: DbfsConfig,
48}
49
50impl Debug for DbfsBuilder {
51    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52        let mut ds = f.debug_struct("DbfsBuilder");
53
54        ds.field("config", &self.config);
55
56        ds.finish()
57    }
58}
59
60impl DbfsBuilder {
61    /// Set root of this backend.
62    ///
63    /// All operations will happen under this root.
64    pub fn root(mut self, root: &str) -> Self {
65        self.config.root = if root.is_empty() {
66            None
67        } else {
68            Some(root.to_string())
69        };
70
71        self
72    }
73
74    /// Set endpoint of this backend.
75    ///
76    /// Endpoint must be full uri, e.g.
77    ///
78    /// - Azure: `https://adb-1234567890123456.78.azuredatabricks.net`
79    /// - Aws: `https://dbc-123a5678-90bc.cloud.databricks.com`
80    pub fn endpoint(mut self, endpoint: &str) -> Self {
81        self.config.endpoint = if endpoint.is_empty() {
82            None
83        } else {
84            Some(endpoint.trim_end_matches('/').to_string())
85        };
86        self
87    }
88
89    /// Set the token of this backend.
90    pub fn token(mut self, token: &str) -> Self {
91        if !token.is_empty() {
92            self.config.token = Some(token.to_string());
93        }
94        self
95    }
96}
97
98impl Builder for DbfsBuilder {
99    const SCHEME: Scheme = Scheme::Dbfs;
100    type Config = DbfsConfig;
101
102    /// Build a DbfsBackend.
103    fn build(self) -> Result<impl Access> {
104        debug!("backend build started: {:?}", &self);
105
106        let root = normalize_root(&self.config.root.unwrap_or_default());
107        debug!("backend use root {}", root);
108
109        let endpoint = match &self.config.endpoint {
110            Some(endpoint) => Ok(endpoint.clone()),
111            None => Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is empty")
112                .with_operation("Builder::build")
113                .with_context("service", Scheme::Dbfs)),
114        }?;
115        debug!("backend use endpoint: {}", &endpoint);
116
117        let token = match self.config.token {
118            Some(token) => token,
119            None => {
120                return Err(Error::new(
121                    ErrorKind::ConfigInvalid,
122                    "missing token for Dbfs",
123                ));
124            }
125        };
126
127        let client = HttpClient::new()?;
128        Ok(DbfsBackend {
129            core: Arc::new(DbfsCore {
130                root,
131                endpoint: endpoint.to_string(),
132                token,
133                client,
134            }),
135        })
136    }
137}
138
139/// Backend for DBFS service
140#[derive(Debug, Clone)]
141pub struct DbfsBackend {
142    core: Arc<DbfsCore>,
143}
144
145impl Access for DbfsBackend {
146    type Reader = ();
147    type Writer = oio::OneShotWriter<DbfsWriter>;
148    type Lister = oio::PageLister<DbfsLister>;
149    type Deleter = oio::OneShotDeleter<DbfsDeleter>;
150    type BlockingReader = ();
151    type BlockingWriter = ();
152    type BlockingLister = ();
153    type BlockingDeleter = ();
154
155    fn info(&self) -> Arc<AccessorInfo> {
156        let am = AccessorInfo::default();
157        am.set_scheme(Scheme::Dbfs)
158            .set_root(&self.core.root)
159            .set_native_capability(Capability {
160                stat: true,
161                stat_has_cache_control: true,
162                stat_has_content_length: true,
163                stat_has_content_type: true,
164                stat_has_content_encoding: true,
165                stat_has_content_range: true,
166                stat_has_etag: true,
167                stat_has_content_md5: true,
168                stat_has_last_modified: true,
169                stat_has_content_disposition: true,
170
171                write: true,
172                create_dir: true,
173                delete: true,
174                rename: true,
175
176                list: true,
177                list_has_last_modified: true,
178                list_has_content_length: true,
179
180                shared: true,
181
182                ..Default::default()
183            });
184        am.into()
185    }
186
187    async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
188        let resp = self.core.dbfs_create_dir(path).await?;
189
190        let status = resp.status();
191
192        match status {
193            StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
194            _ => Err(parse_error(resp)),
195        }
196    }
197
198    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
199        // Stat root always returns a DIR.
200        if path == "/" {
201            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
202        }
203
204        let resp = self.core.dbfs_get_status(path).await?;
205
206        let status = resp.status();
207
208        match status {
209            StatusCode::OK => {
210                let mut meta = parse_into_metadata(path, resp.headers())?;
211                let bs = resp.into_body();
212                let decoded_response: DbfsStatus =
213                    serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
214                meta.set_last_modified(parse_datetime_from_from_timestamp_millis(
215                    decoded_response.modification_time,
216                )?);
217                match decoded_response.is_dir {
218                    true => meta.set_mode(EntryMode::DIR),
219                    false => {
220                        meta.set_mode(EntryMode::FILE);
221                        meta.set_content_length(decoded_response.file_size as u64)
222                    }
223                };
224                Ok(RpStat::new(meta))
225            }
226            StatusCode::NOT_FOUND if path.ends_with('/') => {
227                Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
228            }
229            _ => Err(parse_error(resp)),
230        }
231    }
232
233    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
234        Ok((
235            RpWrite::default(),
236            oio::OneShotWriter::new(DbfsWriter::new(self.core.clone(), args, path.to_string())),
237        ))
238    }
239
240    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
241        Ok((
242            RpDelete::default(),
243            oio::OneShotDeleter::new(DbfsDeleter::new(self.core.clone())),
244        ))
245    }
246
247    async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
248        let l = DbfsLister::new(self.core.clone(), path.to_string());
249
250        Ok((RpList::default(), oio::PageLister::new(l)))
251    }
252
253    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
254        self.core.dbfs_ensure_parent_path(to).await?;
255
256        let resp = self.core.dbfs_rename(from, to).await?;
257
258        let status = resp.status();
259
260        match status {
261            StatusCode::OK => Ok(RpRename::default()),
262            _ => Err(parse_error(resp)),
263        }
264    }
265}
266
267#[derive(Deserialize)]
268struct DbfsStatus {
269    // Not used fields.
270    // path: String,
271    is_dir: bool,
272    file_size: i64,
273    modification_time: i64,
274}