Machine learning models have achieved state-of-the-art results for a wide variety of complex problems including recommandation, prevision and classification. The emerging field has been evolving quickly but is still lacking tools to correctly manage production pipelines. Compared to the software development world, machine learning has some own specificities that need to be adressed differently.

 

 

This article takes a deeper look on a recent technology (tensorflow extended, developped by Google) which provides a python API to build machine learning pipelines. It provides sample code that can be run standalone in this notebook or can be adapted for your own needs.

 

Creating a TFX pipeline for sentiment analysis with the IMDB reviews dataset

 

This tutorial aims at creating a simple tensorflow extended (TFX) pipeline to train a binary classifier for sentiment analysis on IMDB film reviews. TFX has been recently popularized among machine learning communities for the production capabilities it provides.
 
TFX pipelines are directed acyclic graphs (DAGs) that define a succession of steps (or components in TFX), each providing a defined functionality. In this example the pipeline consists in the three TFX components:
 
- An ExampleGenerator to provide data into our pipeline
- A Trainer to train our model
- A Pusher to deploy our model (locally)
 
It's a very minimalistic ML OPS pipeline that will hopefully help you grasp the basic concepts of TFX. Let's dive in.
 

Downloading, exploring and storing the IMDB film reviews dataset

 

The IMDB film reviews dataset consists in 50k samples of labelled text for binary sentiment classification. By default it is splitted in a training and a testing set (25k samples each). We further split the training set into a training and a validation set.

Once downloaded, the data is stored locally as tensorflow records for efficient storage on disk and fast I/O.

Lets's first take a look to the data ! Learn more about the IMDB dataset.
 
import tensorflow_datasets
import pandas as pd


train_data, validation_data, test_data = tensorflow_datasets.load(
    name="imdb_reviews",
    split=("train[:75%]", "train[75%:]", "test"),
    as_supervised=True,
)

pd.DataFrame(
    train_data.take(5).as_numpy_iterator(), 
    columns=["reviews", "is_positive"]
)
outputs:
 
TFX has several built-in components to ingest data into your pipelines depending of the input format. For this example, we will use tf.Records format to store data on disk as it is efficient regarding both storage and I/O. If you want to have a look at available sources and formats supported by TFX, please refer to official documentation for examleGen components.
import tensorflow as tf


dataset_filenames = {
    "imdb_train": train_data,
    "imdb_validation": validation_data,
    "imdb_test": test_data,
}

for filename, tf_dataset in dataset_filenames.items():
    with tf.io.TFRecordWriter(f"data/{filename}.tfrecord") as f_write:
        for text, label in tf_dataset:
            feature = {
                "keras_layer_input": tf.train.Feature(bytes_list=tf.train.BytesList(value=[text.numpy()])),
                "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label.numpy()])),
            }
            example = tf.train.Example(features=tf.train.Features(feature=feature))
            f_write.write(example.SerializeToString())

Creating the model

 

Our model consists of a pretrained text embedding followed by a two layers neural network. As we are trying to do a binary classification, binary cross-entropy should be a good loss function.

In TFX paradigm the model should be defined in a module file with a run_fn function (take a close look at the function signature).
 
%%writefile {"models.py"}


from typing import List

import tensorflow as tf
import tensorflow_hub
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio


def create_model() -> tf.keras.models.Model:
    model = tf.keras.Sequential()
    hub_layer = tensorflow_hub.KerasLayer(
        "https://tfhub.dev/google/nnlm-en-dim50/2",
        input_shape=[],
        dtype=tf.string,
        trainable=True
    )
    model.add(hub_layer)
    model.add(tf.keras.layers.Dense(16, activation="relu"))
    model.add(tf.keras.layers.Dense(1, activation="sigmoid"))

    model.compile(
        optimizer="adam",
        loss=tf.keras.losses.BinaryCrossentropy(),
        metrics=["accuracy"]
    )

    return model


def _input_fn(
    file_pattern: List[str],
    data_accessor: tfx.components.DataAccessor,
    schema: schema_pb2.Schema,
) -> tf.data.Dataset:
    return data_accessor.tf_dataset_factory(
        file_pattern,
        tfxio.TensorFlowDatasetOptions(
            batch_size=256,
            num_epochs=1,
            label_key="label",
        ),
        schema,
    )


def run_fn(fn_args: tfx.components.FnArgs) -> None:
    """This function is named as required by tfx"""

    feature_spec = {
        "keras_layer_input": tf.io.FixedLenFeature(shape=[1], dtype=tf.string),
        "label": tf.io.FixedLenFeature(shape=[1], dtype=tf.int64),
    }
    schema = schema_utils.schema_from_feature_spec(
        feature_spec
    )

    train_data = _input_fn(
        file_pattern=fn_args.train_files,
        data_accessor=fn_args.data_accessor,
        schema=schema,
    )
    validation_data = _input_fn(
        file_pattern=fn_args.eval_files,
        data_accessor=fn_args.data_accessor,
        schema=schema,
    )

    model = create_model()
    model.fit(
        train_data,
        validation_data=validation_data,
        epochs=5,
        verbose=1,
    )
    model.save(fn_args.serving_model_dir, save_format="tf")

Creating the pipeline

 

Now that we have store the data and define the model, it is time to implement the TFX pipeline. As described earlier, a pipeline is a directed acyclic graph of several TFX components.

The pipeline that we are going to define in this example is straightforward:

ExampleGen ▶ Trainer ▶ Pusher

Creating a pipeline using TFX API is easy. Each node of our graph is a TFX component, and dependencies are defined directly at each component level.
 
from tfx import v1 as tfx
from tfx.proto import example_gen_pb2


def create_pipeline() -> tfx.dsl.Pipeline:
    input_config = tfx.proto.Input(splits=[
        example_gen_pb2.Input.Split(name="train", pattern="imdb_train*"),
        example_gen_pb2.Input.Split(name="eval", pattern="imdb_validation*")
    ])
    example_gen = tfx.components.ImportExampleGen(input_base=f"data/", input_config=input_config)
    trainer = tfx.components.Trainer(
        examples=example_gen.outputs['examples'],
        module_file="models.py",
        train_args=tfx.proto.TrainArgs(num_steps=100),
        eval_args=tfx.proto.EvalArgs(num_steps=5)
    )
    pusher = tfx.components.Pusher(
        model=trainer.outputs['model'],
        push_destination=tfx.proto.PushDestination(
            filesystem=tfx.proto.PushDestination.Filesystem(
                base_directory="models/"
            )
        )
    )
    components = [
        example_gen,
        trainer,
        pusher,
    ]
    return tfx.dsl.Pipeline(
        pipeline_name="imdb_pipeline",
        pipeline_root="models",
        components=components,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(
            "metadata.db"
        )
    )

pipeline = create_pipeline()

Running the pipeline

 

TFX supports integration with different tools such as Apache Airflow or Kubeflow for running your pipeline in a production environment.

For the example here we are just going to run the pipeline locally.
 
tfx.orchestration.LocalDagRunner().run(
    pipeline
)

Playing with the model

 

If everything worked properly, you should expect your model to be stored under the models directory. The next step would probably be to evaluate the model using the test set that we stored earlier.

Let's play with it using some generated film reviews instead (feel free to add yours !)
 
import numpy as np
import os


_REVIEWS = [
    "I genuinely liked it. Worth your time!",
    "Watchable at best. I would not recommend it.",
    "It was a wonderful example of how great a film can be",
    "I did not like this film at all. The main actor is not charismatic, and I also found that the story is not immersive.",
]


def get_last_model(directory: str) -> str:
    for root, dirs, files in os.walk(directory):
        for sub_directory in sorted(dirs, reverse=True):
            try:
                int(sub_directory)
                return os.path.join(root, sub_directory)
            except ValueError:
                pass


model_dir = get_last_model("models/")
print(f"using model {model_dir}\n")

input = np.array([
    [val]
    for val
    in _REVIEWS
])

model = tf.keras.models.load_model(model_dir)
predictions = model.predict(input)

for text_review, prediction in zip(_REVIEWS, predictions):
  print(text_review, prediction)
here are the results:

Going further

 

In this article, we took a look at how to use Tensorflow extended to implement a simple pipeline for a machine learning use case. Real world application are usually much more complex but TFX provides a wide variety of components to answer commonly encountered issues regarding machine learning:

* An evaluator to automatically evaluate the model performance
* An example validator component to validate the data (outliers, etc.)
* An infrastructure validator to ensure proper behavior of the model on a given infrastructure
* A tuner for hyper parameters tuning.
* The pusher component can be configured to only deploy the model depending on other component's outputs (infrastructure validator, model evaluator) to ensure no major regression on production environment.


Sources and ending thoughts

 

This article has mainly been made using TFX's official penguin tutorial and Keras tutorial on IMDB classification.

As TFX is still a recent technology, the online documentation is sometimes lackluster and there is not a lot of content shared by the community. This tutorial aims at providing a simple TFX pipeline using an already explored machine learning use case. I hope you will find it useful.