Home > Mobile >  How to separate the tensorflow data pipeline?
How to separate the tensorflow data pipeline?

Time:12-15

I would like to split the TensorFlow pipeline into two and apply a different function to each of them using tf.data.Dataset.map().
Like this:

dataset = tf.data.Dataset.from_tensor_slices(list(range(20)))
dataset = dataset.shuffle(20).batch(10)

dataset_1 = dataset.map(lambda x: x)
dataset_2 = dataset.map(lambda x: x   1)

for d1, d2 in zip(dataset_1, dataset_2):
    print(d1.numpy())  # [13 14 12 15 18  2 16 19 6 4]
    print(d2.numpy())  # [18 16  6  7  3 15 17  9 2 4]

    break

However, this is not the output I wanted. My expectation is that when d1 is [13 14 12 15 18 2 16 19 6 4], d2 should be [14 15 13 16 19 3 17 20 7 5]. I think I know what's going on, but don't know how to write about it. I don't want to create two pipelines from the beginning (because of the large overhead). Can you give me some advice?
Thanks for reading.

update

I decided to implement it as follows.

# use the same seed for dataset_1 and dataset_2
dataset_1 = dataset.shuffle(20, seed=0).batch(10)
dataset_2 = dataset.shuffle(20, seed=0).batch(10)

dataset_1 = dataset_1.map(lambda x: x)
dataset_2 = dataset_2.map(lambda x: x   1)

CodePudding user response:

The default behavior of the tensorflow shuffle function is to reshuffle each time you called .numpy(), to prevent this you want to set reshuffle_each_itertaion=False (https://www.tensorflow.org/api_docs/python/tf/data/Dataset#shuffle).

dataset = tf.data.Dataset.from_tensor_slices(list(range(20)))
dataset = dataset.shuffle(20, reshuffle_each_iteration=False).batch(10)
dataset_1 = dataset.map(lambda x: x)
dataset_2 = dataset.map(lambda x: x   1)

for d1, d2 in zip(dataset_1, dataset_2):
    print(d1.numpy())  # [10 13  3 19 12 16  7 11  2  8]
    print(d2.numpy())  # [11 14  4 20 13 17  8 12  3  9]

    break

But the consequences of this is if you try to call d1.numpy() or d2.numpy() the second times the value will stay the same.

CodePudding user response:

What about simple stack of two actions like

dataset = tf.data.Dataset.from_tensor_slices(list(range(20)))
dataset = dataset.shuffle(20)

def func1(x):
    return x

def func2(x):
    return x   1

dataset = dataset.map(lambda sample: tf.stack([func1(sample), func2(sample)], axis=0))

list(dataset.as_numpy_iterator())

# [array([ 9, 10], dtype=int32),
#  array([16, 17], dtype=int32),
#  array([10, 11], dtype=int32),
#  array([1, 2], dtype=int32),
#  array([11, 12], dtype=int32),
#  array([6, 7], dtype=int32),
#  array([18, 19], dtype=int32),
#  array([3, 4], dtype=int32),
#  array([8, 9], dtype=int32),
#  array([15, 16], dtype=int32),
#  array([4, 5], dtype=int32),
#  array([14, 15], dtype=int32),
#  array([0, 1], dtype=int32),
#  array([12, 13], dtype=int32),
#  array([17, 18], dtype=int32),
#  array([2, 3], dtype=int32),
#  array([5, 6], dtype=int32),
#  array([13, 14], dtype=int32),
#  array([7, 8], dtype=int32),
#  array([19, 20], dtype=int32)]

Afterwards, you can unbatch with dataset = dataset.unbatch() and batch like dataset = dataset.batch(10) if needed.

  • Related