handson-ml/13_loading_and_preprocessin...

1746 lines
46 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Chapter 13 Loading and Preprocessing Data with TensorFlow**\n",
"\n",
"_This notebook contains all the sample code and solutions to the exercises in chapter 13._"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Setup"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First, let's import a few common modules, ensure MatplotLib plots figures inline and prepare a function to save the figures. We also check that Python 3.5 or later is installed (although Python 2.x may work, it is deprecated so we strongly recommend you use Python 3 instead), as well as Scikit-Learn ≥0.20 and TensorFlow ≥2.0-preview."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# Python ≥3.5 is required\n",
"import sys\n",
"assert sys.version_info >= (3, 5)\n",
"\n",
"# Scikit-Learn ≥0.20 is required\n",
"import sklearn\n",
"assert sklearn.__version__ >= \"0.20\"\n",
"\n",
"# TensorFlow ≥2.0-preview is required\n",
"import tensorflow as tf\n",
"from tensorflow import keras\n",
"assert tf.__version__ >= \"2.0\"\n",
"\n",
"# Common imports\n",
"import numpy as np\n",
"import os\n",
"\n",
"# to make this notebook's output stable across runs\n",
"np.random.seed(42)\n",
"\n",
"# To plot pretty figures\n",
"%matplotlib inline\n",
"import matplotlib as mpl\n",
"import matplotlib.pyplot as plt\n",
"mpl.rc('axes', labelsize=14)\n",
"mpl.rc('xtick', labelsize=12)\n",
"mpl.rc('ytick', labelsize=12)\n",
"\n",
"# Where to save the figures\n",
"PROJECT_ROOT_DIR = \".\"\n",
"CHAPTER_ID = \"data\"\n",
"IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, \"images\", CHAPTER_ID)\n",
"os.makedirs(IMAGES_PATH, exist_ok=True)\n",
"\n",
"def save_fig(fig_id, tight_layout=True, fig_extension=\"png\", resolution=300):\n",
" path = os.path.join(IMAGES_PATH, fig_id + \".\" + fig_extension)\n",
" print(\"Saving figure\", fig_id)\n",
" if tight_layout:\n",
" plt.tight_layout()\n",
" plt.savefig(path, format=fig_extension, dpi=resolution)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Datasets"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"X = tf.range(10)\n",
"dataset = tf.data.Dataset.from_tensor_slices(X)\n",
"dataset"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Equivalently:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"dataset = tf.data.Dataset.range(10)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"for item in dataset:\n",
" print(item)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"tags": [
"raises-exception"
]
},
"outputs": [],
"source": [
"dataset = dataset.repeat(3).batch(7)\n",
"for item in dataset:\n",
" print(item)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"dataset = dataset.map(lambda x: x * 2)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"for item in dataset:\n",
" print(item)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"dataset = dataset.apply(tf.data.experimental.unbatch())"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"dataset = dataset.filter(lambda x: x < 10) # keep only items < 10"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"for item in dataset.take(3):\n",
" print(item)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"dataset = tf.data.Dataset.range(10).repeat(3)\n",
"dataset = dataset.shuffle(buffer_size=3, seed=42).batch(7)\n",
"for item in dataset:\n",
" print(item)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Split the California dataset to multiple CSV files"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's start by loading and preparing the California housing dataset. We first load it, then split it into a training set, a validation set and a test set, and finally we scale it:"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.datasets import fetch_california_housing\n",
"from sklearn.model_selection import train_test_split\n",
"from sklearn.preprocessing import StandardScaler\n",
"\n",
"housing = fetch_california_housing()\n",
"X_train_full, X_test, y_train_full, y_test = train_test_split(\n",
" housing.data, housing.target.reshape(-1, 1), random_state=42)\n",
"X_train, X_valid, y_train, y_valid = train_test_split(\n",
" X_train_full, y_train_full, random_state=42)\n",
"\n",
"scaler = StandardScaler()\n",
"scaler.fit(X_train)\n",
"X_mean = scaler.mean_\n",
"X_std = scaler.scale_"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For a very large dataset that does not fit in memory, you will typically want to split it into many files first, then have TensorFlow read these files in parallel. To demonstrate this, let's start by splitting the housing dataset and save it to 20 CSV files:"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"def save_to_multiple_csv_files(data, name_prefix, header=None, n_parts=10):\n",
" housing_dir = os.path.join(\"datasets\", \"housing\")\n",
" os.makedirs(housing_dir, exist_ok=True)\n",
" path_format = os.path.join(housing_dir, \"my_{}_{:02d}.csv\")\n",
"\n",
" filepaths = []\n",
" m = len(data)\n",
" for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):\n",
" part_csv = path_format.format(name_prefix, file_idx)\n",
" filepaths.append(part_csv)\n",
" with open(part_csv, \"wt\", encoding=\"utf-8\") as f:\n",
" if header is not None:\n",
" f.write(header)\n",
" f.write(\"\\n\")\n",
" for row_idx in row_indices:\n",
" f.write(\",\".join([repr(col) for col in data[row_idx]]))\n",
" f.write(\"\\n\")\n",
" return filepaths"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"train_data = np.c_[X_train, y_train]\n",
"valid_data = np.c_[X_valid, y_valid]\n",
"test_data = np.c_[X_test, y_test]\n",
"header_cols = housing.feature_names + [\"MedianHouseValue\"]\n",
"header = \",\".join(header_cols)\n",
"\n",
"train_filepaths = save_to_multiple_csv_files(train_data, \"train\", header, n_parts=20)\n",
"valid_filepaths = save_to_multiple_csv_files(valid_data, \"valid\", header, n_parts=10)\n",
"test_filepaths = save_to_multiple_csv_files(test_data, \"test\", header, n_parts=10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Okay, now let's take a peek at the first few lines of one of these CSV files:"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"pd.read_csv(train_filepaths[0]).head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Or in text mode:"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"with open(train_filepaths[0]) as f:\n",
" for i in range(5):\n",
" print(f.readline(), end=\"\")"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"train_filepaths"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Building an Input Pipeline"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"for filepath in filepath_dataset:\n",
" print(filepath)"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [],
"source": [
"n_readers = 5\n",
"dataset = filepath_dataset.interleave(\n",
" lambda filepath: tf.data.TextLineDataset(filepath).skip(1),\n",
" cycle_length=n_readers)"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"for line in dataset.take(5):\n",
" print(line.numpy())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Notice that field 4 is interpreted as a string."
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"record_defaults=[0, np.nan, tf.constant(np.nan, dtype=tf.float64), \"Hello\", tf.constant([])]\n",
"parsed_fields = tf.io.decode_csv('1,2,3,4,5', record_defaults)\n",
"parsed_fields"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Notice that all missing fields are replaced with their default value, when provided:"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
"parsed_fields = tf.io.decode_csv(',,,,5', record_defaults)\n",
"parsed_fields"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The 5th field is compulsory (since we provided `tf.constant([])` as the \"default value\"), so we get an exception if we do not provide it:"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [],
"source": [
"try:\n",
" parsed_fields = tf.io.decode_csv(',,,,', record_defaults)\n",
"except tf.errors.InvalidArgumentError as ex:\n",
" print(ex)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The number of fields should match exactly the number of fields in the `record_defaults`:"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
"try:\n",
" parsed_fields = tf.io.decode_csv('1,2,3,4,5,6,7', record_defaults)\n",
"except tf.errors.InvalidArgumentError as ex:\n",
" print(ex)"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {
"scrolled": false
},
"outputs": [],
"source": [
"n_inputs = 8 # X_train.shape[-1]\n",
"\n",
"@tf.function\n",
"def preprocess(line):\n",
" defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]\n",
" fields = tf.io.decode_csv(line, record_defaults=defs)\n",
" x = tf.stack(fields[:-1])\n",
" y = tf.stack(fields[-1:])\n",
" return (x - X_mean) / X_std, y"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [],
"source": [
"preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [],
"source": [
"def csv_reader_dataset(filepaths, repeat=1, n_readers=5,\n",
" n_read_threads=None, shuffle_buffer_size=10000,\n",
" n_parse_threads=5, batch_size=32):\n",
" dataset = tf.data.Dataset.list_files(filepaths).repeat(repeat)\n",
" dataset = dataset.interleave(\n",
" lambda filepath: tf.data.TextLineDataset(filepath).skip(1),\n",
" cycle_length=n_readers, num_parallel_calls=n_read_threads)\n",
" dataset = dataset.shuffle(shuffle_buffer_size)\n",
" dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)\n",
" dataset = dataset.batch(batch_size)\n",
" return dataset.prefetch(1)"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [],
"source": [
"train_set = csv_reader_dataset(train_filepaths, batch_size=3)\n",
"for X_batch, y_batch in train_set.take(2):\n",
" print(\"X =\", X_batch)\n",
" print(\"y =\", y_batch)\n",
" print()"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [],
"source": [
"train_set = csv_reader_dataset(train_filepaths, repeat=None)\n",
"valid_set = csv_reader_dataset(valid_filepaths)\n",
"test_set = csv_reader_dataset(test_filepaths)"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [],
"source": [
"model = keras.models.Sequential([\n",
" keras.layers.Dense(30, activation=\"relu\", input_shape=X_train.shape[1:]),\n",
" keras.layers.Dense(1),\n",
"])"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [],
"source": [
"model.compile(loss=\"mse\", optimizer=keras.optimizers.SGD(lr=1e-3))"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [],
"source": [
"batch_size = 32\n",
"model.fit(train_set, steps_per_epoch=len(X_train) // batch_size, epochs=10,\n",
" validation_data=valid_set)"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [],
"source": [
"model.evaluate(test_set, steps=len(X_test) // batch_size)"
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"new_set = test_set.map(lambda X, y: X) # we could instead just pass test_set, Keras would ignore the labels\n",
"X_new = X_test\n",
"model.predict(new_set, steps=len(X_new) // batch_size)"
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [],
"source": [
"optimizer = keras.optimizers.Nadam(lr=0.01)\n",
"loss_fn = keras.losses.mean_squared_error\n",
"\n",
"n_epochs = 5\n",
"batch_size = 32\n",
"n_steps_per_epoch = len(X_train) // batch_size\n",
"total_steps = n_epochs * n_steps_per_epoch\n",
"global_step = 0\n",
"for X_batch, y_batch in train_set.take(total_steps):\n",
" global_step += 1\n",
" print(\"\\rGlobal step {}/{}\".format(global_step, total_steps), end=\"\")\n",
" with tf.GradientTape() as tape:\n",
" y_pred = model(X_batch)\n",
" main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))\n",
" loss = tf.add_n([main_loss] + model.losses)\n",
" gradients = tape.gradient(loss, model.trainable_variables)\n",
" optimizer.apply_gradients(zip(gradients, model.trainable_variables))"
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {},
"outputs": [],
"source": [
"optimizer = keras.optimizers.Nadam(lr=0.01)\n",
"loss_fn = keras.losses.mean_squared_error\n",
"\n",
"@tf.function\n",
"def train(model, n_epochs, batch_size=32,\n",
" n_readers=5, n_read_threads=5, shuffle_buffer_size=10000, n_parse_threads=5):\n",
" train_set = csv_reader_dataset(train_filepaths, repeat=n_epochs, n_readers=n_readers,\n",
" n_read_threads=n_read_threads, shuffle_buffer_size=shuffle_buffer_size,\n",
" n_parse_threads=n_parse_threads, batch_size=batch_size)\n",
" for X_batch, y_batch in train_set:\n",
" with tf.GradientTape() as tape:\n",
" y_pred = model(X_batch)\n",
" main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))\n",
" loss = tf.add_n([main_loss] + model.losses)\n",
" gradients = tape.gradient(loss, model.trainable_variables)\n",
" optimizer.apply_gradients(zip(gradients, model.trainable_variables))\n",
"\n",
"train(model, 5)"
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {},
"outputs": [],
"source": [
"optimizer = keras.optimizers.Nadam(lr=0.01)\n",
"loss_fn = keras.losses.mean_squared_error\n",
"\n",
"@tf.function\n",
"def train(model, n_epochs, batch_size=32,\n",
" n_readers=5, n_read_threads=5, shuffle_buffer_size=10000, n_parse_threads=5):\n",
" train_set = csv_reader_dataset(train_filepaths, repeat=n_epochs, n_readers=n_readers,\n",
" n_read_threads=n_read_threads, shuffle_buffer_size=shuffle_buffer_size,\n",
" n_parse_threads=n_parse_threads, batch_size=batch_size)\n",
" n_steps_per_epoch = len(X_train) // batch_size\n",
" total_steps = n_epochs * n_steps_per_epoch\n",
" global_step = 0\n",
" for X_batch, y_batch in train_set.take(total_steps):\n",
" global_step += 1\n",
" if tf.equal(global_step % 100, 0):\n",
" tf.print(\"\\rGlobal step\", global_step, \"/\", total_steps)\n",
" with tf.GradientTape() as tape:\n",
" y_pred = model(X_batch)\n",
" main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))\n",
" loss = tf.add_n([main_loss] + model.losses)\n",
" gradients = tape.gradient(loss, model.trainable_variables)\n",
" optimizer.apply_gradients(zip(gradients, model.trainable_variables))\n",
"\n",
"train(model, 5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here is a short description of each method in the `Dataset` class:"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"for m in dir(tf.data.Dataset):\n",
" if not (m.startswith(\"_\") or m.endswith(\"_\")):\n",
" func = getattr(tf.data.Dataset, m)\n",
" if hasattr(func, \"__doc__\"):\n",
" print(\"● {:21s}{}\".format(m + \"()\", func.__doc__.split(\"\\n\")[0]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## The `TFRecord` binary format"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A TFRecord file is just a list of binary records. You can create one using a `tf.io.TFRecordWriter`:"
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [],
"source": [
"with tf.io.TFRecordWriter(\"my_data.tfrecord\") as f:\n",
" f.write(b\"This is the first record\")\n",
" f.write(b\"And this is the second record\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"And you can read it using a `tf.data.TFRecordDataset`:"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {},
"outputs": [],
"source": [
"filepaths = [\"my_data.tfrecord\"]\n",
"dataset = tf.data.TFRecordDataset(filepaths)\n",
"for item in dataset:\n",
" print(item)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can read multiple TFRecord files with just one `TFRecordDataset`. By default it will read them one at a time, but if you set `num_parallel_reads=3`, it will read 3 at a time in parallel and interleave their records:"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [],
"source": [
"filepaths = [\"my_test_{}.tfrecord\".format(i) for i in range(5)]\n",
"for i, filepath in enumerate(filepaths):\n",
" with tf.io.TFRecordWriter(filepath) as f:\n",
" for j in range(3):\n",
" f.write(\"File {} record {}\".format(i, j).encode(\"utf-8\"))\n",
"\n",
"dataset = tf.data.TFRecordDataset(filepaths, num_parallel_reads=3)\n",
"for item in dataset:\n",
" print(item)"
]
},
{
"cell_type": "code",
"execution_count": 43,
"metadata": {},
"outputs": [],
"source": [
"options = tf.io.TFRecordOptions(compression_type=\"GZIP\")\n",
"with tf.io.TFRecordWriter(\"my_compressed.tfrecord\", options) as f:\n",
" f.write(b\"This is the first record\")\n",
" f.write(b\"And this is the second record\")"
]
},
{
"cell_type": "code",
"execution_count": 44,
"metadata": {},
"outputs": [],
"source": [
"dataset = tf.data.TFRecordDataset([\"my_compressed.tfrecord\"],\n",
" compression_type=\"GZIP\")\n",
"for item in dataset:\n",
" print(item)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### A Brief Intro to Protocol Buffers"
]
},
{
"cell_type": "code",
"execution_count": 45,
"metadata": {},
"outputs": [],
"source": [
"from homl.person_pb2 import Person\n",
"\n",
"person = Person(name=\"Al\", id=123, email=[\"a@b.com\"]) # create a Person\n",
"print(person) # display the Person"
]
},
{
"cell_type": "code",
"execution_count": 46,
"metadata": {},
"outputs": [],
"source": [
"person.name # read a field"
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {},
"outputs": [],
"source": [
"person.name = \"Alice\" # modify a field"
]
},
{
"cell_type": "code",
"execution_count": 48,
"metadata": {},
"outputs": [],
"source": [
"person.email[0] # repeated fields can be accessed like arrays"
]
},
{
"cell_type": "code",
"execution_count": 49,
"metadata": {},
"outputs": [],
"source": [
"person.email.append(\"c@d.com\") # add an email address"
]
},
{
"cell_type": "code",
"execution_count": 50,
"metadata": {},
"outputs": [],
"source": [
"s = person.SerializeToString() # serialize to a byte string\n",
"s"
]
},
{
"cell_type": "code",
"execution_count": 51,
"metadata": {},
"outputs": [],
"source": [
"person2 = Person() # create a new Person\n",
"person2.ParseFromString(s) # parse the byte string (27 bytes)"
]
},
{
"cell_type": "code",
"execution_count": 52,
"metadata": {},
"outputs": [],
"source": [
"person == person2 # now they are equal"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### TensorFlow Protobufs"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here is the definition of the tf.train.Example protobuf:"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```proto\n",
"syntax = \"proto3\";\n",
"\n",
"message BytesList { repeated bytes value = 1; }\n",
"message FloatList { repeated float value = 1 [packed = true]; }\n",
"message Int64List { repeated int64 value = 1 [packed = true]; }\n",
"message Feature {\n",
" oneof kind {\n",
" BytesList bytes_list = 1;\n",
" FloatList float_list = 2;\n",
" Int64List int64_list = 3;\n",
" }\n",
"};\n",
"message Features { map<string, Feature> feature = 1; };\n",
"message Example { Features features = 1; };\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 53,
"metadata": {},
"outputs": [],
"source": [
"from tensorflow.train import BytesList, FloatList, Int64List\n",
"from tensorflow.train import Feature, Features, Example\n",
"\n",
"person_example = Example(\n",
" features=Features(\n",
" feature={\n",
" \"name\": Feature(bytes_list=BytesList(value=[b\"Alice\"])),\n",
" \"id\": Feature(int64_list=Int64List(value=[123])),\n",
" \"emails\": Feature(bytes_list=BytesList(value=[b\"a@b.com\", b\"c@d.com\"]))\n",
" }))\n",
"\n",
"with tf.io.TFRecordWriter(\"my_contacts.tfrecord\") as f:\n",
" f.write(person_example.SerializeToString())"
]
},
{
"cell_type": "code",
"execution_count": 54,
"metadata": {},
"outputs": [],
"source": [
"feature_description = {\n",
" \"name\": tf.io.FixedLenFeature([], tf.string, default_value=\"\"),\n",
" \"id\": tf.io.FixedLenFeature([], tf.int64, default_value=0),\n",
" \"emails\": tf.io.VarLenFeature(tf.string),\n",
"}\n",
"for serialized_example in tf.data.TFRecordDataset([\"my_contacts.tfrecord\"]):\n",
" parsed_example = tf.io.parse_single_example(serialized_example,\n",
" feature_description)"
]
},
{
"cell_type": "code",
"execution_count": 55,
"metadata": {},
"outputs": [],
"source": [
"parsed_example"
]
},
{
"cell_type": "code",
"execution_count": 56,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"parsed_example"
]
},
{
"cell_type": "code",
"execution_count": 57,
"metadata": {},
"outputs": [],
"source": [
"parsed_example[\"emails\"].values[0]"
]
},
{
"cell_type": "code",
"execution_count": 58,
"metadata": {},
"outputs": [],
"source": [
"tf.sparse.to_dense(parsed_example[\"emails\"], default_value=b\"\")"
]
},
{
"cell_type": "code",
"execution_count": 59,
"metadata": {},
"outputs": [],
"source": [
"parsed_example[\"emails\"].values"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Putting Images in TFRecords"
]
},
{
"cell_type": "code",
"execution_count": 60,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.datasets import load_sample_images\n",
"\n",
"img = load_sample_images()[\"images\"][0]\n",
"plt.imshow(img)\n",
"plt.axis(\"off\")\n",
"plt.title(\"Original Image\")\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": 61,
"metadata": {},
"outputs": [],
"source": [
"data = tf.io.encode_jpeg(img)\n",
"example_with_image = Example(features=Features(feature={\n",
" \"image\": Feature(bytes_list=BytesList(value=[data.numpy()]))}))\n",
"serialized_example = example_with_image.SerializeToString()\n",
"# then save to TFRecord"
]
},
{
"cell_type": "code",
"execution_count": 62,
"metadata": {},
"outputs": [],
"source": [
"feature_description = { \"image\": tf.io.VarLenFeature(tf.string) }\n",
"example_with_image = tf.io.parse_single_example(serialized_example, feature_description)\n",
"decoded_img = tf.io.decode_jpeg(example_with_image[\"image\"].values[0])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Or use `decode_image()` which supports BMP, GIF, JPEG and PNG formats:"
]
},
{
"cell_type": "code",
"execution_count": 63,
"metadata": {},
"outputs": [],
"source": [
"decoded_img = tf.io.decode_image(example_with_image[\"image\"].values[0])"
]
},
{
"cell_type": "code",
"execution_count": 64,
"metadata": {},
"outputs": [],
"source": [
"plt.imshow(decoded_img)\n",
"plt.title(\"Decoded Image\")\n",
"plt.axis(\"off\")\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Putting Tensors and Sparse Tensors in TFRecords"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Tensors can be serialized and parsed easily using `tf.io.serialize_tensor()` and `tf.io.parse_tensor()`:"
]
},
{
"cell_type": "code",
"execution_count": 65,
"metadata": {},
"outputs": [],
"source": [
"t = tf.constant([[0., 1.], [2., 3.], [4., 5.]])\n",
"s = tf.io.serialize_tensor(t)\n",
"s"
]
},
{
"cell_type": "code",
"execution_count": 66,
"metadata": {},
"outputs": [],
"source": [
"tf.io.parse_tensor(s, out_type=tf.float32)"
]
},
{
"cell_type": "code",
"execution_count": 67,
"metadata": {},
"outputs": [],
"source": [
"serialized_sparse = tf.io.serialize_sparse(parsed_example[\"emails\"])\n",
"serialized_sparse"
]
},
{
"cell_type": "code",
"execution_count": 68,
"metadata": {},
"outputs": [],
"source": [
"BytesList(value=serialized_sparse.numpy())"
]
},
{
"cell_type": "code",
"execution_count": 69,
"metadata": {},
"outputs": [],
"source": [
"dataset = tf.data.TFRecordDataset([\"my_contacts.tfrecord\"]).batch(10)\n",
"for serialized_examples in dataset:\n",
" parsed_examples = tf.io.parse_example(serialized_examples,\n",
" feature_description)"
]
},
{
"cell_type": "code",
"execution_count": 70,
"metadata": {},
"outputs": [],
"source": [
"parsed_examples"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Handling Sequential Data Using `SequenceExample`"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```proto\n",
"syntax = \"proto3\";\n",
"\n",
"message FeatureList { repeated Feature feature = 1; };\n",
"message FeatureLists { map<string, FeatureList> feature_list = 1; };\n",
"message SequenceExample {\n",
" Features context = 1;\n",
" FeatureLists feature_lists = 2;\n",
"};\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 71,
"metadata": {},
"outputs": [],
"source": [
"from tensorflow.train import FeatureList, FeatureLists, SequenceExample\n",
"\n",
"context = Features(feature={\n",
" \"author_id\": Feature(int64_list=Int64List(value=[123])),\n",
" \"title\": Feature(bytes_list=BytesList(value=[b\"A\", b\"desert\", b\"place\", b\".\"])),\n",
" \"pub_date\": Feature(int64_list=Int64List(value=[1623, 12, 25]))\n",
"})\n",
"\n",
"content = [[\"When\", \"shall\", \"we\", \"three\", \"meet\", \"again\", \"?\"],\n",
" [\"In\", \"thunder\", \",\", \"lightning\", \",\", \"or\", \"in\", \"rain\", \"?\"]]\n",
"comments = [[\"When\", \"the\", \"hurlyburly\", \"'s\", \"done\", \".\"],\n",
" [\"When\", \"the\", \"battle\", \"'s\", \"lost\", \"and\", \"won\", \".\"]]\n",
"\n",
"def words_to_feature(words):\n",
" return Feature(bytes_list=BytesList(value=[word.encode(\"utf-8\")\n",
" for word in words]))\n",
"\n",
"content_features = [words_to_feature(sentence) for sentence in content]\n",
"comments_features = [words_to_feature(comment) for comment in comments]\n",
" \n",
"sequence_example = SequenceExample(\n",
" context=context,\n",
" feature_lists=FeatureLists(feature_list={\n",
" \"content\": FeatureList(feature=content_features),\n",
" \"comments\": FeatureList(feature=comments_features)\n",
" }))"
]
},
{
"cell_type": "code",
"execution_count": 72,
"metadata": {},
"outputs": [],
"source": [
"sequence_example"
]
},
{
"cell_type": "code",
"execution_count": 73,
"metadata": {},
"outputs": [],
"source": [
"serialized_sequence_example = sequence_example.SerializeToString()"
]
},
{
"cell_type": "code",
"execution_count": 74,
"metadata": {},
"outputs": [],
"source": [
"context_feature_descriptions = {\n",
" \"author_id\": tf.io.FixedLenFeature([], tf.int64, default_value=0),\n",
" \"title\": tf.io.VarLenFeature(tf.string),\n",
" \"pub_date\": tf.io.FixedLenFeature([3], tf.int64, default_value=[0, 0, 0]),\n",
"}\n",
"sequence_feature_descriptions = {\n",
" \"content\": tf.io.VarLenFeature(tf.string),\n",
" \"comments\": tf.io.VarLenFeature(tf.string),\n",
"}\n",
"parsed_context, parsed_feature_lists = tf.io.parse_single_sequence_example(\n",
" serialized_sequence_example, context_feature_descriptions,\n",
" sequence_feature_descriptions)"
]
},
{
"cell_type": "code",
"execution_count": 75,
"metadata": {},
"outputs": [],
"source": [
"parsed_context"
]
},
{
"cell_type": "code",
"execution_count": 76,
"metadata": {},
"outputs": [],
"source": [
"parsed_context[\"title\"].values"
]
},
{
"cell_type": "code",
"execution_count": 77,
"metadata": {},
"outputs": [],
"source": [
"parsed_feature_lists"
]
},
{
"cell_type": "code",
"execution_count": 78,
"metadata": {},
"outputs": [],
"source": [
"print(tf.RaggedTensor.from_sparse(parsed_feature_lists[\"content\"]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# The Features API"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's use the variant of the California housing dataset that we used in Chapter 2, since it contains categorical features and missing values:"
]
},
{
"cell_type": "code",
"execution_count": 79,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import tarfile\n",
"from six.moves import urllib\n",
"\n",
"DOWNLOAD_ROOT = \"https://raw.githubusercontent.com/ageron/handson-ml2/master/\"\n",
"HOUSING_PATH = os.path.join(\"datasets\", \"housing\")\n",
"HOUSING_URL = DOWNLOAD_ROOT + \"datasets/housing/housing.tgz\"\n",
"\n",
"def fetch_housing_data(housing_url=HOUSING_URL, housing_path=HOUSING_PATH):\n",
" if not os.path.isdir(housing_path):\n",
" os.makedirs(housing_path)\n",
" tgz_path = os.path.join(housing_path, \"housing.tgz\")\n",
" urllib.request.urlretrieve(housing_url, tgz_path)\n",
" housing_tgz = tarfile.open(tgz_path)\n",
" housing_tgz.extractall(path=housing_path)\n",
" housing_tgz.close()"
]
},
{
"cell_type": "code",
"execution_count": 80,
"metadata": {},
"outputs": [],
"source": [
"fetch_housing_data()"
]
},
{
"cell_type": "code",
"execution_count": 81,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"def load_housing_data(housing_path=HOUSING_PATH):\n",
" csv_path = os.path.join(housing_path, \"housing.csv\")\n",
" return pd.read_csv(csv_path)"
]
},
{
"cell_type": "code",
"execution_count": 82,
"metadata": {},
"outputs": [],
"source": [
"housing = load_housing_data()\n",
"housing.head()"
]
},
{
"cell_type": "code",
"execution_count": 83,
"metadata": {},
"outputs": [],
"source": [
"housing_median_age = tf.feature_column.numeric_column(\"housing_median_age\")"
]
},
{
"cell_type": "code",
"execution_count": 84,
"metadata": {},
"outputs": [],
"source": [
"age_mean, age_std = X_mean[1], X_std[1] # The median age is column in 1\n",
"housing_median_age = tf.feature_column.numeric_column(\n",
" \"housing_median_age\", normalizer_fn=lambda x: (x - age_mean) / age_std)"
]
},
{
"cell_type": "code",
"execution_count": 85,
"metadata": {},
"outputs": [],
"source": [
"median_income = tf.feature_column.numeric_column(\"median_income\")\n",
"bucketized_income = tf.feature_column.bucketized_column(\n",
" median_income, boundaries=[1.5, 3., 4.5, 6.])"
]
},
{
"cell_type": "code",
"execution_count": 86,
"metadata": {},
"outputs": [],
"source": [
"bucketized_income"
]
},
{
"cell_type": "code",
"execution_count": 87,
"metadata": {},
"outputs": [],
"source": [
"ocean_prox_vocab = ['<1H OCEAN', 'INLAND', 'ISLAND', 'NEAR BAY', 'NEAR OCEAN']\n",
"ocean_proximity = tf.feature_column.categorical_column_with_vocabulary_list(\n",
" \"ocean_proximity\", ocean_prox_vocab)"
]
},
{
"cell_type": "code",
"execution_count": 88,
"metadata": {},
"outputs": [],
"source": [
"ocean_proximity"
]
},
{
"cell_type": "code",
"execution_count": 89,
"metadata": {},
"outputs": [],
"source": [
"# Just an example, it's not used later on\n",
"city_hash = tf.feature_column.categorical_column_with_hash_bucket(\n",
" \"city\", hash_bucket_size=1000)\n",
"city_hash"
]
},
{
"cell_type": "code",
"execution_count": 90,
"metadata": {},
"outputs": [],
"source": [
"bucketized_age = tf.feature_column.bucketized_column(\n",
" housing_median_age, boundaries=[-1., -0.5, 0., 0.5, 1.]) # age was scaled\n",
"age_and_ocean_proximity = tf.feature_column.crossed_column(\n",
" [bucketized_age, ocean_proximity], hash_bucket_size=100)"
]
},
{
"cell_type": "code",
"execution_count": 91,
"metadata": {},
"outputs": [],
"source": [
"latitude = tf.feature_column.numeric_column(\"latitude\")\n",
"longitude = tf.feature_column.numeric_column(\"longitude\")\n",
"bucketized_latitude = tf.feature_column.bucketized_column(\n",
" latitude, boundaries=list(np.linspace(32., 42., 20 - 1)))\n",
"bucketized_longitude = tf.feature_column.bucketized_column(\n",
" longitude, boundaries=list(np.linspace(-125., -114., 20 - 1)))\n",
"location = tf.feature_column.crossed_column(\n",
" [bucketized_latitude, bucketized_longitude], hash_bucket_size=1000)"
]
},
{
"cell_type": "code",
"execution_count": 92,
"metadata": {},
"outputs": [],
"source": [
"ocean_proximity_one_hot = tf.feature_column.indicator_column(ocean_proximity)"
]
},
{
"cell_type": "code",
"execution_count": 93,
"metadata": {},
"outputs": [],
"source": [
"ocean_proximity_embed = tf.feature_column.embedding_column(ocean_proximity,\n",
" dimension=2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Using Feature Columns for Parsing"
]
},
{
"cell_type": "code",
"execution_count": 94,
"metadata": {},
"outputs": [],
"source": [
"median_house_value = tf.feature_column.numeric_column(\"median_house_value\")"
]
},
{
"cell_type": "code",
"execution_count": 95,
"metadata": {},
"outputs": [],
"source": [
"columns = [housing_median_age, median_house_value]\n",
"feature_descriptions = tf.feature_column.make_parse_example_spec(columns)\n",
"feature_descriptions"
]
},
{
"cell_type": "code",
"execution_count": 96,
"metadata": {},
"outputs": [],
"source": [
"with tf.io.TFRecordWriter(\"my_data_with_features.tfrecords\") as f:\n",
" for x, y in zip(X_train[:, 1:2], y_train):\n",
" example = Example(features=Features(feature={\n",
" \"housing_median_age\": Feature(float_list=FloatList(value=[x])),\n",
" \"median_house_value\": Feature(float_list=FloatList(value=[y]))\n",
" }))\n",
" f.write(example.SerializeToString())"
]
},
{
"cell_type": "code",
"execution_count": 97,
"metadata": {},
"outputs": [],
"source": [
"def parse_examples(serialized_examples):\n",
" examples = tf.io.parse_example(serialized_examples, feature_descriptions)\n",
" targets = examples.pop(\"median_house_value\") # separate the targets\n",
" return examples, targets\n",
"\n",
"batch_size = 32\n",
"dataset = tf.data.TFRecordDataset([\"my_data_with_features.tfrecords\"])\n",
"dataset = dataset.repeat().shuffle(10000).batch(batch_size).map(parse_examples)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Warning**: the `DenseFeatures` layer currently does not work with the Functional API, see [TF issue #27416](https://github.com/tensorflow/tensorflow/issues/27416). Hopefully this will be resolved before the final release of TF 2.0."
]
},
{
"cell_type": "code",
"execution_count": 98,
"metadata": {},
"outputs": [],
"source": [
"columns_without_target = columns[:-1]\n",
"model = keras.models.Sequential([\n",
" keras.layers.DenseFeatures(feature_columns=columns_without_target),\n",
" keras.layers.Dense(1)\n",
"])\n",
"model.compile(loss=\"mse\",\n",
" optimizer=keras.optimizers.SGD(lr=1e-3),\n",
" metrics=[\"accuracy\"])\n",
"model.fit(dataset, steps_per_epoch=len(X_train) // batch_size, epochs=5)"
]
},
{
"cell_type": "code",
"execution_count": 99,
"metadata": {},
"outputs": [],
"source": [
"some_columns = [ocean_proximity_embed, bucketized_income]\n",
"dense_features = keras.layers.DenseFeatures(some_columns)\n",
"dense_features({\n",
" \"ocean_proximity\": [[\"NEAR OCEAN\"], [\"INLAND\"], [\"INLAND\"]],\n",
" \"median_income\": [[3.], [7.2], [1.]]\n",
"})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# TF Transform"
]
},
{
"cell_type": "code",
"execution_count": 100,
"metadata": {},
"outputs": [],
"source": [
"try:\n",
" import tensorflow_transform as tft\n",
"\n",
" def preprocess(inputs): # inputs is a batch of input features\n",
" median_age = inputs[\"housing_median_age\"]\n",
" ocean_proximity = inputs[\"ocean_proximity\"]\n",
" standardized_age = tft.scale_to_z_score(median_age - tft.mean(median_age))\n",
" ocean_proximity_id = tft.compute_and_apply_vocabulary(ocean_proximity)\n",
" return {\n",
" \"standardized_median_age\": standardized_age,\n",
" \"ocean_proximity_id\": ocean_proximity_id\n",
" }\n",
"except ImportError:\n",
" print(\"TF Transform is not installed. Try running: pip3 install -U tensorflow-transform\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# TensorFlow Datasets"
]
},
{
"cell_type": "code",
"execution_count": 101,
"metadata": {},
"outputs": [],
"source": [
"import tensorflow_datasets as tfds\n",
"\n",
"datasets = tfds.load(name=\"mnist\")\n",
"mnist_train, mnist_test = datasets[\"train\"], datasets[\"test\"]"
]
},
{
"cell_type": "code",
"execution_count": 102,
"metadata": {},
"outputs": [],
"source": [
"print(tfds.list_builders())"
]
},
{
"cell_type": "code",
"execution_count": 103,
"metadata": {},
"outputs": [],
"source": [
"plt.figure(figsize=(6,3))\n",
"mnist_train = mnist_train.repeat(5).batch(32).prefetch(1)\n",
"for item in mnist_train:\n",
" images = item[\"image\"]\n",
" labels = item[\"label\"]\n",
" for index in range(5):\n",
" plt.subplot(1, 5, index + 1)\n",
" image = images[index, ..., 0]\n",
" label = labels[index].numpy()\n",
" plt.imshow(image, cmap=\"binary\")\n",
" plt.title(label)\n",
" plt.axis(\"off\")\n",
" break # just showing part of the first batch"
]
},
{
"cell_type": "code",
"execution_count": 104,
"metadata": {},
"outputs": [],
"source": [
"datasets = tfds.load(name=\"mnist\")\n",
"mnist_train, mnist_test = datasets[\"train\"], datasets[\"test\"]\n",
"mnist_train = mnist_train.repeat(5).batch(32)\n",
"mnist_train = mnist_train.map(lambda items: (items[\"image\"], items[\"label\"]))\n",
"mnist_train = mnist_train.prefetch(1)\n",
"for images, labels in mnist_train.take(1):\n",
" print(images.shape)\n",
" print(labels.numpy())"
]
},
{
"cell_type": "code",
"execution_count": 105,
"metadata": {},
"outputs": [],
"source": [
"datasets = tfds.load(name=\"mnist\", batch_size=32, as_supervised=True)\n",
"mnist_train = datasets[\"train\"].repeat().prefetch(1)\n",
"model = keras.models.Sequential([\n",
" keras.layers.Flatten(input_shape=[28, 28, 1]),\n",
" keras.layers.Lambda(lambda images: tf.cast(images, tf.float32)),\n",
" keras.layers.Dense(10, activation=\"softmax\")])\n",
"model.compile(loss=\"sparse_categorical_crossentropy\",\n",
" optimizer=keras.optimizers.SGD(lr=1e-3),\n",
" metrics=[\"accuracy\"])\n",
"model.fit(mnist_train, steps_per_epoch=60000 // 32, epochs=5)"
]
},
{
"cell_type": "code",
"execution_count": 106,
"metadata": {},
"outputs": [],
"source": [
"try:\n",
" datasets = tfds.load(\"imagenet2012\", split=[\"train\", \"test\"])\n",
"except AssertionError as ex:\n",
" print(ex)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# TensorFlow Hub"
]
},
{
"cell_type": "code",
"execution_count": 107,
"metadata": {},
"outputs": [],
"source": [
"import tensorflow_hub as hub\n",
"\n",
"hub_layer = hub.KerasLayer(\"https://tfhub.dev/google/tf2-preview/nnlm-en-dim50/1\",\n",
" output_shape=[50], input_shape=[], dtype=tf.string)\n",
"\n",
"model = keras.Sequential()\n",
"model.add(hub_layer)\n",
"model.add(keras.layers.Dense(16, activation='relu'))\n",
"model.add(keras.layers.Dense(1, activation='sigmoid'))\n",
"\n",
"model.summary()"
]
},
{
"cell_type": "code",
"execution_count": 108,
"metadata": {},
"outputs": [],
"source": [
"sentences = tf.constant([\"It was a great movie\", \"The actors were amazing\"])\n",
"embeddings = hub_layer(sentences)"
]
},
{
"cell_type": "code",
"execution_count": 109,
"metadata": {},
"outputs": [],
"source": [
"embeddings"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.8"
},
"nav_menu": {
"height": "264px",
"width": "369px"
},
"toc": {
"navigate_menu": true,
"number_sections": true,
"sideBar": true,
"threshold": 6,
"toc_cell": false,
"toc_section_display": "block",
"toc_window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 1
}