I found a similar question here: Read CSV with PyArrow
In this answer it references sys.stdin.buffer and sys.stdout.buffer, but I am not exactly sure how that would be used to write the .arrow file, or name it. I can't seem to find the exact information I am looking for in the docs for pyarrow. My file will not have any nans, but it will have a timestamped index. The file is ~100 gb, so loading it into memory simply isn't an option. I tried changing the code, but as I assumed, the code ended up overwriting the previous file every loop.
***This is my first post. I would like to thank all the contributors who answered 99.9% of my other questions before I had even the asked them.
import sys
import pandas as pd
import pyarrow as pa
SPLIT_ROWS = 1 ### used one line chunks for a small test
def main():
writer = None
for split in pd.read_csv(sys.stdin.buffer, chunksize=SPLIT_ROWS):
table = pa.Table.from_pandas(split)
# Write out to file
with pa.OSFile('test.arrow', 'wb') as sink: ### no append mode yet
with pa.RecordBatchFileWriter(sink, table.schema) as writer:
writer.write_table(table)
writer.close()
if __name__ == "__main__":
main()
Below is the code I used in the command line
>cat data.csv | python test.py
CodePudding user response:
Solution adapted from @Martin-Evans code:
Closed file after the for loop as suggested by @Pace
import sys
import pandas as pd
import pyarrow as pa
SPLIT_ROWS = 1000000
def main():
schema = pa.Table.from_pandas(pd.read_csv('Data.csv',nrows=2)).schema
### reads first two lines to define schema
with pa.OSFile('test.arrow', 'wb') as sink:
with pa.RecordBatchFileWriter(sink, schema) as writer:
for split in pd.read_csv('Data.csv',chunksize=SPLIT_ROWS):
table = pa.Table.from_pandas(split)
writer.write_table(table)
writer.close()
if __name__ == "__main__":
main()
CodePudding user response:
As suggested by @Pace, you should consider moving the output file creation outside of the reading loop. Something like this:
import sys
import pandas as pd
import pyarrow as pa
SPLIT_ROWS = 1 ### used one line chunks for a small test
def main():
# Write out to file
with pa.OSFile('test.arrow', 'wb') as sink: ### no append mode yet
with pa.RecordBatchFileWriter(sink, table.schema) as writer:
for split in pd.read_csv('data.csv', chunksize=SPLIT_ROWS):
table = pa.Table.from_pandas(split)
writer.write_table(table)
if __name__ == "__main__":
main()
You also don't have to use sys.stdin.buffer
if you would prefer to specify specific input and output files. You could then just run the script as:
python test.py
By using with
statements, both writer
and sink
will be automatically closed afterwards (in this case when main()
returns). This means it should not be necessary to include an explicit close()
call.