Home > Enterprise >  Migration Flink 1.4.1 to 1.13.2
Migration Flink 1.4.1 to 1.13.2

Time:10-01

We were using Flink 1.4.1 one of our projects. We send some image processing jobs to our processing nodes via Flink. Flink Task Managers are installed on each processing nodes. And our main application sends jobs to Flink Job Manager and Flink Job Manager sends jobs to Flink Task Managers according to availability. We implemented a java application(let's say node application) and Flink executes this application on nodes. And this applications executes our processors running on processing nodes. This was working properly but after we have migrated Flink from version 1.4.1 to 1.13.2, we received below error.

2021-09-30 14:45:56,107 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
2021-09-30 14:45:56,108 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 9603bc4718c13ab4015372e89f89dd60: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
2021-09-30 14:45:56,132 ERROR tr.com.sdt.mm.wfm.processor.api.agent.ProcessorInvokerAgent  [] - An unexpected exception occurred.
org.apache.flink.api.common.InvalidProgramException: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count]. 
    at org.apache.flink.core.execution.DetachedJobExecutionResult.getAccumulatorResult(DetachedJobExecutionResult.java:56) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.api.java.DataSet.collect(DataSet.java:419) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at tr.com.sdt.mm.wfm.processor.api.agent.ProcessorInvokerAgent.main(ProcessorInvokerAgent.java:149) ~[bdb0ecae-9aa3-4962-a392-4464d0db0ae6_tr.com.sdt.mm.wfm.processor.api-0.0-DEV-SNAPSHOT.jar:0.0-DEV-SNAPSHOT]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_301]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_301]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_301]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_301]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) [?:1.8.0_301]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_301]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_301]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_301]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_301]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_301]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_301]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301]
2021-09-30 14:45:56,133 WARN  org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - Could not execute application: 
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count]. 
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) [?:1.8.0_301]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_301]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_301]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_301]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_301]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_301]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_301]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301]
Caused by: org.apache.flink.api.common.InvalidProgramException: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count]. 
    at org.apache.flink.core.execution.DetachedJobExecutionResult.getAccumulatorResult(DetachedJobExecutionResult.java:56) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.api.java.DataSet.collect(DataSet.java:419) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.api.java.DataSet.print(DataSet.java:1748) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at tr.com.sdt.mm.wfm.processor.api.agent.ProcessorInvokerAgent.main(ProcessorInvokerAgent.java:149) ~[?:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_301]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_301]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_301]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_301]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    ... 13 more
2021-09-30 14:45:56,136 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Exception occurred in REST handler: Could not execute application.
2021-09-30 14:45:56,148 WARN  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - **Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.**
2021-09-30 14:45:56,148 WARN  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - **Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.**

Our Flink job like below.

public class ProcessorInvokerAgent implements MapFunction<String[], IProcessingNodeResult> {
    .....
    .......
    ........

    public static void main(String[] args) throws Exception {

        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
        String[][] dataTaken = { args };
        DataSet<String[]> elements = environment.fromElements(dataTaken);
        MapOperator<String[], IProcessingNodeResult> map = elements.map(new ProcessorInvokerAgent());
        try {
            map.print();
            environment.execute();
        } catch (Exception exc) {
            LOGGER.error("An unexpected exception occurred.", exc);
            throw exc;
        }
    }

    /**
     * Invokes the processor controllers with specified arguments.
     *
     * @param args
     *            the arguments of processing request (first one is base64 encoded
     *            order, and second one is base64 encoded configuration)
     * @return the processing node result
     * @throws ProcessorAgentException
     *             if cannot write processor result to
     */
    public IProcessingNodeResult map(String[] args) throws ProcessorAgentException {
        IProcessingNodeResult result = null;
        // check if argument number is valid
        if (args.length == 0) {
            // cannot send result to manager raise exception
            throw new ProcessorAgentException("Invalid number of arguments("   args.length
                      "), must match format <command> (any-other-param)* . Agent could not be invoked", null);
        } else if ("execute".equalsIgnoreCase(args[0])) {
            if (args.length != 3) {
                // cannot send result to manager raise exception
                throw new ProcessorAgentException("Invalid number of arguments ("   args.length
                          ") for 'execute' command. Must match 'execute' <processing-order-id> <path-to-request-file>",
                        null);
            } else {
                result = handleProcessorExecution(args);
            }
        } else {
            // cannot send result to manager raise exception
            throw new ProcessorAgentException(
                    "Invalid execution command ("   args[0]   ")is given to processor agent. Agent will not be invoked",
                    null);

        }
        return result;
    }

    private IProcessingNodeResult handleProcessorExecution(String[] args) {
        // args[0] was command thus we need to start from index 1
        try {
            // 1st position contains processing order id (same as order#getOrderIdentifier())
            String processingOrderId = getArg1_ProcessingOrderId(args);

            LOGGER.info("Processor Invoker agent is started proceeding on order {}", processingOrderId);

            // 2nd position contains handle for processing order request file (this file contains all the related parameters for processing order)
            ResourceAccessInfo processingOrderFile = getArg2_ProcessingOrderFile(args, processingOrderId);

            LOGGER.debug("Argumans are extracted and checked. Processing order file will be downloaded and will be parsed.");

            List<String> lines = downloadOrderFileAndGetLines(processingOrderFile, processingOrderId);

            LOGGER.debug("Processing order file lines are : \n{}", printLines(lines));

            IProcessingOrder order = getLine1_ProcessingOrder(lines); // order is initialized in this function
            if (order != null) { // if order read and successfully parsed
                LogUtils.putMDC(ECommonSystemProperty.MMUGS_ENTITY.getValue(), order.getInstanceId(), order.getParentProcessId());
                LOGGER.debug("Line 1: Procesing Order has extracted successfully. Order Id=\"{}\"", order.getOrderIdentifier());


                IConfiguration configuration = getLine2_ConfigurationYAML(lines); // configuration is initialized in this function

                LOGGER.debug("Line 2: Configuration YAML has extracted.Order Id=\"{}\"", order.getOrderIdentifier());

                String procConfDir = getProcessorParametersConfDir(configuration);

                LOGGER.debug("Processing Node's processor parameters conf directory =\"{}\" for Order Id=\"{}\"", procConfDir, order.getOrderIdentifier());

                // The errors occurred while extracting Line 3&4 are logged as error/info.
                getLine3And4_ProcessorParamConfAndScripts(lines, processingOrderId, procConfDir);

                // The errors occurred while extracting Line 5 are logged as error/info.
                getLine5_AdditionalQueryConf(lines, processingOrderId, procConfDir);

                // The errors occurred while extracting Line 6 are logged as error/info.
                ProcTableType wsConfFileContent = getLine6_IPFWSConf(lines, processingOrderId);

                // The errors occurred while extracting Line 7 are logged as error/info.
                getLine7_TaskTable(lines, wsConfFileContent, order.getProcessorName(), order.getProcessorVersion(), processingOrderId);

                executeProcessor(order, configuration);
            }
        } catch (ProcessorInvokerAgentException exc) {
            LOGGER.debug("Processor execution result message=\"{}\", status=\"{}\"", exc.getResult().getMessage(), exc.getResult().getRequestStatus().value());
            return exc.getResult();
        } catch (Exception exc) {
            LOGGER.error("Processor execution failed due to an exception.Ex:\"{}\"", exc.getMessage(), exc);
        }
        LOGGER.debug("handleProcessorExecution COMPLETED");
        return null;
    }
    
    }

We have removed the map.print() line according to error. But we have received below error after that change.

2021-10-01 10:55:55,767 WARN  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2021-10-01 10:55:55,769 WARN  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2021-10-01 10:55:55,992 INFO  org.apache.flink.client.ClientUtils                          [] - Starting program (detached: true)
2021-10-01 10:55:56,091 ERROR tr.com.sdt.mm.wfm.processor.api.agent.ProcessorInvokerAgent  [] - An unexpected exception occurred.
java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1170) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1145) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1041) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at tr.com.sdt.mm.wfm.processor.api.agent.ProcessorInvokerAgent.main(ProcessorInvokerAgent.java:150) ~[0a856782-ce46-4a33-b438-2519a2f92278_tr.com.sdt.mm.wfm.processor.api-0.0-DEV-SNAPSHOT.jar:0.0-DEV-SNAPSHOT]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_301]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_301]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_301]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_301]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) [?:1.8.0_301]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_301]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_301]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_301]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_301]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_301]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_301]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301]
2021-10-01 10:55:56,098 WARN  org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - Could not execute application: 
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) [?:1.8.0_301]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_301]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_301]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_301]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_301]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_301]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_301]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301]
Caused by: java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1170) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1145) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1041) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    at tr.com.sdt.mm.wfm.processor.api.agent.ProcessorInvokerAgent.main(ProcessorInvokerAgent.java:150) ~[?:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_301]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_301]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_301]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_301]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
    ... 13 more
2021-10-01 10:55:56,102 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Exception occurred in REST handler: Could not execute application.

What should we do? How to change our job? Are there any migration steps?

Thank you.

CodePudding user response:

I would recommend to check the release notes of Flink 1.5, 1.6 etc all the way up to 1.13. Flink 1.3 was released in 2017 and a lot of changes have happened since.

CodePudding user response:

Restore the map.print(), and remove the environment.execute().

  • Related