Skip to main content

decorators

The LabSDK provides a set of decorators that can be used to configure the assets in a way that can be translated to an optimized production-ready solution by Raptor.

namespace

def namespace(namespace: str)

Register a namespace for the asset.

Arguments:

  • namespace (str): the name of namespace to attach the asset to. Example:
@namespace('my_namespace')

runtime

def runtime(packages: Optional[List[str]], env_name: Optional[str])

Register the runtime environment for the asset.

Arguments:

  • packages (list of str): list of PIP installable packages. You can specify a version pip notation, e.g. 'numpy==1.19.5' or 'numpy>=1.19.5'.
  • env_name (str): the name of the runtime virtual environment name. The environment should be pre-configured in the Raptor Core installation by your DevOps. Defaults to the 'default' runtime if not specified.

Example:

@runtime(packages=['numpy==1.21.1', 'phonenumbers'], env_name='default')

freshness

def freshness(max_age: Union[str, timedelta],
max_stale: Optional[Union[str, timedelta]] = None,
timeout: Optional[Union[str, timedelta]] = timedelta(seconds=1))

Set the freshness policy, and timeout of a feature or model. It is required so Raptor will be able to match the

production behaviour with the development behaviour. This decorator must be used in conjunction with a feature or model decorator.

Feature or Model values are considered fresh if they are younger than the max_age. If the value is older than max_age, we'll try to recompute it with a timeout of timeout. If we fail to recompute the value within timeout, we'll return the stale value as long as it is younger than max_stale.

Arguments:

  • max_age (timedelta or str of the form '2h 3m 4s'): the maximum age of a feature or model value. If the calculated value is older than max_age, we'll try to recompute the value.
  • max_stale (timedelta or str of the form '2h 3m 4s'): the time after which the feature or model is considered stale. If the value is older than max_stale, we'll return None. Defaults to max_age.
  • timeout (timedelta or str of the form '2h 3m 4s'): the maximum time allowed for the feature to be computed. defaults to 1 second. Example:
@freshness(max_age='1h', max_stale='2h', timeout='10s')

labels

def labels(labels: Dict[str, str])

Register labels for the asset.

Arguments:

  • labels (dict<str,str> (key, value)): a dictionary of tags. Example:
@labels({'owner': '@AlmogBaku', 'team': 'search'})

data_source

def data_source(training_data: DataFrame,
keys: Optional[Union[str, List[str]]] = None,
name: Optional[str] = None,
timestamp: Optional[str] = None,
production_config: Optional[SourceProductionConfig] = None)

Register a DataSource asset. The data source is a class that represents the schema of the data source in production.

It is used to validate the data source in production and to connect the data source to the feature and model assets.

Class signature:

This decorator should wrap a class that inherits from typing_extensions.TypedDict, the class content is optional and should reflect the schema of the data source.

Arguments:

  • training_data (DataFrame): DataFrame of training data. This should reflect the schema of the data source in production.
  • keys (str or list of str): list of columns that are keys.
  • name (str): name of the data source. Defaults to the class name.
  • timestamp (str): name of the timestamp column. If not specified, the timestamp is inferred from the training data.

Returns:

A wrapped class with additional methods and properties:

  • raptor_spec - the Raptor specification object.
    • manifest(to_file: bool = False) - a function that returns the data source manifest. If to_file is True, the manifest is written to a file.
    • export() - a function that exports the data source to the out directory.

Example:

@data_source(
training_data=pd.read_csv('deals.csv'),
keys=['id', 'account_id'],
timestamp='event_at',
)
class Deal(typing_extensions.TypedDict):
id: int
event_at: pd.Timestamp
account_id: str
amount: float
currency: str
is_win: bool

aggregation

def aggregation(function: Union[AggregationFunction, List[AggregationFunction],
str, List[str]], over: Union[str, timedelta,
None],
granularity: Union[str, timedelta, None])

Registers aggregations for the Feature Definition.

Arguments:

  • function (AggregationFunction or List[AggregationFunction] or str or List[str]): a list of :func:AggrFn.
  • over (str or timedelta in the form '2h 3m 4s'): the time period over which to aggregate.
  • granularity (str or timedelta in the form '2h 3m 4s'): the granularity of the aggregation (this is overriding the freshness' max_age). Example:
@aggregation(
function=['sum', 'count', 'avg'],
over='1d',
granularity='1h',
)

keep_previous

def keep_previous(versions: int, over: Union[str, timedelta])

Keep previous versions of the feature.

Arguments:

  • versions (int): the number of versions to keep (excluding the current value).
  • over (str or timedelta in the form '2h 3m 4s'): the maximum time period to keep a previous values in the history since the last update. You can specify 0 to keep the value until the next update.

Example:

@keep_previous(versions=3, over='1d')

feature

def feature(keys: Union[str, List[str]],
name: Optional[str] = None,
data_source: Optional[Union[str, object]] = None,
sourceless_markers_df: Optional[DataFrame] = None)

Registers a Feature Definition within the LabSDK.

A feature definition is a Python handler function that process a calculation request and calculates the feature value.

Feature signature:

The function signature of a feature definition must accept two arguments:

  1. this_row - A dictionary of the current row (this is reflects the schema of the data source).
  2. Context - A dictionary of the context. See Context for more details.

It must use a return type annotation to indicate the primitive type of the feature value.

Arguments:

  • keys (str or List[str]): a list of indexing keys, indicated the owner of the feature value.
  • name (str): the name of the feature. If not provided, the function name will be used.
  • data_source (str or DataSource object): the (fully qualified) name of the DataSource or a reference to the DataSource object.
  • sourceless_markers_df (DataFrame): a DataFrame with the timestamp and keys markers for training sourceless features. It a timestamp column, and a column for each key.

Returns:

function: It returns a wrapped function with a few additional methods/properties:

  • raptor_spec - The Raptor specification of the feature.
    • replay() - A function that can be used to replay the feature calculation using the training sata of the source.
    • manifest(to_file=False) - A function that returns the manifest of the feature.
    • export(with_dependent_source=True) - A function that exports the feature to out directory.

Example:

@feature(keys='account_id', data_source=Deal)
@freshness(max_age='1h', max_stale='2h')
def last_amount(this_row: Deal, ctx: Context) -> float:
return this_row['amount']

model

def model(keys: Union[str, List[str]],
input_features: Union[str, List[str], Callable, List[Callable]],
input_labels: Union[str, List[str], Callable, List[Callable]],
model_framework: Union[ModelFramework, str],
model_server: Optional[Union[ModelServer, str]] = None,
key_feature: Optional[Union[str, Callable]] = None,
prediction_output_schema: Optional[TypedDict] = None,
name: Optional[str] = None)

Register a Model Definition within the LabSDK.

Function Signature:

This decorator should wrap a training function that returns a trained model. The function signature of a model definition must accept TrainingContext as an argument.

Arguments:

  • keys (str or list of str): the keys of the model. The keys are required for fetching the features.
  • input_features (str or list of str or callable or list of callable): the features that are used as input to the model.
  • input_labels (str or list of str or callable or list of callable): the labels that are used as input to the model.
  • model_framework (ModelFramework or str): the model framework used to train the model.
  • model_server (ModelServer or str): the model server used to serve the model.
  • key_feature (str or callable): the feature that is used for joining the features together.
  • prediction_output_schema (TypedDict): the schema of the prediction output.
  • name (str): the name of the model. If not provided, the name will be the function name.

Returns:

`function Example:

@model(
keys=['customer_id'],
input_features=['total_spend+sum'],
input_labels=[amount],
model_framework='sklearn',
model_server='sagemaker-ack',
)
@freshness(max_age='1h', max_stale='100h')
def amount_prediction(ctx: TrainingContext):
from sklearn.linear_model import LinearRegression

df = ctx.features_and_labels()

trainer = LinearRegression()
trainer.fit(df[ctx.input_features], df[ctx.input_labels])

return trainer
````: a wrapped function `train()` that runs your training function with the `TrainingContext` provided.
It also provides a few new methods/properties to the returned function:

* `raptor_spec` - The Raptor spec of the model.
* `train()` - The training function.
* `features_and_labels()` - A function that returns a DataFrame of the features and labels.
* `manifest(to_file=False)` - A function that returns the manifest of the model.
* `export(with_dependent_features=True, with_dependent_sources=True)` - A function that exports the model to the `out` directory.
* `keys` - the keys of the model.
* `input_features` - the input features of the model.
* `input_labels` - the input labels of the model.