This is similar to what has been discussed here: https://www.mail-archive.com/[email protected]/msg24204.html
The MSI endpoint that the hadoop-azure client expects (I think) is applicable only for VM's and not for functions. So, I retrieved the MSI endpoint from the environment variable IDENTITY_ENDPOINT
sparkContext.hadoopConfiguration().set("fs.azure.account.auth.type", "OAuth");
sparkContext.hadoopConfiguration().set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider");
if (System.getenv("IDENTITY_ENDPOINT") != null) {
sparkContext.hadoopConfiguration().set("fs.azure.account.oauth2.msi.endpoint", System.getenv("MSI_ENDPOINT"));
}
sparkContext.hadoopConfiguration().set("fs.azure.account.oauth2.msi.tenant", "xx");
sparkContext.hadoopConfiguration().set("fs.azure.account.oauth2.client.id", "yy");
spark.parquet.read("")
Running above gives me the following error:
Status code: -1 error code: null error message: Auth failure: HTTP Error 400; url='http://169.254.138.2:8081/msi/token' AADToken: HTTP connection to http://169.254.138.2:8081/msi/token failed for getting token from AzureAD.; contentType='application/json; charset=utf-8'; response '{"error":{"code":"UnsupportedApiVersion","message":"The HTTP resource that matches the request URI 'http://169.254.138.2:8081/msi/token' does not support the API version '2018-02-01'.","innerError":null}}'org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator$HttpException: HTTP Error 400; url='http://169.254.138.2:8081/msi/token' AADToken: HTTP connection to http://169.254.138.2:8081/msi/token failed for getting token from AzureAD.; contentType='application/json; charset=utf-8'; response '{"error":{"code":"UnsupportedApiVersion","message":"The HTTP resource that matches the request URI 'http://169.254.138.2:8081/msi/token' does not support the API version '2018-02-01'.","innerError":null}}' at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:274) at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:217) at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.lambda$execute$0(AbfsRestOperation.java:191) at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:464) at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:189) at org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:911) at org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:892) at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getIsNamespaceEnabled(AzureBlobFileSystemStore.java:358) at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFileStatus(AzureBlobFileSystemStore.java:932) at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:609) at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:599) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1760) at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.exists(AzureBlobFileSystem.java:1177) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:784) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:782) at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:678) at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator$HttpException: HTTP Error 400; url='http://169.254.138.2:8081/msi/token' AADToken: HTTP connection to http://169.254.138.2:8081/msi/token failed for getting token from AzureAD.; contentType='application/json; charset=utf-8'; response '{"error":{"code":"UnsupportedApiVersion","message":"The HTTP resource that matches the request URI 'http://169.254.138.2:8081/msi/token' does not support the API version '2018-02-01'.","innerError":null}}'
at org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator.getTokenSingleCall(AzureADAuthenticator.java:430) at org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator.getTokenCall(AzureADAuthenticator.java:306) at org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator.getTokenFromMsi(AzureADAuthenticator.java:154) at org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider.refreshToken(MsiTokenProvider.java:57) at org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider.getToken(AccessTokenProvider.java:50) at org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAccessToken(AbfsClient.java:1055) at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:256) ... 23 more
CodePudding user response:
Below is applicable for azure-functions
as well as azure-container-apps
. The behavior may be different for other managed services.
hadoop-azure provides a mechanism for custom token provider.
package org.acme;
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenRequestContext;
import com.azure.identity.DefaultAzureCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.OffsetDateTime;
import java.util.Date;
class CustomToken implements org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee {
private Logger log = LoggerFactory.getLogger(getClass());
private String accountName;
private volatile AccessToken token;
@Override
public void initialize(Configuration configuration, String accountName) {
log.info("Custom Token to be initialized. Config: " configuration ". AccountName: " accountName);
this.accountName = accountName;
}
@Override
public String getAccessToken() {
if (token != null && OffsetDateTime.now().isBefore(token.getExpiresAt().minusHours(2))) {
return token.getToken();
} else {
log.info("token has expired or not been set. " token);
fetchAndSetToken();
return token.getToken();
}
}
private void fetchAndSetToken() {
DefaultAzureCredential creds = new DefaultAzureCredentialBuilder()
.build();
TokenRequestContext request = new TokenRequestContext();
request.addScopes("https://" accountName);
this.token = creds.getToken(request).block();
log.info("Token has been set. Expires at: " token.getExpiresAt() " . " token.isExpired());
}
@Override
public Date getExpiryTime() {
return new Date(token.getExpiresAt().toInstant().toEpochMilli());
}
}
To further debug: ssh onto the container apps or azure functions vm.
To obtain the MSI Secret: echo $MSI_SECRET
. And then obtain the token by making a curl call to:
curl -v -H "X-IDENTITY-HEADER: msi_secret_from_above" "http://127.0.0.1:42356/msi/token/?resource=https://storage-account-name.dfs.core.windows.net/&api-version=2019-08-01"
The java code earlier written replicates the above behavior. I found this link useful to understand MSI. https://dev.to/stratiteq/managed-identity-how-it-works-behind-the-scenes-co4