Deploy R or Python models to production in Databricks using Orbital
Introduction
Bridging the gap between model development and production deployment can often be a complex and time-consuming endeavor. Data scientists and ML engineers frequently grapple with translating their carefully crafted models from familiar environments like Python’s Scikit-learn or R’s tidymodels into scalable, production-ready systems. This is where the Orbital package comes in.
This blog post will walk you through a workflow that leverages the strengths of Databricks as an integrated environment, along with Scikit-learn (for Python) and tidymodels (for R), all unified by the innovative Orbital package. The Orbital package (Python version, R version) translates your arbitrary Scikit-learn or tidymodels models into various backend formats. Within Databricks, the resulting SQL statements are not only saved directly into your Databricks Workspace for immediate execution but can also be efficiently scheduled as a Databricks Job. This approach streamlines your MLOps workflow, allowing you to transition models from experimentation to scheduled production runs without altering your original code. The workflow is similar for both Python and R users, allowing for consistent and efficient MLOps practices regardless of your preferred language.
Use case
Using loan data, we want to use a model that estimates an appropriate interest rate and then use that model to find out if the interest rate for a given loan may have been too high. The loan data is in a table located in the Databricks Unity Catalog. The ultimate objective of the project is to have it check on a daily basis to see what loans may an issue.
Approach
“Fit small, predict big”
To make it as close to a ‘real-life’ scenario as possible, we will download a sample of the table into our R or Python session, fit a model using a Scikit-learn or tidymodels pipeline, and then use Orbital to translate the steps and estimates into a SQL statement. Finally, we will use that SQL statement as the base to compare the current interest against the prediction and download the loans that had a large difference. Thanks to the integrated environment in Databricks, the resulting SQL statement will be saved in the Databricks Workspace, and used to run on a schedule via a Databricks Job.
flowchart LR A[1-Full Table] --Download--> B(2-Sample) B--Scikit-learn or tidymodels fit-->C(3-Model) C--Orbital parse-->D(4-SQL) D--Automate-->E(5-Job) E--Predict-->A
Use the tabsets to change languages
To make this easier to follow, we’ve organized the code examples into tabsets. If you’re interested in the Python code, click the “Python” tab; for the R code, click the “R” tab. The tabs switch between the code for each language.
Download sample
- Load necessary libraries. Make sure to have
databricks-sql-connectorinstalled in your environment; that is the source ofdatabricks.
from dotenv import load_dotenv
from dotenv import load_dotenv
from databricks import sql
import pandas as pd
import os- Load the credentials to be used via their respective environment variables.
load_dotenv()
host = os.getenv("DATABRICKS_HOST")
token = os.getenv("DATABRICKS_TOKEN")- For simplicity’s sake, the table’s catalog, schema, and HTTP path are divided into variables.
schema = "end-to-end"
catalog = "sol_eng_demo_nickp"
http_path = "/sql/1.0/warehouses/b71952ebceb705ce"- Establish the database connection using the defined variables.
con = sql.connect(host, http_path, token, catalog = catalog, schema = schema)- Using
TABLESAMPLE, download 100 rows.REPEATABLEis used for purposes of reproducibility.
con_cursor = con.cursor()
con_cursor.execute(
"select * from loans_full_schema TABLESAMPLE (100 ROWS) REPEATABLE (999);"
)<databricks.sql.client.Cursor at 0x116666ae0>
- Iterate through the field descriptions to extract their respective names.
col_names = [desc[0] for desc in con_cursor.description]- Convert the downloaded data into Pandas.
res = con_cursor.fetchall()
full_df = pd.DataFrame(res, columns=col_names)Fit locally
- Load the appropriate Scikit Learn modules.
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline- Select the fields that will be used for predictors, and add them to a list called
pred_names.
pred_names = ["annual_income", "total_credit_lines", "loan_amount", "term"]- Subset the data into a new variable (
predictors).
predictors = full_df[pred_names]- Pull the interest rate field from the data into a new variable (
outcome).
outcome = full_df["interest_rate"]- Split the rows into train and test.
pred_train, pred_test, out_train, out_test = train_test_split(
predictors, outcome, test_size=20, random_state=999
)- Create the pipeline. Use
pre_namesto define the fields to run the scaler against.
pipeline = Pipeline(
[
(
"preprocess",
ColumnTransformer(
[("scaler", StandardScaler(with_std=False), pred_names)],
remainder="passthrough",
),
),
("linear_regression", LinearRegression()),
]
)- Fit the pipeline.
pipeline.fit(pred_train, out_train)Pipeline(steps=[('preprocess',
ColumnTransformer(remainder='passthrough',
transformers=[('scaler',
StandardScaler(with_std=False),
['annual_income',
'total_credit_lines',
'loan_amount', 'term'])])),
('linear_regression', LinearRegression())])Pipeline(steps=[('preprocess',
ColumnTransformer(remainder='passthrough',
transformers=[('scaler',
StandardScaler(with_std=False),
['annual_income',
'total_credit_lines',
'loan_amount', 'term'])])),
('linear_regression', LinearRegression())])ColumnTransformer(remainder='passthrough',
transformers=[('scaler', StandardScaler(with_std=False),
['annual_income', 'total_credit_lines',
'loan_amount', 'term'])])['annual_income', 'total_credit_lines', 'loan_amount', 'term']
StandardScaler(with_std=False)
[]
passthrough
LinearRegression()
Convert to SQL using Orbital
- Import Orbital.
import orbitalml
import orbitalml.types- Parse the pipeline with Orbital. At this stage, you can define the predictor’s field and types, as well as any other fields that need to be included in the final result set.
orbital_pipeline = orbitalml.parse_pipeline(
pipeline,
features={
"annual_income": orbitalml.types.DoubleColumnType(),
"total_credit_lines": orbitalml.types.DoubleColumnType(),
"loan_amount": orbitalml.types.DoubleColumnType(),
"term": orbitalml.types.DoubleColumnType(),
"loan_id": orbitalml.types.Int32ColumnType(),
"emp_title": orbitalml.types.StringColumnType(),
"loan_amount": orbitalml.types.DoubleColumnType(),
"balance": orbitalml.types.DoubleColumnType(),
"application_type": orbitalml.types.StringColumnType(),
"interest_rate": orbitalml.types.DoubleColumnType(),
},
)- Convert the pipeline to SQL. By default, Orbital will exclude the predictor fields from the finalized SQL statement, so
ResultsProjection()is used to force the loan amount and loan term to be included in the statement.
pred_sql = orbitalml.export_sql(
table_name="loans_full_schema",
pipeline=orbital_pipeline,
projection=orbitalml.ResultsProjection(["loan_amount", "term"]),
dialect="databricks",
)- Preview the resulting SQL statement.
pred_sql'SELECT `t0`.`loan_id` AS `loan_id`, `t0`.`emp_title` AS `emp_title`, `t0`.`balance` AS `balance`, `t0`.`application_type` AS `application_type`, `t0`.`interest_rate` AS `interest_rate`, (`t0`.`annual_income` - 77239.975) * -3.004111806545882e-05 + 12.646 + (`t0`.`total_credit_lines` - 23.225) * 0.01950461625046961 + (`t0`.`loan_amount` - 16201.5625) * 2.9834429401503845e-05 + (`t0`.`term` - 45.9) * 0.19726277170596157 AS `variable`, `t0`.`loan_amount` AS `loan_amount`, `t0`.`term` AS `term` FROM `loans_full_schema` AS `t0`'
- Use the new SQL statement as the source to filter for any rate that is 15 points above the prediction.
final_sql = f"select * from ({pred_sql}) where interest_rate - variable > 15 and variable > 0"
final_sql'select * from (SELECT `t0`.`loan_id` AS `loan_id`, `t0`.`emp_title` AS `emp_title`, `t0`.`balance` AS `balance`, `t0`.`application_type` AS `application_type`, `t0`.`interest_rate` AS `interest_rate`, (`t0`.`annual_income` - 77239.975) * -3.004111806545882e-05 + 12.646 + (`t0`.`total_credit_lines` - 23.225) * 0.01950461625046961 + (`t0`.`loan_amount` - 16201.5625) * 2.9834429401503845e-05 + (`t0`.`term` - 45.9) * 0.19726277170596157 AS `variable`, `t0`.`loan_amount` AS `loan_amount`, `t0`.`term` AS `term` FROM `loans_full_schema` AS `t0`) where interest_rate - variable > 15 and variable > 0'
- Execute the finalized SQL statement, and return it as a Pandas data frame.
con_cursor = con.cursor()
con_cursor.execute(final_sql)
pred_cols = [desc[0] for desc in con_cursor.description]
res = con_cursor.fetchall()
pred_df = pd.DataFrame(res, columns=pred_cols)
pred_df<div>
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
| | loan_id | emp_title | balance | application_type | interest_rate | variable | loan_amount | term |
|----|----|----|----|----|----|----|----|----|
| 0 | 9802 | operational risk manager | 0.00 | individual | 26.77 | 6.268735 | 5000 | 36 |
| 1 | 3935 | sr admin assistant | 19821.53 | individual | 30.79 | 15.271815 | 20600 | 60 |
| 2 | 5068 | assistant | 10672.97 | individual | 30.17 | 15.048231 | 11100 | 60 |
| 3 | 5967 | rn | 6768.73 | individual | 25.82 | 10.208089 | 7500 | 36 |
| 4 | 9338 | vice president of operations | 9774.95 | individual | 30.17 | 14.033024 | 10000 | 60 |
| 5 | 4535 | mechanical designer | 15896.16 | individual | 30.75 | 10.860353 | 16175 | 36 |
| 6 | 4833 | president of media division | 34119.79 | individual | 26.30 | 8.808102 | 35000 | 60 |
| 7 | 1649 | pilot | 29297.84 | individual | 28.72 | 12.486257 | 30000 | 60 |
| 8 | 9430 | sr project manager | 0.00 | individual | 23.88 | 7.005178 | 17000 | 36 |
| 9 | 5526 | teacher | 24253.68 | individual | 30.79 | 15.700122 | 25000 | 60 |
| 10 | 5657 | None | 19243.97 | individual | 30.79 | 15.673809 | 20000 | 60 |
| 11 | 8057 | account manager | 24250.66 | individual | 30.65 | 13.753280 | 25000 | 60 |
| 12 | 7471 | server engineer | 0.00 | individual | 30.79 | 14.910085 | 25000 | 60 |
| 13 | 7772 | insurance broker | 9192.11 | joint | 26.30 | 9.761687 | 9550 | 36 |
| 14 | 7668 | assistant vice president | 23589.47 | individual | 26.77 | 9.867967 | 25000 | 36 |
| 15 | 5340 | operations manager - core | 27585.12 | joint | 30.17 | 14.443606 | 28000 | 60 |
| 16 | 1241 | physician | 0.00 | individual | 26.77 | 7.631215 | 10000 | 36 |
| 17 | 2887 | cytotechnologist | 34398.52 | individual | 30.94 | 11.017999 | 35000 | 36 |
| 18 | 2732 | None | 1805.00 | individual | 25.82 | 10.406061 | 2000 | 36 |
| 19 | 1127 | director, support | 23579.25 | individual | 26.30 | 8.085005 | 25000 | 36 |
| 20 | 6933 | attorney/shareholder | 33643.67 | individual | 21.45 | 3.264169 | 35000 | 60 |
| 21 | 4453 | executive manger | 17849.23 | individual | 21.45 | 4.984362 | 19450 | 36 |
| 22 | 36 | None | 1318.63 | individual | 24.84 | 8.510812 | 1400 | 36 |
| 23 | 843 | registered nurse | 34386.09 | individual | 29.69 | 9.161281 | 35000 | 36 |
| 24 | 9569 | income developement specialist | 33635.31 | joint | 29.69 | 12.900652 | 35000 | 60 |
| 25 | 3351 | electrician | 34494.86 | joint | 23.87 | 8.651117 | 40000 | 36 |
| 26 | 8720 | nurse | 27230.80 | joint | 26.30 | 10.871734 | 30150 | 36 |
| 27 | 1159 | cip compliance specialist | 1081.34 | individual | 24.85 | 9.134140 | 1200 | 36 |
| 28 | 7972 | program analyst | 2042.13 | individual | 30.65 | 9.103450 | 2200 | 36 |
| 29 | 4390 | executive director | 18456.81 | individual | 25.82 | 10.264924 | 20000 | 36 |
| 30 | 5845 | supervisor | 14552.19 | individual | 30.79 | 15.526647 | 15000 | 60 |
| 31 | 6450 | rn | 24054.97 | individual | 30.79 | 14.395123 | 25000 | 60 |
| 32 | 6586 | business analyst | 0.00 | individual | 26.30 | 10.714821 | 5000 | 36 |
| 33 | 9765 | chief operating officer | 15164.37 | individual | 24.84 | 9.147561 | 16100 | 36 |
| 34 | 3514 | None | 5755.81 | individual | 26.77 | 11.663725 | 6100 | 36 |
| 35 | 7117 | administration | 23084.76 | individual | 26.30 | 10.359162 | 25000 | 36 |
| 36 | 7839 | benefits/worklife manager | 23185.38 | individual | 30.65 | 15.281168 | 24100 | 60 |
| 37 | 8509 | general manager | 14635.95 | individual | 19.03 | 2.791786 | 16000 | 36 |
| 38 | 814 | underwriter | 0.00 | individual | 30.17 | 12.653358 | 22000 | 60 |
| 39 | 1231 | general manager | 0.00 | individual | 26.30 | 9.049302 | 4375 | 36 |
| 40 | 6398 | conusultant | 29295.97 | individual | 28.72 | 12.989556 | 30000 | 60 |
| 41 | 2389 | warehouse manager | 8586.56 | joint | 26.77 | 11.301432 | 9100 | 36 |
</div>
Automate in Databricks
WorkspaceClientis the main way to create and manage objects.
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()- The SQL tasks require a warehouse ID. This code pulls the ID from the first warehouse returned by the function that lists all of the sources.
srcs = w.data_sources.list()
warehouse_id = srcs[0].warehouse_id- To start, a new Query is created in the Databricks Workspace. To start, the query object is built using
CreateQueryRequestQuery(). This is where thefinal_sqlvalue is passed. Additionally, the catalog and schema are also defined via their respective arguments.
from databricks.sdk.service import sql as sdk_sql
db_request_query = sdk_sql.CreateQueryRequestQuery(
query_text=final_sql,
catalog=catalog,
schema=schema,
display_name="Interest rate differences",
warehouse_id=warehouse_id,
description="Find differences in interest rate",
)w.queries.create()takes the request query object to create the new query. After executing the command, a new query named “Interest rate differences” will appear in the Queries section of the Databricks Web UI.
new_query = w.queries.create(query=db_request_query)- A Databricks Job can be used to orchestrate multiple tasks. The job being created for this example requires a single task. Tasks could be a variety of kinds such as a Notebook, Python script, SQL Queries and many others. The following starts building the SQL task. It uses the
new_query’s ID to tie it to the query created in the previous step.
from databricks.sdk.service import jobs as sdk_jobs
db_sql_task_query = sdk_jobs.SqlTaskQuery(query_id=new_query.id)- To finish defining the SQL task object,
db_sql_task_queryis passed toSqlTask().
db_sql_task = sdk_jobs.SqlTask(query=db_sql_task_query, warehouse_id=warehouse_id)- The SQL task object is used to create a formal Task. This is the point where the resulting task in the Databricks UI can be named and described
db_task = sdk_jobs.Task(
sql_task=db_sql_task, description="Int rate diffs", task_key="run_sql"
)- For this example, the Job needs to have a schedule defined. And to do this, a
CronScheduleobject is needed. To define the schedule use a CRON expression.
db_schedule = sdk_jobs.CronSchedule(
quartz_cron_expression="0 0 12 * * ?", timezone_id="CST"
)- Finally, the Job is created using the task and schedule objects. After running the following command, a new Job named “Daily check interest differences” will appear in the Jobs section of the Databricks Web UI.
new_job = w.jobs.create(
name="Daily check interest differences", tasks=[db_task], schedule=db_schedule
)Appendix
Data in example
The data used for this example was downloaded from OpenIntro. The page containing the description of the data and the download link is available here: loans_full_schema. The CSV file was manually uploaded to the Databricks Unity Catalog.
The loan_id field is not included in the data file. That was created using the following two SQL commands. This one to create the field:
ALTER TABLE sol_eng_demo_nickp.`end-to-end`.loans_full_schema
ADD COLUMN loan_id BIGINT;This is the SQL command to populate the table with a sequential number:
WITH cte AS (
SELECT
loan_id,
ROW_NUMBER() OVER (ORDER BY debt_to_income) AS new_loan_id
FROM
sol_eng_demo_nickp.`end-to-end`.loans_full_schema
)
MERGE INTO sol_eng_demo_nickp.`end-to-end`.loans_full_schema AS target
USING cte
ON target.debt_to_income = cte.debt_to_income
WHEN MATCHED THEN
UPDATE SET target.loan_id = cte.new_loan_id;Python environment
The following library requirements were used to run the example:
dotenv>=0.9.9
orbitalml>=0.2.0
pandas>=2.2.3
pip>=25.1.1
databricks-sql-connector
pyarrowThere is an issue with the onnx binary for Python 3.13, so for the example Python 3.12 was used.
Full code
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from dotenv import load_dotenv
from databricks import sql
import orbitalml.types
import pandas as pd
import orbitalml
import os
load_dotenv()
host = os.getenv("DATABRICKS_HOST")
token = os.getenv("DATABRICKS_TOKEN")
schema = "end-to-end"
catalog = "sol_eng_demo_nickp"
http_path = "/sql/1.0/warehouses/b71952ebceb705ce"
con = sql.connect(host, http_path, token, catalog=catalog, schema=schema)
con_cursor = con.cursor()
con_cursor.execute(
"select * from loans_full_schema TABLESAMPLE (100 ROWS) REPEATABLE (999);"
)
col_names = [desc[0] for desc in con_cursor.description]
res = con_cursor.fetchall()
full_df = pd.DataFrame(res, columns=col_names)
pred_names = ["annual_income", "total_credit_lines", "loan_amount", "term"]
predictors = full_df[pred_names]
outcome = full_df["interest_rate"]
pred_train, pred_test, out_train, out_test = train_test_split(
predictors, outcome, test_size=20, random_state=999
)
pipeline = Pipeline(
[
(
"preprocess",
ColumnTransformer(
[("scaler", StandardScaler(with_std=False), pred_names)],
remainder="passthrough",
),
),
("linear_regression", LinearRegression()),
]
)
pipeline.fit(pred_train, out_train)
orbital_pipeline = orbitalml.parse_pipeline(
pipeline,
features={
"annual_income": orbitalml.types.DoubleColumnType(),
"total_credit_lines": orbitalml.types.DoubleColumnType(),
"loan_amount": orbitalml.types.DoubleColumnType(),
"term": orbitalml.types.DoubleColumnType(),
"loan_id": orbitalml.types.Int32ColumnType(),
"emp_title": orbitalml.types.StringColumnType(),
"loan_amount": orbitalml.types.DoubleColumnType(),
"balance": orbitalml.types.DoubleColumnType(),
"application_type": orbitalml.types.StringColumnType(),
"interest_rate": orbitalml.types.DoubleColumnType(),
},
)
pred_sql = orbitalml.export_sql(
table_name="loans_full_schema",
pipeline=orbital_pipeline,
projection=orbitalml.ResultsProjection(["loan_amount", "term"]),
dialect="databricks",
)
final_sql = (
f"select * from ({pred_sql}) where interest_rate - variable > 15 and variable > 0"
)
con_cursor.execute(final_sql)
pred_cols = [desc[0] for desc in con_cursor.description]
res = con_cursor.fetchall()
pred_df = pd.DataFrame(res, columns=pred_cols)
pred_dfDownload sample
- The
odbc::databricks()function provides lots of convenient features that handle essentially everything needed to establish a connection to Databricks. The only thing to provide is thehttpPathin order to succeed. The connection is established viaDBI.
library(DBI)
con <- dbConnect(
drv = odbc::databricks(),
httpPath = "/sql/1.0/warehouses/b71952ebceb705ce"
)- Because they will be used in multiple locations in this example, the
catalog,schema, andtablename are loaded to variables.
catalog <- "sol_eng_demo_nickp"
schema <- "end-to-end"
table <- "loans_full_schema"- Sample the database table using the
TABLESAMPLESQL function.REPEATABLEis used to aid with reproducibility. Theglue_sql()function is used to compile the SQL statement.
library(glue)
sample_sql <- glue_sql(
"SELECT * ",
"FROM ",
"{`catalog`}.{`schema`}.{`table`}",
"TABLESAMPLE (100 ROWS) REPEATABLE (999)",
.con = con
)
sample_sql<SQL> SELECT * FROM `sol_eng_demo_nickp`.`end-to-end`.`loans_full_schema`TABLESAMPLE (100 ROWS) REPEATABLE (999)
- The sample is downloaded by executing the SQL statement via
dbGetQuery().
sample_lending <- dbGetQuery(
conn = con,
statement = sample_sql
)Fit locally
- Load tidymodels and set the seed.
library(tidymodels)
set.seed(999)- Currently, certain fields are downloaded from Databricks as Integer 64 type. These are not supported by R in general. The easiest solution is to convert them to double. This needs to only be done in the local copy, since the goal is to have a SQL statement that will run inside Databricks. To make the data transformation easy, we will use dplyr.
library(dplyr)
sample_lending <- sample_lending |>
mutate(
total_credit_lines = as.double(total_credit_lines),
loan_amount = as.double(loan_amount),
term = as.double(term)
)- Split the data into training and testing.
split_lending <- initial_split(sample_lending)
lending_training <- training(split_lending)- Create a
recipethat defines the predictors and outcome fields and includes a normalization step. For this example, we will use theannual_income,total_credit_lines,loan_amount, andtermfields for predictors.
rec_lending <- recipe(
interest_rate ~ annual_income + total_credit_lines + loan_amount + term,
data = lending_training
) |>
step_normalize(all_numeric_predictors())- Define a linear regression model spec.
lm_spec <- linear_reg()- Create the workflow by combining the recipe and the defined model spec.
wf_spec <- workflow(rec_lending, lm_spec)- Fit the workflow using the training data and preview.
wf_fit <- fit(wf_spec, lending_training)
wf_fit ══ Workflow [trained] ══════════════════════════════════════════════════════════
Preprocessor: Recipe
Model: linear_reg()
── Preprocessor ────────────────────────────────────────────────────────────────
1 Recipe Step
• step_normalize()
── Model ───────────────────────────────────────────────────────────────────────
Call:
stats::lm(formula = ..y ~ ., data = data)
Coefficients:
(Intercept) annual_income total_credit_lines loan_amount
12.1961 -0.3791 -0.2425 -0.9216
term
2.0547
Convert to SQL using Orbital
- Load and use
orbitalto read the fitted workflow. In Databricks, names with dots (“.”) are not acceptable, and.predis the default name thatorbitalgives the prediction. To fix, use theprefixargument to override.
library(orbital)
lending_orbital <- orbital(wf_fit, prefix = "pred")
lending_orbital ── orbital Object ──────────────────────────────────────────────────────────────
• annual_income = (annual_income - 71942.79) / 42292.87
• total_credit_lines = (total_credit_lines - 22.49333) / 11.66813
• loan_amount = (loan_amount - 14928.33) / 9958.628
• term = (term - 44.32) / 11.49872
• pred = 12.19613 + (annual_income * -0.3791282) + (total_credit_lines * ...
────────────────────────────────────────────────────────────────────────────────
5 equations in total.
- Load dbplyr and use
tblto create a reference to the lending table in the R session. To pass the fully qualified name, we can useglueandI().
library(dbplyr)
tbl_lending <- tbl(con, I(glue("{catalog}.`{schema}`.{table}")))
tbl_lending # Source: table<sol_eng_demo_nickp.`end-to-end`.loans_full_schema> [?? x 56]
# Database: Spark SQL 3.1.1[token@Spark SQL/hive_metastore]
emp_title emp_length state homeownership annual_income verified_income
<chr> <chr> <chr> <chr> <dbl> <chr>
1 "global config … 3 NJ MORTGAGE 90000 Verified
2 "warehouse offi… 10 HI RENT 40000 Not Verified
3 "assembly" 3 WI RENT 40000 Source Verified
4 "customer servi… 1 PA RENT 30000 Not Verified
5 "security super… 10 CA RENT 35000 Verified
6 <NA> NA KY OWN 34000 Not Verified
7 "hr " 10 MI MORTGAGE 35000 Source Verified
8 "police" 10 AZ MORTGAGE 110000 Source Verified
9 "parts" 10 NV MORTGAGE 65000 Source Verified
10 "4th person" 3 IL RENT 30000 Not Verified
# ℹ more rows
# ℹ 50 more variables: debt_to_income <chr>, annual_income_joint <chr>,
# verification_income_joint <chr>, debt_to_income_joint <chr>,
# delinq_2y <int64>, months_since_last_delinq <chr>,
# earliest_credit_line <int64>, inquiries_last_12m <int64>,
# total_credit_lines <int64>, open_credit_lines <int64>,
# total_credit_limit <int64>, total_credit_utilized <int64>, …
- In order to make the predictions part of a larger set of fields returned by the final query, the
orbital_inline()function is used. This allows for it to be passed inside adplyrmutate()call.orbital_inline()will modify the predictor fields based on the steps from the recipe, which, in the example’s case, is the normalization step. A quick way to retain the original values, if they are to be part of the final result, is to simply create copies of them viamutate(). Finally, since it is not necessary to return all of the fields, we reduce them via aselect()call.
tbl_prep <- tbl_lending |>
mutate(
o_annual_income = annual_income,
o_total_credit_lines = total_credit_lines,
o_loan_amount = loan_amount,
o_term = term
) |>
mutate(!!!orbital_inline(lending_orbital)) |>
select(
pred, interest_rate, emp_title, balance, application_type,
o_annual_income, o_total_credit_lines, o_loan_amount, o_term
)- Preview the top rows from the initial transformations.
tbl_prep |>
head() # Source: SQL [?? x 9]
# Database: Spark SQL 3.1.1[token@Spark SQL/hive_metastore]
pred interest_rate emp_title balance application_type o_annual_income
<dbl> <dbl> <chr> <dbl> <chr> <dbl>
1 13.5 14.1 "global config e… 27016. individual 90000
2 11.8 12.6 "warehouse offic… 4651. individual 40000
3 12.0 17.1 "assembly" 1825. individual 40000
4 10.9 6.72 "customer servic… 18853. individual 30000
5 10.3 14.1 "security superv… 21430. joint 35000
6 11.8 6.72 <NA> 4257. individual 34000
# ℹ 3 more variables: o_total_credit_lines <int64>, o_loan_amount <int64>,
# o_term <int64>
- An additional step is added to keep only the rows with a current interest rate 15 points higher than the prediction.
tbl_final <- tbl_prep |>
filter(interest_rate - pred > 15, pred > 0)- Preview the results.
tbl_final # Source: SQL [?? x 9]
# Database: Spark SQL 3.1.1[token@Spark SQL/hive_metastore]
pred interest_rate emp_title balance application_type o_annual_income
<dbl> <dbl> <chr> <dbl> <chr> <dbl>
1 10.5 26.8 operational ris… 0 individual 210000
2 14.0 30.8 sr admin assist… 19822. individual 95731
3 14.3 30.8 firefighter 24254. individual 50000
4 15.0 30.2 vice president … 9775. individual 118000
5 10.6 30.8 mechanical desi… 15896. individual 71500
6 10.8 26.3 president of me… 34120. individual 320000
7 12.5 28.7 pilot 29298. individual 190000
8 13.9 30.8 teacher 24254. individual 80000
9 14.0 30.8 <NA> 19244. individual 85000
10 13.1 30.6 account manager 24251. individual 150000
# ℹ more rows
# ℹ 3 more variables: o_total_credit_lines <int64>, o_loan_amount <int64>,
# o_term <int64>
- Preview the actual SQL that will be sent using
show_query().
tbl_final |>
show_query() <SQL>
SELECT `q01`.*
FROM (
SELECT
(((12.1961333333333 + (`annual_income` * -0.379128214967168)) + (`total_credit_lines` * -0.242483849568646)) + (`loan_amount` * -0.921560182698075)) + (`term` * 2.05467819018834) AS `pred`,
`interest_rate`,
`emp_title`,
`balance`,
`application_type`,
`o_annual_income`,
`o_total_credit_lines`,
`o_loan_amount`,
`o_term`
FROM (
SELECT
`emp_title`,
`emp_length`,
`state`,
`homeownership`,
(`annual_income` - 71942.7866666667) / 42292.8687411871 AS `annual_income`,
`verified_income`,
`debt_to_income`,
`annual_income_joint`,
`verification_income_joint`,
`debt_to_income_joint`,
`delinq_2y`,
`months_since_last_delinq`,
`earliest_credit_line`,
`inquiries_last_12m`,
(`total_credit_lines` - 22.4933333333333) / 11.6681286085312 AS `total_credit_lines`,
`open_credit_lines`,
`total_credit_limit`,
`total_credit_utilized`,
`num_collections_last_12m`,
`num_historical_failed_to_pay`,
`months_since_90d_late`,
`current_accounts_delinq`,
`total_collection_amount_ever`,
`current_installment_accounts`,
`accounts_opened_24m`,
`months_since_last_credit_inquiry`,
`num_satisfactory_accounts`,
`num_accounts_120d_past_due`,
`num_accounts_30d_past_due`,
`num_active_debit_accounts`,
`total_debit_limit`,
`num_total_cc_accounts`,
`num_open_cc_accounts`,
`num_cc_carrying_balance`,
`num_mort_accounts`,
`account_never_delinq_percent`,
`tax_liens`,
`public_record_bankrupt`,
`loan_purpose`,
`application_type`,
(`loan_amount` - 14928.3333333333) / 9958.62753532771 AS `loan_amount`,
(`term` - 44.32) / 11.4987190825996 AS `term`,
`interest_rate`,
`installment`,
`grade`,
`sub_grade`,
`issue_month`,
`loan_status`,
`initial_listing_status`,
`disbursement_method`,
`balance`,
`paid_total`,
`paid_principal`,
`paid_interest`,
`paid_late_fees`,
`loan_id`,
`o_annual_income`,
`o_total_credit_lines`,
`o_loan_amount`,
`o_term`
FROM (
SELECT
`loans_full_schema`.*,
`annual_income` AS `o_annual_income`,
`total_credit_lines` AS `o_total_credit_lines`,
`loan_amount` AS `o_loan_amount`,
`term` AS `o_term`
FROM sol_eng_demo_nickp.`end-to-end`.loans_full_schema
) `q01`
) `q01`
) `q01`
WHERE ((`interest_rate` - `pred`) > 15.0) AND (`pred` > 0.0)
show_query()is mostly geared towards having a nice output to the R console. To capture the SQL in a variable, useremote_query().
final_sql <- remote_query(tbl_final)- As a way to confirm that the SQL will run as returned by
remote_query, usedbQuery()to run the statement against the database.
res <- dbGetQuery(con, final_sql)
head(res) pred interest_rate emp_title balance application_type
1 10.48399 26.77 operational risk manager 0.00 individual
2 13.95838 30.79 sr admin assistant 19821.53 individual
3 14.25211 30.79 firefighter 24253.68 individual
4 15.00983 30.17 vice president of operations 9774.95 individual
5 10.58753 30.75 mechanical designer 15896.16 individual
6 10.78165 26.30 president of media division 34119.79 individual
o_annual_income o_total_credit_lines o_loan_amount o_term
1 210000 18 5000 36
2 95731 37 20600 60
3 50000 23 25000 60
4 118000 24 10000 60
5 71500 23 16175 36
6 320000 29 35000 60
Automate in Databricks
The brickster package provides an easy-to-use interface with Databricks. It allows us to create, setup and manage assets within Databricks, such as jobs, files and warehouses.
- The SQL tasks require a warehouse ID.
db_sql_warehouse_list()retrieves all available warehouses and their details. The warehouse to be used is selected based on its name.
library(brickster)
warehouses <- db_sql_warehouse_list()
wh_names <- as.character(lapply(warehouses, function(x) x$name))
warehouse <- unlist(warehouses[wh_names == "David's Serverless Warehouse"])
warehouse_id <- warehouse[["id"]]- The first step is to create a named query in Databricks. This is where
final_sqlis used.
db_request_query <- db_query_create(
query_text = final_sql,
catalog = catalog,
schema = schema,
display_name = "Interest rate differences",
warehouse_id = warehouse_id,
description = "Find differences in interest rate"
)- A Databricks Job can be used to orchestrate multiple tasks. The job being created for this example requires a single task. Tasks could be a variety of kinds such as a Notebook, Python script, SQL Queries and many others. The following starts building the SQL task. It uses the
db_request_query’s ID to tie it to the query created in the previous step.
db_task_query <- sql_query_task(
query_id = db_request_query$id,
warehouse_id = warehouse_id
)- The SQL task object is used to create a formal Task. This is the point where the resulting task in the Databricks UI can be named and described.
db_task_job <- job_task(
task = db_task_query,
description = "Int rate diffs",
task_key = "run_sql"
)- For this example, the Job needs to have a schedule defined. And to do this, a CronSchedule object is needed. To define the schedule, use a CRON expression.
db_schedule <- cron_schedule(
quartz_cron_expression = "0 0 12 * * ?",
timezone_id = "CST"
)- Finally, the Job is created using the task and schedule objects. After running the following command, a new Job named “Daily check interest differences” will appear in the Jobs section of the Databricks Web UI.
new_job <- db_jobs_create(
name = "Daily check interest differences",
tasks = job_tasks(db_task_job),
schedule = db_schedule
)Appendix
Data in example
The data used for this example was downloaded from OpenIntro. The page containing the description of the data and the download link is available here: loans_full_schema. The CSV file was manually uploaded to the Databricks Unity Catalog.
The loan_id field is not included in the data file. That was created using the following two SQL commands. This one to create the field:
ALTER TABLE sol_eng_demo_nickp.`end-to-end`.loans_full_schema
ADD COLUMN loan_id BIGINT;This is the SQL command to populate the table with a sequential number:
WITH cte AS (
SELECT
loan_id,
ROW_NUMBER() OVER (ORDER BY debt_to_income) AS new_loan_id
FROM
sol_eng_demo_nickp.`end-to-end`.loans_full_schema
)
MERGE INTO sol_eng_demo_nickp.`end-to-end`.loans_full_schema AS target
USING cte
ON target.debt_to_income = cte.debt_to_income
WHEN MATCHED THEN
UPDATE SET target.loan_id = cte.new_loan_id;Learn more about putting models in production with Posit
By integrating Scikit-learn or tidymodels and the powerful Orbital package, you can simplify your MLOps pipeline while writing models in your preferred framework. Together with Databricks, this unified approach dramatically accelerates the journey from model development to production.
Learn more: