Introduction to TensorFlow Extended (TFX)

Running a TFX Pipeline: From Definition to Deployment

Introduction to TensorFlow Extended (TFX)

TensorFlow Extended (TFX) is an end-to-end platform for deploying production machine learning (ML) pipelines. TFX allows data scientists and ML engineers to build, evaluate, and deploy ML models in a scalable, reliable, and reproducible manner. This article will introduce you to the core components of TFX, provide practical examples using the Iris dataset, and guide you through building a simple TFX pipeline.

What is TFX?

TFX is a production-ready ML platform designed to help you build, deploy, and manage ML models. It consists of a set of libraries and tools that help automate and manage the ML lifecycle. TFX pipelines are portable and can run on various platforms, including Apache Beam, Apache Airflow, and Kubeflow.

Key Features of TFX

  • Scalability: TFX can handle large-scale data processing and training.

  • Portability: Pipelines can run on different platforms and environments.

  • Modularity: TFX components are designed to be modular, allowing you to customize and extend them as needed.

  • Production-Ready: TFX is built with production deployment in mind, ensuring reliability and robustness.

TFX Components

TFX pipelines are composed of several components, each responsible for a specific part of the ML lifecycle. Here are the core components:

ExampleGen

ExampleGen is the first component in a TFX pipeline. It ingests and splits data into training and evaluation datasets. ExampleGen supports various data sources, such as CSV files, TFRecord files, and BigQuery.

from tfx.components import CsvExampleGen

example_gen = CsvExampleGen(input_base='/content/iris.csv')

StatisticsGen

StatisticsGen computes statistics over the data for data visualization and validation. It generates statistics using TensorFlow Data Validation (TFDV).

from tfx.components import StatisticsGen

statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

SchemaGen

SchemaGen generates a schema for the data based on the statistics computed by StatisticsGen. The schema includes information about the data types, domains, and constraints.

from tfx.components import SchemaGen

schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])

ExampleValidator

ExampleValidator detects anomalies in the data by comparing the data against the schema generated by SchemaGen. It helps ensure data quality.

from tfx.components import ExampleValidator

example_validator = ExampleValidator(statistics=statistics_gen.outputs['statistics'], schema=schema_gen.outputs['schema'])

Transform

Transform performs feature engineering and data transformation using TensorFlow Transform (TFT). It preprocesses the data for model training and serving.

from tfx.components import Transform

def preprocessing_fn(inputs):
    outputs = {
        'sepal_length': Transform.scale_to_z_score(inputs['sepal.length']),
        'sepal_width': Transform.scale_to_z_score(inputs['sepal.width']),
        'petal_length': Transform.scale_to_z_score(inputs['petal.ength']),
        'petal_width': Transform.scale_to_z_score(inputs['petal.width']),
        'species': inputs['variety']
    }
    return outputs

transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file='/content/preprocessing.py'
)

Trainer

Trainer trains an ML model using the preprocessed data. It supports various frameworks, including TensorFlow, Keras, and Estimator.

from tfx.components import Trainer
from tfx.proto import trainer_pb2

trainer = Trainer(
    module_file='/content/trainer_module.py',
    examples=transform.outputs['transformed_examples'],
    schema=schema_gen.outputs['schema'],
    transform_graph=transform.outputs['transform_graph'],
    train_args=trainer_pb2.TrainArgs(num_steps=1000),
    eval_args=trainer_pb2.EvalArgs(num_steps=100)
)

Evaluator

Evaluator evaluates the trained model using TensorFlow Model Analysis (TFMA). It helps in validating and comparing different models.

import tensorflow_model_analysis as tfma

eval_config = tfma.EvalConfig(
    slicing_specs=[tfma.SlicingSpec()],
    metrics_specs=[
        tfma.MetricsSpec(
            metrics=[
                tfma.MetricConfig(class_name='SparseCategoricalAccuracy')
            ]
        )
    ]
)

evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    eval_config=eval_config
)

Pusher

Pusher deploys the trained model to a serving infrastructure. It ensures that the model meets certain criteria before pushing it to production.

from tfx.components import Pusher
from tfx.proto import pusher_pb2

pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory='/content/model'
        )
    )
)

Setting Up TFX

Before building a TFX pipeline, it's essential to set up the environment. This involves installing the necessary packages and configuring the runtime environment.

Installing TFX: TFX can be installed via pip. The installation includes all the required libraries and dependencies for running TFX components.

pip install tfx

Configuring the Environment: Setting up the environment involves configuring paths for data, pipelines, and model artifacts. This ensures that all components can access the necessary resources and save outputs in the correct locations.

Building a Simple TFX Pipeline

To illustrate the capabilities of TFX, we will build a pipeline using the Iris dataset, a well-known dataset for classification tasks. The Iris dataset contains 150 samples of iris flowers, each with four features (sepal length, sepal width, petal length, petal width) and a class label (species).

Data Ingestion

The first step in the TFX pipeline is to ingest the Iris dataset using ExampleGen. This component reads the dataset, splits it into training and evaluation sets, and converts it into the TFX internal format.

CSV ExampleGen: For the Iris dataset, we use the CsvExampleGen component, which ingests data from CSV files. It automatically splits the data into training and evaluation sets based on a specified ratio.

from tfx.components import CsvExampleGen

example_gen = CsvExampleGen(input_base='/content/iris.csv')

Data Statistics

StatisticsGen computes descriptive statistics for the dataset, providing insights into data distributions and detecting anomalies. It uses TensorFlow Data Validation (TFDV) to generate statistics such as mean, median, and standard deviation for each feature.

from tfx.components import StatisticsGen

statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

Importance: Understanding the distribution of data is crucial for identifying potential issues and ensuring that the data is suitable for training. StatisticsGen helps detect anomalies such as outliers and missing values, which can affect model performance.

Schema Generation

Based on the statistics computed by StatisticsGen, SchemaGen generates a schema for the dataset. The schema includes information about feature types, value ranges, and presence constraints, serving as a blueprint for data validation and transformation.

from tfx.components import SchemaGen

schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])

Definition: The schema defines the expected structure of the data, including feature types (numeric, categorical, etc.), value ranges, and constraints (e.g., required features). This information is critical for ensuring data consistency and preparing it for model training.

Data Validation

ExampleValidator validates the dataset against the schema, identifying anomalies and missing values. It ensures that the data adheres to the expected format, which is essential for training reliable models.

from tfx.components import ExampleValidator

example_validator = ExampleValidator(statistics=statistics_gen.outputs['statistics'], schema=schema_gen.outputs['schema'])

Anomaly Detection: ExampleValidator detects anomalies such as outliers, missing values, and unexpected feature values. These issues can affect model performance and lead to unreliable predictions, making data validation a crucial step in the pipeline.

Data Transformation

Transform performs feature engineering and data preprocessing using TensorFlow Transform (TFT). It applies transformations such as scaling, normalization, and encoding, preparing the data for model training.

from tfx.components import Transform

def preprocessing_fn(inputs):
    # Normalize the numeric features
    outputs = {
        'sepal_length': Transform.scale_to_z_score(inputs['sepal.length']),
        'sepal_width': Transform.scale_to_z_score(inputs['sepal.width']),
        'petal_length': Transform.scale_to_z_score(inputs['petal.ength']),
        'petal_width': Transform.scale_to_z_score(inputs['petal.width']),
        'species': inputs['variety']
    }
    return outputs

transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file='/content/preprocessing.py'
)

Preprocessing Functions: Transform defines preprocessing functions that apply transformations to the raw data. These functions can include operations such as scaling numerical features, encoding categorical features, and generating new features based on existing ones.

Model Training

The Trainer component trains the ML model using the transformed data. It leverages TensorFlow's capabilities to define, train, and evaluate models.

import tensorflow as tf
from tfx import v1 as tfxio

def _input_fn(file_pattern, data_accessor, schema, batch_size=200):
    raw_data = data_accessor.tf_dataset_factory(
        file_pattern, tfxio.TensorFlowDatasetOptions(batch_size=batch_size), schema)
    transformed_data = raw_data.map(_parse_fn)
    return transformed_data

def _build_keras_model():
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(4,)),
        tf.keras.layers.Dense(10, activation='relu'),
        tf.keras.layers.Dense(10, activation='relu'),
        tf.keras.layers.Dense(3, activation='softmax')
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(lr=0.001),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    return model

def run_fn(fn_args):
    schema = tfx_bsl.public.tfxio_utils.get_tfx_schema_from_tensorflow_metadata_schema(
        fn_args.schema)
    train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor, schema, batch_size=200)
    eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor, schema, batch_size=200)

    model = _build_keras_model()
    model.fit(train_dataset, steps_per_epoch=fn_args.train_steps,
              validation_data=eval_dataset, validation_steps=fn_args.eval_steps)
    model.save(fn_args.serving_model_dir, save_format='tf')

Model Definition: Trainer uses a module file containing the model definition and training logic. This file defines the architecture of the model, the loss function, and the optimization algorithm. It also includes the training and evaluation steps, specifying the number of epochs, batch size, and evaluation metrics.

Trainer

Trainer trains an ML model using the preprocessed data. It supports various frameworks, including TensorFlow, Keras, and Estimator


from tfx.components import Trainer
from tfx.proto import trainer_pb2

trainer = Trainer(
    module_file='/content/trainer_module.py',
    examples=transform.outputs['transformed_examples'],
    schema=schema_gen.outputs['schema'],
    transform_graph=transform.outputs['transform_graph'],
    train_args=trainer_pb2.TrainArgs(num_steps=1000),
    eval_args=trainer_pb2.EvalArgs(num_steps=100)
)

Model Evaluation

Evaluator evaluates the trained model using TensorFlow Model Analysis (TFMA). It performs a detailed analysis of model performance, identifying potential issues and ensuring that the model meets the desired criteria before deployment.

import tensorflow_model_analysis as tfma

eval_config = tfma.EvalConfig(
    slicing_specs=[tfma.SlicingSpec()],
    metrics_specs=[
        tfma.MetricsSpec(
            metrics=[
                tfma.MetricConfig(class_name='SparseCategoricalAccuracy')
            ]
        )
    ]
)

evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    eval_config=eval_config
)

Evaluation Configuration: Evaluator uses an evaluation configuration to specify the metrics and slicing specifications for model evaluation. Metrics such as accuracy, precision, and recall are used to assess model performance, while slicing specifications allow for analyzing performance across different subsets of the data.

Model Deployment

The final step in the TFX pipeline is to deploy the validated model using Pusher. This component ensures that only the best models are deployed to the serving infrastructure.

from tfx.components import Pusher
from tfx.proto import pusher_pb2

pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory='/content/model'
        )
    )
)

Model Deployment: Pusher deploys the model to a specified serving infrastructure, such as TensorFlow Serving. It ensures that the model is production-ready and meets the desired performance criteria, facilitating continuous model improvement.

Running the Pipeline

The pipeline is executed using an orchestrator like the Local DAG Runner, which processes the data, trains the model, evaluates its performance, and deploys it if it meets the required criteria.

from tfx.orchestration import pipeline
from tfx.orchestration.local.local_dag_runner import LocalDagRunner

# Define the pipeline
pipeline = pipeline.Pipeline(
    pipeline_name='iris_pipeline',
    pipeline_root='/content/iris_pipeline',
    components=[example_gen, statistics_gen, schema_gen, example_validator, transform, trainer, evaluator, pusher],
    enable_cache=True,
    metadata_connection_config=None
)

# Run the pipeline
LocalDagRunner().run(pipeline)

After execution, the pipeline produces outputs such as transformed data, trained models, evaluation results, and metadata, all stored in a specified directory for further analysis and deployment.

Monitoring involves checking logs and reviewing evaluation metrics, while debugging includes inspecting artifacts and re-executing specific components to resolve any issues. This process ensures the pipeline runs smoothly, producing reliable and scalable machine learning models ready for deployment.

Conclusion

In this article, we introduced TensorFlow Extended (TFX) and its core components. We demonstrated how to set up a TFX environment, build a simple TFX pipeline using the Iris dataset, and run it. TFX provides a powerful and flexible framework for managing the end-to-end ML lifecycle, from data ingestion to model deployment. You should now have a solid foundation for building and deploying your own TFX pipelines.

TFX's modularity and scalability make it suitable for a wide range of ML applications, ensuring that you can build robust and production-ready ML systems. Happy experimenting with TFX!

Resources

GitHub Gist to the notebook