Home > Back-end >  How to implement custom encode Tensorflow Federated
How to implement custom encode Tensorflow Federated

Time:10-21

I have created a custom encoder/decoder like so:

import tensorflow as tf

from tensorflow_model_optimization.python.core.internal import tensor_encoding as te


# noinspection PyUnresolvedReferences
class SparseTernaryCompressionEncodingStage(te.core.EncodingStageInterface):
    AVERAGE = 'average'
    NEGATIVES = 'negatives'
    POSITIVES = 'positives'
    TESTING = 'testing'
    NEW_SHAPE = 'new_shape'
    ORIGINAL_SHAPE = 'original_shape'

    def name(self):
        pass

    def compressible_tensors_keys(self):
        pass

    def commutes_with_sum(self):
        pass

    def decode_needs_input_shape(self):
        pass

    def get_params(self):
        pass

    def encode(self, original_tensor, encode_params):
        original_shape = tf.shape(original_tensor)
        tensor = tf.reshape(original_tensor, [-1])
        sparsification_rate = int(len(tensor) / 100 * 1)
        new_shape = tensor.get_shape().as_list()
        if sparsification_rate == 0:
            sparsification_rate = 1
        mask = tf.cast(tf.abs(tensor) >= tf.math.top_k(tf.abs(tensor), sparsification_rate)[0][-1], tf.float32)
        inv_mask = tf.cast(tf.abs(tensor) < tf.math.top_k(tf.abs(tensor), sparsification_rate)[0][-1], tf.float32)
        tensor_masked = tf.multiply(tensor, mask)
        average = tf.reduce_sum(tf.abs(tensor_masked)) / sparsification_rate
        compressed_tensor = tf.add(tf.multiply(average, mask) * tf.sign(tensor), tf.multiply(tensor_masked, inv_mask))
        negatives = tf.where(compressed_tensor < 0)
        positives = tf.where(compressed_tensor > 0)

        encoded_x = {self.AVERAGE: average, self.NEGATIVES: negatives, self.POSITIVES: positives,
                     self.NEW_SHAPE: new_shape, self.ORIGINAL_SHAPE: original_shape}

        return encoded_x

    def decode(self, encoded_tensors, decode_params, num_summands=None, shape=None):
        decompressed_tensor = tf.zeros(self.NEW_SHAPE, tf.float32)
        average_values_negative = tf.fill([len(self.NEGATIVES), ], -self.AVERAGE)
        average_values_positive = tf.fill([len(self.POSITIVES), ], self.AVERAGE)
        decompressed_tensor = tf.tensor_scatter_nd_update(decompressed_tensor, self.NEGATIVES, average_values_negative)
        decompressed_tensor = tf.tensor_scatter_nd_update(decompressed_tensor, self.POSITIVES, average_values_positive)
        decompressed_tensor = tf.reshape(decompressed_tensor, self.ORIGINAL_SHAPE)
        return decompressed_tensor

Now, i would like to use the encode function to encode all the weights that the client send to the server and, on the server, use the decode function to be able to obtain all the weights back. Basically, instead of sending all the weights from the client to the server, i want to send only some necessaries information that will let me able to create the weights back from only 5 informations.

The problem is that i don't understand how to tell the client to use this encoder to send the information and to the server to use the decoder before trying to do: round_model_delta = tff.federated_mean(client_outputs.weights_delta, weight=weight_denom)

I'm using Tensorflow Federated simple_fedavg as basic project.

CodePudding user response:

If you only want to modify the aggregation, you may have easier time using the tff.learning APIs with what you have, parameterizing the aggregation with a tff.aggregators object. For instance:

te.core.EncoderComposer(te.testing.PlusOneOverNEncodingStage()).make()

def encoder_fn(value_spec):
  return te.encoders.as_gather_encoder(
     te.core.EncoderComposer(SparseTernaryCompressionEncodingStage()).make(),
     value_spec)

tff.learning.build_federated_averaging_process(
    ...,  # Other args.
    model_update_aggregation_factory=tff.aggregators.EncodedSumFactory(
        encoder_fn))

You may also find these tutorials helpful:

  • Related