Home > Net >  How to use custom JDBC jar file from GCS in Apache Beam Java SDK
How to use custom JDBC jar file from GCS in Apache Beam Java SDK

Time:10-23

I have a use-case i.e. to read a file from GCS and write it to our own data-warehouse product through Apache Beam. We have a custom JDBC driver(.jar) to connect the warehouse and I am trying to use Apache Beam's JdbcIO to perform the ETL and maven-pom to manage dependency. Can someone help me to understand how can I leverage this custom jar file in Apache Beam?


p.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"MYDRIVERCLASS", "DATABASE_URL")
.withUsername("username")
.withPassword("password"))
.withQuery("select id,name from Person")
.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
    return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);

CodePudding user response:

You can use this example code, how to use it, in a code.

@Experimental(Experimental.Kind.SOURCE_SINK)
public class JdbcIO {
  /**
   * Read data from a JDBC datasource.
   *
   * @param  Type of the data to be read.
   */
  public static  Read read() {
    return new AutoValue_JdbcIO_Read.Builder().build();
  }

  /**
   * Like {@link #read}, but executes multiple instances of the query substituting each element
   * of a {@link PCollection} as query parameters.
   *
   * @param  Type of the data representing query parameters.
   * @param  Type of the data to be read.
   */
  public static  ReadAll readAll() {
    return new AutoValue_JdbcIO_ReadAll.Builder().build();
  }

  /**
   * Write data to a JDBC datasource.
   *
   * @param  Type of the data to be written.
   */
  public static  Write write() {
    return new AutoValue_JdbcIO_Write.Builder().build();
  }

  private JdbcIO() {}

  /**
   * An interface used by {@link JdbcIO.Read} for converting each row of the {@link ResultSet} into
   * an element of the resulting {@link PCollection}.
   */
  @FunctionalInterface
  public interface RowMapper extends Serializable {
    T mapRow(ResultSet resultSet) throws Exception;
  }

  /**
   * A POJO describing a {@link DataSource}, either providing directly a {@link DataSource} or all
   * properties allowing to create a {@link DataSource}.
   */
  @AutoValue
  public abstract static class DataSourceConfiguration implements Serializable {
    @Nullable abstract String getDriverClassName();
    @Nullable abstract String getUrl();
    @Nullable abstract String getUsername();
    @Nullable abstract String getPassword();
    @Nullable abstract String getConnectionProperties();
    @Nullable abstract DataSource getDataSource();

    abstract Builder builder();

    @AutoValue.Builder
    abstract static class Builder {
      abstract Builder setDriverClassName(String driverClassName);
      abstract Builder setUrl(String url);
      abstract Builder setUsername(String username);
      abstract Builder setPassword(String password);
      abstract Builder setConnectionProperties(String connectionProperties);
      abstract Builder setDataSource(DataSource dataSource);
      abstract DataSourceConfiguration build();
    }

    public static DataSourceConfiguration create(DataSource dataSource) {
      checkArgument(dataSource != null, "dataSource can not be null");
      checkArgument(dataSource instanceof Serializable, "dataSource must be Serializable");
      return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
          .setDataSource(dataSource)
          .build();
    }

    public static DataSourceConfiguration create(String driverClassName, String url) {
      checkArgument(driverClassName != null, "driverClassName can not be null");
      checkArgument(url != null, "url can not be null");
      return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
          .setDriverClassName(driverClassName)
          .setUrl(url)
          .build();
    }

    public DataSourceConfiguration withUsername(String username) {
      return builder().setUsername(username).build();
    }

    public DataSourceConfiguration withPassword(String password) {
      return builder().setPassword(password).build();
    }

    /**

You can build and run your file as this example. You can see more documentation

# Build the project.
gradle('build')
 
# Check the generated build files.
run('ls -lh build/libs/')
 
# Run the shadow (fat jar) build.
gradle('runShadow')
 
# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 outputs/part-00000-of-*')

CodePudding user response:

To use additional dependency jars, you can simply add such jars to CLASSPATH when running the Beam Java pipeline. All jars in the CLASSPATH should be staged by Beam runners.

You can also use this PipelineOption to specify dependencies.

  • Related