Machine learning models have achieved state-of-the-art results for a wide variety of complex problems including recommandation, prevision and classification. The emerging field has been evolving quickly but is still lacking tools to correctly manage production pipelines. Compared to the software development world, machine learning has some own specificities that need to be adressed differently.
This article takes a deeper look on a recent technology (tensorflow extended, developped by Google) which provides a python API to build machine learning pipelines. It provides sample code that can be run standalone in this notebook or can be adapted for your own needs.
Creating a TFX pipeline for sentiment analysis with the IMDB reviews dataset
Downloading, exploring and storing the IMDB film reviews dataset
import tensorflow_datasets
import pandas as pd
train_data, validation_data, test_data = tensorflow_datasets.load(
name="imdb_reviews",
split=("train[:75%]", "train[75%:]", "test"),
as_supervised=True,
)
pd.DataFrame(
train_data.take(5).as_numpy_iterator(),
columns=["reviews", "is_positive"]
)
outputs:

import tensorflow as tf
dataset_filenames = {
"imdb_train": train_data,
"imdb_validation": validation_data,
"imdb_test": test_data,
}
for filename, tf_dataset in dataset_filenames.items():
with tf.io.TFRecordWriter(f"data/{filename}.tfrecord") as f_write:
for text, label in tf_dataset:
feature = {
"keras_layer_input": tf.train.Feature(bytes_list=tf.train.BytesList(value=[text.numpy()])),
"label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label.numpy()])),
}
example = tf.train.Example(features=tf.train.Features(feature=feature))
f_write.write(example.SerializeToString())
Creating the model
%%writefile {"models.py"}
from typing import List
import tensorflow as tf
import tensorflow_hub
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
def create_model() -> tf.keras.models.Model:
model = tf.keras.Sequential()
hub_layer = tensorflow_hub.KerasLayer(
"https://tfhub.dev/google/nnlm-en-dim50/2",
input_shape=[],
dtype=tf.string,
trainable=True
)
model.add(hub_layer)
model.add(tf.keras.layers.Dense(16, activation="relu"))
model.add(tf.keras.layers.Dense(1, activation="sigmoid"))
model.compile(
optimizer="adam",
loss=tf.keras.losses.BinaryCrossentropy(),
metrics=["accuracy"]
)
return model
def _input_fn(
file_pattern: List[str],
data_accessor: tfx.components.DataAccessor,
schema: schema_pb2.Schema,
) -> tf.data.Dataset:
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=256,
num_epochs=1,
label_key="label",
),
schema,
)
def run_fn(fn_args: tfx.components.FnArgs) -> None:
"""This function is named as required by tfx"""
feature_spec = {
"keras_layer_input": tf.io.FixedLenFeature(shape=[1], dtype=tf.string),
"label": tf.io.FixedLenFeature(shape=[1], dtype=tf.int64),
}
schema = schema_utils.schema_from_feature_spec(
feature_spec
)
train_data = _input_fn(
file_pattern=fn_args.train_files,
data_accessor=fn_args.data_accessor,
schema=schema,
)
validation_data = _input_fn(
file_pattern=fn_args.eval_files,
data_accessor=fn_args.data_accessor,
schema=schema,
)
model = create_model()
model.fit(
train_data,
validation_data=validation_data,
epochs=5,
verbose=1,
)
model.save(fn_args.serving_model_dir, save_format="tf")
Creating the pipeline
from tfx import v1 as tfx
from tfx.proto import example_gen_pb2
def create_pipeline() -> tfx.dsl.Pipeline:
input_config = tfx.proto.Input(splits=[
example_gen_pb2.Input.Split(name="train", pattern="imdb_train*"),
example_gen_pb2.Input.Split(name="eval", pattern="imdb_validation*")
])
example_gen = tfx.components.ImportExampleGen(input_base=f"data/", input_config=input_config)
trainer = tfx.components.Trainer(
examples=example_gen.outputs['examples'],
module_file="models.py",
train_args=tfx.proto.TrainArgs(num_steps=100),
eval_args=tfx.proto.EvalArgs(num_steps=5)
)
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory="models/"
)
)
)
components = [
example_gen,
trainer,
pusher,
]
return tfx.dsl.Pipeline(
pipeline_name="imdb_pipeline",
pipeline_root="models",
components=components,
metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(
"metadata.db"
)
)
pipeline = create_pipeline()
Running the pipeline
tfx.orchestration.LocalDagRunner().run(
pipeline
)
Playing with the model
import numpy as np
import os
_REVIEWS = [
"I genuinely liked it. Worth your time!",
"Watchable at best. I would not recommend it.",
"It was a wonderful example of how great a film can be",
"I did not like this film at all. The main actor is not charismatic, and I also found that the story is not immersive.",
]
def get_last_model(directory: str) -> str:
for root, dirs, files in os.walk(directory):
for sub_directory in sorted(dirs, reverse=True):
try:
int(sub_directory)
return os.path.join(root, sub_directory)
except ValueError:
pass
model_dir = get_last_model("models/")
print(f"using model {model_dir}\n")
input = np.array([
[val]
for val
in _REVIEWS
])
model = tf.keras.models.load_model(model_dir)
predictions = model.predict(input)
for text_review, prediction in zip(_REVIEWS, predictions):
print(text_review, prediction)
here are the results:
Going further
Sources and ending thoughts