NextLytics Blog

Databricks und MLflow: Ideales Match für skalierbares Machine Learning

Geschrieben von Apostolos Kouzoukos | 14.11.2024 10:59:18

Databricks ist wohl ein idealer Kandidat für die Verwaltung von Daten, die Entwicklung von Modellen für Machine Learning und die Durchführung von Experimenten. Der interaktive und kollaborative Charakter der Plattform erleichtert es Ingenieuren und Wissenschaftlern, zusammenzuarbeiten und Lösungen zu liefern sowie Erkenntnisse aus komplexen Datensammlungen zu gewinnen. Modelle für Machine Learning müssen ständig aktualisiert werden und sich an neue Daten anpassen. Außerdem ist es wichtig, die historische Leistung solcher Modelle im Auge zu behalten und sogar auf frühere Versionen zurückgreifen zu können. In diesem Artikel werden wir untersuchen, wie MLflow uns dabei helfen kann, einige häufige Aufgaben, mit denen Datenpraktiker täglich konfrontiert sind, zu optimieren und zu automatisieren, und warum Databricks in Kombination mit MLflow eine großartige Kombination zur Lösung von Problemen im Bereich MLOps ist.

Was ist MLflow

MLflow ist eine Open-Source-Plattform, die dabei hilft, den gesamten Lernprozess des Machine Learnings zu optimieren, zu verwalten und die Tools für Experimente, Reproduzierbarkeit und Einsatz bereitstellt. Zu den wichtigsten Funktionen gehören:

  • Experimentverfolgung, bei der Parameter, Metriken und Artefakte aus Experimenten protokolliert werden, sodass sich die Ergebnisse verschiedener Modelle und Experimente leicht vergleichen lassen.
  • Modellverwaltung, bei der die Registrierung, Versionierung und Verwaltung des Lebenszyklus der Modelle für Machine Learning von der Entwicklung bis zur Produktion unterstützt wird.
  • Modelle können in verschiedenen Umgebungen wie Cloud, lokal oder Docker bereitgestellt werden, wobei mehrere Frameworks unterstützt werden.
  • Nahtlose Integration mit beliebten ML-Bibliotheken wie TensorFlow, PyTorch und scikit-learn, was eine flexible Workflow-Integration ermöglicht.

Databricks & MLflow End-to-End-Beispiel

Wir werden ein Python-Notebook entwickeln und einen typischen Lebenszyklus eines maschinellen Lernprojekts mit Hilfe von Databricks und MLflow vorstellen. Das Endziel ist die Erstellung eines Modells, das die Qualität von Wein auf der Grundlage verschiedener Eigenschaften wie Säuregehalt, pH-Wert, Alkoholgehalt usw. vorhersagt.

Zunächst laden wir die Daten aus dem Unity-Katalog als Pandas-Datenrahmen:

import pandas as pd

CATALOG_NAME = "nextlytics"

SCHEMA_NAME = "nextlytics-demo"

white_wine = spark.table(f"{CATALOG_NAME}.`{SCHEMA_NAME}`.winequality_red").toPandas()

red_wine = spark.table(f"{CATALOG_NAME}.`{SCHEMA_NAME}`.winequality_white").toPandas()

red_wine['is_red'] = 1

white_wine['is_red'] = 0

data = pd.concat([red_wine, white_wine], axis=0)

# Remove spaces from column names

data.rename(columns=lambda x: x.replace(' ', '_'), inplace=True)

# Consider a wine’s quality as high, if it’s 7 or above

high_quality = (data.quality >= 7).astype(int)

data.quality = high_quality

 

Der endgültige Datensatz sieht wie folgt aus:


Wir können den Datensatz nun in Untergruppen für Training, Validierung und Tests aufteilen. 60 % der Gesamtdaten werden für das Training verwendet, 20 % für die Validierung und 20 % für Tests.

from sklearn.model_selection import train_test_split

X = data.drop(["quality"], axis=1)

y = data.quality

# Split out the training data

X_train, X_rem, y_train, y_rem = train_test_split(

   X, y, train_size=0.6, random_state=123

)

# Split the remaining data equally into validation and test

X_val, X_test, y_val, y_test = train_test_split(

   X_rem, y_rem, test_size=0.5, random_state=123

 

Diese Aufgabe scheint für einen Random-Forest-Klassifikator gut geeignet zu sein, da die Ausgabe binär ist und es zu Interaktionen zwischen mehreren Variablen kommen kann. Wir erstellen das Modell und verfolgen seine Leistung anhand der Genauigkeitsbewertung.

import mlflow.pyfunc

import mlflow.sklearn

import numpy as np

import sklearn

from sklearn.ensemble import RandomForestClassifier

from sklearn.metrics import roc_auc_score, accuracy_score

from mlflow.models.signature import infer_signature

from mlflow.utils.environment import _mlflow_conda_env

import cloudpickle

import time

# The predict method of sklearn's RandomForestClassifier returns a binary classification (0 or 1).

# The following code creates a wrapper function, SklearnModelWrapper, that uses

# the predict_proba method to return the probability that the observation belongs to each class.

class SklearnModelWrapper(mlflow.pyfunc.PythonModel):

   def __init__(self, model):

       self.model = model

   def predict(self, context, model_input):

       return self.model.predict_proba(model_input)[:, 1]

# mlflow.start_run creates a new MLflow run to track the performance of this model.

# Within the context, you call mlflow.log_param to keep track of the parameters used, and

# mlflow.log_metric to record metrics like accuracy.

with mlflow.start_run(run_name="untuned_random_forest"):

   n_estimators = 10

   model = RandomForestClassifier(

       n_estimators=n_estimators, random_state=np.random.RandomState(123)

   )

   model.fit(X_train, y_train)

   # predict_proba returns [prob_negative, prob_positive], so slice the output with [:, 1]

   predictions_test = model.predict_proba(X_test)[:, 1]

   # auc_score = roc_auc_score(y_test, predictions_test)

   accuracy = accuracy_score(y_test, predictions_test > 0.5)

   mlflow.log_param("n_estimators", n_estimators)

   # Use accuracy as a metric.

   mlflow.log_metric("accuracy", accuracy)

   wrappedModel = SklearnModelWrapper(model)

   # Log the model with a signature that defines the schema of the model's inputs and outputs.

   # When the model is deployed, this signature will be used to validate inputs.

   signature = infer_signature(X_train, wrappedModel.predict(None, X_train))

   # MLflow contains utilities to create a conda environment used to serve models.

   # The necessary dependencies are added to a conda.yaml file which is logged along with the model.

   conda_env = _mlflow_conda_env(

       additional_conda_deps=None,

       additional_pip_deps=[

           "cloudpickle=={}".format(cloudpickle.__version__),

           "scikit-learn=={}".format(sklearn.__version__),

       ],

       additional_conda_channels=None,

   )

   mlflow.pyfunc.log_model(

       "random_forest_model",

       python_model=wrappedModel,

       conda_env=conda_env,

       signature=signature,

   )

 

Wir können das Ergebnis des Experiments im Reiter „Experimente“ im Abschnitt „Machine Learning“ überprüfen.

 

Wir können das Modell jetzt im Unity-Katalog registrieren und überall in Databricks darauf verweisen.

run_id = (

   mlflow.search_runs(filter_string='tags.mlflow.runName = "untuned_random_forest"')

   .iloc[0]

   .run_id

)

# Register the model to Unity Catalog.

 

Das Modell wird im Unity-Katalog unter der Rubrik „Modelle“ erscheinen.

 

Von dort aus können wir einige Details der aktuellen Version des Modells sehen.

 

Wie Ihr Business von Künstlicher Intelligenz
und Machine Learning profitiert -
Laden Sie sich hier Ihr Whitepaper herunter! 

Wir können das Modell nun durch Angabe der Version abrufen und es zur erneuten Erstellung von Vorhersagen verwenden.

version = 1

model = mlflow.pyfunc.load_model(f"models:/{model_name}/{version}")

# Sanity-check: This should match the accuracy logged by MLflow

print(f'Accuracy: {accuracy_score(y_test, model.predict(X_test) > 0.5)}')

Das Random-Forest-Modell schnitt auch ohne Hyperparameter-Tuning gut ab. Die Genauigkeit liegt bei über 86 %, aber es gibt immer Raum für Verbesserungen. Wir können nun die leistungsstarke XGBoost-Bibliothek verwenden, um eine detaillierte Hyperparameter-Optimierung für das Modell durchzuführen. In diesem Experiment versuchen wir, die Fläche unter der Kurve (AUC) zu maximieren.

from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK

from hyperopt.pyll import scope

from math import exp

import mlflow.xgboost

import numpy as np

import xgboost as xgb

search_space = {

 'max_depth': scope.int(hp.quniform('max_depth', 4, 100, 1)),

 'learning_rate': hp.loguniform('learning_rate', -3, 0),

 'reg_alpha': hp.loguniform('reg_alpha', -5, -1),

 'reg_lambda': hp.loguniform('reg_lambda', -6, -1),

 'min_child_weight': hp.loguniform('min_child_weight', -1, 3),

 'objective': 'binary:logistic',

 'seed': 123, # Set a seed for deterministic training

}

def train_model(params):

 # With MLflow autologging, hyperparameters and the trained model are automatically logged to MLflow.

 mlflow.xgboost.autolog()

 with mlflow.start_run(nested=True):

   train = xgb.DMatrix(data=X_train, label=y_train)

   validation = xgb.DMatrix(data=X_val, label=y_val)

   # Pass in the validation set so xgb can track an evaluation metric. XGBoost terminates training when the evaluation metric

   # is no longer improving.

   booster = xgb.train(params=params, dtrain=train, num_boost_round=1000,\

                       evals=[(validation, "validation")], early_stopping_rounds=50)

   validation_predictions = booster.predict(validation)

   auc_score = roc_auc_score(y_val, validation_predictions)

   mlflow.log_metric('auc', auc_score)

   signature = infer_signature(X_train, booster.predict(train))

   mlflow.xgboost.log_model(booster, "model", signature=signature)

  

   # Set the loss to -1*auc_score so fmin maximizes the auc_score

   return {'status': STATUS_OK, 'loss': -1*auc_score, 'booster': booster.attributes()}

# Greater parallelism will lead to speedups, but a less optimal hyperparameter sweep.

# A reasonable value for parallelism is the square root of max_evals.

spark_trials = SparkTrials(parallelism=10)

# Run fmin within an MLflow run context so that each hyperparameter configuration is logged as a child run of a parent

# run called "xgboost_models" .

with mlflow.start_run(run_name='xgboost_models'):

 best_params = fmin(

   fn=train_model,

   space=search_space,

   algo=tpe.suggest,

   max_evals=96,

   trials=spark_trials,

 )

 

Die einzelnen Läufe werden auf der Seite „Experimente“ angezeigt.

Jetzt können wir das leistungsstärkste Modell auswählen, es registrieren und eine neue Version wird erstellt.

best_run = mlflow.search_runs(order_by=['metrics.auc DESC']).iloc[0]

new_model_version = mlflow.register_model(f"runs:/{best_run.run_id}/model", model_name)

 

Es ist an der Zeit, das optimierte Modell für einige tatsächliche Vorhersagen zu neuen, noch nicht bekannten Daten zu verwenden.

# To simulate a new corpus of data, save the existing X_train data to a Delta table.

# In the real world, this would be a new batch of data.

spark_df = spark.createDataFrame(X_train)

table_name = f"{CATALOG_NAME}.`{SCHEMA_NAME}`.wine_data"

(spark_df

 .write

 .format("delta")

 .mode("overwrite")

 .option("overwriteSchema",True)

 .saveAsTable(table_name)

)

 

Dann rufen wir das neu erstellte, optimierte Modell ab und verwenden es, um Vorhersagen zu generieren.

from pyspark.sql.functions import struct

apply_model_udf = mlflow.pyfunc.spark_udf(spark, f"models:/{model_name}/2")

new_data = spark.read.table(f"{CATALOG_NAME}.`{SCHEMA_NAME}`.wine_data")

# Apply the model to the new data

udf_inputs = struct(*(X_train.columns.tolist()))

new_data = new_data.withColumn(

 "prediction",

 apply_model_udf(udf_inputs)

)

 

Die letzte Spalte gibt an, ob ein bestimmter Wein als hochwertig angesehen wird. Ein Wert über 0,5 könnte als hochwertiger Wein interpretiert werden.

Databricks und MLflow - Unser Fazit

Anhand dieses einfachen Beispiels für ein End-to-End-Projekt für maschinelles Lernen wird deutlich, dass MLflow ein leistungsstarkes Tool ist, das jedem Datenpraktiker zur Verfügung steht. Mit einer intuitiven (und sehr Python-ähnlichen) API reichen wenige Codezeilen aus, um Experimente zu entwerfen und durchzuführen, Metriken und Ergebnisse zu protokollieren, Modelle zu speichern und abzurufen und ihre Leistung im Laufe der Zeit zu verfolgen. Durch die nahtlose Integration in Databricks können wir Komponenten wie den Unity-Katalog für die Speicherung, ein leistungsstarkes Spark-Backend für komplexe Berechnungen und eine Jupyter-Notebook-Umgebung für die Entwicklung und Zusammenarbeit mit anderen nutzen.

Haben Sie Fragen zu MLflow, Databricks oder anderen Themen? Wir freuen wir uns darauf, mehr über Ihre Herausforderungen zu erfahren und helfen Ihnen gerne dabei, optimale Lösungen zu finden und diese umzusetzen.