I am building an audio-based deep learning model. As part of the preporcessing I want to augment the audio in my datasets. One augmentation that I want to do is to apply RIR (room impulse response) function. I am working with Python 3.9.5
and TensorFlow 2.8
.
In Python the standard way to do it is, if the RIR is given as a finite impulse response (FIR) of n taps, is using SciPy lfilter
import numpy as np
from scipy import signal
import soundfile as sf
h = np.load("rir.npy")
x, fs = sf.read("audio.wav")
y = signal.lfilter(h, 1, x)
Running in loop on all the files may take a long time. Doing it with TensorFlow map
utility on TensorFlow datasets:
# define filter function
def h_filt(audio, label):
h = np.load("rir.npy")
x = audio.numpy()
y = signal.lfilter(h, 1, x)
return tf.convert_to_tensor(y, dtype=tf.float32), label
# apply it via TF map on dataset
aug_ds = ds.map(h_filt)
Using tf.numpy_function
:
tf_h_filt = tf.numpy_function(h_filt, [audio, label], [tf.float32, tf.string])
# apply it via TF map on dataset
aug_ds = ds.map(tf_h_filt)
I have two questions:
- Is this way correct and fast enough (less than a minute for 50,000 files)?
- Is there a faster way to do it? E.g. replace the SciPy function with a built-in TensforFlow function. I didn't find the equivalent of
lfilter
or SciPy's convolve.
CodePudding user response:
Here is one way you could do
Notice that tensor flow function is designed to receive batches of inputs with multiple channels, and the filter can have multiple input channels and multiple output channels. Let N
be the size of the batch I
, the number of input channels, F
the filter width, L
the input width and O
the number of output channels. Using padding='SAME'
it maps an input of shape (N, L, I)
and a filter of shape (F, I, O)
to an output of shape (N, L, O)
.
import numpy as np
from scipy import signal
import tensorflow as tf
# data to compare the two approaches
x = np.random.randn(100)
h = np.random.randn(11)
# h
y_lfilt = signal.lfilter(h, 1, x)
# Since the denominator of your filter transfer function is 1
# the output of lfiler matches the convolution
y_np = np.convolve(h, x)
assert np.allclose(y_lfilt, y_np[:len(y_lfilt)])
# now let's do the convolution using tensorflow
y_tf = tf.nn.conv1d(
# x must be padded with half of the size of h
# to use padding 'SAME'
np.pad(x, len(h) // 2).reshape(1, -1, 1),
# the time axis of h must be flipped
h[::-1].reshape(-1, 1, 1), # a 1x1 matrix of filters
stride=1,
padding='SAME',
data_format='NWC')
assert np.allclose(y_lfilt, np.squeeze(y_tf)[:len(y_lfilt)])