Databricks ist wohl ein idealer Kandidat für die Verwaltung von Daten, die Entwicklung von Modellen für Machine Learning und die Durchführung von Experimenten. Der interaktive und kollaborative Charakter der Plattform erleichtert es Ingenieuren und Wissenschaftlern, zusammenzuarbeiten und Lösungen zu liefern sowie Erkenntnisse aus komplexen Datensammlungen zu gewinnen. Modelle für Machine Learning müssen ständig aktualisiert werden und sich an neue Daten anpassen. Außerdem ist es wichtig, die historische Leistung solcher Modelle im Auge zu behalten und sogar auf frühere Versionen zurückgreifen zu können. In diesem Artikel werden wir untersuchen, wie MLflow uns dabei helfen kann, einige häufige Aufgaben, mit denen Datenpraktiker täglich konfrontiert sind, zu optimieren und zu automatisieren, und warum Databricks in Kombination mit MLflow eine großartige Kombination zur Lösung von Problemen im Bereich MLOps ist.
Was ist MLflow
MLflow ist eine Open-Source-Plattform, die dabei hilft, den gesamten Lernprozess des Machine Learnings zu optimieren, zu verwalten und die Tools für Experimente, Reproduzierbarkeit und Einsatz bereitstellt. Zu den wichtigsten Funktionen gehören:
- Experimentverfolgung, bei der Parameter, Metriken und Artefakte aus Experimenten protokolliert werden, sodass sich die Ergebnisse verschiedener Modelle und Experimente leicht vergleichen lassen.
- Modellverwaltung, bei der die Registrierung, Versionierung und Verwaltung des Lebenszyklus der Modelle für Machine Learning von der Entwicklung bis zur Produktion unterstützt wird.
- Modelle können in verschiedenen Umgebungen wie Cloud, lokal oder Docker bereitgestellt werden, wobei mehrere Frameworks unterstützt werden.
- Nahtlose Integration mit beliebten ML-Bibliotheken wie TensorFlow, PyTorch und scikit-learn, was eine flexible Workflow-Integration ermöglicht.
Databricks & MLflow End-to-End-Beispiel
Wir werden ein Python-Notebook entwickeln und einen typischen Lebenszyklus eines maschinellen Lernprojekts mit Hilfe von Databricks und MLflow vorstellen. Das Endziel ist die Erstellung eines Modells, das die Qualität von Wein auf der Grundlage verschiedener Eigenschaften wie Säuregehalt, pH-Wert, Alkoholgehalt usw. vorhersagt.
Zunächst laden wir die Daten aus dem Unity-Katalog als Pandas-Datenrahmen:
import pandas as pd
CATALOG_NAME = "nextlytics"
SCHEMA_NAME = "nextlytics-demo"
white_wine = spark.table(f"{CATALOG_NAME}.`{SCHEMA_NAME}`.winequality_red").toPandas()
red_wine = spark.table(f"{CATALOG_NAME}.`{SCHEMA_NAME}`.winequality_white").toPandas()
red_wine['is_red'] = 1
white_wine['is_red'] = 0
data = pd.concat([red_wine, white_wine], axis=0)
# Remove spaces from column names
data.rename(columns=lambda x: x.replace(' ', '_'), inplace=True)
# Consider a wine’s quality as high, if it’s 7 or above
high_quality = (data.quality >= 7).astype(int)
data.quality = high_quality
Der endgültige Datensatz sieht wie folgt aus:
Wir können den Datensatz nun in Untergruppen für Training, Validierung und Tests aufteilen. 60 % der Gesamtdaten werden für das Training verwendet, 20 % für die Validierung und 20 % für Tests.
from sklearn.model_selection import train_test_split
X = data.drop(["quality"], axis=1)
y = data.quality
# Split out the training data
X_train, X_rem, y_train, y_rem = train_test_split(
X, y, train_size=0.6, random_state=123
)
# Split the remaining data equally into validation and test
X_val, X_test, y_val, y_test = train_test_split(
X_rem, y_rem, test_size=0.5, random_state=123
Diese Aufgabe scheint für einen Random-Forest-Klassifikator gut geeignet zu sein, da die Ausgabe binär ist und es zu Interaktionen zwischen mehreren Variablen kommen kann. Wir erstellen das Modell und verfolgen seine Leistung anhand der Genauigkeitsbewertung.
import mlflow.pyfunc
import mlflow.sklearn
import numpy as np
import sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score, accuracy_score
from mlflow.models.signature import infer_signature
from mlflow.utils.environment import _mlflow_conda_env
import cloudpickle
import time
# The predict method of sklearn's RandomForestClassifier returns a binary classification (0 or 1).
# The following code creates a wrapper function, SklearnModelWrapper, that uses
# the predict_proba method to return the probability that the observation belongs to each class.
class SklearnModelWrapper(mlflow.pyfunc.PythonModel):
def __init__(self, model):
self.model = model
def predict(self, context, model_input):
return self.model.predict_proba(model_input)[:, 1]
# mlflow.start_run creates a new MLflow run to track the performance of this model.
# Within the context, you call mlflow.log_param to keep track of the parameters used, and
# mlflow.log_metric to record metrics like accuracy.
with mlflow.start_run(run_name="untuned_random_forest"):
n_estimators = 10
model = RandomForestClassifier(
n_estimators=n_estimators, random_state=np.random.RandomState(123)
)
model.fit(X_train, y_train)
# predict_proba returns [prob_negative, prob_positive], so slice the output with [:, 1]
predictions_test = model.predict_proba(X_test)[:, 1]
# auc_score = roc_auc_score(y_test, predictions_test)
accuracy = accuracy_score(y_test, predictions_test > 0.5)
mlflow.log_param("n_estimators", n_estimators)
# Use accuracy as a metric.
mlflow.log_metric("accuracy", accuracy)
wrappedModel = SklearnModelWrapper(model)
# Log the model with a signature that defines the schema of the model's inputs and outputs.
# When the model is deployed, this signature will be used to validate inputs.
signature = infer_signature(X_train, wrappedModel.predict(None, X_train))
# MLflow contains utilities to create a conda environment used to serve models.
# The necessary dependencies are added to a conda.yaml file which is logged along with the model.
conda_env = _mlflow_conda_env(
additional_conda_deps=None,
additional_pip_deps=[
"cloudpickle=={}".format(cloudpickle.__version__),
"scikit-learn=={}".format(sklearn.__version__),
],
additional_conda_channels=None,
)
mlflow.pyfunc.log_model(
"random_forest_model",
python_model=wrappedModel,
conda_env=conda_env,
signature=signature,
)
Wir können das Ergebnis des Experiments im Reiter „Experimente“ im Abschnitt „Machine Learning“ überprüfen.
Wir können das Modell jetzt im Unity-Katalog registrieren und überall in Databricks darauf verweisen.
run_id = (
mlflow.search_runs(filter_string='tags.mlflow.runName = "untuned_random_forest"')
.iloc[0]
.run_id
)
# Register the model to Unity Catalog.
Das Modell wird im Unity-Katalog unter der Rubrik „Modelle“ erscheinen.
Von dort aus können wir einige Details der aktuellen Version des Modells sehen.
Wie Ihr Business von Künstlicher Intelligenz
und Machine Learning profitiert -
Laden Sie sich hier Ihr Whitepaper herunter!
Wir können das Modell nun durch Angabe der Version abrufen und es zur erneuten Erstellung von Vorhersagen verwenden.
version = 1
model = mlflow.pyfunc.load_model(f"models:/{model_name}/{version}")
# Sanity-check: This should match the accuracy logged by MLflow
print(f'Accuracy: {accuracy_score(y_test, model.predict(X_test) > 0.5)}')
Das Random-Forest-Modell schnitt auch ohne Hyperparameter-Tuning gut ab. Die Genauigkeit liegt bei über 86 %, aber es gibt immer Raum für Verbesserungen. Wir können nun die leistungsstarke XGBoost-Bibliothek verwenden, um eine detaillierte Hyperparameter-Optimierung für das Modell durchzuführen. In diesem Experiment versuchen wir, die Fläche unter der Kurve (AUC) zu maximieren.
from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK
from hyperopt.pyll import scope
from math import exp
import mlflow.xgboost
import numpy as np
import xgboost as xgb
search_space = {
'max_depth': scope.int(hp.quniform('max_depth', 4, 100, 1)),
'learning_rate': hp.loguniform('learning_rate', -3, 0),
'reg_alpha': hp.loguniform('reg_alpha', -5, -1),
'reg_lambda': hp.loguniform('reg_lambda', -6, -1),
'min_child_weight': hp.loguniform('min_child_weight', -1, 3),
'objective': 'binary:logistic',
'seed': 123, # Set a seed for deterministic training
}
def train_model(params):
# With MLflow autologging, hyperparameters and the trained model are automatically logged to MLflow.
mlflow.xgboost.autolog()
with mlflow.start_run(nested=True):
train = xgb.DMatrix(data=X_train, label=y_train)
validation = xgb.DMatrix(data=X_val, label=y_val)
# Pass in the validation set so xgb can track an evaluation metric. XGBoost terminates training when the evaluation metric
# is no longer improving.
booster = xgb.train(params=params, dtrain=train, num_boost_round=1000,\
evals=[(validation, "validation")], early_stopping_rounds=50)
validation_predictions = booster.predict(validation)
auc_score = roc_auc_score(y_val, validation_predictions)
mlflow.log_metric('auc', auc_score)
signature = infer_signature(X_train, booster.predict(train))
mlflow.xgboost.log_model(booster, "model", signature=signature)
# Set the loss to -1*auc_score so fmin maximizes the auc_score
return {'status': STATUS_OK, 'loss': -1*auc_score, 'booster': booster.attributes()}
# Greater parallelism will lead to speedups, but a less optimal hyperparameter sweep.
# A reasonable value for parallelism is the square root of max_evals.
spark_trials = SparkTrials(parallelism=10)
# Run fmin within an MLflow run context so that each hyperparameter configuration is logged as a child run of a parent
# run called "xgboost_models" .
with mlflow.start_run(run_name='xgboost_models'):
best_params = fmin(
fn=train_model,
space=search_space,
algo=tpe.suggest,
max_evals=96,
trials=spark_trials,
)
Die einzelnen Läufe werden auf der Seite „Experimente“ angezeigt.
Jetzt können wir das leistungsstärkste Modell auswählen, es registrieren und eine neue Version wird erstellt.
best_run = mlflow.search_runs(order_by=['metrics.auc DESC']).iloc[0]
new_model_version = mlflow.register_model(f"runs:/{best_run.run_id}/model", model_name)
Es ist an der Zeit, das optimierte Modell für einige tatsächliche Vorhersagen zu neuen, noch nicht bekannten Daten zu verwenden.
# To simulate a new corpus of data, save the existing X_train data to a Delta table.
# In the real world, this would be a new batch of data.
spark_df = spark.createDataFrame(X_train)
table_name = f"{CATALOG_NAME}.`{SCHEMA_NAME}`.wine_data"
(spark_df
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema",True)
.saveAsTable(table_name)
)
Dann rufen wir das neu erstellte, optimierte Modell ab und verwenden es, um Vorhersagen zu generieren.
from pyspark.sql.functions import struct
apply_model_udf = mlflow.pyfunc.spark_udf(spark, f"models:/{model_name}/2")
new_data = spark.read.table(f"{CATALOG_NAME}.`{SCHEMA_NAME}`.wine_data")
# Apply the model to the new data
udf_inputs = struct(*(X_train.columns.tolist()))
new_data = new_data.withColumn(
"prediction",
apply_model_udf(udf_inputs)
)
Die letzte Spalte gibt an, ob ein bestimmter Wein als hochwertig angesehen wird. Ein Wert über 0,5 könnte als hochwertiger Wein interpretiert werden.
Databricks und MLflow - Unser Fazit
Anhand dieses einfachen Beispiels für ein End-to-End-Projekt für maschinelles Lernen wird deutlich, dass MLflow ein leistungsstarkes Tool ist, das jedem Datenpraktiker zur Verfügung steht. Mit einer intuitiven (und sehr Python-ähnlichen) API reichen wenige Codezeilen aus, um Experimente zu entwerfen und durchzuführen, Metriken und Ergebnisse zu protokollieren, Modelle zu speichern und abzurufen und ihre Leistung im Laufe der Zeit zu verfolgen. Durch die nahtlose Integration in Databricks können wir Komponenten wie den Unity-Katalog für die Speicherung, ein leistungsstarkes Spark-Backend für komplexe Berechnungen und eine Jupyter-Notebook-Umgebung für die Entwicklung und Zusammenarbeit mit anderen nutzen.
Haben Sie Fragen zu MLflow, Databricks oder anderen Themen? Wir freuen wir uns darauf, mehr über Ihre Herausforderungen zu erfahren und helfen Ihnen gerne dabei, optimale Lösungen zu finden und diese umzusetzen.