I'm trying to use Flink to work with Oracle. Just do a simple task copy data from table to a new one.
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE ExistedTable(\n"
" quoteid BIGINT,\n"
" requestid BIGINT,\n"
" createddt DATE,\n"
" PRIMARY KEY (quoteid) NOT ENFORCED\n"
") WITH (\n"
" 'connector' = 'jdbc',\n"
" 'url' = 'jdbc:oracle:thin:@xxx.xxx.xxx.xxx:1521:DBNAME',\n"
" 'table-name' = 'TableName',\n"
" 'driver' = 'oracle.jdbc.driver.OracleDriver',\n"
" 'username' = 'UserName',\n"
" 'password' = 'Password'\n"
")");
tEnv.executeSql("CREATE TABLE NewTable (\n"
" quoteid BIGINT,\n"
" requestid BIGINT,\n"
" createddt DATE,\n"
" PRIMARY KEY (quoteid) NOT ENFORCED\n"
") WITH (\n"
" 'connector' = 'jdbc',\n"
" 'url' = 'jdbc:oracle:thin:@xxx.xxx.xxx.xxx:1521:DBNAME',\n"
" 'table-name' = 'NewTableName',\n"
" 'driver' = 'oracle.jdbc.driver.OracleDriver',\n"
" 'username' = 'UserName',\n"
" 'password' = 'Password'\n"
")");
Table data= tEnv.from("ExistedTable");
data.executeInsert("NewTable");
When running I've got error
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a source for reading table 'default_catalog.default_database.xxx'.
Table options are:
'connector'='jdbc'
'driver'='oracle.jdbc.OracleDriver'
'password'='******'
'table-name'='xxx'
'url'='jdbc:oracle:thin:@xxx:1521:xxx'
'username'='xxx'
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.xxx'.
Is there any error in my sqlconnection. I couldn't found any example for working with oracle. Thanks,
CodePudding user response:
Which version of Flink are you using? Support for Oracle JDBC is available since Flink 1.15, which hasn't been released yet.