I am trying to use dask to solve a pandas df memory issue, the api seems very simple as below:
import dask.dataframe as dd
df = dd.read_csv(input_file, encoding='utf8', dtype=str, error_bad_lines=False, sep='\t', keep_default_na=False)
# ...
date_column = [get_today()] * row_count
headers = list(df)
if 'entry_date' not in headers:
df["entry_date"] = date_column
However, this gives an error message:
File "/builder/csv_data_preprocessor.py", line 65, in process
df["entry_date"] = date_column
File ".venv/kg/lib/python3.7/site-packages/dask/dataframe/core.py", line 4113, in __setitem__
df = self.assign(**{key: value})
File ".venv/kg/lib/python3.7/site-packages/dask/dataframe/core.py", line 4425, in assign
df2 = self._meta_nonempty.assign(**_extract_meta(kwargs, nonempty=True))
File ".venv/kg/lib/python3.7/site-packages/dask/dataframe/core.py", line 5752, in _extract_meta
res[k] = _extract_meta(x[k], nonempty)
File ".venv/kg/lib/python3.7/site-packages/dask/dataframe/core.py", line 5756, in _extract_meta
"Cannot infer dataframe metadata with a `dask.delayed` argument"
ValueError: Cannot infer dataframe metadata with a `dask.delayed` argument
How can I add metadata info. here to fix the error? The error message comes from:
df["entry_date"] = date_column
CodePudding user response:
The problem with using this:
date_column = [get_today()] * row_count
is that it creates a large list in memory, possibly exceeding it. Dask also might not know how to allocate it across partitions. So the following should work (assuming that get_today()
return a datetime object or a string):
if 'entry_date' not in df.columns:
df["entry_date"] = get_today()