I am trying to import a very large .csv file as:
import dask.dataframe as dd
import pandas as pd
#TO DO
dd_subf1_small = dd.read_csv('subf1_small.csv', dtype={'Unnamed: 0': 'float64','oecd_subfield':'object','paperid':'object'}, sep=None, engine = 'python').persist()
but I am getting the following error:
---------------------------------------------------------------------------
ParserError Traceback (most recent call last)
Cell In [1], line 5
2 import pandas as pd
3 #TO DO
----> 5 dd_subf1_small = dd.read_csv('subf1_small.csv', dtype={'Unnamed: 0': 'float64','oecd_subfield':'object','paperid':'object'}, sep=None, engine = 'python').persist()
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/base.py:288, in DaskMethodsMixin.persist(self, **kwargs)
249 def persist(self, **kwargs):
250 """Persist this dask collection into memory
251
252 This turns a lazy Dask collection into a Dask collection with the same
(...)
286 dask.base.persist
287 """
--> 288 (result,) = persist(self, traverse=False, **kwargs)
289 return result
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/base.py:904, in persist(traverse, optimize_graph, scheduler, *args, **kwargs)
901 keys.extend(a_keys)
902 postpersists.append((rebuild, a_keys, state))
--> 904 results = schedule(dsk, keys, **kwargs)
905 d = dict(zip(keys, results))
906 results2 = [r({k: d[k] for k in ks}, *s) for r, ks, s in postpersists]
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
86 elif isinstance(pool, multiprocessing.pool.Pool):
87 pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
90 pool.submit,
91 pool._max_workers,
92 dsk,
93 keys,
94 cache=cache,
95 get_id=_thread_get_id,
96 pack_exception=pack_exception,
97 **kwargs,
98 )
100 # Cleanup pools associated to dead threads
101 with pools_lock:
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
509 _execute_task(task, data) # Re-execute locally
510 else:
--> 511 raise_exception(exc, tb)
512 res, worker_id = loads(res_info)
513 state["cache"][key] = res
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/local.py:319, in reraise(exc, tb)
317 if exc.__traceback__ is not tb:
318 raise exc.with_traceback(tb)
--> 319 raise exc
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
222 try:
223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
225 id = get_id()
226 result = dumps((result, id))
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/optimization.py:990, in SubgraphCallable.__call__(self, *args)
988 if not len(args) == len(self.inkeys):
989 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/core.py:149, in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/dataframe/io/csv.py:129, in CSVFunctionWrapper.__call__(self, part)
126 rest_kwargs["usecols"] = columns
128 # Call `pandas_read_text`
--> 129 df = pandas_read_text(
130 self.reader,
131 block,
132 self.header,
133 rest_kwargs,
134 self.dtypes,
135 columns,
136 write_header,
137 self.enforce,
138 path_info,
139 )
140 if project_after_read:
141 return df[self.columns]
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/dataframe/io/csv.py:182, in pandas_read_text(reader, b, header, kwargs, dtypes, columns, write_header, enforce, path)
180 bio.write(b)
181 bio.seek(0)
--> 182 df = reader(bio, **kwargs)
183 if dtypes:
184 coerce_dtypes(df, dtypes)
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/pandas/util/_decorators.py:311, in deprecate_nonkeyword_arguments.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
305 if len(args) > num_allow_args:
306 warnings.warn(
307 msg.format(arguments=arguments),
308 FutureWarning,
309 stacklevel=stacklevel,
310 )
--> 311 return func(*args, **kwargs)
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/pandas/io/parsers/readers.py:678, in read_csv(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, skipfooter, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, doublequote, escapechar, comment, encoding, encoding_errors, dialect, error_bad_lines, warn_bad_lines, on_bad_lines, delim_whitespace, low_memory, memory_map, float_precision, storage_options)
663 kwds_defaults = _refine_defaults_read(
664 dialect,
665 delimiter,
(...)
674 defaults={"delimiter": ","},
675 )
676 kwds.update(kwds_defaults)
--> 678 return _read(filepath_or_buffer, kwds)
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/pandas/io/parsers/readers.py:581, in _read(filepath_or_buffer, kwds)
578 return parser
580 with parser:
--> 581 return parser.read(nrows)
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/pandas/io/parsers/readers.py:1253, in TextFileReader.read(self, nrows)
1251 nrows = validate_integer("nrows", nrows)
1252 try:
-> 1253 index, columns, col_dict = self._engine.read(nrows)
1254 except Exception:
1255 self.close()
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/pandas/io/parsers/python_parser.py:270, in PythonParser.read(self, rows)
267 indexnamerow = content[0]
268 content = content[1:]
--> 270 alldata = self._rows_to_cols(content)
271 data, columns = self._exclude_implicit_index(alldata)
273 conv_data = self._convert_data(data)
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/pandas/io/parsers/python_parser.py:1013, in PythonParser._rows_to_cols(self, content)
1007 reason = (
1008 "Error could possibly be due to quotes being "
1009 "ignored when a multi-char delimiter is used."
1010 )
1011 msg = ". " reason
-> 1013 self._alert_malformed(msg, row_num 1)
1015 # see gh-13320
1016 zipped_content = list(lib.to_object_array(content, min_width=col_len).T)
File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/pandas/io/parsers/python_parser.py:739, in PythonParser._alert_malformed(self, msg, row_num)
722 """
723 Alert a user about a malformed row, depending on value of
724 `self.on_bad_lines` enum.
(...)
736 even though we 0-index internally.
737 """
738 if self.on_bad_lines == self.BadLineHandleMethod.ERROR:
--> 739 raise ParserError(msg)
740 elif self.on_bad_lines == self.BadLineHandleMethod.WARN:
741 base = f"Skipping line {row_num}: "
ParserError: Expected 3 fields in line 1811036, saw 5
Actually i don't know how the data are made as the csv file is 36gb and did not manage to open. I saw another question where the erro was passing header=None which I am not doing.
How can I avoid the above error?
Thanks!
CodePudding user response:
As the error says, your CSV file probably contains rows with 5 values instead of 3.
You have two options:
- Found those rows and fix/remove them from the file. This might be challenging given the file is huge.
- use paramter
on_bad_lines="skip"
to let pandas skip them and continue loading the file.
Learn more about on_bad_lines
here: https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html
Also, I noticed you are using sep=None
. Why? are the values in each row seperated by nothing? that doesn't make sense. The default (and most common delimiter (aka separator) is comma (,)). Post here an example of 3 lines from the file so I could assist with that.