I need to implement a simple inner join using a replicated join, but I am finding shocking little amount of information on how to do this.
I have one dataset < 1gb that looks like
A C
A D
B Y
C D
Then I have my input file that looks very similar
C Z
B I
A B
D Z
C O
I want to do an inner join on the 1st column from my replicated dataset and the 2nd column from dataset so that I get something like
A C Z
A C O
A D Z
C D Z
Neither the replicated dataset or the main dataset will have duplicate lines
I know I need to set the number of reducers to 0, and that I need to read in the replicated dataset during the setup phase of MAP
But I don't know how to read it in, or where to store the replicated dataset, nor the data structure to read the file in, or how to make the join happen.
I saw some tutorials using a hash map but that would not work as duplicate keys would overwrite each other.
CodePudding user response:
This I the solution I came up with. The file I want replicated across all mappers is loaded into the Distributed Cache in my driver Code
DistributedCache.addCacheFile(new Path(args[3]).toUri(), job.getConfiguration());
In my mapper, I declare a hash map of type
Map<String, List<String>> adjacencyList = new HashMap<>();
In the setup phase in my mapper I read all the files from the distributed cache
Path[] pathLength1 = DistributedCache.getLocalCacheFiles(context.getConfiguration());
if(pathLength1 != null && pathLength1.length > 0) {
for(Path file : pathLength1) {
readFile(file);
}
}
The code for read file is below, I am able to bypass the keys overwriting each other by appending all values of the same key to themselves
BufferedReader bufferedReader = new BufferedReader(new FileReader(filePath.toString()));
String stopWord = null;
while((stopWord = bufferedReader.readLine()) != null) {
String[] splited = stopWord.split("\\s ");
List<String> temp = new ArrayList<String>();
if(adjacencyList.containsKey(splited[0])){
temp = adjacencyList.get(splited[0]);
}
temp.add(splited[1]);
adjacencyList.put(splited[0], temp);
}
Then in the math phase when I read my other file in line by line, I can check my hash map if the key exists and join on that.
No reducer required