I want to upload a file to Azure blob by Apache Beam. But, I can't it. Why?
I set the correct environment variables.
az
command is OK:
$ az storage blob upload \
-c AZURE_STORAGE_CONTAINER_NAME \
-f example.json -n example.json \
--account-name $AZURE_STORAGE_ACCOUNT \
--account-key $AZURE_STORAGE_KEY
Finished[#############################################################] 100.0000%
{
"etag": "\"0x8D9B92C4C0BE870\"",
"lastModified": "2021-12-07T02:50:15 00:00"
}
But, the following command run:
$ mvn compile exec:java -Dexec.mainClass=jp.example.Indexer \
-Dexec.args="--runner=DirectRunner \
--destination=azfs://$AZURE_STORAGE_ACCOUNT/$AZURE_STORAGE_CONTAINER_NAME/example.json \
--source=example.json \
--azureConnectionString=$AZURE_CONNECTION_STRING \
--sasToken=$AZURE_STORAGE_SAS_TOKEN \
--accessKey=$AZURE_STORAGE_KEY \
--accountName=$AZURE_STORAGE_ACCOUNT
Then, the following errors occur:
[WARNING]
java.lang.IllegalArgumentException: PipelineOptions specified failed to serialize to JSON.
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:171)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
at jp.example.Indexer.main (Indexer.java:24)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected IOException (of type java.io.IOException): Failed to serialize and deserialize property 'azureCredentialsProvider' with value 'com.azure.identity.DefaultAzureCredential@3e88886c'
at com.fasterxml.jackson.databind.JsonMappingException.fromUnexpectedIOE (JsonMappingException.java:334)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes (ObjectMapper.java:3769)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:168)
at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
at jp.example.Indexer.main (Indexer.java:24)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 13.564 s
[INFO] Finished at: 2021-12-07T11:58:57 09:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project Indexer: An exception occured while executing the Java class. PipelineOptions specified failed to serialize to JSON.: Unexpected IOException (of type java.io.IOException): Failed to serialize and deserialize property 'azureCredentialsProvider' with value 'com.azure.identity.DefaultAzureCredential@3e88886c' -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
Indexer.java
is here.
package jp. example;
import java.util.logging.Logger;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
public class Indexer {
private static final Logger LOG = Logger.getLogger(Indexer.class.getName());
public static void main(String[] args) {
ToAzurePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(ToAzurePipelineOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply(TextIO.read().from(options.getSource()));
lines.apply(TextIO.write().to(options.getDestination()));
p.run();
}
}
ToAzurePipelineOptions.java
is here.
package jp.example;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
public interface ToAzurePipelineOptions extends DataflowPipelineOptions, BlobstoreOptions{
@Description("Root path of data files")
@Default.String("file://hoge")
String getSource();
void setSource(String value);
@Description("Address of Azure Storage")
@Default.String("azfs://hoge")
String getDestination();
void setDestination(String value);
}
The versions of beam-sdks-java-io-azure
and beam-sdks-java-core
are 2.31.0
.
CodePudding user response:
I guess it can be caused by the fact that TokenCredentialSerializer
is implemented only in Beam 2.33.0. Could you upgrade your Beam dependencies to, at least, Beam 2.33.0 and see if it will solve a problem?