Home > Blockchain >  Flink error with 'The implementation of the StreamExecutionEnvironment is not serializable'
Flink error with 'The implementation of the StreamExecutionEnvironment is not serializable'

Time:03-06

I am a beginner of Flink, and I tried to use Flink to run LFM, one of the recommended algorithms, but the following errors appeared in my code when it was running. I tried to find and modify them, but they were not solved. Could someone tell me why I had problems?

Here are my main exception

The implementation of the StreamExecutionEnvironment is not serializable

And my code here

Noted that sourceDataStream is from my custom source extends RichFunction<Tuple3<>>

//training model
for (int iter = 0; iter < iterations; iter  ) {
    sourceDataStream
        // the exception appears here
        .process(new ProcessFunction<Tuple3<String, String, Double>, Object>() {
            @Override
            public void processElement(Tuple3<String, String, Double> in,
                                                   ProcessFunction<Tuple3<String, String, Double>, Object>.Context context,
                                                   Collector<Object> collector) throws Exception {

                Double hat_rui = predict(in.f0, in.f1, qbiTable, pbuTable, streamTableEnvironment);
                Double err_ui = in.f2 - hat_rui;
                Table pbuSelectTable = pbuTable.select($("buValue"), $("pList")).where($("userId").isEqual(in.f0));
                Table qbiSelectTable = qbiTable.select($("biValue"), $("qList")).where($("itemId").isEqual(in.f1));

                DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> pbuSelectDataStream = streamTableEnvironment.toRetractStream(pbuSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE)));
                pbuSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() {

                @Override
                public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> userTuple,
                                                           ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context,
                                                           Collector<Object> collector) throws Exception {

                    DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> qbiSelectDataStream = streamTableEnvironment.toRetractStream(qbiSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE)));
                    qbiSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() {

                        @Override
                        public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> itemTuple, ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context, Collector<Object> collector) throws Exception {

                            double bu = userTuple.f1.f0   (alpha * (err_ui - lambd * userTuple.f1.f0));
                            double bi = itemTuple.f1.f0   (alpha * (err_ui - lambd * itemTuple.f1.f0));

                            List<Double> pList = new ArrayList<>();
                            List<Double> qList = new ArrayList<>();
                            for (int fIter = 0; fIter < F; fIter  ) {
                                Double pValueLast = userTuple.f1.f1.get(fIter);
                                Double qValueLast = itemTuple.f1.f1.get(fIter);
                                Double qValueNew = qValueLast   (alpha * (err_ui * pValueLast - lambd * qValueLast));
                                Double pValueNew = pValueLast   (alpha * (err_ui * qValueLast - lambd * pValueLast));
                                pList.add(pValueNew);
                                qList.add(qValueNew);
                            }
                            streamTableEnvironment.executeSql("INSERT OVERWRITE qbiTable VALUES ('qList', "   qList   "), ('biValue', "   bi   ")");
                            streamTableEnvironment.executeSql("INSERT OVERWRITE pbuTable VALUES ('pList', "   pList   "), ('buValue', "   bu   ")");
                        }
                    });
                }
            });
        }
    });
}

CodePudding user response:

There are a few things about this that aren't going to work:

In the implementation of any user function (such as a ProcessFunction) you cannot have a DataStream, or a Table, or a StreamExecutionEnvironment, or another ProcessFunction. All you can do is react to an incoming stream record, optionally using state you have built up inside that function based on the previously processed records.

The DataStream and Table APIs are organized around a builder paradigm, with which you are describing a streaming dataflow pipeline. This pipeline must be a directed acyclic graph: it can split and merge but must flow from sources to sinks without any loops. The stages of that pipeline (e.g., a ProcessFunction) must be coded as independent blocks -- they cannot reach outside of themselves to access data from other pipeline stages.

This paradigm isn't well suited for the purpose of training machine learning models (since training involves iterating/looping). If that's your objective, maybe take a look at https://github.com/apache/flink-ml.

  • Related