Distributed Learning with Vineyard

With the growth of data, distributed learning is becoming a must in real-world machine learning applications, as the data size can easily exceed the memory limit of a single machine. Thus, many distributed systems addressing different workloads are developed and they share the same objective of extending users’ single machine prototypes to distributed settings with as few modifications to the code as possible.

For example, dask.dataframe mimics the API of pandas which is the de-facto standard library for single-machine structured data processing, so that users can apply their pandas code for data preprocessing in the dask cluster with few modifications. Similarly, horovod provides easy-to-use APIs for users to transfer their single-machine code in machine learning frameworks (e.g., TensorFlow, PyTorch, MXNet) to the distributed settings with only a few additional lines of code.

However, when extending to distributed learning, the data sharing between libraries within the same python process (e.g., pandas and tensorflow) becomes inter-process sharing between engines (e.g., dask and horovod), not to mention in the distributed fashion. Existing solutions using external distributed file systems are less than optimal for the huge I/O overheads.

Vineyard shares the same design principle with the aforementioned distributed systems, which aims to provide efficient cross-engine data sharing with few modifications to the existing code. Next, we demonstrate how to transfer a single-machine learing example in keras to distributed learning with dask, horovod and Vineyard.

An Example from Keras

This example uses the Covertype dataset from the UCI Machine Learning Repository. The task is to predict forest cover type from cartographic variables. The dataset includes 506,011 instances with 12 input features: 10 numerical features and 2 categorical features. Each instance is categorized into 1 of 7 classes.

The solution contains three steps:

  1. preprocess the data in pandas to extract the 12 features and the label

  2. store the preprocessed data in files

  3. define and train the model in keras

Mapping the solution to distributed learning, we have:

  1. preprocess the data in dask.dataframe

  2. share the preprocessed data using Vineyard

  3. train the model in horovod.keras

We will walk through the code as follows.

Setup

The distributed deployment of vineyard and dask is as follows: on each machine, we launch a vineyard daemon process to handle the local data storage on that machine; and we also launch a dask worker on that machine for the computation accordingly. In this notebook, we limit the machine number as 1 (i.e., the local machine) just for demonstration.

[ ]:
import vineyard
import subprocess as sp

# launch local vineyardd
client = vineyard.init()

# launch dask scheduler and worker
dask_scheduler = sp.Popen(['dask-scheduler', '--host', 'localhost'])
dask_worker = sp.Popen(['dask-worker', 'tcp://localhost:8786'])

Preprocessing the data

To read the data, we replace pd.read_csv by dd.read_csv, which will automatically read the data in parallel.

[ ]:
import dask.dataframe as dd
raw_data = dd.read_csv('covtype.data', header=None)

Then we preprocess the data using the same code from the example, except the replacement of pd.concat to dd.concat only.

[ ]:
"""
The two categorical features in the dataset are binary-encoded.
We will convert this dataset representation to the typical representation, where each
categorical feature is represented as a single integer value.
"""
import warnings
warnings.filterwarnings('ignore')

soil_type_values = [f"soil_type_{idx+1}" for idx in range(40)]
wilderness_area_values = [f"area_type_{idx+1}" for idx in range(4)]

soil_type = raw_data.loc[:, 14:53].apply(
    lambda x: soil_type_values[0::1][x.to_numpy().nonzero()[0][0]], axis=1
)
wilderness_area = raw_data.loc[:, 10:13].apply(
    lambda x: wilderness_area_values[0::1][x.to_numpy().nonzero()[0][0]], axis=1
)

CSV_HEADER = [
    "Elevation",
    "Aspect",
    "Slope",
    "Horizontal_Distance_To_Hydrology",
    "Vertical_Distance_To_Hydrology",
    "Horizontal_Distance_To_Roadways",
    "Hillshade_9am",
    "Hillshade_Noon",
    "Hillshade_3pm",
    "Horizontal_Distance_To_Fire_Points",
    "Wilderness_Area",
    "Soil_Type",
    "Cover_Type",
]

data = dd.concat(
    [raw_data.loc[:, 0:9], wilderness_area, soil_type, raw_data.loc[:, 54]],
    axis=1,
    ignore_index=True,
)
data.columns = CSV_HEADER

# Convert the target label indices into a range from 0 to 6 (there are 7 labels in total).
data["Cover_Type"] = data["Cover_Type"] - 1

Finally, instead of saving the preprocessed data into files, we store them in Vineyard.

[ ]:
import vineyard
from vineyard.core.builder import builder_context
from vineyard.contrib.dask.dask import register_dask_types

with builder_context() as builder:
    register_dask_types(builder, None) # register dask builders
    gdf_id = client.put(data, dask_scheduler='tcp://localhost:8786')
    print(gdf_id)

We saved the preprocessed data as a global dataframe in Vineyard with the ObjectID above.

Training the model

In the single machine solution from the example. A get_dataset_from_csv function is defined to load the dataset from the files of the preprocessed data as follows:

def get_dataset_from_csv(csv_file_path, batch_size, shuffle=False):

    dataset = tf.data.experimental.make_csv_dataset(
        csv_file_path,
        batch_size=batch_size,
        column_names=CSV_HEADER,
        column_defaults=COLUMN_DEFAULTS,
        label_name=TARGET_FEATURE_NAME,
        num_epochs=1,
        header=True,
        shuffle=shuffle,
    )
    return dataset.cache()

while in the training procedure, it loads the train_dataset and test_dataset seperately from two files as:

def run_experiment(model):

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
        loss=keras.losses.SparseCategoricalCrossentropy(),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )

    train_dataset = get_dataset_from_csv(train_data_file, batch_size, shuffle=True)

    test_dataset = get_dataset_from_csv(test_data_file, batch_size)

    print("Start training the model...")
    history = model.fit(train_dataset, epochs=num_epochs)
    print("Model training finished")

    _, accuracy = model.evaluate(test_dataset, verbose=0)

    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

In our solution, we provide a function to load dataset from the global dataframe generated in the last step.

[ ]:
from vineyard.core.resolver import resolver_context
from vineyard.contrib.ml.tensorflow import register_tf_types

def get_dataset_from_vineyard(object_id, batch_size, shuffle=False):
    with resolver_context() as resolver:
        register_tf_types(None, resolver) # register tf resolvers
        ds = vineyard.connect().get(object_id, label=TARGET_FEATURE_NAME) # specify the label column

    if shuffle:
        ds = ds.shuffle(len(ds))

    len_test = int(len(ds) * 0.15)
    test_dataset = ds.take(len_test).batch(batch_size)
    train_dataset = ds.skip(len_test).batch(batch_size)

    return train_dataset, test_dataset

And modify the training procedure with a few lines of horovod code.

[ ]:
import horovod.keras as hvd

def run_experiment(model):

    hvd.init()

    model.compile(
        optimizer=hvd.DistributedOptimizer(keras.optimizers.Adam(learning_rate=learning_rate)),
        loss=keras.losses.SparseCategoricalCrossentropy(),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )

    callbacks = [
        # Horovod: broadcast initial variable states from rank 0 to all other processes.
        # This is necessary to ensure consistent initialization of all workers when
        # training is started with random weights or restored from a checkpoint.
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    ]

    train_dataset, test_dataset = get_dataset_from_vineyard(gdf_id, batch_size, shuffle=True)

    print("Start training the model...")
    history = model.fit(train_dataset, epochs=num_epochs, callbacks=callbacks)
    print("Model training finished")

    _, accuracy = model.evaluate(test_dataset, verbose=0)

    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

All the other parts of training procedure are the same as the single machine solution.

[ ]:
TARGET_FEATURE_NAME = "Cover_Type"

TARGET_FEATURE_LABELS = ["0", "1", "2", "3", "4", "5", "6"]

NUMERIC_FEATURE_NAMES = [
    "Aspect",
    "Elevation",
    "Hillshade_3pm",
    "Hillshade_9am",
    "Hillshade_Noon",
    "Horizontal_Distance_To_Fire_Points",
    "Horizontal_Distance_To_Hydrology",
    "Horizontal_Distance_To_Roadways",
    "Slope",
    "Vertical_Distance_To_Hydrology",
]

CATEGORICAL_FEATURES_WITH_VOCABULARY = {
    "Soil_Type": soil_type_values,
    "Wilderness_Area": wilderness_area_values,
}

CATEGORICAL_FEATURE_NAMES = list(CATEGORICAL_FEATURES_WITH_VOCABULARY.keys())

FEATURE_NAMES = NUMERIC_FEATURE_NAMES + CATEGORICAL_FEATURE_NAMES

NUM_CLASSES = len(TARGET_FEATURE_LABELS)

learning_rate = 0.001
dropout_rate = 0.1
batch_size = 265
num_epochs = 5

hidden_units = [32, 32]

"""
## Create model inputs
Now, define the inputs for the models as a dictionary, where the key is the feature name,
and the value is a `keras.layers.Input` tensor with the corresponding feature shape
and data type.
"""
import tensorflow as tf

def create_model_inputs():
    inputs = {}
    for feature_name in FEATURE_NAMES:
        if feature_name in NUMERIC_FEATURE_NAMES:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.float32
            )
        else:
            inputs[feature_name] = layers.Input(
                name=feature_name, shape=(), dtype=tf.string
            )
    return inputs


"""
## Encode features
We create two representations of our input features: sparse and dense:
1. In the **sparse** representation, the categorical features are encoded with one-hot
encoding using the `CategoryEncoding` layer. This representation can be useful for the
model to *memorize* particular feature values to make certain predictions.
2. In the **dense** representation, the categorical features are encoded with
low-dimensional embeddings using the `Embedding` layer. This representation helps
the model to *generalize* well to unseen feature combinations.
"""


from tensorflow.keras.layers import StringLookup


def encode_inputs(inputs, use_embedding=False):
    encoded_features = []
    for feature_name in inputs:
        if feature_name in CATEGORICAL_FEATURE_NAMES:
            vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]
            # Create a lookup to convert string values to an integer indices.
            # Since we are not using a mask token nor expecting any out of vocabulary
            # (oov) token, we set mask_token to None and  num_oov_indices to 0.
            lookup = StringLookup(
                vocabulary=vocabulary,
                mask_token=None,
                num_oov_indices=0,
                output_mode="int" if use_embedding else "binary",
            )
            if use_embedding:
                # Convert the string input values into integer indices.
                encoded_feature = lookup(inputs[feature_name])
                embedding_dims = int(math.sqrt(len(vocabulary)))
                # Create an embedding layer with the specified dimensions.
                embedding = layers.Embedding(
                    input_dim=len(vocabulary), output_dim=embedding_dims
                )
                # Convert the index values to embedding representations.
                encoded_feature = embedding(encoded_feature)
            else:
                # Convert the string input values into a one hot encoding.
                encoded_feature = lookup(tf.expand_dims(inputs[feature_name], -1))
        else:
            # Use the numerical features as-is.
            encoded_feature = tf.expand_dims(inputs[feature_name], -1)

        encoded_features.append(encoded_feature)

    all_features = layers.concatenate(encoded_features)
    return all_features


"""
## Experiment 1: a baseline model
In the first experiment, let's create a multi-layer feed-forward network,
where the categorical features are one-hot encoded.
"""
from tensorflow import keras
from tensorflow.keras import layers

def create_baseline_model():
    inputs = create_model_inputs()
    features = encode_inputs(inputs)

    for units in hidden_units:
        features = layers.Dense(units)(features)
        features = layers.BatchNormalization()(features)
        features = layers.ReLU()(features)
        features = layers.Dropout(dropout_rate)(features)

    outputs = layers.Dense(units=NUM_CLASSES, activation="softmax")(features)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model


baseline_model = create_baseline_model()

Let’s run it:

[ ]:
run_experiment(baseline_model)

We clear the environments in the end.

[ ]:
dask_worker.terminate()
dask_scheduler.terminate()

vineyard.shutdown()

Finally, we can use horovodrun to run the above code distributedly in a cluster for distributed learning on big datasets.

Conclusion

From this example, we can see that with the help of Vineyard, users can easily extend their single machine solutions to distributed learning using dedicated systems without worrying about the cross-system data sharing issues.