Use RAPIDS on a GPU Cluster
Overview
This example is an extension of the example of using RAPIDS on a single GPU to train a random forest model on NYC taxi data, only here we will be using multiple GPUs. We will be using Dask to orchestrate the model training over multiple worker machines, each with a GPU. GPU clusers can be valuable for training models quickly and can be necessary if your data is too large to fit into a single GPU’s memory.
We recommend you skim the single GPU example first if you haven’t read it already.
Modeling Process
Imports
Compared to the first excercise, this exercise uses a few new packages.
dask_saturn
anddask_distributed
: Set up and run the Dask cluster in Saturn Cloud.dask-cudf
: Create distributedcudf
dataframes using Dask.
from dask_saturn import SaturnCluster
from dask.distributed import Client, wait
import dask_cudf
from cuml.dask.ensemble import RandomForestClassifier
from cuml.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt
Start the Dask Cluster
The template resource you are running has a Dask cluster already attached to it with three workers. The dask-saturn
code below creates two important objects: a cluster and a client.
cluster
: knows about and manages the scheduler and workers- can be used to create, resize, reconfigure, or destroy those resources
- knows how to communicate with the scheduler, and where to find logs and diagnostic dashboards
client
: tells the cluster to do things- can send work to the cluster
- can restart all the worker processes
- can send data to the cluster or pull data back from the cluster
n_workers = 3
cluster = SaturnCluster(n_workers=n_workers)
client = Client(cluster)
If you already started the Dask cluster on the resource page, then the code above will run much more quickly since it will not have to wait for the cluster to turn on.
Pro tip: Create and start the cluster in the Saturn Cloud UI before opening JupyterLab if you want to get a head start!
The last command ensures the kernel waits until all the desired workers are online before continuing.
client.wait_for_workers(n_workers=n_workers)
Download and Examine the Dataset
The code below loads the data into a dask-cudf
dataframe. You can interact with this data structure as if it were just a regular cudf
dataframe, but it is actually a collection of smaller cudf
dataframes spread across the workers in the Dask cluster.
taxi = (
dask_cudf.read_parquet(
"s3://saturn-public-data/nyc-taxi/data/yellow_tripdata_2019-01.parquet",
storage_options={"anon": True},
assume_missing=True,
)
.repartition(10)
.persist()
)
wait(taxi)
Many dataframe operations that you would execute on a pandas dataframe, like .head()
and .dtypes
, also work on a dask-cudf
dataframe.
Simple commands might take longer than you are used to. This is due to the distributed nature of the dataframe.
You can compute the length and memory usage of the dataset using the following code.
num_rows = len(taxi)
memory_usage = taxi.memory_usage(deep=True).sum().compute() / 1e9
print(f"Num rows: {num_rows}, Memory Usage: {memory_usage} GB")
Note: Dask is lazily evaluated. The result from a computation is not computed until you ask for it. Instead, a Dask task graph for the computation is produced. Anytime you have a Dask object and you want to get the result, call
.compute()
.
When we say that a dask-cudf
dataframe is a distributed dataframe, that means that it comprises multiple smaller cudf
dataframes. Run the following to see how many of these pieces (called “partitions”) there are.
taxi.npartitions
Preprocess the Data
This code looks nearly identical to the code you ran in the single-node RAPIDS example. dask-cudf
translates regular cudf
operations into the corresponding distributed operations.
def prep_df(df: dask_cudf.DataFrame) -> dask_cudf.DataFrame:
df = df[df["fare_amount"] > 0] # to avoid a divide by zero error
df["tip_fraction"] = df["tip_amount"] / df["fare_amount"]
df["target"] = df["tip_fraction"] > 0.2
df["pickup_weekday"] = df["tpep_pickup_datetime"].dt.weekday
df["pickup_hour"] = df["tpep_pickup_datetime"].dt.hour
df["pickup_week_hour"] = (df["pickup_weekday"] * 24) + df.pickup_hour
df["pickup_minute"] = df["tpep_pickup_datetime"].dt.minute
df = df[
[
"pickup_weekday",
"pickup_hour",
"pickup_week_hour",
"pickup_minute",
"passenger_count",
"PULocationID",
"DOLocationID",
"target",
]
]
df = df.astype("float32").fillna(-1)
df["target"] = df["target"].astype("int32")
return df
taxi = prep_df(taxi)
Since this is a binary classification task, before proceeding we should examine the proportion of 1s and 0s in the target. Note that we add .compute()
to ask for the results of the calculation immediately.
taxi["target"].value_counts(normalize=True).compute()
Now that the dataframe has been processed, let’s check its length and size in memory again. Again, we need to add .compute()
in order to get the results immediately.
num_rows = len(taxi)
memory_usage = taxi.memory_usage(deep=True).sum().compute() / 1e9
print(f"Num rows: {num_rows}, Memory Usage: {memory_usage} GB")
Train a Random Forest Model
Now that the data has been prepped, it’s time to build a model! This code is identical to the first example, except we are using the Dask version of the cuml RandomForestClassifier
.
X = taxi.drop(columns=["target"])
y = taxi["target"]
rfc = RandomForestClassifier(n_estimators=100, max_depth=10, n_streams=4)
%%time
_ = rfc.fit(X, y)
As you might expect, this model takes less time to run than the single GPU example!
Calculate Metrics on a Test Set
We will use another month of taxi data for the test set and calculate the AUC score.
taxi_test = dask_cudf.read_parquet(
"s3://saturn-public-data/nyc-taxi/data/yellow_tripdata_2019-02.parquet",
storage_options={"anon": True},
assume_missing=True,
).persist()
wait(taxi_test)
taxi_test = prep_df(taxi_test)
As of this writing, cuml.metrics.roc_auc_score
does not support Dask collections as inputs. The code below uses .compute()
to create cudf
series instead.
X_test = taxi_test.drop(columns=["target"])
y_test = taxi_test["target"]
preds = rfc.predict_proba(X_test)[1]
y_test = y_test.compute()
preds = preds.compute()
roc_auc_score(y_test, preds)
Graph the ROC Curve
fpr, tpr, _ = roc_curve(y_test.to_numpy(), preds.to_numpy())
plt.rcParams["font.size"] = "16"
fig = plt.figure(figsize=(8, 8))
plt.plot([0, 1], [0, 1], color="navy", linestyle="--")
plt.plot(fpr, tpr, color="red")
plt.legend(["Random chance", "ROC curve"])
plt.xlabel("False positive rate")
plt.ylabel("True positive rate")
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.fill_between(fpr, tpr, color="yellow", alpha=0.1)
plt.show()
Conclusion
By only changing a few lines of code, we went from training on a single GPU to a training on a GPU cluster! Wow!
Feel free to play around with parameters and the volume of data. You could, for instance, read in and train on all of 2019’s taxi data (yellow_tripdata_2019-*.parquet
). Make sure you test on a different test set!
Take a look at our other examples for more resources on running models on single and multiple GPUs!
from dask_saturn import SaturnCluster
from dask.distributed import Client, wait
import dask_cudf
from cuml.dask.ensemble import RandomForestClassifier
from cuml.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt
n_workers = 3
cluster = SaturnCluster(n_workers=n_workers)
client = Client(cluster)
client.wait_for_workers(n_workers=n_workers)
taxi = (
dask_cudf.read_parquet(
"s3://saturn-public-data/nyc-taxi/data/yellow_tripdata_2019-01.parquet",
storage_options={"anon": True},
assume_missing=True,
)
.repartition(10)
.persist()
)
wait(taxi)
num_rows = len(taxi)
memory_usage = taxi.memory_usage(deep=True).sum().compute() / 1e9
print(f"Num rows: {num_rows}, Memory Usage: {memory_usage} GB")
taxi.npartitions
def prep_df(df: dask_cudf.DataFrame) -> dask_cudf.DataFrame:
df = df[df["fare_amount"] > 0] # to avoid a divide by zero error
df["tip_fraction"] = df["tip_amount"] / df["fare_amount"]
df["target"] = df["tip_fraction"] > 0.2
df["pickup_weekday"] = df["tpep_pickup_datetime"].dt.weekday
df["pickup_hour"] = df["tpep_pickup_datetime"].dt.hour
df["pickup_week_hour"] = (df["pickup_weekday"] * 24) + df.pickup_hour
df["pickup_minute"] = df["tpep_pickup_datetime"].dt.minute
df = df[
[
"pickup_weekday",
"pickup_hour",
"pickup_week_hour",
"pickup_minute",
"passenger_count",
"PULocationID",
"DOLocationID",
"target",
]
]
df = df.astype("float32").fillna(-1)
df["target"] = df["target"].astype("int32")
return df
taxi = prep_df(taxi)
taxi["target"].value_counts(normalize=True).compute()
num_rows = len(taxi)
memory_usage = taxi.memory_usage(deep=True).sum().compute() / 1e9
print(f"Num rows: {num_rows}, Memory Usage: {memory_usage} GB")
X = taxi.drop(columns=["target"])
y = taxi["target"]
rfc = RandomForestClassifier(n_estimators=100, max_depth=10, n_streams=4)
%%time
_ = rfc.fit(X, y)
taxi_test = dask_cudf.read_parquet(
"s3://saturn-public-data/nyc-taxi/data/yellow_tripdata_2019-02.parquet",
storage_options={"anon": True},
assume_missing=True,
).persist()
wait(taxi_test)
taxi_test = prep_df(taxi_test)
X_test = taxi_test.drop(columns=["target"])
y_test = taxi_test["target"]
preds = rfc.predict_proba(X_test)[1]
y_test = y_test.compute()
preds = preds.compute()
roc_auc_score(y_test, preds)
fpr, tpr, _ = roc_curve(y_test.to_numpy(), preds.to_numpy())
plt.rcParams["font.size"] = "16"
fig = plt.figure(figsize=(8, 8))
plt.plot([0, 1], [0, 1], color="navy", linestyle="--")
plt.plot(fpr, tpr, color="red")
plt.legend(["Random chance", "ROC curve"])
plt.xlabel("False positive rate")
plt.ylabel("True positive rate")
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.fill_between(fpr, tpr, color="yellow", alpha=0.1)
plt.show()