"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "xHxb-dlhMIzW"
},
"source": [
"## Overview\n",
"\n",
"GPUs and TPUs can radically reduce the time required to execute a single training step.\n",
"Achieving peak performance requires an efficient input pipeline that delivers data for the next step before the current step has finished.\n",
"The `tf.data` API helps to build flexible and efficient input pipelines.\n",
"This document demonstrates how to use the `tf.data` API to build highly performant TensorFlow input pipelines.\n",
"\n",
"Before you continue, check the [Build TensorFlow input pipelines](./data.ipynb) guide to learn how to use the `tf.data` API."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "UhNtHfuxCGVy"
},
"source": [
"## Resources\n",
"\n",
"* [Build TensorFlow input pipelines](./data.ipynb)\n",
"* `tf.data.Dataset` API\n",
"* [Analyze `tf.data` performance with the TF Profiler](./data_performance_analysis.md)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "MUXex9ctTuDB"
},
"source": [
"## Setup"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:29:59.608267Z",
"iopub.status.busy": "2024-08-15T02:29:59.607850Z",
"iopub.status.idle": "2024-08-15T02:30:01.973658Z",
"shell.execute_reply": "2024-08-15T02:30:01.972911Z"
},
"id": "IqR2PQG4ZaZ0"
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-08-15 02:29:59.860833: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered\n",
"2024-08-15 02:29:59.882136: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered\n",
"2024-08-15 02:29:59.888640: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\n"
]
}
],
"source": [
"import tensorflow as tf\n",
"\n",
"import time"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "QthTHCKF-jKD"
},
"source": [
"Throughout this guide, you will iterate across a dataset and measure the performance.\n",
"Making reproducible performance benchmarks can be difficult. Different factors affecting reproducibility include:\n",
"\n",
"- The current CPU load\n",
"- The network traffic\n",
"- Complex mechanisms, such as cache\n",
"\n",
"To get a reproducible benchmark, you will build an artificial example."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "3bU5gsSI-jKF"
},
"source": [
"### The dataset\n",
"\n",
"Start with defining a class inheriting from `tf.data.Dataset` called `ArtificialDataset`.\n",
"This dataset:\n",
"\n",
"- Generates `num_samples` samples (default is 3)\n",
"- Sleeps for some time before the first item to simulate opening a file\n",
"- Sleeps for some time before producing each item to simulate reading data from a file"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:01.977718Z",
"iopub.status.busy": "2024-08-15T02:30:01.977354Z",
"iopub.status.idle": "2024-08-15T02:30:01.982291Z",
"shell.execute_reply": "2024-08-15T02:30:01.981698Z"
},
"id": "zUQv4kCd-jKH"
},
"outputs": [],
"source": [
"class ArtificialDataset(tf.data.Dataset):\n",
" def _generator(num_samples):\n",
" # Opening the file\n",
" time.sleep(0.03)\n",
" \n",
" for sample_idx in range(num_samples):\n",
" # Reading data (line, record) from the file\n",
" time.sleep(0.015)\n",
" \n",
" yield (sample_idx,)\n",
" \n",
" def __new__(cls, num_samples=3):\n",
" return tf.data.Dataset.from_generator(\n",
" cls._generator,\n",
" output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),\n",
" args=(num_samples,)\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "O9y1WjNv-jKL"
},
"source": [
"This dataset is similar to the `tf.data.Dataset.range` one, adding a fixed delay at the beginning of and in-between each sample."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "FGK1Y4jn-jKM"
},
"source": [
"### The training loop\n",
"\n",
"Next, write a dummy training loop that measures how long it takes to iterate over a dataset.\n",
"Training time is simulated."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:01.985429Z",
"iopub.status.busy": "2024-08-15T02:30:01.985202Z",
"iopub.status.idle": "2024-08-15T02:30:01.988833Z",
"shell.execute_reply": "2024-08-15T02:30:01.988266Z"
},
"id": "MIaM3u00-jKP"
},
"outputs": [],
"source": [
"def benchmark(dataset, num_epochs=2):\n",
" start_time = time.perf_counter()\n",
" for epoch_num in range(num_epochs):\n",
" for sample in dataset:\n",
" # Performing a training step\n",
" time.sleep(0.01)\n",
" print(\"Execution time:\", time.perf_counter() - start_time)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "KK58SuXS-jKT"
},
"source": [
"## Optimize performance\n",
"\n",
"To exhibit how performance can be optimized, you will improve the performance of the `ArtificialDataset`."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Xi8t26y7-jKV"
},
"source": [
"### The naive approach\n",
"\n",
"Start with a naive pipeline using no tricks, iterating over the dataset as-is."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:01.992175Z",
"iopub.status.busy": "2024-08-15T02:30:01.991628Z",
"iopub.status.idle": "2024-08-15T02:30:04.538180Z",
"shell.execute_reply": "2024-08-15T02:30:04.537460Z"
},
"id": "_gP7J1y4-jKY"
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING: All log messages before absl::InitializeLog() is called are written to STDERR\n",
"I0000 00:00:1723689002.526086 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689002.529717 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689002.533395 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689002.537215 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689002.548874 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689002.552105 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689002.555507 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689002.559016 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See mo"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"re at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689002.562268 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689002.565421 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689002.568842 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689002.572296 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.797478 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.799457 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.801473 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.803565 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.805613 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.807466 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.809382 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.811373 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.813299 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.815122 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.817022 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.819005 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.857728 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.860099 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.862063 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.864094 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.866073 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.867912 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.869940 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.871942 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.873924 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.876256 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.878556 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n",
"I0000 00:00:1723689003.880974 112933 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 0.28992818999995507\n"
]
}
],
"source": [
"benchmark(ArtificialDataset())"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Lxeat5dH-jKf"
},
"source": [
"Under the hood, this is how your execution time was spent:\n",
"\n",
"\n",
"\n",
"The plot shows that performing a training step involves:\n",
"\n",
"- Opening a file if it hasn't been opened yet\n",
"- Fetching a data entry from the file\n",
"- Using the data for training\n",
"\n",
"However, in a naive synchronous implementation like here, while your pipeline is fetching the data, your model is sitting idle. \n",
"Conversely, while your model is training, the input pipeline is sitting idle.\n",
"The training step time is thus the sum of opening, reading and training times.\n",
"\n",
"The next sections build on this input pipeline, illustrating best practices for designing performant TensorFlow input pipelines."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "mfukBGNz-jKh"
},
"source": [
"### Prefetching\n",
"\n",
"\n",
"\n",
"Prefetching overlaps the preprocessing and model execution of a training step.\n",
"While the model is executing training step `s`, the input pipeline is reading the data for step `s+1`.\n",
"Doing so reduces the step time to the maximum (as opposed to the sum) of the training and the time it takes to extract the data.\n",
"\n",
"The `tf.data` API provides the `tf.data.Dataset.prefetch` transformation.\n",
"It can be used to decouple the time when data is produced from the time when data is consumed.\n",
"In particular, the transformation uses a background thread and an internal buffer to prefetch elements from the input dataset ahead of the time they are requested.\n",
"The number of elements to prefetch should be equal to (or possibly greater than) the number of batches consumed by a single training step.\n",
"You could either manually tune this value, or set it to `tf.data.AUTOTUNE`, which will prompt the\n",
"`tf.data` runtime to tune the value dynamically at runtime.\n",
"\n",
"Note that the prefetch transformation provides benefits any time there is an opportunity to overlap the work of a \"producer\" with the work of a \"consumer.\""
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:04.542348Z",
"iopub.status.busy": "2024-08-15T02:30:04.541704Z",
"iopub.status.idle": "2024-08-15T02:30:04.856786Z",
"shell.execute_reply": "2024-08-15T02:30:04.856143Z"
},
"id": "DHpUVqH1-jKi"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 0.282483695999872\n"
]
}
],
"source": [
"benchmark(\n",
" ArtificialDataset()\n",
" .prefetch(tf.data.AUTOTUNE)\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "h7z_kzo--jKn"
},
"source": [
"\n",
"\n",
"Now, as the data execution time plot shows, while the training step is running for sample 0, the input pipeline is reading the data for the sample 1, and so on."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "52QMKfaY-jKq"
},
"source": [
"### Parallelizing data extraction\n",
"\n",
"\n",
"\n",
"In a real-world setting, the input data may be stored remotely (for example, on Google Cloud Storage or HDFS).\n",
"A dataset pipeline that works well when reading data locally might become bottlenecked on I/O when reading data remotely because of the following differences between local and remote storage:\n",
"\n",
"- **Time-to-first-byte**: Reading the first byte of a file from remote storage can take orders of magnitude longer than from local storage.\n",
"- **Read throughput**: While remote storage typically offers large aggregate bandwidth, reading a single file might only be able to utilize a small fraction of this bandwidth.\n",
"\n",
"In addition, once the raw bytes are loaded into memory, it may also be necessary to deserialize and/or decrypt the data (e.g. [protobuf](https://developers.google.com/protocol-buffers/)), which requires additional computation.\n",
"This overhead is present irrespective of whether the data is stored locally or remotely, but can be worse in the remote case if data is not prefetched effectively.\n",
"\n",
"To mitigate the impact of the various data extraction overheads, the `tf.data.Dataset.interleave` transformation can be used to parallelize the data loading step, interleaving the contents of other datasets (such as data file\n",
"readers).\n",
"The number of datasets to overlap can be specified by the `cycle_length` argument, while the level of parallelism can be specified by the `num_parallel_calls` argument. Similar to the `prefetch` transformation, the `interleave` transformation supports `tf.data.AUTOTUNE`, which will delegate the decision about what level of parallelism to use to the `tf.data` runtime."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "gs8O8Vbu-jKu"
},
"source": [
"#### Sequential interleave\n",
"\n",
"The default arguments of the `tf.data.Dataset.interleave` transformation make it interleave single samples from two datasets sequentially."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:04.860656Z",
"iopub.status.busy": "2024-08-15T02:30:04.860246Z",
"iopub.status.idle": "2024-08-15T02:30:05.380865Z",
"shell.execute_reply": "2024-08-15T02:30:05.380192Z"
},
"id": "fDH12GiK-jKw"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 0.4770545540004605\n"
]
}
],
"source": [
"benchmark(\n",
" tf.data.Dataset.range(2)\n",
" .interleave(lambda _: ArtificialDataset())\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "78CsSOnf-jK0"
},
"source": [
"\n",
"\n",
"This data execution time plot allows to exhibit the behavior of the `interleave` transformation, fetching samples alternatively from the two datasets available.\n",
"However, no performance improvement is involved here."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "j3cqqmYl-jK2"
},
"source": [
"#### Parallel interleave\n",
"\n",
"Now, use the `num_parallel_calls` argument of the `interleave` transformation.\n",
"This loads multiple datasets in parallel, reducing the time waiting for the files to be opened."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:05.384554Z",
"iopub.status.busy": "2024-08-15T02:30:05.384264Z",
"iopub.status.idle": "2024-08-15T02:30:05.805199Z",
"shell.execute_reply": "2024-08-15T02:30:05.804480Z"
},
"id": "a3FQcTPY-jK4"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 0.37743708600009995\n"
]
}
],
"source": [
"benchmark(\n",
" tf.data.Dataset.range(2)\n",
" .interleave(\n",
" lambda _: ArtificialDataset(),\n",
" num_parallel_calls=tf.data.AUTOTUNE\n",
" )\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "RxRLPB6C-jLA"
},
"source": [
"\n",
"\n",
"This time, as the data execution time plot shows, the reading of the two datasets is parallelized, reducing the global data processing time."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "5ZCLFWyv-jLB"
},
"source": [
"### Parallelizing data transformation\n",
"\n",
"\n",
"\n",
"When preparing data, input elements may need to be pre-processed.\n",
"To this end, the `tf.data` API offers the `tf.data.Dataset.map` transformation, which applies a user-defined function to each element of the input dataset.\n",
"Because input elements are independent of one another, the pre-processing can be parallelized across multiple CPU cores.\n",
"To make this possible, similarly to the `prefetch` and `interleave` transformations, the `map` transformation provides the `num_parallel_calls` argument to specify the level of parallelism.\n",
"\n",
"Choosing the best value for the `num_parallel_calls` argument depends on your hardware, characteristics of your training data (such as its size and shape), the cost of your map function, and what other processing is happening on the CPU at the same time.\n",
"A simple heuristic is to use the number of available CPU cores.\n",
"However, as for the `prefetch` and `interleave` transformation, the `map` transformation supports `tf.data.AUTOTUNE` which will delegate the decision about what level of parallelism to use to the `tf.data` runtime."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:05.809046Z",
"iopub.status.busy": "2024-08-15T02:30:05.808779Z",
"iopub.status.idle": "2024-08-15T02:30:05.812585Z",
"shell.execute_reply": "2024-08-15T02:30:05.811987Z"
},
"id": "GSkKetpx-jLD"
},
"outputs": [],
"source": [
"def mapped_function(s):\n",
" # Do some hard pre-processing\n",
" tf.py_function(lambda: time.sleep(0.03), [], ())\n",
" return s"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "wiU7W_QC-jLI"
},
"source": [
"#### Sequential mapping\n",
"\n",
"Start by using the `map` transformation without parallelism as a baseline example."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:05.816153Z",
"iopub.status.busy": "2024-08-15T02:30:05.815573Z",
"iopub.status.idle": "2024-08-15T02:30:06.364742Z",
"shell.execute_reply": "2024-08-15T02:30:06.363940Z"
},
"id": "ZSBvDpJG-jLL"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 0.49222556600034295\n"
]
}
],
"source": [
"benchmark(\n",
" ArtificialDataset()\n",
" .map(mapped_function)\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ngwMTDb6-jLR"
},
"source": [
"\n",
"\n",
"As for the [naive approach](#The-naive-approach), here, as the plot shows, the times spent for opening, reading, pre-processing (mapping) and training steps sum together for a single iteration."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "U-10PE1D-jLU"
},
"source": [
"#### Parallel mapping\n",
"\n",
"Now, use the same pre-processing function but apply it in parallel on multiple samples."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:06.368778Z",
"iopub.status.busy": "2024-08-15T02:30:06.368286Z",
"iopub.status.idle": "2024-08-15T02:30:06.765549Z",
"shell.execute_reply": "2024-08-15T02:30:06.764838Z"
},
"id": "F8AYLZbg-jLV"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 0.36392719900049997\n"
]
}
],
"source": [
"benchmark(\n",
" ArtificialDataset()\n",
" .map(\n",
" mapped_function,\n",
" num_parallel_calls=tf.data.AUTOTUNE\n",
" )\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "-MoJklzP-jLe"
},
"source": [
"\n",
"\n",
"As the data plot demonstrates, the pre-processing steps overlap, reducing the overall time for a single iteration."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ZY1Q9kJO-jLh"
},
"source": [
"### Caching\n",
"\n",
"\n",
"\n",
"The `tf.data.Dataset.cache` transformation can cache a dataset, either in memory or on local storage.\n",
"This will save some operations (like file opening and data reading) from being executed during each epoch."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:06.769615Z",
"iopub.status.busy": "2024-08-15T02:30:06.768968Z",
"iopub.status.idle": "2024-08-15T02:30:07.190903Z",
"shell.execute_reply": "2024-08-15T02:30:07.190097Z"
},
"id": "xieLApaI-jLi"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 0.3864543270001377\n"
]
}
],
"source": [
"benchmark(\n",
" ArtificialDataset()\n",
" .map( # Apply time consuming operations before cache\n",
" mapped_function\n",
" ).cache(\n",
" ),\n",
" 5\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "KeMgW9XI-jLn"
},
"source": [
"\n",
"\n",
"Here, the data execution time plot shows that when you cache a dataset, the transformations before the `cache` one (like the file opening and data reading) are executed only during the first epoch.\n",
"The next epochs will reuse the data cached by the`cache` transformation.\n",
"\n",
"If the user-defined function passed into the `map` transformation is expensive, apply the `cache` transformation after the `map` transformation as long as the resulting dataset can still fit into memory or local storage.\n",
"If the user-defined function increases the space required to store the dataset beyond the cache capacity, either apply it after the `cache` transformation or consider pre-processing your data before your training job to reduce resource usage."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "i3NtGI3r-jLp"
},
"source": [
"### Vectorizing mapping\n",
"\n",
"\n",
"\n",
"Invoking a user-defined function passed into the `map` transformation has overhead related to scheduling and executing the user-defined function.\n",
"Vectorize the user-defined function (that is, have it operate over a batch of inputs at once) and apply the `batch` transformation _before_ the `map` transformation.\n",
"\n",
"To illustrate this good practice, your artificial dataset is not suitable.\n",
"The scheduling delay is around 10 microseconds (10e-6 seconds), far less than the tens of milliseconds used in the `ArtificialDataset`, and thus its impact is hard to see.\n",
"\n",
"For this example, use the base `tf.data.Dataset.range` function and simplify the training loop to its simplest form."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:07.194856Z",
"iopub.status.busy": "2024-08-15T02:30:07.194224Z",
"iopub.status.idle": "2024-08-15T02:30:07.200596Z",
"shell.execute_reply": "2024-08-15T02:30:07.199991Z"
},
"id": "xqtiYPmb-jLt"
},
"outputs": [],
"source": [
"fast_dataset = tf.data.Dataset.range(10000)\n",
"\n",
"def fast_benchmark(dataset, num_epochs=2):\n",
" start_time = time.perf_counter()\n",
" for _ in tf.data.Dataset.range(num_epochs):\n",
" for _ in dataset:\n",
" pass\n",
" tf.print(\"Execution time:\", time.perf_counter() - start_time)\n",
" \n",
"def increment(x):\n",
" return x+1"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Fj2gmsMT-jL5"
},
"source": [
"#### Scalar mapping"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:07.204115Z",
"iopub.status.busy": "2024-08-15T02:30:07.203558Z",
"iopub.status.idle": "2024-08-15T02:30:07.475971Z",
"shell.execute_reply": "2024-08-15T02:30:07.475223Z"
},
"id": "Imn3SslJ-jMA"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 0.23993100699954084\n"
]
}
],
"source": [
"fast_benchmark(\n",
" fast_dataset\n",
" # Apply function one item at a time\n",
" .map(increment)\n",
" # Batch\n",
" .batch(256)\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "BWUNbPqv-jMF"
},
"source": [
"\n",
"\n",
"The plot above illustrates what is going on (with less samples) using the scalar mapping method.\n",
"It shows that the mapped function is applied for each sample.\n",
"While this function is very fast, it has some overhead that impact the time performance."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "tDVSM0A--jMG"
},
"source": [
"#### Vectorized mapping"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:07.479399Z",
"iopub.status.busy": "2024-08-15T02:30:07.479112Z",
"iopub.status.idle": "2024-08-15T02:30:07.541914Z",
"shell.execute_reply": "2024-08-15T02:30:07.541303Z"
},
"id": "nAw1mDLw-jMI"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 0.04984995199993136\n"
]
}
],
"source": [
"fast_benchmark(\n",
" fast_dataset\n",
" .batch(256)\n",
" # Apply function on a batch of items\n",
" # The tf.Tensor.__add__ method already handle batches\n",
" .map(increment)\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "DbMteMY9-jMO"
},
"source": [
"\n",
"\n",
"This time, the mapped function is called once and applies to a batch of sample.\n",
"As the data execution time plot shows, while the function could takes more time to execute, the overhead appear only once, improving the overall time performance."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "hfueG0Wj-jMR"
},
"source": [
"### Reducing memory footprint\n",
"\n",
"\n",
"\n",
"A number of transformations, including `interleave`, `prefetch`, and `shuffle`, maintain an internal buffer of elements. If the user-defined function passed into the `map` transformation changes the size of the elements, then the ordering of the map transformation and the transformations that buffer elements affects the memory usage. In general, choose the order that results in lower memory footprint, unless different ordering is desirable for performance.\n",
"\n",
"#### Caching partial computations\n",
"\n",
"It is recommended to cache the dataset after the `map` transformation except if this transformation makes the data too big to fit in memory.\n",
"A trade-off can be achieved if your mapped function can be split in two parts: a time consuming one and a memory consuming part.\n",
"In this case, you can chain your transformations like below:\n",
"\n",
"```python\n",
"dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)\n",
"```\n",
"\n",
"This way, the time consuming part is only executed during the first epoch, and you avoid using too much cache space."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "MYOHG69M-jMT"
},
"source": [
"## Best practice summary\n",
"\n",
"Here is a summary of the best practices for designing performant TensorFlow\n",
"input pipelines:\n",
"\n",
"* [Use the `prefetch` transformation](#prefetching) to overlap the work of a producer and consumer\n",
"* [Parallelize the data reading transformation](#parallelizing_data_extraction) using the `interleave` transformation\n",
"* [Parallelize the `map` transformation](#parallelizing_data_transformation) by setting the `num_parallel_calls` argument\n",
"* [Use the `cache` transformation](#caching) to cache data in memory during the first epoch\n",
"* [Vectorize user-defined functions](#vectorizing_mapping) passed in to the `map` transformation\n",
"* [Reduce memory usage](#reducing_memory_footprint) when applying the `interleave`, `prefetch`, and `shuffle` transformations"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "mP_EMFsQ-jMU"
},
"source": [
"## Reproducing the figures\n",
"\n",
"Note: The rest of this notebook is about how to reproduce the above figures. Feel free to play around with this code, but understanding it is not an essential part of this tutorial.\n",
"\n",
"To go deeper in the `tf.data.Dataset` API understanding, you can play with your own pipelines.\n",
"Below is the code used to plot the images from this guide.\n",
"It can be a good starting point, showing some workarounds for common difficulties such as:\n",
"\n",
"- Execution time reproducibility\n",
"- Mapped functions eager execution\n",
"- `interleave` transformation callable"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:07.546005Z",
"iopub.status.busy": "2024-08-15T02:30:07.545474Z",
"iopub.status.idle": "2024-08-15T02:30:07.823712Z",
"shell.execute_reply": "2024-08-15T02:30:07.823057Z"
},
"id": "7M_jFLer-jMV"
},
"outputs": [],
"source": [
"import itertools\n",
"from collections import defaultdict\n",
"\n",
"import numpy as np\n",
"import matplotlib as mpl\n",
"import matplotlib.pyplot as plt"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Z3pjnxtK-jMa"
},
"source": [
"### The dataset\n",
"\n",
"Similar to the `ArtificialDataset` you can build a dataset returning the time spent in each step."
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:07.827441Z",
"iopub.status.busy": "2024-08-15T02:30:07.827202Z",
"iopub.status.idle": "2024-08-15T02:30:07.834182Z",
"shell.execute_reply": "2024-08-15T02:30:07.833596Z"
},
"id": "OgGl4U7t-jMc"
},
"outputs": [],
"source": [
"class TimeMeasuredDataset(tf.data.Dataset):\n",
" # OUTPUT: (steps, timings, counters)\n",
" OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)\n",
" OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))\n",
" \n",
" _INSTANCES_COUNTER = itertools.count() # Number of datasets generated\n",
" _EPOCHS_COUNTER = defaultdict(itertools.count) # Number of epochs done for each dataset\n",
" \n",
" def _generator(instance_idx, num_samples):\n",
" epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])\n",
" \n",
" # Opening the file\n",
" open_enter = time.perf_counter()\n",
" time.sleep(0.03)\n",
" open_elapsed = time.perf_counter() - open_enter\n",
" \n",
" for sample_idx in range(num_samples):\n",
" # Reading data (line, record) from the file\n",
" read_enter = time.perf_counter()\n",
" time.sleep(0.015)\n",
" read_elapsed = time.perf_counter() - read_enter\n",
" \n",
" yield (\n",
" [(\"Open\",), (\"Read\",)],\n",
" [(open_enter, open_elapsed), (read_enter, read_elapsed)],\n",
" [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]\n",
" )\n",
" open_enter, open_elapsed = -1., -1. # Negative values will be filtered\n",
" \n",
" \n",
" def __new__(cls, num_samples=3):\n",
" return tf.data.Dataset.from_generator(\n",
" cls._generator,\n",
" output_types=cls.OUTPUT_TYPES,\n",
" output_shapes=cls.OUTPUT_SHAPES,\n",
" args=(next(cls._INSTANCES_COUNTER), num_samples)\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "YQqDP4jk-jMj"
},
"source": [
"This dataset provides samples of shape `[[2, 1], [2, 2], [2, 3]]` and of type `[tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32]`.\n",
"Each sample is:\n",
"```\n",
"(\n",
" [(\"Open\"), (\"Read\")],\n",
" [(t0, d), (t0, d)],\n",
" [(i, e, -1), (i, e, s)]\n",
")\n",
"```\n",
"\n",
"Where:\n",
"\n",
"- `Open` and `Read` are steps identifiers\n",
"- `t0` is the timestamp when the corresponding step started\n",
"- `d` is the time spent in the corresponding step\n",
"- `i` is the instance index\n",
"- `e` is the epoch index (number of times the dataset has been iterated)\n",
"- `s` is the sample index"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "IQK913bB-jMm"
},
"source": [
"### The iteration loop\n",
"\n",
"Make the iteration loop a little bit more complicated to aggregate all timings.\n",
"This will only work with datasets generating samples as detailed above."
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:07.837625Z",
"iopub.status.busy": "2024-08-15T02:30:07.837360Z",
"iopub.status.idle": "2024-08-15T02:30:07.845075Z",
"shell.execute_reply": "2024-08-15T02:30:07.844437Z"
},
"id": "zAy-K_Cq-jMn"
},
"outputs": [],
"source": [
"def timelined_benchmark(dataset, num_epochs=2):\n",
" # Initialize accumulators\n",
" steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)\n",
" times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)\n",
" values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)\n",
" \n",
" start_time = time.perf_counter()\n",
" for epoch_num in range(num_epochs):\n",
" epoch_enter = time.perf_counter()\n",
" for (steps, times, values) in dataset:\n",
" # Record dataset preparation informations\n",
" steps_acc = tf.concat((steps_acc, steps), axis=0)\n",
" times_acc = tf.concat((times_acc, times), axis=0)\n",
" values_acc = tf.concat((values_acc, values), axis=0)\n",
" \n",
" # Simulate training time\n",
" train_enter = time.perf_counter()\n",
" time.sleep(0.01)\n",
" train_elapsed = time.perf_counter() - train_enter\n",
" \n",
" # Record training informations\n",
" steps_acc = tf.concat((steps_acc, [[\"Train\"]]), axis=0)\n",
" times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)\n",
" values_acc = tf.concat((values_acc, [values[-1]]), axis=0)\n",
" \n",
" epoch_elapsed = time.perf_counter() - epoch_enter\n",
" # Record epoch informations\n",
" steps_acc = tf.concat((steps_acc, [[\"Epoch\"]]), axis=0)\n",
" times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)\n",
" values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)\n",
" time.sleep(0.001)\n",
" \n",
" tf.print(\"Execution time:\", time.perf_counter() - start_time)\n",
" return {\"steps\": steps_acc, \"times\": times_acc, \"values\": values_acc}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "jw_WSQC8-jMs"
},
"source": [
"### The plotting method\n",
"\n",
"Finally, define a function able to plot a timeline given the values returned by the `timelined_benchmark` function."
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:07.848514Z",
"iopub.status.busy": "2024-08-15T02:30:07.847974Z",
"iopub.status.idle": "2024-08-15T02:30:07.858065Z",
"shell.execute_reply": "2024-08-15T02:30:07.857473Z"
},
"id": "1j73RxiP-jMw"
},
"outputs": [],
"source": [
"def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):\n",
" # Remove invalid entries (negative times, or empty steps) from the timelines\n",
" invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]\n",
" steps = timeline['steps'][invalid_mask].numpy()\n",
" times = timeline['times'][invalid_mask].numpy()\n",
" values = timeline['values'][invalid_mask].numpy()\n",
" \n",
" # Get a set of different steps, ordered by the first time they are encountered\n",
" step_ids, indices = np.stack(np.unique(steps, return_index=True))\n",
" step_ids = step_ids[np.argsort(indices)]\n",
"\n",
" # Shift the starting time to 0 and compute the maximal time value\n",
" min_time = times[:,0].min()\n",
" times[:,0] = (times[:,0] - min_time)\n",
" end = max(width, (times[:,0]+times[:,1]).max() + 0.01)\n",
" \n",
" cmap = mpl.cm.get_cmap(\"plasma\")\n",
" plt.close()\n",
" fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})\n",
" fig.suptitle(title)\n",
" fig.set_size_inches(17.0, len(step_ids))\n",
" plt.xlim(-0.01, end)\n",
" \n",
" for i, step in enumerate(step_ids):\n",
" step_name = step.decode()\n",
" ax = axs[i]\n",
" ax.set_ylabel(step_name)\n",
" ax.set_ylim(0, 1)\n",
" ax.set_yticks([])\n",
" ax.set_xlabel(\"time (s)\")\n",
" ax.set_xticklabels([])\n",
" ax.grid(which=\"both\", axis=\"x\", color=\"k\", linestyle=\":\")\n",
" \n",
" # Get timings and annotation for the given step\n",
" entries_mask = np.squeeze(steps==step)\n",
" serie = np.unique(times[entries_mask], axis=0)\n",
" annotations = values[entries_mask]\n",
" \n",
" ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)\n",
" if annotate:\n",
" for j, (start, width) in enumerate(serie):\n",
" annotation = \"\\n\".join([f\"{l}: {v}\" for l,v in zip((\"i\", \"e\", \"s\"), annotations[j])])\n",
" ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,\n",
" horizontalalignment='left', verticalalignment='center')\n",
" if save:\n",
" plt.savefig(title.lower().translate(str.maketrans(\" \", \"_\")) + \".svg\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "xto6GNdO-jM1"
},
"source": [
"### Use wrappers for mapped function\n",
"\n",
"To run mapped function in an eager context, you have to wrap them inside a `tf.py_function` call."
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:07.861450Z",
"iopub.status.busy": "2024-08-15T02:30:07.860827Z",
"iopub.status.idle": "2024-08-15T02:30:07.864790Z",
"shell.execute_reply": "2024-08-15T02:30:07.864103Z"
},
"id": "39v7JD4L-jM2"
},
"outputs": [],
"source": [
"def map_decorator(func):\n",
" def wrapper(steps, times, values):\n",
" # Use a tf.py_function to prevent auto-graph from compiling the method\n",
" return tf.py_function(\n",
" func,\n",
" inp=(steps, times, values),\n",
" Tout=(steps.dtype, times.dtype, values.dtype)\n",
" )\n",
" return wrapper"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "7eJRCinb-jM5"
},
"source": [
"### Pipelines comparison"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:07.868084Z",
"iopub.status.busy": "2024-08-15T02:30:07.867558Z",
"iopub.status.idle": "2024-08-15T02:30:07.870675Z",
"shell.execute_reply": "2024-08-15T02:30:07.870064Z"
},
"id": "YwX4ndHE-jM6"
},
"outputs": [],
"source": [
"_batch_map_num_items = 50\n",
"\n",
"def dataset_generator_fun(*args):\n",
" return TimeMeasuredDataset(num_samples=_batch_map_num_items)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "EwxJT2aR-jNA"
},
"source": [
"#### Naive"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:07.873537Z",
"iopub.status.busy": "2024-08-15T02:30:07.873318Z",
"iopub.status.idle": "2024-08-15T02:30:21.186494Z",
"shell.execute_reply": "2024-08-15T02:30:21.185705Z"
},
"id": "wLKgurx_-jNC"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"WARNING:tensorflow:From /tmpfs/tmp/ipykernel_112933/64197174.py:32: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version.\n",
"Instructions for updating:\n",
"Use output_signature instead\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"WARNING:tensorflow:From /tmpfs/tmp/ipykernel_112933/64197174.py:32: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version.\n",
"Instructions for updating:\n",
"Use output_signature instead\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 13.208576904999973\n"
]
}
],
"source": [
"@map_decorator\n",
"def naive_map(steps, times, values):\n",
" map_enter = time.perf_counter()\n",
" time.sleep(0.001) # Time consuming step\n",
" time.sleep(0.0001) # Memory consuming step\n",
" map_elapsed = time.perf_counter() - map_enter\n",
"\n",
" return (\n",
" tf.concat((steps, [[\"Map\"]]), axis=0),\n",
" tf.concat((times, [[map_enter, map_elapsed]]), axis=0),\n",
" tf.concat((values, [values[-1]]), axis=0)\n",
" )\n",
"\n",
"naive_timeline = timelined_benchmark(\n",
" tf.data.Dataset.range(2)\n",
" .flat_map(dataset_generator_fun)\n",
" .map(naive_map)\n",
" .batch(_batch_map_num_items, drop_remainder=True)\n",
" .unbatch(),\n",
" 5\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "EJqUMDsO-jNG"
},
"source": [
"### Optimized"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:21.190475Z",
"iopub.status.busy": "2024-08-15T02:30:21.189814Z",
"iopub.status.idle": "2024-08-15T02:30:28.084324Z",
"shell.execute_reply": "2024-08-15T02:30:28.083578Z"
},
"id": "HYHcwabr-jNH"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 6.8234945540007175\n"
]
}
],
"source": [
"@map_decorator\n",
"def time_consuming_map(steps, times, values):\n",
" map_enter = time.perf_counter()\n",
" time.sleep(0.001 * values.shape[0]) # Time consuming step\n",
" map_elapsed = time.perf_counter() - map_enter\n",
"\n",
" return (\n",
" tf.concat((steps, tf.tile([[[\"1st map\"]]], [steps.shape[0], 1, 1])), axis=1),\n",
" tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),\n",
" tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)\n",
" )\n",
"\n",
"\n",
"@map_decorator\n",
"def memory_consuming_map(steps, times, values):\n",
" map_enter = time.perf_counter()\n",
" time.sleep(0.0001 * values.shape[0]) # Memory consuming step\n",
" map_elapsed = time.perf_counter() - map_enter\n",
"\n",
" # Use tf.tile to handle batch dimension\n",
" return (\n",
" tf.concat((steps, tf.tile([[[\"2nd map\"]]], [steps.shape[0], 1, 1])), axis=1),\n",
" tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),\n",
" tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)\n",
" )\n",
"\n",
"\n",
"optimized_timeline = timelined_benchmark(\n",
" tf.data.Dataset.range(2)\n",
" .interleave( # Parallelize data reading\n",
" dataset_generator_fun,\n",
" num_parallel_calls=tf.data.AUTOTUNE\n",
" )\n",
" .batch( # Vectorize your mapped function\n",
" _batch_map_num_items,\n",
" drop_remainder=True)\n",
" .map( # Parallelize map transformation\n",
" time_consuming_map,\n",
" num_parallel_calls=tf.data.AUTOTUNE\n",
" )\n",
" .cache() # Cache data\n",
" .map( # Reduce memory usage\n",
" memory_consuming_map,\n",
" num_parallel_calls=tf.data.AUTOTUNE\n",
" )\n",
" .prefetch( # Overlap producer and consumer works\n",
" tf.data.AUTOTUNE\n",
" )\n",
" .unbatch(),\n",
" 5\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:28.088090Z",
"iopub.status.busy": "2024-08-15T02:30:28.087533Z",
"iopub.status.idle": "2024-08-15T02:30:28.682784Z",
"shell.execute_reply": "2024-08-15T02:30:28.682068Z"
},
"id": "b_CSUbxL-jNK"
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/tmpfs/tmp/ipykernel_112933/2966908191.py:17: MatplotlibDeprecationWarning: The get_cmap function was deprecated in Matplotlib 3.7 and will be removed in 3.11. Use ``matplotlib.colormaps[name]`` or ``matplotlib.colormaps.get_cmap()`` or ``pyplot.get_cmap()`` instead.\n",
" cmap = mpl.cm.get_cmap(\"plasma\")\n"
]
},
{
"data": {
"image/png": "",
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"draw_timeline(naive_timeline, \"Naive\", 15)"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {
"execution": {
"iopub.execute_input": "2024-08-15T02:30:28.686209Z",
"iopub.status.busy": "2024-08-15T02:30:28.685879Z",
"iopub.status.idle": "2024-08-15T02:30:29.224088Z",
"shell.execute_reply": "2024-08-15T02:30:29.223400Z"
},
"id": "DoovY7qr-jNR"
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/tmpfs/tmp/ipykernel_112933/2966908191.py:17: MatplotlibDeprecationWarning: The get_cmap function was deprecated in Matplotlib 3.7 and will be removed in 3.11. Use ``matplotlib.colormaps[name]`` or ``matplotlib.colormaps.get_cmap()`` or ``pyplot.get_cmap()`` instead.\n",
" cmap = mpl.cm.get_cmap(\"plasma\")\n"
]
},
{
"data": {
"image/png": "",
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"draw_timeline(optimized_timeline, \"Optimized\", 15)"
]
}
],
"metadata": {
"colab": {
"collapsed_sections": [],
"name": "data_performance.ipynb",
"toc_visible": true
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.19"
}
},
"nbformat": 4,
"nbformat_minor": 0
}