Getting Started
Using the LabSDK, data-scientists can build models(that can run on production) directly from the notebook.
When you're done, you can "export" your work, like any other production asset. This way, you can focus on your model, while Raptor is taking care of the production concerns.
π§ Getting startedβ
In this quickstart tutorial, we'll build a model that predicts the probability of closing a deal.
Our CRM allow us to track every email communication, and the history of previous deals for each customer. We'll use this data sources to predict whether the customer is ready for closure or not.
To do that, we're going to build a few features from the data:
emails_10h
- the amount of email exchanges over the last 10 hoursquestion_marks_10h+avg
- the average amount of question marks in the subject over the last 10 hoursdeals_10h+sum
- the sum of the deals of the last 10 hoursemails_deals
- the rate between the emails in the last 10 hours (emails_10h
) and the avarage of the deals in the last 10 hours (deals_10h[avg]
)diff_with_previous_amount
- the delta between the last amount and the one beforelast_amount
- our label
β‘οΈ Installing the SDKβ
Yalla, let's go! First, we install the LabSDK and import it.
!pip install raptor-labsdk pyarrow -U --quiet
from raptor import *
import pandas as pd
from datetime import datetime
from typing_extensions import TypedDict
ββββββββββββββββββββββββββββββββββββββββ 51.1/51.1 kB 1.3 MB/s eta 0:00:00
ββββββββββββββββββββββββββββββββββββββββ 40.8/40.8 MB 15.0 MB/s eta 0:00:00
ββββββββββββββββββββββββββββββββββββββββ 985.1/985.1 kB 17.2 MB/s eta 0:00:00
ββββββββββββββββββββββββββββββββββββββββ 57.5/57.5 kB 2.2 MB/s eta 0:00:00
ββββββββββββββββββββββββββββββββββββββββ 200.6/200.6 kB 8.7 MB/s eta 0:00:00
ββββββββββββββββββββββββββββββββββββββββ 135.3/135.3 kB 6.5 MB/s eta 0:00:00
βββββββββββββββββββββββββββββββββ βββββββ 57.3/57.3 kB 2.0 MB/s eta 0:00:00
ββββββββββββββββββββββββββββββββββββββββ 100.0/100.0 kB 7.2 MB/s eta 0:00:00
ββββββββββββββββββββββββββββββββββββββββ 53.1/53.1 kB 3.7 MB/s eta 0:00:00
ββββββββββββββββββββββββββββββββββββββββ 68.9/68.9 kB 3.3 MB/s eta 0:00:00
ββββββββββββββββββββββββββββββββββββββββ 60.8/60.8 kB 2.8 MB/s eta 0:00:00
ββββββββββββββββββββββββββββββββββββββββ 1.3/1.3 MB 19.1 MB/s eta 0:00:00
ββββββββββββββββββββββββββββββββββββββββ 45.6/45.6 kB 2.8 MB/s eta 0:00:00
ββββββββββββββββββββββββββββββββββββββββ 58.3/58.3 kB 1.5 MB/s eta 0:00:00
βοΈ Writing our first featuresβ
Our first feature is calculating how many emails an account got over the last 10 hours.
To do that, we first define our data-sources, then we can start transforming our data.
@data_source(
training_data=pd.read_parquet('https://gist.github.com/AlmogBaku/8be77c2236836177b8e54fa8217411f2/raw/emails.parquet'), # This is the data as looks in production
keys=['id', 'account_id'],
timestamp='event_at',
production_config=StreamingConfig(kind='kafka'), # This optional, and will create the production data-source configuration for DevOps
)
class Email(TypedDict('Email', {'from': str})):
event_at: datetime
account_id: str
subject: str
to: str
@feature(keys='account_id', data_source=Email)
@aggregation(function=AggregationFunction.Count, over='10h', granularity='1h')
def emails_10h(this_row: Email, ctx: Context) -> int:
"""email over 10 hours"""
return 1
@feature(keys='account_id', data_source=Email)
@aggregation(function=AggregationFunction.Avg, over='10h', granularity='1h')
def question_marks_10h(this_row: Email, ctx: Context) -> int:
"""question marks over 10 hours"""
return this_row['subject'].count('?')
π Cool tipβ
You can use the
@runtime
decorator to specify packages you want to install and use.
Let's create another feature that calculates various aggregations against the deal amount.
@data_source(
training_data=pd.read_csv('https://gist.githubusercontent.com/AlmogBaku/8be77c2236836177b8e54fa8217411f2/raw/deals.csv'),
keys=['id', 'account_id'],
timestamp='event_at',
)
class Deal(TypedDict):
id: int
event_at: datetime
account_id: str
amount: float
@feature(keys='account_id', data_source=Deal)
@aggregation(
function=[AggregationFunction.Sum, AggregationFunction.Avg, AggregationFunction.Max, AggregationFunction.Min],
over='10h',
granularity='1m'
)
def deals_10h(this_row: Deal, ctx: Context) -> float:
"""sum/avg/min/max of deal amount over 10 hours"""
return this_row['amount']
Now we can create a derived feature that defines the rate between these two features.
π‘Hint: Notice that when querying a feature with aggregation, we need to specify the feature with the aggregation feature we want using the feature selector.
@feature(keys='account_id', sourceless_markers_df=Deal.raptor_spec.local_df)
@freshness(max_age='-1', max_stale='-1')
def emails_deals(_, ctx: Context) -> float:
"""emails/deal[avg] rate over 10 hours"""
e, _ = ctx.get_feature('emails_10h+count')
d, _ = ctx.get_feature('deals_10h+avg')
if e is None or d is None:
return None
return e / d
Finally, we'll create last_amount
which will reserve one previous
value. We'll use this feature as our label, and to calculte the delta
between the previous amount.
@feature(keys='account_id', data_source=Deal)
@freshness(max_age='1h', max_stale='2h')
@keep_previous(versions=1, over='1h')
def last_amount(this_row: Deal, ctx: Context) -> float:
return this_row['amount']
@feature(keys='account_id', sourceless_markers_df=Deal.raptor_spec.local_df)
@freshness(max_age='1h', max_stale='2h')
def diff_with_previous_amount(this_row: Deal, ctx: Context) -> float:
lv, ts = ctx.get_feature('last_amount@-1')
if lv is None:
return 0
return this_row['amount'] - lv
π§ Training our modelβ
After we defined our features, and wrote our feature engineering code, we can start and train our model.
@model(
keys=['account_id'],
input_features=[
'emails_10h+count', 'deals_10h+sum', emails_deals, diff_with_previous_amount, 'question_marks_10h+avg',
],
input_labels=[last_amount],
model_framework='sklearn',
model_server='sagemaker-ack',
)
@freshness(max_age='1h', max_stale='100h')
def deal_prediction(ctx: TrainingContext) -> float:
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
df = ctx.features_and_labels()
X = df[ctx.input_features]
y = df[ctx.input_labels]
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)
# Transform y_train to a 1D array
y_train = y_train.values.ravel()
# Initialize an XGBoost model
xgb_model = XGBClassifier()
# Fit the model to the training data
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
# Initialize the LabelEncoder
le = LabelEncoder()
y_train_encoded = le.fit_transform(y_train)
# Fit the model with the encoded labels
xgb_model.fit(X_train, y_train_encoded)
return xgb_model
deal_prediction.export()
βοΈ Deploymentβ
Your'e officially done! π
To deploy your model, instruct your DevOps team to deploy it using the
existing CI/CD using the generated Makefile
in the out
dir (or
manually using kubectl
).
!make -C out/
ββ
ββ ββ
β βββ βββ β
βββ βββ βββ βββ
βββββββββββββ β ββββ
ββ βββ ββ β
βββ βββ β ββββ
βββ ββββ ββββββββββ ββ ββββββββ ββββββββ βββββββββββ β ββββββ
ββββββββββββ ββββ βββ βββ ββββ βββ βββ βββ βββ
βββ ββββ βββ βββ βββ βββ ββ ββ ββ ββ
βββ ββββ ββββ βββββ βββ ββββ βββ βββ ββ ββ
βββ ββββ ββββββ ββ βββββββββ βββββ ββββββ β
βββ
ββ
Usage:
make <target>
General
help Display this help.
Helpers
show-envs Show all environment variables that are available for configuring the generated YAML manifests
All
all Build docker images for all models, push them to the docker repository and deploy all data-sources, features and models to Kubernetes
deploy Deploy all data-sources, features and models to Kubernetes
all-ecr Build docker images for all models, create ECR repos if not exists, push the images to the docker repository and deploy all data-sources, features and models to Kubernetes
deploy-ecr Deploy all data-sources, features and models to Kubernetes
Data Sources
deploy-dsrcs Deploy all data-sources to Kubernetes
deploy-dsrc-default-email Deploy default.email to Kubernetes
deploy-dsrc-default-deal Deploy default.deal to Kubernetes
Features
deploy-features Deploy all features to Kubernetes
deploy-feat-default-emails_10h Deploy default.emails_10h to Kubernetes
deploy-feat-default-deals_10h Deploy default.deals_10h to Kubernetes
deploy-feat-default-emails_deals Deploy default.emails_deals to Kubernetes
deploy-feat-default-diff_with_previous_amount Deploy default.diff_with_previous_amount to Kubernetes
deploy-feat-default-question_marks_10h Deploy default.question_marks_10h to Kubernetes
Models (All)
deploy-models Deploy all models to Kubernetes
docker-build-models Build docker images for all models
docker-push-models Push docker images for all models
create-model-ecr-repos Create ECR repositories for all models if they don't exist
docker-ecr-push-models Push docker images for all models to ECR
deploy-ecr-models Deploy all models to Kubernetes
Models.PHONY: create-model-ecr-repo-default-deal_prediction
create-model-ecr-repo-default-deal_prediction Create ECR repository for default.deal_prediction if it doesn't exist
docker-ecr-push-model-default-deal_prediction Push docker image for default.deal_prediction to ECR
deploy-ecr-model-default-deal_prediction Deploy default.deal_prediction to ECR.PHONY: docker-build-model-default-deal_prediction
docker-build-model-default-deal_prediction Build docker image for default.deal_prediction
docker-push-model-default-deal_prediction Push default.deal_prediction docker image
deploy-model-default-deal_prediction Deploy default.deal_prediction to Kubernetes
πͺ Ta-dam!β
From now on, you'll have features and models running in production and record the values for historical purposes (so you'll be able to retrain against the production data).
π Learn more about what else you can do with Raptor at the official docs