so ive been trying to load 1000 lines of a csv into elasticsearhc as 1000 different documents, the csv has 8 headers: telease year, title, origin/ethnicity, director, cast, wiki page, plot. my current code for loading the dataset loads it using the bulk command from helpers
import csv
from elasticsearch import helpers, Elasticsearch
es = Elasticsearch("http://localhost:9200")
es.indices.delete(index='movie-plots', ignore=[400, 404])
es.indices.create(index='movie-plots', body=body)
filename = 'wiki_movie_plots_deduped.csv'
def csv_reader(file_name):
with open(file_name, 'r') as outfile:
reader = csv.DictReader(outfile)
helpers.bulk(es, reader, index="movie-plots", doc_type="_doc")
this i think loads 1000 lines into one document.
CodePudding user response:
You are on the right path, the code below will split the csv into 1000 different items but splitting out the headers and turning each line item into a map/dictionary item with appropriate headers. This is then appended to a list so you upload a list of dictionary items.
import csv, sys
from elasticsearch import helpers, Elasticsearch, RequestsHttpConnection
es = Elasticsearch(
hosts=[{
'host': 'localhost',
'port': '9200'}],
use_ssl=False,
verify_certs=True,
connection_class=RequestsHttpConnection
)
upload_list = [] # list of items for upload
# Load all csv data
with open('my_folder/my_csv_file.csv', newline='') as csvfile:
data_list = []
csv_data = csv.reader(csvfile)
for row in csv_data:
data_list.append(row)
# separate out the headers from the main data
headers = data_list[0]
# drop headers from data_list
data_list.pop(0)
for item in data_list: # iterate over each row/item in the csv
item_dict = {}
# match a column header to the row data for an item
i = 0
for header in headers:
item_dict[header] = item[i]
i = i 1
# add the transformed item/row to a list of dicts
upload_list = [item_dict]
# using helper library's Bulk API to index list of Elasticsearch docs
try:
resp = helpers.bulk(
es,
upload_list,
index="my-index-name"
)
msg = "helpers.bulk() RESPONSE: " str(resp)
print(msg) # print the response returned by Elasticsearch
except Exception as err:
msg = "Elasticsearch helpers.bulk() ERROR: " str(err)
print(msg)
sys.exit(1)