Scaling to large datasets

pandas provides data structures for in-memory analytics, which makes using pandas to analyze datasets that are larger than memory datasets somewhat tricky. Even datasets that are a sizable fraction of memory become unwieldy, as some pandas operations need to make intermediate copies.

This document provides a few recommendations for scaling your analysis to larger datasets. It’s a complement to Enhancing performance, which focuses on speeding up analysis for datasets that fit in memory.

Load less data

Suppose our raw dataset on disk has many columns.

In [1]: import pandas as pd

In [2]: import numpy as np

In [3]: def make_timeseries(start="2000-01-01", end="2000-12-31", freq="1D", seed=None):
   ...:     index = pd.date_range(start=start, end=end, freq=freq, name="timestamp")
   ...:     n = len(index)
   ...:     state = np.random.RandomState(seed)
   ...:     columns = {
   ...:         "name": state.choice(["Alice", "Bob", "Charlie"], size=n),
   ...:         "id": state.poisson(1000, size=n),
   ...:         "x": state.rand(n) * 2 - 1,
   ...:         "y": state.rand(n) * 2 - 1,
   ...:     }
   ...:     df = pd.DataFrame(columns, index=index, columns=sorted(columns))
   ...:     if df.index[-1] == end:
   ...:         df = df.iloc[:-1]
   ...:     return df
   ...: 

In [4]: timeseries = [
   ...:     make_timeseries(freq="1T", seed=i).rename(columns=lambda x: f"{x}_{i}")
   ...:     for i in range(10)
   ...: ]
   ...: 

In [5]: ts_wide = pd.concat(timeseries, axis=1)

In [6]: ts_wide.head()
Out[6]: 
                     id_0 name_0       x_0  ...   name_9       x_9       y_9
timestamp                                   ...                             
2000-01-01 00:00:00   977  Alice -0.821225  ...  Charlie -0.957208 -0.757508
2000-01-01 00:01:00  1018    Bob -0.219182  ...    Alice -0.414445 -0.100298
2000-01-01 00:02:00   927  Alice  0.660908  ...  Charlie -0.325838  0.581859
2000-01-01 00:03:00   997    Bob -0.852458  ...      Bob  0.992033 -0.686692
2000-01-01 00:04:00   965    Bob  0.717283  ...  Charlie -0.924556 -0.184161

[5 rows x 40 columns]

In [7]: ts_wide.to_parquet("timeseries_wide.parquet")
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
Cell In[7], line 1
----> 1 ts_wide.to_parquet("timeseries_wide.parquet")

File /usr/lib/python3/dist-packages/pandas/core/frame.py:2970, in DataFrame.to_parquet(self, path, engine, compression, index, partition_cols, storage_options, **kwargs)
   2882 """
   2883 Write a DataFrame to the binary parquet format.
   2884 
   (...)
   2966 >>> content = f.read()
   2967 """
   2968 from pandas.io.parquet import to_parquet
-> 2970 return to_parquet(
   2971     self,
   2972     path,
   2973     engine,
   2974     compression=compression,
   2975     index=index,
   2976     partition_cols=partition_cols,
   2977     storage_options=storage_options,
   2978     **kwargs,
   2979 )

File /usr/lib/python3/dist-packages/pandas/io/parquet.py:479, in to_parquet(df, path, engine, compression, index, storage_options, partition_cols, filesystem, **kwargs)
    477 if isinstance(partition_cols, str):
    478     partition_cols = [partition_cols]
--> 479 impl = get_engine(engine)
    481 path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path
    483 impl.write(
    484     df,
    485     path_or_buf,
   (...)
    491     **kwargs,
    492 )

File /usr/lib/python3/dist-packages/pandas/io/parquet.py:66, in get_engine(engine)
     63         except ImportError as err:
     64             error_msgs += "\n - " + str(err)
---> 66     raise ImportError(
     67         "Unable to find a usable engine; "
     68         "tried using: 'pyarrow', 'fastparquet'.\n"
     69         "A suitable version of "
     70         "pyarrow or fastparquet is required for parquet "
     71         "support.\n"
     72         "Trying to import the above resulted in these errors:"
     73         f"{error_msgs}"
     74     )
     76 if engine == "pyarrow":
     77     return PyArrowImpl()

ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
 - Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
 - Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.

To load the columns we want, we have two options. Option 1 loads in all the data and then filters to what we need.

In [8]: columns = ["id_0", "name_0", "x_0", "y_0"]

In [9]: pd.read_parquet("timeseries_wide.parquet")[columns]
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
Cell In[9], line 1
----> 1 pd.read_parquet("timeseries_wide.parquet")[columns]

File /usr/lib/python3/dist-packages/pandas/io/parquet.py:654, in read_parquet(path, engine, columns, storage_options, use_nullable_dtypes, dtype_backend, filesystem, filters, **kwargs)
    501 @doc(storage_options=_shared_docs["storage_options"])
    502 def read_parquet(
    503     path: FilePath | ReadBuffer[bytes],
   (...)
    511     **kwargs,
    512 ) -> DataFrame:
    513     """
    514     Load a parquet object from the file path, returning a DataFrame.
    515 
   (...)
    651     1    4    9
    652     """
--> 654     impl = get_engine(engine)
    656     if use_nullable_dtypes is not lib.no_default:
    657         msg = (
    658             "The argument 'use_nullable_dtypes' is deprecated and will be removed "
    659             "in a future version."
    660         )

File /usr/lib/python3/dist-packages/pandas/io/parquet.py:66, in get_engine(engine)
     63         except ImportError as err:
     64             error_msgs += "\n - " + str(err)
---> 66     raise ImportError(
     67         "Unable to find a usable engine; "
     68         "tried using: 'pyarrow', 'fastparquet'.\n"
     69         "A suitable version of "
     70         "pyarrow or fastparquet is required for parquet "
     71         "support.\n"
     72         "Trying to import the above resulted in these errors:"
     73         f"{error_msgs}"
     74     )
     76 if engine == "pyarrow":
     77     return PyArrowImpl()

ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
 - Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
 - Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.

Option 2 only loads the columns we request.

In [10]: pd.read_parquet("timeseries_wide.parquet", columns=columns)
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
Cell In[10], line 1
----> 1 pd.read_parquet("timeseries_wide.parquet", columns=columns)

File /usr/lib/python3/dist-packages/pandas/io/parquet.py:654, in read_parquet(path, engine, columns, storage_options, use_nullable_dtypes, dtype_backend, filesystem, filters, **kwargs)
    501 @doc(storage_options=_shared_docs["storage_options"])
    502 def read_parquet(
    503     path: FilePath | ReadBuffer[bytes],
   (...)
    511     **kwargs,
    512 ) -> DataFrame:
    513     """
    514     Load a parquet object from the file path, returning a DataFrame.
    515 
   (...)
    651     1    4    9
    652     """
--> 654     impl = get_engine(engine)
    656     if use_nullable_dtypes is not lib.no_default:
    657         msg = (
    658             "The argument 'use_nullable_dtypes' is deprecated and will be removed "
    659             "in a future version."
    660         )

File /usr/lib/python3/dist-packages/pandas/io/parquet.py:66, in get_engine(engine)
     63         except ImportError as err:
     64             error_msgs += "\n - " + str(err)
---> 66     raise ImportError(
     67         "Unable to find a usable engine; "
     68         "tried using: 'pyarrow', 'fastparquet'.\n"
     69         "A suitable version of "
     70         "pyarrow or fastparquet is required for parquet "
     71         "support.\n"
     72         "Trying to import the above resulted in these errors:"
     73         f"{error_msgs}"
     74     )
     76 if engine == "pyarrow":
     77     return PyArrowImpl()

ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
 - Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
 - Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.

If we were to measure the memory usage of the two calls, we’d see that specifying columns uses about 1/10th the memory in this case.

With pandas.read_csv(), you can specify usecols to limit the columns read into memory. Not all file formats that can be read by pandas provide an option to read a subset of columns.

Use efficient datatypes

The default pandas data types are not the most memory efficient. This is especially true for text data columns with relatively few unique values (commonly referred to as “low-cardinality” data). By using more efficient data types, you can store larger datasets in memory.

In [11]: ts = make_timeseries(freq="30S", seed=0)

In [12]: ts.to_parquet("timeseries.parquet")
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
Cell In[12], line 1
----> 1 ts.to_parquet("timeseries.parquet")

File /usr/lib/python3/dist-packages/pandas/core/frame.py:2970, in DataFrame.to_parquet(self, path, engine, compression, index, partition_cols, storage_options, **kwargs)
   2882 """
   2883 Write a DataFrame to the binary parquet format.
   2884 
   (...)
   2966 >>> content = f.read()
   2967 """
   2968 from pandas.io.parquet import to_parquet
-> 2970 return to_parquet(
   2971     self,
   2972     path,
   2973     engine,
   2974     compression=compression,
   2975     index=index,
   2976     partition_cols=partition_cols,
   2977     storage_options=storage_options,
   2978     **kwargs,
   2979 )

File /usr/lib/python3/dist-packages/pandas/io/parquet.py:479, in to_parquet(df, path, engine, compression, index, storage_options, partition_cols, filesystem, **kwargs)
    477 if isinstance(partition_cols, str):
    478     partition_cols = [partition_cols]
--> 479 impl = get_engine(engine)
    481 path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path
    483 impl.write(
    484     df,
    485     path_or_buf,
   (...)
    491     **kwargs,
    492 )

File /usr/lib/python3/dist-packages/pandas/io/parquet.py:66, in get_engine(engine)
     63         except ImportError as err:
     64             error_msgs += "\n - " + str(err)
---> 66     raise ImportError(
     67         "Unable to find a usable engine; "
     68         "tried using: 'pyarrow', 'fastparquet'.\n"
     69         "A suitable version of "
     70         "pyarrow or fastparquet is required for parquet "
     71         "support.\n"
     72         "Trying to import the above resulted in these errors:"
     73         f"{error_msgs}"
     74     )
     76 if engine == "pyarrow":
     77     return PyArrowImpl()

ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
 - Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
 - Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.

In [13]: ts = pd.read_parquet("timeseries.parquet")
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
Cell In[13], line 1
----> 1 ts = pd.read_parquet("timeseries.parquet")

File /usr/lib/python3/dist-packages/pandas/io/parquet.py:654, in read_parquet(path, engine, columns, storage_options, use_nullable_dtypes, dtype_backend, filesystem, filters, **kwargs)
    501 @doc(storage_options=_shared_docs["storage_options"])
    502 def read_parquet(
    503     path: FilePath | ReadBuffer[bytes],
   (...)
    511     **kwargs,
    512 ) -> DataFrame:
    513     """
    514     Load a parquet object from the file path, returning a DataFrame.
    515 
   (...)
    651     1    4    9
    652     """
--> 654     impl = get_engine(engine)
    656     if use_nullable_dtypes is not lib.no_default:
    657         msg = (
    658             "The argument 'use_nullable_dtypes' is deprecated and will be removed "
    659             "in a future version."
    660         )

File /usr/lib/python3/dist-packages/pandas/io/parquet.py:66, in get_engine(engine)
     63         except ImportError as err:
     64             error_msgs += "\n - " + str(err)
---> 66     raise ImportError(
     67         "Unable to find a usable engine; "
     68         "tried using: 'pyarrow', 'fastparquet'.\n"
     69         "A suitable version of "
     70         "pyarrow or fastparquet is required for parquet "
     71         "support.\n"
     72         "Trying to import the above resulted in these errors:"
     73         f"{error_msgs}"
     74     )
     76 if engine == "pyarrow":
     77     return PyArrowImpl()

ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
 - Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
 - Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.

In [14]: ts
Out[14]: 
                       id     name         x         y
timestamp                                             
2000-01-01 00:00:00  1041    Alice  0.889987  0.281011
2000-01-01 00:00:30   988      Bob -0.455299  0.488153
2000-01-01 00:01:00  1018    Alice  0.096061  0.580473
2000-01-01 00:01:30   992      Bob  0.142482  0.041665
2000-01-01 00:02:00   960      Bob -0.036235  0.802159
...                   ...      ...       ...       ...
2000-12-30 23:58:00  1022    Alice  0.266191  0.875579
2000-12-30 23:58:30   974    Alice -0.009826  0.413686
2000-12-30 23:59:00  1028  Charlie  0.307108 -0.656789
2000-12-30 23:59:30  1002    Alice  0.202602  0.541335
2000-12-31 00:00:00   987    Alice  0.200832  0.615972

[1051201 rows x 4 columns]

Now, let’s inspect the data types and memory usage to see where we should focus our attention.

In [15]: ts.dtypes
Out[15]: 
id        int64
name     object
x       float64
y       float64
dtype: object
In [16]: ts.memory_usage(deep=True)  # memory usage in bytes
Out[16]: 
Index     8409608
id        8409608
name     65176434
x         8409608
y         8409608
dtype: int64

The name column is taking up much more memory than any other. It has just a few unique values, so it’s a good candidate for converting to a pandas.Categorical. With a pandas.Categorical, we store each unique name once and use space-efficient integers to know which specific name is used in each row.

In [17]: ts2 = ts.copy()

In [18]: ts2["name"] = ts2["name"].astype("category")

In [19]: ts2.memory_usage(deep=True)
Out[19]: 
Index    8409608
id       8409608
name     1051495
x        8409608
y        8409608
dtype: int64

We can go a bit further and downcast the numeric columns to their smallest types using pandas.to_numeric().

In [20]: ts2["id"] = pd.to_numeric(ts2["id"], downcast="unsigned")

In [21]: ts2[["x", "y"]] = ts2[["x", "y"]].apply(pd.to_numeric, downcast="float")

In [22]: ts2.dtypes
Out[22]: 
id        uint16
name    category
x        float32
y        float32
dtype: object
In [23]: ts2.memory_usage(deep=True)
Out[23]: 
Index    8409608
id       2102402
name     1051495
x        4204804
y        4204804
dtype: int64
In [24]: reduction = ts2.memory_usage(deep=True).sum() / ts.memory_usage(deep=True).sum()

In [25]: print(f"{reduction:0.2f}")
0.20

In all, we’ve reduced the in-memory footprint of this dataset to 1/5 of its original size.

See Categorical data for more on pandas.Categorical and dtypes for an overview of all of pandas’ dtypes.

Use chunking

Some workloads can be achieved with chunking by splitting a large problem into a bunch of small problems. For example, converting an individual CSV file into a Parquet file and repeating that for each file in a directory. As long as each chunk fits in memory, you can work with datasets that are much larger than memory.

Note

Chunking works well when the operation you’re performing requires zero or minimal coordination between chunks. For more complicated workflows, you’re better off using another library.

Suppose we have an even larger “logical dataset” on disk that’s a directory of parquet files. Each file in the directory represents a different year of the entire dataset.

In [26]: import pathlib

In [27]: N = 12

In [28]: starts = [f"20{i:>02d}-01-01" for i in range(N)]

In [29]: ends = [f"20{i:>02d}-12-13" for i in range(N)]

In [30]: pathlib.Path("data/timeseries").mkdir(exist_ok=True)

In [31]: for i, (start, end) in enumerate(zip(starts, ends)):
   ....:     ts = make_timeseries(start=start, end=end, freq="1T", seed=i)
   ....:     ts.to_parquet(f"data/timeseries/ts-{i:0>2d}.parquet")
   ....: 
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
Cell In[31], line 3
      1 for i, (start, end) in enumerate(zip(starts, ends)):
      2     ts = make_timeseries(start=start, end=end, freq="1T", seed=i)
----> 3     ts.to_parquet(f"data/timeseries/ts-{i:0>2d}.parquet")

File /usr/lib/python3/dist-packages/pandas/core/frame.py:2970, in DataFrame.to_parquet(self, path, engine, compression, index, partition_cols, storage_options, **kwargs)
   2882 """
   2883 Write a DataFrame to the binary parquet format.
   2884 
   (...)
   2966 >>> content = f.read()
   2967 """
   2968 from pandas.io.parquet import to_parquet
-> 2970 return to_parquet(
   2971     self,
   2972     path,
   2973     engine,
   2974     compression=compression,
   2975     index=index,
   2976     partition_cols=partition_cols,
   2977     storage_options=storage_options,
   2978     **kwargs,
   2979 )

File /usr/lib/python3/dist-packages/pandas/io/parquet.py:479, in to_parquet(df, path, engine, compression, index, storage_options, partition_cols, filesystem, **kwargs)
    477 if isinstance(partition_cols, str):
    478     partition_cols = [partition_cols]
--> 479 impl = get_engine(engine)
    481 path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path
    483 impl.write(
    484     df,
    485     path_or_buf,
   (...)
    491     **kwargs,
    492 )

File /usr/lib/python3/dist-packages/pandas/io/parquet.py:66, in get_engine(engine)
     63         except ImportError as err:
     64             error_msgs += "\n - " + str(err)
---> 66     raise ImportError(
     67         "Unable to find a usable engine; "
     68         "tried using: 'pyarrow', 'fastparquet'.\n"
     69         "A suitable version of "
     70         "pyarrow or fastparquet is required for parquet "
     71         "support.\n"
     72         "Trying to import the above resulted in these errors:"
     73         f"{error_msgs}"
     74     )
     76 if engine == "pyarrow":
     77     return PyArrowImpl()

ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
 - Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
 - Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.
data
└── timeseries
    ├── ts-00.parquet
    ├── ts-01.parquet
    ├── ts-02.parquet
    ├── ts-03.parquet
    ├── ts-04.parquet
    ├── ts-05.parquet
    ├── ts-06.parquet
    ├── ts-07.parquet
    ├── ts-08.parquet
    ├── ts-09.parquet
    ├── ts-10.parquet
    └── ts-11.parquet

Now we’ll implement an out-of-core pandas.Series.value_counts(). The peak memory usage of this workflow is the single largest chunk, plus a small series storing the unique value counts up to this point. As long as each individual file fits in memory, this will work for arbitrary-sized datasets.

In [32]: %%time
   ....: files = pathlib.Path("data/timeseries/").glob("ts*.parquet")
   ....: counts = pd.Series(dtype=int)
   ....: for path in files:
   ....:     df = pd.read_parquet(path)
   ....:     counts = counts.add(df["name"].value_counts(), fill_value=0)
   ....: counts.astype(int)
   ....: 
CPU times: user 383 us, sys: 0 ns, total: 383 us
Wall time: 391 us
Out[32]: Series([], dtype: int64)

Some readers, like pandas.read_csv(), offer parameters to control the chunksize when reading a single file.

Manually chunking is an OK option for workflows that don’t require too sophisticated of operations. Some operations, like pandas.DataFrame.groupby(), are much harder to do chunkwise. In these cases, you may be better switching to a different library that implements these out-of-core algorithms for you.

Use Dask

pandas is just one library offering a DataFrame API. Because of its popularity, pandas’ API has become something of a standard that other libraries implement. The pandas documentation maintains a list of libraries implementing a DataFrame API in the ecosystem page.

For example, Dask, a parallel computing library, has dask.dataframe, a pandas-like API for working with larger than memory datasets in parallel. Dask can use multiple threads or processes on a single machine, or a cluster of machines to process data in parallel.

We’ll import dask.dataframe and notice that the API feels similar to pandas. We can use Dask’s read_parquet function, but provide a globstring of files to read in.

In [33]: import dask.dataframe as dd
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[33], line 1
----> 1 import dask.dataframe as dd

File /usr/lib/python3/dist-packages/dask/dataframe/__init__.py:98
     96 import dask.dataframe._pyarrow_compat
     97 from dask.base import compute
---> 98 from dask.dataframe import backends, dispatch, rolling
     99 from dask.dataframe.core import (
    100     DataFrame,
    101     Index,
   (...)
    107     to_timedelta,
    108 )
    109 from dask.dataframe.groupby import Aggregation

File /usr/lib/python3/dist-packages/dask/dataframe/backends.py:15
     13 from dask.backends import CreationDispatch, DaskBackendEntrypoint
     14 from dask.dataframe._compat import PANDAS_GE_220, is_any_real_numeric_dtype
---> 15 from dask.dataframe.core import DataFrame, Index, Scalar, Series, _Frame
     16 from dask.dataframe.dispatch import (
     17     categorical_dtype_dispatch,
     18     concat,
   (...)
     35     union_categoricals_dispatch,
     36 )
     37 from dask.dataframe.extensions import make_array_nonempty, make_scalar

File /usr/lib/python3/dist-packages/dask/dataframe/core.py:36
     34 from dask.blockwise import Blockwise, BlockwiseDep, BlockwiseDepDict, blockwise
     35 from dask.context import globalmethod
---> 36 from dask.dataframe import methods
     37 from dask.dataframe._compat import (
     38     PANDAS_GE_140,
     39     PANDAS_GE_150,
   (...)
     47     is_string_dtype,
     48 )
     49 from dask.dataframe.accessor import CachedAccessor, DatetimeAccessor, StringAccessor

File /usr/lib/python3/dist-packages/dask/dataframe/methods.py:34
     22 #  preserve compatibility while moving dispatch objects
     23 from dask.dataframe.dispatch import (  # noqa: F401
     24     concat,
     25     concat_dispatch,
   (...)
     32     union_categoricals,
     33 )
---> 34 from dask.dataframe.utils import is_dataframe_like, is_index_like, is_series_like
     35 from dask.utils import _deprecated_kwarg
     37 # cuDF may try to import old dispatch functions

File /usr/lib/python3/dist-packages/dask/dataframe/utils.py:20
     18 from dask.base import get_scheduler, is_dask_collection
     19 from dask.core import get_deps
---> 20 from dask.dataframe import (  # noqa: F401 register pandas extension types
     21     _dtypes,
     22     methods,
     23 )
     24 from dask.dataframe._compat import PANDAS_GE_150, tm  # noqa: F401
     25 from dask.dataframe.dispatch import (  # noqa : F401
     26     make_meta,
     27     make_meta_obj,
     28     meta_nonempty,
     29 )

File /usr/lib/python3/dist-packages/dask/dataframe/_dtypes.py:9
      6 import pandas as pd
      8 from dask.dataframe._compat import PANDAS_GE_150
----> 9 from dask.dataframe.extensions import make_array_nonempty, make_scalar
     12 @make_array_nonempty.register(pd.DatetimeTZDtype)
     13 def _(dtype):
     14     return pd.array([pd.Timestamp(1), pd.NaT], dtype=dtype)

File /usr/lib/python3/dist-packages/dask/dataframe/extensions.py:8
      1 """
      2 Support for pandas ExtensionArray in dask.dataframe.
      3 
      4 See :ref:`extensionarrays` for more.
      5 """
      6 from __future__ import annotations
----> 8 from dask.dataframe.accessor import (
      9     register_dataframe_accessor,
     10     register_index_accessor,
     11     register_series_accessor,
     12 )
     13 from dask.utils import Dispatch
     15 make_array_nonempty = Dispatch("make_array_nonempty")

File /usr/lib/python3/dist-packages/dask/dataframe/accessor.py:126
    113         token = f"{self._accessor_name}-{attr}"
    114         return self._series.map_partitions(
    115             self._delegate_method,
    116             self._accessor_name,
   (...)
    122             token=token,
    123         )
--> 126 class DatetimeAccessor(Accessor):
    127     """Accessor object for datetimelike properties of the Series values.
    128 
    129     Examples
   (...)
    132     >>> s.dt.microsecond  # doctest: +SKIP
    133     """
    135     _accessor_name = "dt"

File /usr/lib/python3/dist-packages/dask/dataframe/accessor.py:81, in Accessor.__init_subclass__(cls, **kwargs)
     79 attr, min_version = item if isinstance(item, tuple) else (item, None)
     80 if not hasattr(cls, attr):
---> 81     _bind_property(cls, pd_cls, attr, min_version)

File /usr/lib/python3/dist-packages/dask/dataframe/accessor.py:35, in _bind_property(cls, pd_cls, attr, min_version)
     33 except Exception:
     34     pass
---> 35 setattr(cls, attr, property(derived_from(pd_cls, version=min_version)(func)))

File /usr/lib/python3/dist-packages/dask/utils.py:983, in derived_from.<locals>.wrapper(method)
    981 try:
    982     extra = getattr(method, "__doc__", None) or ""
--> 983     method.__doc__ = _derived_from(
    984         original_klass,
    985         method,
    986         ua_args=ua_args,
    987         extra=extra,
    988         skipblocks=skipblocks,
    989         inconsistencies=inconsistencies,
    990     )
    991     return method
    993 except AttributeError:

File /usr/lib/python3/dist-packages/dask/utils.py:936, in _derived_from(cls, method, ua_args, extra, skipblocks, inconsistencies)
    934 # Mark unsupported arguments
    935 try:
--> 936     method_args = get_named_args(method)
    937     original_args = get_named_args(original_method)
    938     not_supported = [m for m in original_args if m not in method_args]

File /usr/lib/python3/dist-packages/dask/utils.py:697, in get_named_args(func)
    695 def get_named_args(func) -> list[str]:
    696     """Get all non ``*args/**kwargs`` arguments for a function"""
--> 697     s = inspect.signature(func)
    698     return [
    699         n
    700         for n, p in s.parameters.items()
    701         if p.kind in [p.POSITIONAL_OR_KEYWORD, p.POSITIONAL_ONLY, p.KEYWORD_ONLY]
    702     ]

File /usr/lib/python3.11/inspect.py:3263, in signature(obj, follow_wrapped, globals, locals, eval_str)
   3261 def signature(obj, *, follow_wrapped=True, globals=None, locals=None, eval_str=False):
   3262     """Get a signature object for the passed callable."""
-> 3263     return Signature.from_callable(obj, follow_wrapped=follow_wrapped,
   3264                                    globals=globals, locals=locals, eval_str=eval_str)

File /usr/lib/python3.11/inspect.py:3011, in Signature.from_callable(cls, obj, follow_wrapped, globals, locals, eval_str)
   3007 @classmethod
   3008 def from_callable(cls, obj, *,
   3009                   follow_wrapped=True, globals=None, locals=None, eval_str=False):
   3010     """Constructs Signature for the given callable object."""
-> 3011     return _signature_from_callable(obj, sigcls=cls,
   3012                                     follow_wrapper_chains=follow_wrapped,
   3013                                     globals=globals, locals=locals, eval_str=eval_str)

File /usr/lib/python3.11/inspect.py:2599, in _signature_from_callable(obj, follow_wrapper_chains, skip_bound_arg, globals, locals, eval_str, sigcls)
   2597     call = getattr_static(type(obj), '__call__', None)
   2598     if call is not None:
-> 2599         call = _descriptor_get(call, obj)
   2600         return _get_signature_of(call)
   2602 raise ValueError('callable {!r} is not supported by signature'.format(obj))

File /usr/lib/python3.11/inspect.py:2432, in _descriptor_get(descriptor, obj)
   2430 if get is _sentinel:
   2431     return descriptor
-> 2432 return get(descriptor, obj, type(obj))

TypeError: descriptor '__call__' for 'type' objects doesn't apply to a 'property' object

In [34]: ddf = dd.read_parquet("data/timeseries/ts*.parquet", engine="pyarrow")
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[34], line 1
----> 1 ddf = dd.read_parquet("data/timeseries/ts*.parquet", engine="pyarrow")

NameError: name 'dd' is not defined

In [35]: ddf
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[35], line 1
----> 1 ddf

NameError: name 'ddf' is not defined

Inspecting the ddf object, we see a few things

  • There are familiar attributes like .columns and .dtypes

  • There are familiar methods like .groupby, .sum, etc.

  • There are new attributes like .npartitions and .divisions

The partitions and divisions are how Dask parallelizes computation. A Dask DataFrame is made up of many pandas pandas.DataFrame. A single method call on a Dask DataFrame ends up making many pandas method calls, and Dask knows how to coordinate everything to get the result.

In [36]: ddf.columns
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[36], line 1
----> 1 ddf.columns

NameError: name 'ddf' is not defined

In [37]: ddf.dtypes
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[37], line 1
----> 1 ddf.dtypes

NameError: name 'ddf' is not defined

In [38]: ddf.npartitions
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[38], line 1
----> 1 ddf.npartitions

NameError: name 'ddf' is not defined

One major difference: the dask.dataframe API is lazy. If you look at the repr above, you’ll notice that the values aren’t actually printed out; just the column names and dtypes. That’s because Dask hasn’t actually read the data yet. Rather than executing immediately, doing operations build up a task graph.

In [39]: ddf
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[39], line 1
----> 1 ddf

NameError: name 'ddf' is not defined

In [40]: ddf["name"]
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[40], line 1
----> 1 ddf["name"]

NameError: name 'ddf' is not defined

In [41]: ddf["name"].value_counts()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[41], line 1
----> 1 ddf["name"].value_counts()

NameError: name 'ddf' is not defined

Each of these calls is instant because the result isn’t being computed yet. We’re just building up a list of computation to do when someone needs the result. Dask knows that the return type of a pandas.Series.value_counts is a pandas pandas.Series with a certain dtype and a certain name. So the Dask version returns a Dask Series with the same dtype and the same name.

To get the actual result you can call .compute().

In [42]: %time ddf["name"].value_counts().compute()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
File <timed eval>:1

NameError: name 'ddf' is not defined

At that point, you get back the same thing you’d get with pandas, in this case a concrete pandas pandas.Series with the count of each name.

Calling .compute causes the full task graph to be executed. This includes reading the data, selecting the columns, and doing the value_counts. The execution is done in parallel where possible, and Dask tries to keep the overall memory footprint small. You can work with datasets that are much larger than memory, as long as each partition (a regular pandas pandas.DataFrame) fits in memory.

By default, dask.dataframe operations use a threadpool to do operations in parallel. We can also connect to a cluster to distribute the work on many machines. In this case we’ll connect to a local “cluster” made up of several processes on this single machine.

>>> from dask.distributed import Client, LocalCluster

>>> cluster = LocalCluster()
>>> client = Client(cluster)
>>> client
<Client: 'tcp://127.0.0.1:53349' processes=4 threads=8, memory=17.18 GB>

Once this client is created, all of Dask’s computation will take place on the cluster (which is just processes in this case).

Dask implements the most used parts of the pandas API. For example, we can do a familiar groupby aggregation.

In [43]: %time ddf.groupby("name")[["x", "y"]].mean().compute().head()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
File <timed eval>:1

NameError: name 'ddf' is not defined

The grouping and aggregation is done out-of-core and in parallel.

When Dask knows the divisions of a dataset, certain optimizations are possible. When reading parquet datasets written by dask, the divisions will be known automatically. In this case, since we created the parquet files manually, we need to supply the divisions manually.

In [44]: N = 12

In [45]: starts = [f"20{i:>02d}-01-01" for i in range(N)]

In [46]: ends = [f"20{i:>02d}-12-13" for i in range(N)]

In [47]: divisions = tuple(pd.to_datetime(starts)) + (pd.Timestamp(ends[-1]),)

In [48]: ddf.divisions = divisions
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[48], line 1
----> 1 ddf.divisions = divisions

NameError: name 'ddf' is not defined

In [49]: ddf
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[49], line 1
----> 1 ddf

NameError: name 'ddf' is not defined

Now we can do things like fast random access with .loc.

In [50]: ddf.loc["2002-01-01 12:01":"2002-01-01 12:05"].compute()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[50], line 1
----> 1 ddf.loc["2002-01-01 12:01":"2002-01-01 12:05"].compute()

NameError: name 'ddf' is not defined

Dask knows to just look in the 3rd partition for selecting values in 2002. It doesn’t need to look at any other data.

Many workflows involve a large amount of data and processing it in a way that reduces the size to something that fits in memory. In this case, we’ll resample to daily frequency and take the mean. Once we’ve taken the mean, we know the results will fit in memory, so we can safely call compute without running out of memory. At that point it’s just a regular pandas object.

In [51]: ddf[["x", "y"]].resample("1D").mean().cumsum().compute().plot()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[51], line 1
----> 1 ddf[["x", "y"]].resample("1D").mean().cumsum().compute().plot()

NameError: name 'ddf' is not defined
savefig/dask_resample.png

These Dask examples have all be done using multiple processes on a single machine. Dask can be deployed on a cluster to scale up to even larger datasets.

You see more dask examples at https://examples.dask.org.