Commercial enterprise offerings

Deploy R or Python models to production in Databricks using Orbital

Headshot of Edgar Ruiz
Written by Edgar Ruiz
2023-01-13
A graphic illustrating data science and big data technologies. On the left, stacked vertically, are the logos for R and Python. In the center, also stacked vertically, are the Orbital logo (featuring a satellite) and the Scala logo (a blue serpent). On the right, stacked vertically, are the Databricks logo and a generic database cylinder icon. The background is a light blue with a subtle, dark blue dot pattern at the bottom.

Introduction

Bridging the gap between model development and production deployment can often be a complex and time-consuming endeavor. Data scientists and ML engineers frequently grapple with translating their carefully crafted models from familiar environments like Python’s Scikit-learn or R’s tidymodels into scalable, production-ready systems. This is where the Orbital package comes in.

This blog post will walk you through a workflow that leverages the strengths of Databricks as an integrated environment, along with Scikit-learn (for Python) and tidymodels (for R), all unified by the innovative Orbital package. The Orbital package (Python version, R version) translates your arbitrary Scikit-learn or tidymodels models into various backend formats. Within Databricks, the resulting SQL statements are not only saved directly into your Databricks Workspace for immediate execution but can also be efficiently scheduled as a Databricks Job. This approach streamlines your MLOps workflow, allowing you to transition models from experimentation to scheduled production runs without altering your original code. The workflow is similar for both Python and R users, allowing for consistent and efficient MLOps practices regardless of your preferred language.

Use case

Using loan data, we want to use a model that estimates an appropriate interest rate and then use that model to find out if the interest rate for a given loan may have been too high. The loan data is in a table located in the Databricks Unity Catalog. The ultimate objective of the project is to have it check on a daily basis to see what loans may an issue.

Approach

“Fit small, predict big”

To make it as close to a ‘real-life’ scenario as possible, we will download a sample of the table into our R or Python session, fit a model using a Scikit-learn or tidymodels pipeline, and then use Orbital to translate the steps and estimates into a SQL statement. Finally, we will use that SQL statement as the base to compare the current interest against the prediction and download the loans that had a large difference. Thanks to the integrated environment in Databricks, the resulting SQL statement will be saved in the Databricks Workspace, and used to run on a schedule via a Databricks Job.

flowchart LR
  A[1-Full Table] --Download--> B(2-Sample) 
  B--Scikit-learn or tidymodels fit-->C(3-Model)
  C--Orbital parse-->D(4-SQL)
  D--Automate-->E(5-Job)
  E--Predict-->A

Figure 1: Diagram of the approach used for this use case.

Use the tabsets to change languages

To make this easier to follow, we’ve organized the code examples into tabsets. If you’re interested in the Python code, click the “Python” tab; for the R code, click the “R” tab. The tabs switch between the code for each language.

Download sample

  1. Load necessary libraries. Make sure to have databricks-sql-connector installed in your environment; that is the source of databricks.
from dotenv import load_dotenv
from dotenv import load_dotenv
from databricks import sql
import pandas as pd
import os
  1. Load the credentials to be used via their respective environment variables.
load_dotenv()
host = os.getenv("DATABRICKS_HOST")
token = os.getenv("DATABRICKS_TOKEN")
  1. For simplicity’s sake, the table’s catalog, schema, and HTTP path are divided into variables.
schema = "end-to-end"
catalog = "sol_eng_demo_nickp"
http_path = "/sql/1.0/warehouses/b71952ebceb705ce"
  1. Establish the database connection using the defined variables.
con = sql.connect(host, http_path, token, catalog = catalog, schema = schema)
  1. Using TABLESAMPLE, download 100 rows. REPEATABLE is used for purposes of reproducibility.
con_cursor = con.cursor()
con_cursor.execute(
"select * from loans_full_schema TABLESAMPLE (100 ROWS) REPEATABLE (999);"
)
<databricks.sql.client.Cursor at 0x116666ae0>
  1. Iterate through the field descriptions to extract their respective names.
col_names = [desc[0] for desc in con_cursor.description]
  1. Convert the downloaded data into Pandas.
res = con_cursor.fetchall()
full_df = pd.DataFrame(res, columns=col_names)

Fit locally

  1. Load the appropriate Scikit Learn modules.
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
  1. Select the fields that will be used for predictors, and add them to a list called pred_names.
pred_names = ["annual_income", "total_credit_lines", "loan_amount",  "term"]
  1. Subset the data into a new variable (predictors).
predictors = full_df[pred_names]
  1. Pull the interest rate field from the data into a new variable (outcome).
outcome = full_df["interest_rate"]
  1. Split the rows into train and test.
pred_train, pred_test, out_train, out_test = train_test_split(
predictors, outcome, test_size=20, random_state=999
)
  1. Create the pipeline. Use pre_names to define the fields to run the scaler against.
pipeline = Pipeline(
    [
        (
            "preprocess",
            ColumnTransformer(
                [("scaler", StandardScaler(with_std=False), pred_names)],
                remainder="passthrough",
            ),
        ),
        ("linear_regression", LinearRegression()),
    ]
)
  1. Fit the pipeline.
pipeline.fit(pred_train, out_train)
Pipeline(steps=[('preprocess',
                     ColumnTransformer(remainder='passthrough',
                                       transformers=[('scaler',
                                                      StandardScaler(with_std=False),
                                                      ['annual_income',
                                                       'total_credit_lines',
                                                       'loan_amount', 'term'])])),
                    ('linear_regression', LinearRegression())])

Convert to SQL using Orbital

  1. Import Orbital.
import orbitalml
import orbitalml.types
  1. Parse the pipeline with Orbital. At this stage, you can define the predictor’s field and types, as well as any other fields that need to be included in the final result set.
orbital_pipeline = orbitalml.parse_pipeline(
    pipeline,
    features={
        "annual_income": orbitalml.types.DoubleColumnType(),
        "total_credit_lines": orbitalml.types.DoubleColumnType(),
        "loan_amount": orbitalml.types.DoubleColumnType(),
        "term": orbitalml.types.DoubleColumnType(),
        "loan_id": orbitalml.types.Int32ColumnType(),
        "emp_title": orbitalml.types.StringColumnType(),
        "loan_amount": orbitalml.types.DoubleColumnType(),
        "balance": orbitalml.types.DoubleColumnType(),
        "application_type": orbitalml.types.StringColumnType(),
        "interest_rate": orbitalml.types.DoubleColumnType(),
    },
)
  1. Convert the pipeline to SQL. By default, Orbital will exclude the predictor fields from the finalized SQL statement, so ResultsProjection() is used to force the loan amount and loan term to be included in the statement.
pred_sql = orbitalml.export_sql(
    table_name="loans_full_schema",
    pipeline=orbital_pipeline,
    projection=orbitalml.ResultsProjection(["loan_amount", "term"]),
    dialect="databricks",
)
  1. Preview the resulting SQL statement.
pred_sql
'SELECT `t0`.`loan_id` AS `loan_id`, `t0`.`emp_title` AS `emp_title`, `t0`.`balance` AS `balance`, `t0`.`application_type` AS `application_type`, `t0`.`interest_rate` AS `interest_rate`, (`t0`.`annual_income` - 77239.975) * -3.004111806545882e-05 + 12.646 + (`t0`.`total_credit_lines` - 23.225) * 0.01950461625046961 + (`t0`.`loan_amount` - 16201.5625) * 2.9834429401503845e-05 + (`t0`.`term` - 45.9) * 0.19726277170596157 AS `variable`, `t0`.`loan_amount` AS `loan_amount`, `t0`.`term` AS `term` FROM `loans_full_schema` AS `t0`'
  1. Use the new SQL statement as the source to filter for any rate that is 15 points above the prediction.
final_sql = f"select * from ({pred_sql}) where interest_rate - variable > 15 and variable > 0"

final_sql
'select * from (SELECT `t0`.`loan_id` AS `loan_id`, `t0`.`emp_title` AS `emp_title`, `t0`.`balance` AS `balance`, `t0`.`application_type` AS `application_type`, `t0`.`interest_rate` AS `interest_rate`, (`t0`.`annual_income` - 77239.975) * -3.004111806545882e-05 + 12.646 + (`t0`.`total_credit_lines` - 23.225) * 0.01950461625046961 + (`t0`.`loan_amount` - 16201.5625) * 2.9834429401503845e-05 + (`t0`.`term` - 45.9) * 0.19726277170596157 AS `variable`, `t0`.`loan_amount` AS `loan_amount`, `t0`.`term` AS `term` FROM `loans_full_schema` AS `t0`) where interest_rate - variable > 15 and variable > 0'
  1. Execute the finalized SQL statement, and return it as a Pandas data frame.
con_cursor = con.cursor()
con_cursor.execute(final_sql)
pred_cols = [desc[0] for desc in con_cursor.description]
res = con_cursor.fetchall()
pred_df = pd.DataFrame(res, columns=pred_cols)
pred_df
<div>
<style scoped>
    .dataframe tbody tr th:only-of-type {
        vertical-align: middle;
    }
    .dataframe tbody tr th {
        vertical-align: top;
    }
    .dataframe thead th {
        text-align: right;
    }
</style>

|  | loan_id | emp_title | balance | application_type | interest_rate | variable | loan_amount | term |
|----|----|----|----|----|----|----|----|----|
| 0 | 9802 | operational risk manager | 0.00 | individual | 26.77 | 6.268735 | 5000 | 36 |
| 1 | 3935 | sr admin assistant | 19821.53 | individual | 30.79 | 15.271815 | 20600 | 60 |
| 2 | 5068 | assistant | 10672.97 | individual | 30.17 | 15.048231 | 11100 | 60 |
| 3 | 5967 | rn | 6768.73 | individual | 25.82 | 10.208089 | 7500 | 36 |
| 4 | 9338 | vice president of operations | 9774.95 | individual | 30.17 | 14.033024 | 10000 | 60 |
| 5 | 4535 | mechanical designer | 15896.16 | individual | 30.75 | 10.860353 | 16175 | 36 |
| 6 | 4833 | president of media division | 34119.79 | individual | 26.30 | 8.808102 | 35000 | 60 |
| 7 | 1649 | pilot | 29297.84 | individual | 28.72 | 12.486257 | 30000 | 60 |
| 8 | 9430 | sr project manager | 0.00 | individual | 23.88 | 7.005178 | 17000 | 36 |
| 9 | 5526 | teacher | 24253.68 | individual | 30.79 | 15.700122 | 25000 | 60 |
| 10 | 5657 | None | 19243.97 | individual | 30.79 | 15.673809 | 20000 | 60 |
| 11 | 8057 | account manager | 24250.66 | individual | 30.65 | 13.753280 | 25000 | 60 |
| 12 | 7471 | server engineer | 0.00 | individual | 30.79 | 14.910085 | 25000 | 60 |
| 13 | 7772 | insurance broker | 9192.11 | joint | 26.30 | 9.761687 | 9550 | 36 |
| 14 | 7668 | assistant vice president | 23589.47 | individual | 26.77 | 9.867967 | 25000 | 36 |
| 15 | 5340 | operations manager - core | 27585.12 | joint | 30.17 | 14.443606 | 28000 | 60 |
| 16 | 1241 | physician | 0.00 | individual | 26.77 | 7.631215 | 10000 | 36 |
| 17 | 2887 | cytotechnologist | 34398.52 | individual | 30.94 | 11.017999 | 35000 | 36 |
| 18 | 2732 | None | 1805.00 | individual | 25.82 | 10.406061 | 2000 | 36 |
| 19 | 1127 | director, support | 23579.25 | individual | 26.30 | 8.085005 | 25000 | 36 |
| 20 | 6933 | attorney/shareholder | 33643.67 | individual | 21.45 | 3.264169 | 35000 | 60 |
| 21 | 4453 | executive manger | 17849.23 | individual | 21.45 | 4.984362 | 19450 | 36 |
| 22 | 36 | None | 1318.63 | individual | 24.84 | 8.510812 | 1400 | 36 |
| 23 | 843 | registered nurse | 34386.09 | individual | 29.69 | 9.161281 | 35000 | 36 |
| 24 | 9569 | income developement specialist | 33635.31 | joint | 29.69 | 12.900652 | 35000 | 60 |
| 25 | 3351 | electrician | 34494.86 | joint | 23.87 | 8.651117 | 40000 | 36 |
| 26 | 8720 | nurse | 27230.80 | joint | 26.30 | 10.871734 | 30150 | 36 |
| 27 | 1159 | cip compliance specialist | 1081.34 | individual | 24.85 | 9.134140 | 1200 | 36 |
| 28 | 7972 | program analyst | 2042.13 | individual | 30.65 | 9.103450 | 2200 | 36 |
| 29 | 4390 | executive director | 18456.81 | individual | 25.82 | 10.264924 | 20000 | 36 |
| 30 | 5845 | supervisor | 14552.19 | individual | 30.79 | 15.526647 | 15000 | 60 |
| 31 | 6450 | rn | 24054.97 | individual | 30.79 | 14.395123 | 25000 | 60 |
| 32 | 6586 | business analyst | 0.00 | individual | 26.30 | 10.714821 | 5000 | 36 |
| 33 | 9765 | chief operating officer | 15164.37 | individual | 24.84 | 9.147561 | 16100 | 36 |
| 34 | 3514 | None | 5755.81 | individual | 26.77 | 11.663725 | 6100 | 36 |
| 35 | 7117 | administration | 23084.76 | individual | 26.30 | 10.359162 | 25000 | 36 |
| 36 | 7839 | benefits/worklife manager | 23185.38 | individual | 30.65 | 15.281168 | 24100 | 60 |
| 37 | 8509 | general manager | 14635.95 | individual | 19.03 | 2.791786 | 16000 | 36 |
| 38 | 814 | underwriter | 0.00 | individual | 30.17 | 12.653358 | 22000 | 60 |
| 39 | 1231 | general manager | 0.00 | individual | 26.30 | 9.049302 | 4375 | 36 |
| 40 | 6398 | conusultant | 29295.97 | individual | 28.72 | 12.989556 | 30000 | 60 |
| 41 | 2389 | warehouse manager | 8586.56 | joint | 26.77 | 11.301432 | 9100 | 36 |

</div>

Automate in Databricks

  1. WorkspaceClient is the main way to create and manage objects.
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
  1. The SQL tasks require a warehouse ID. This code pulls the ID from the first warehouse returned by the function that lists all of the sources.
srcs = w.data_sources.list()
warehouse_id = srcs[0].warehouse_id
  1. To start, a new Query is created in the Databricks Workspace. To start, the query object is built using CreateQueryRequestQuery(). This is where the final_sql value is passed. Additionally, the catalog and schema are also defined via their respective arguments.
from databricks.sdk.service import sql as sdk_sql

db_request_query = sdk_sql.CreateQueryRequestQuery(
    query_text=final_sql,
    catalog=catalog,
    schema=schema,
    display_name="Interest rate differences",
    warehouse_id=warehouse_id,
    description="Find differences in interest rate",
)
  1. w.queries.create() takes the request query object to create the new query. After executing the command, a new query named “Interest rate differences” will appear in the Queries section of the Databricks Web UI.
new_query = w.queries.create(query=db_request_query)
  1. A Databricks Job can be used to orchestrate multiple tasks. The job being created for this example requires a single task. Tasks could be a variety of kinds such as a Notebook, Python script, SQL Queries and many others. The following starts building the SQL task. It uses the new_query’s ID to tie it to the query created in the previous step.
from databricks.sdk.service import jobs as sdk_jobs

db_sql_task_query = sdk_jobs.SqlTaskQuery(query_id=new_query.id)
  1. To finish defining the SQL task object, db_sql_task_query is passed to SqlTask().
db_sql_task = sdk_jobs.SqlTask(query=db_sql_task_query, warehouse_id=warehouse_id)
  1. The SQL task object is used to create a formal Task. This is the point where the resulting task in the Databricks UI can be named and described
db_task = sdk_jobs.Task(
    sql_task=db_sql_task, description="Int rate diffs", task_key="run_sql"
)
  1. For this example, the Job needs to have a schedule defined. And to do this, a CronSchedule object is needed. To define the schedule use a CRON expression.
db_schedule = sdk_jobs.CronSchedule(
    quartz_cron_expression="0 0 12 * * ?", timezone_id="CST"
)
  1. Finally, the Job is created using the task and schedule objects. After running the following command, a new Job named “Daily check interest differences” will appear in the Jobs section of the Databricks Web UI.
new_job = w.jobs.create(
    name="Daily check interest differences", tasks=[db_task], schedule=db_schedule
)

Appendix

Data in example

The data used for this example was downloaded from OpenIntro. The page containing the description of the data and the download link is available here: loans_full_schema. The CSV file was manually uploaded to the Databricks Unity Catalog.

The loan_id field is not included in the data file. That was created using the following two SQL commands. This one to create the field:

ALTER TABLE sol_eng_demo_nickp.`end-to-end`.loans_full_schema 
ADD COLUMN loan_id BIGINT;

This is the SQL command to populate the table with a sequential number:

WITH cte AS (
  SELECT
    loan_id,
    ROW_NUMBER() OVER (ORDER BY debt_to_income) AS new_loan_id
  FROM
    sol_eng_demo_nickp.`end-to-end`.loans_full_schema
)
MERGE INTO sol_eng_demo_nickp.`end-to-end`.loans_full_schema AS target
USING cte
ON target.debt_to_income = cte.debt_to_income
WHEN MATCHED THEN
  UPDATE SET target.loan_id = cte.new_loan_id;

Python environment

The following library requirements were used to run the example:

dotenv>=0.9.9
orbitalml>=0.2.0
pandas>=2.2.3
pip>=25.1.1
databricks-sql-connector
pyarrow

There is an issue with the onnx binary for Python 3.13, so for the example Python 3.12 was used.

Full code

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from dotenv import load_dotenv
from databricks import sql
import orbitalml.types
import pandas as pd
import orbitalml
import os

load_dotenv()
host = os.getenv("DATABRICKS_HOST")
token = os.getenv("DATABRICKS_TOKEN")
schema = "end-to-end"
catalog = "sol_eng_demo_nickp"
http_path = "/sql/1.0/warehouses/b71952ebceb705ce"
con = sql.connect(host, http_path, token, catalog=catalog, schema=schema)
con_cursor = con.cursor()
con_cursor.execute(
    "select * from loans_full_schema TABLESAMPLE (100 ROWS) REPEATABLE (999);"
)
col_names = [desc[0] for desc in con_cursor.description]
res = con_cursor.fetchall()
full_df = pd.DataFrame(res, columns=col_names)

pred_names = ["annual_income", "total_credit_lines", "loan_amount", "term"]
predictors = full_df[pred_names]
outcome = full_df["interest_rate"]
pred_train, pred_test, out_train, out_test = train_test_split(
    predictors, outcome, test_size=20, random_state=999
)
pipeline = Pipeline(
    [
        (
            "preprocess",
            ColumnTransformer(
                [("scaler", StandardScaler(with_std=False), pred_names)],
                remainder="passthrough",
            ),
        ),
        ("linear_regression", LinearRegression()),
    ]
)
pipeline.fit(pred_train, out_train)

orbital_pipeline = orbitalml.parse_pipeline(
    pipeline,
    features={
        "annual_income": orbitalml.types.DoubleColumnType(),
        "total_credit_lines": orbitalml.types.DoubleColumnType(),
        "loan_amount": orbitalml.types.DoubleColumnType(),
        "term": orbitalml.types.DoubleColumnType(),
        "loan_id": orbitalml.types.Int32ColumnType(),
        "emp_title": orbitalml.types.StringColumnType(),
        "loan_amount": orbitalml.types.DoubleColumnType(),
        "balance": orbitalml.types.DoubleColumnType(),
        "application_type": orbitalml.types.StringColumnType(),
        "interest_rate": orbitalml.types.DoubleColumnType(),
    },
)
pred_sql = orbitalml.export_sql(
    table_name="loans_full_schema",
    pipeline=orbital_pipeline,
    projection=orbitalml.ResultsProjection(["loan_amount", "term"]),
    dialect="databricks",
)

final_sql = (
    f"select * from ({pred_sql}) where interest_rate - variable > 15 and variable > 0"
)
con_cursor.execute(final_sql)
pred_cols = [desc[0] for desc in con_cursor.description]
res = con_cursor.fetchall()
pred_df = pd.DataFrame(res, columns=pred_cols)
pred_df

Download sample

  1. The odbc::databricks() function provides lots of convenient features that handle essentially everything needed to establish a connection to Databricks. The only thing to provide is the httpPath in order to succeed. The connection is established via DBI.
library(DBI)

con <- dbConnect(
  drv = odbc::databricks(),
  httpPath = "/sql/1.0/warehouses/b71952ebceb705ce"
)
  1. Because they will be used in multiple locations in this example, the catalog, schema, and table name are loaded to variables.
catalog <- "sol_eng_demo_nickp"
schema <- "end-to-end"
table <- "loans_full_schema"
  1. Sample the database table using the TABLESAMPLE SQL function. REPEATABLE is used to aid with reproducibility. The glue_sql() function is used to compile the SQL statement.
library(glue)

sample_sql <- glue_sql(
  "SELECT * ",
  "FROM ",
  "{`catalog`}.{`schema`}.{`table`}",
  "TABLESAMPLE (100 ROWS) REPEATABLE (999)",
  .con = con
)

sample_sql
<SQL> SELECT * FROM `sol_eng_demo_nickp`.`end-to-end`.`loans_full_schema`TABLESAMPLE (100 ROWS) REPEATABLE (999)
  1. The sample is downloaded by executing the SQL statement via dbGetQuery().
sample_lending <- dbGetQuery(
  conn = con,
  statement = sample_sql
)

Fit locally

  1. Load tidymodels and set the seed.
library(tidymodels)
set.seed(999)
  1. Currently, certain fields are downloaded from Databricks as Integer 64 type. These are not supported by R in general. The easiest solution is to convert them to double. This needs to only be done in the local copy, since the goal is to have a SQL statement that will run inside Databricks. To make the data transformation easy, we will use dplyr.
library(dplyr)

sample_lending <- sample_lending |>
  mutate(
    total_credit_lines = as.double(total_credit_lines),
    loan_amount = as.double(loan_amount),
    term = as.double(term)
  )
  1. Split the data into training and testing.
split_lending <- initial_split(sample_lending)

lending_training <- training(split_lending)
  1. Create a recipe that defines the predictors and outcome fields and includes a normalization step. For this example, we will use the annual_income, total_credit_lines, loan_amount, and term fields for predictors.
rec_lending <- recipe(
  interest_rate ~ annual_income + total_credit_lines + loan_amount + term,
  data = lending_training
) |>
  step_normalize(all_numeric_predictors())
  1. Define a linear regression model spec.
lm_spec <- linear_reg()
  1. Create the workflow by combining the recipe and the defined model spec.
wf_spec <- workflow(rec_lending, lm_spec)
  1. Fit the workflow using the training data and preview.
wf_fit <- fit(wf_spec, lending_training)

wf_fit
    ══ Workflow [trained] ══════════════════════════════════════════════════════════
    Preprocessor: Recipe
    Model: linear_reg()

    ── Preprocessor ────────────────────────────────────────────────────────────────
    1 Recipe Step

    • step_normalize()

    ── Model ───────────────────────────────────────────────────────────────────────

    Call:
    stats::lm(formula = ..y ~ ., data = data)

    Coefficients:
           (Intercept)       annual_income  total_credit_lines         loan_amount  
               12.1961             -0.3791             -0.2425             -0.9216  
                  term  
                2.0547  

Convert to SQL using Orbital

  1. Load and use orbital to read the fitted workflow. In Databricks, names with dots (“.”) are not acceptable, and .pred is the default name that orbital gives the prediction. To fix, use the prefix argument to override.
library(orbital)

lending_orbital <- orbital(wf_fit, prefix = "pred")

lending_orbital
    ── orbital Object ──────────────────────────────────────────────────────────────
    • annual_income = (annual_income - 71942.79) / 42292.87
    • total_credit_lines = (total_credit_lines - 22.49333) / 11.66813
    • loan_amount = (loan_amount - 14928.33) / 9958.628
    • term = (term - 44.32) / 11.49872
    • pred = 12.19613 + (annual_income * -0.3791282) + (total_credit_lines * ...
    ────────────────────────────────────────────────────────────────────────────────
    5 equations in total.
  1. Load dbplyr and use tbl to create a reference to the lending table in the R session. To pass the fully qualified name, we can use glue and I().
library(dbplyr)

tbl_lending <- tbl(con, I(glue("{catalog}.`{schema}`.{table}")))

tbl_lending
    # Source:   table<sol_eng_demo_nickp.`end-to-end`.loans_full_schema> [?? x 56]
    # Database: Spark SQL 3.1.1[token@Spark SQL/hive_metastore]
       emp_title        emp_length state homeownership annual_income verified_income
       <chr>            <chr>      <chr> <chr>                 <dbl> <chr>          
     1 "global config … 3          NJ    MORTGAGE              90000 Verified       
     2 "warehouse offi… 10         HI    RENT                  40000 Not Verified   
     3 "assembly"       3          WI    RENT                  40000 Source Verified
     4 "customer servi… 1          PA    RENT                  30000 Not Verified   
     5 "security super… 10         CA    RENT                  35000 Verified       
     6  <NA>            NA         KY    OWN                   34000 Not Verified   
     7 "hr "            10         MI    MORTGAGE              35000 Source Verified
     8 "police"         10         AZ    MORTGAGE             110000 Source Verified
     9 "parts"          10         NV    MORTGAGE              65000 Source Verified
    10 "4th person"     3          IL    RENT                  30000 Not Verified   
    # ℹ more rows
    # ℹ 50 more variables: debt_to_income <chr>, annual_income_joint <chr>,
    #   verification_income_joint <chr>, debt_to_income_joint <chr>,
    #   delinq_2y <int64>, months_since_last_delinq <chr>,
    #   earliest_credit_line <int64>, inquiries_last_12m <int64>,
    #   total_credit_lines <int64>, open_credit_lines <int64>,
    #   total_credit_limit <int64>, total_credit_utilized <int64>, …
  1. In order to make the predictions part of a larger set of fields returned by the final query, the orbital_inline() function is used. This allows for it to be passed inside a dplyr mutate() call. orbital_inline() will modify the predictor fields based on the steps from the recipe, which, in the example’s case, is the normalization step. A quick way to retain the original values, if they are to be part of the final result, is to simply create copies of them via mutate(). Finally, since it is not necessary to return all of the fields, we reduce them via a select() call.
tbl_prep <- tbl_lending |>
  mutate(
    o_annual_income = annual_income,
    o_total_credit_lines = total_credit_lines,
    o_loan_amount = loan_amount,
    o_term = term
  ) |>
  mutate(!!!orbital_inline(lending_orbital)) |>
  select(
    pred, interest_rate, emp_title, balance, application_type,
    o_annual_income, o_total_credit_lines, o_loan_amount, o_term
  )
  1. Preview the top rows from the initial transformations.
tbl_prep |>
  head()
    # Source:   SQL [?? x 9]
    # Database: Spark SQL 3.1.1[token@Spark SQL/hive_metastore]
       pred interest_rate emp_title         balance application_type o_annual_income
      <dbl>         <dbl> <chr>               <dbl> <chr>                      <dbl>
    1  13.5         14.1  "global config e…  27016. individual                 90000
    2  11.8         12.6  "warehouse offic…   4651. individual                 40000
    3  12.0         17.1  "assembly"          1825. individual                 40000
    4  10.9          6.72 "customer servic…  18853. individual                 30000
    5  10.3         14.1  "security superv…  21430. joint                      35000
    6  11.8          6.72  <NA>               4257. individual                 34000
    # ℹ 3 more variables: o_total_credit_lines <int64>, o_loan_amount <int64>,
    #   o_term <int64>
  1. An additional step is added to keep only the rows with a current interest rate 15 points higher than the prediction.
tbl_final <- tbl_prep |>
  filter(interest_rate - pred > 15, pred > 0)
  1. Preview the results.
tbl_final
    # Source:   SQL [?? x 9]
    # Database: Spark SQL 3.1.1[token@Spark SQL/hive_metastore]
        pred interest_rate emp_title        balance application_type o_annual_income
       <dbl>         <dbl> <chr>              <dbl> <chr>                      <dbl>
     1  10.5          26.8 operational ris…      0  individual                210000
     2  14.0          30.8 sr admin assist…  19822. individual                 95731
     3  14.3          30.8 firefighter       24254. individual                 50000
     4  15.0          30.2 vice president …   9775. individual                118000
     5  10.6          30.8 mechanical desi…  15896. individual                 71500
     6  10.8          26.3 president of me…  34120. individual                320000
     7  12.5          28.7 pilot             29298. individual                190000
     8  13.9          30.8 teacher           24254. individual                 80000
     9  14.0          30.8 <NA>              19244. individual                 85000
    10  13.1          30.6 account manager   24251. individual                150000
    # ℹ more rows
    # ℹ 3 more variables: o_total_credit_lines <int64>, o_loan_amount <int64>,
    #   o_term <int64>
  1. Preview the actual SQL that will be sent using show_query().
tbl_final |>
  show_query()
    <SQL>
    SELECT `q01`.*
    FROM (
      SELECT
        (((12.1961333333333 + (`annual_income` * -0.379128214967168)) + (`total_credit_lines` * -0.242483849568646)) + (`loan_amount` * -0.921560182698075)) + (`term` * 2.05467819018834) AS `pred`,
        `interest_rate`,
        `emp_title`,
        `balance`,
        `application_type`,
        `o_annual_income`,
        `o_total_credit_lines`,
        `o_loan_amount`,
        `o_term`
      FROM (
        SELECT
          `emp_title`,
          `emp_length`,
          `state`,
          `homeownership`,
          (`annual_income` - 71942.7866666667) / 42292.8687411871 AS `annual_income`,
          `verified_income`,
          `debt_to_income`,
          `annual_income_joint`,
          `verification_income_joint`,
          `debt_to_income_joint`,
          `delinq_2y`,
          `months_since_last_delinq`,
          `earliest_credit_line`,
          `inquiries_last_12m`,
          (`total_credit_lines` - 22.4933333333333) / 11.6681286085312 AS `total_credit_lines`,
          `open_credit_lines`,
          `total_credit_limit`,
          `total_credit_utilized`,
          `num_collections_last_12m`,
          `num_historical_failed_to_pay`,
          `months_since_90d_late`,
          `current_accounts_delinq`,
          `total_collection_amount_ever`,
          `current_installment_accounts`,
          `accounts_opened_24m`,
          `months_since_last_credit_inquiry`,
          `num_satisfactory_accounts`,
          `num_accounts_120d_past_due`,
          `num_accounts_30d_past_due`,
          `num_active_debit_accounts`,
          `total_debit_limit`,
          `num_total_cc_accounts`,
          `num_open_cc_accounts`,
          `num_cc_carrying_balance`,
          `num_mort_accounts`,
          `account_never_delinq_percent`,
          `tax_liens`,
          `public_record_bankrupt`,
          `loan_purpose`,
          `application_type`,
          (`loan_amount` - 14928.3333333333) / 9958.62753532771 AS `loan_amount`,
          (`term` - 44.32) / 11.4987190825996 AS `term`,
          `interest_rate`,
          `installment`,
          `grade`,
          `sub_grade`,
          `issue_month`,
          `loan_status`,
          `initial_listing_status`,
          `disbursement_method`,
          `balance`,
          `paid_total`,
          `paid_principal`,
          `paid_interest`,
          `paid_late_fees`,
          `loan_id`,
          `o_annual_income`,
          `o_total_credit_lines`,
          `o_loan_amount`,
          `o_term`
        FROM (
          SELECT
            `loans_full_schema`.*,
            `annual_income` AS `o_annual_income`,
            `total_credit_lines` AS `o_total_credit_lines`,
            `loan_amount` AS `o_loan_amount`,
            `term` AS `o_term`
          FROM sol_eng_demo_nickp.`end-to-end`.loans_full_schema
        ) `q01`
      ) `q01`
    ) `q01`
    WHERE ((`interest_rate` - `pred`) > 15.0) AND (`pred` > 0.0)
  1. show_query() is mostly geared towards having a nice output to the R console. To capture the SQL in a variable, use remote_query().
final_sql <- remote_query(tbl_final)
  1. As a way to confirm that the SQL will run as returned by remote_query, use dbQuery() to run the statement against the database.
res <- dbGetQuery(con, final_sql)
head(res)
          pred interest_rate                    emp_title  balance application_type
    1 10.48399         26.77     operational risk manager     0.00       individual
    2 13.95838         30.79           sr admin assistant 19821.53       individual
    3 14.25211         30.79                  firefighter 24253.68       individual
    4 15.00983         30.17 vice president of operations  9774.95       individual
    5 10.58753         30.75          mechanical designer 15896.16       individual
    6 10.78165         26.30  president of media division 34119.79       individual
      o_annual_income o_total_credit_lines o_loan_amount o_term
    1          210000                   18          5000     36
    2           95731                   37         20600     60
    3           50000                   23         25000     60
    4          118000                   24         10000     60
    5           71500                   23         16175     36
    6          320000                   29         35000     60

Automate in Databricks

The brickster package provides an easy-to-use interface with Databricks. It allows us to create, setup and manage assets within Databricks, such as jobs, files and warehouses.

  1. The SQL tasks require a warehouse ID. db_sql_warehouse_list() retrieves all available warehouses and their details. The warehouse to be used is selected based on its name.
library(brickster)

warehouses <- db_sql_warehouse_list()

wh_names <- as.character(lapply(warehouses, function(x) x$name))

warehouse <- unlist(warehouses[wh_names == "David's Serverless Warehouse"])

warehouse_id <- warehouse[["id"]]
  1. The first step is to create a named query in Databricks. This is where final_sql is used.
db_request_query <- db_query_create(
  query_text = final_sql,
  catalog = catalog,
  schema = schema,
  display_name = "Interest rate differences",
  warehouse_id = warehouse_id,
  description = "Find differences in interest rate"
)
  1. A Databricks Job can be used to orchestrate multiple tasks. The job being created for this example requires a single task. Tasks could be a variety of kinds such as a Notebook, Python script, SQL Queries and many others. The following starts building the SQL task. It uses the db_request_query’s ID to tie it to the query created in the previous step.
db_task_query <- sql_query_task(
  query_id = db_request_query$id,
  warehouse_id = warehouse_id
)
  1. The SQL task object is used to create a formal Task. This is the point where the resulting task in the Databricks UI can be named and described.
db_task_job <- job_task(
  task = db_task_query,
  description = "Int rate diffs",
  task_key = "run_sql"
)
  1. For this example, the Job needs to have a schedule defined. And to do this, a CronSchedule object is needed. To define the schedule, use a CRON expression.
db_schedule <- cron_schedule(
  quartz_cron_expression = "0 0 12 * * ?",
  timezone_id = "CST"
)
  1. Finally, the Job is created using the task and schedule objects. After running the following command, a new Job named “Daily check interest differences” will appear in the Jobs section of the Databricks Web UI.
new_job <- db_jobs_create(
  name = "Daily check interest differences",
  tasks = job_tasks(db_task_job),
  schedule = db_schedule
)

Appendix

Data in example

The data used for this example was downloaded from OpenIntro. The page containing the description of the data and the download link is available here: loans_full_schema. The CSV file was manually uploaded to the Databricks Unity Catalog.

The loan_id field is not included in the data file. That was created using the following two SQL commands. This one to create the field:

ALTER TABLE sol_eng_demo_nickp.`end-to-end`.loans_full_schema 
ADD COLUMN loan_id BIGINT;

This is the SQL command to populate the table with a sequential number:

WITH cte AS (
  SELECT
    loan_id,
    ROW_NUMBER() OVER (ORDER BY debt_to_income) AS new_loan_id
  FROM
    sol_eng_demo_nickp.`end-to-end`.loans_full_schema
)
MERGE INTO sol_eng_demo_nickp.`end-to-end`.loans_full_schema AS target
USING cte
ON target.debt_to_income = cte.debt_to_income
WHEN MATCHED THEN
  UPDATE SET target.loan_id = cte.new_loan_id;

Learn more about putting models in production with Posit

By integrating Scikit-learn or tidymodels and the powerful Orbital package, you can simplify your MLOps pipeline while writing models in your preferred framework. Together with Databricks, this unified approach dramatically accelerates the journey from model development to production.

Learn more:

Headshot of Edgar Ruiz

Edgar Ruiz

Senior Software Engineer at Posit PBC
Edgar is a Senior Software Engineer at Posit PBC, in charge of the {sparklyr} package. He is the author of other packages such as {tidypredict}, {mall}, and {chattr}. Edgar is also a speaker and writer who focuses on the success of data scientists by promoting open-source tools.