I am trying to use a util function from the TF Recommender System (TFRS) to turn a TF Dataset into a Listwise TF Dataset.
Basically, the function (above) makes X amount of lists with Y amount of random samples in each list for every user Z in the dataset. My problem is that when I pass in a TF Dataset with a few million records in it, the program crashes.
tensor_slices = {"user_id": [], "movie_title": [], "user_rating": []}
It appears, that eventually, the tensor_slices
dict inside the function fills up with so much information that it run out of memory and crashes.
I modified the original function by turning each users sampled list into a TF dataset via the from_tensor_slices()
method at the end of processing each user. This allowed me to not let the tensor_slices
dict implode. I let the program loop through each user and concatenate each from_tensor_slice()
dataset onto each other before returning a full TF Datatset.
def sample_listwise(
rating_dataset: tf.data.Dataset,
num_list_per_user: int = 50,
num_examples_per_list: int = 5,
seed: Optional[int] = None,
) -> tf.data.Dataset:
random_state = np.random.RandomState(seed)
example_lists_by_user = defaultdict(_create_feature_dict)
movie_title_vocab = set()
for example in rating_dataset:
user_id = example["user_id"].numpy()
example_lists_by_user[user_id]["movie_title"].append(example["movie_title"])
example_lists_by_user[user_id]["user_rating"].append(example["user_rating"])
movie_title_vocab.add(example["movie_title"].numpy())
i = 0
for user_id, feature_lists in example_lists_by_user.items():
tensor_slices = {"user_id": [], "movie_title": [], "user_rating": []}
for _ in range(num_list_per_user):
# Drop the user if they don't have enough ratings.
if len(feature_lists["movie_title"]) < num_examples_per_list:
continue
sampled_movie_titles, sampled_ratings = _sample_list(
feature_lists,
num_examples_per_list,
random_state=random_state,
)
tensor_slices["user_id"].append(user_id)
tensor_slices["movie_title"].append(sampled_movie_titles)
tensor_slices["user_rating"].append(sampled_ratings)
# check if all lists for a user are stored in tensor_slices
if len(tensor_slices["user_id"]) == num_list_per_user:
tmp_tf_dataset = tf.data.Dataset.from_tensor_slices(tensor_slices)
# clear out tensor slice dict for that user
tensor_slices.clear()
# concat tmp_tf_dataset to the main tf dataset
if i == 0:
tf_dataset = tmp_tf_dataset
else:
tf_dataset = tf_dataset.concatenate(tmp_tf_dataset)
i = 1
return tf_dataset
I can pass the result of this function to a model if I keep the amount of data very small (250k records). If I increase the amount of data to process.... eventually the model fails with a Segmentation Fault
error.
So my question is, how do I properly concatenate all this data together to form one coherent dataset, so my program won't crash and I can side step the tenor_slices
dict implosion?
CodePudding user response:
The original function first creates example_lists_by_user
using the input dataset, then creates tensor_slices
object, and finally converts it to another tf.Dataset
. It takes a dataset and returns a dataset.
If the problem arises from the size of tensor_slices
(and not from example_lists_by_user
), there is a way to avoid creating it altogether, using a generator expression and tf.data.Dataset.from_generator
.
Specifically, you could have something like:
def sample_listwise(...):
# generate example_lists_by_user...
def example_generator():
for user_id, feature_lists in example_lists_by_user.items():
for _ in range(num_list_per_user):
movie_titles, ratings = _sample_list(...)
yield {'user_id': user_id, 'movie_title': movie_titles, 'user_rating': ratings}
# create a dataset from the generator function above
return tf.data.Dataset.from_generator(
example_generator,
output_signature={
'user_id':tf.TensorSpec([], tf.string),
'movie_title':tf.TensorSpec([num_examples_per_list], tf.string),
'user_rating':tf.TensorSpec([num_examples_per_list], tf.string)
})