Home > Back-end >  The Flume custom interceptors # doesn't work the Flume custom interceptors don't take effe
The Flume custom interceptors # doesn't work the Flume custom interceptors don't take effe

Time:03-23

Thank you for your help, regardless of how to solve the problem...
Use version: 1.7.0
general business: an Agent (a1) is responsible for monitoring, nc and will receive messages sent to the other two Agent (a2 and a3), these two Agent (listening to) using the virtual machine

expected results: a1 to listen to the data sent to hadoop102 (a2) is responsible for receiving a letter/hadoop103 (a3 is responsible for receiving digital)

the current results: two receive did not show their own, but have to send what you

The flume configuration file detailed
[a1] - & gt; To monitor and send the listening to hear the
# Name
A1. Sources=s1
A1. Channels=c1 c2
A1. Sinks=k1, k2

# Sources
A1. Sources. S1. Type=netcat
A1. Sources. S1. Bind=localhost
A1. Sources. S1. The port=44444

# Channels
A1. Channels. C1. Type=memory
A1. Channels. C1. Capacity=1000
A1. Channels. C1. TransactionCapacity=100
A1. Channels. C2. Type=memory
A1. Channels. C2. Capacity=1000
A1. Channels. C2. TransactionCapacity=100
# Channel Interceptors
A1. Sources. R1. Interceptors=i1
A1. Sources. R1. Interceptors. I1. Type=a flume. The test. The test1 $Builder
# Channel Selector
A1. Sources. R1. The selector. Type=multiplexing
A1. Sources. R1. The selector. The header=type
A1. Sources. R1. The selector. Mapping. The letter=c1
A1. Sources. R1. The selector. Mapping. Number=c2

# Sinks
A1. Sinks. K1. Type=avro
A1. Sinks. K1. The hostname=hadoop102
A1. Sinks. K1. Port=4142
A1. Sinks. K2. Type=avro
A1. Sinks. K2. The hostname=hadoop103
A1. Sinks. K2. The port=4141

# Bind
A1. Sources. S1. Channels=c1 c2
A1. Sinks. K1. Channel=c1
A1. Sinks. K2. Channel=c2

[a2] - & gt; Receive letter
# Name
A2. Sources=s1
A2. Channels=c1
A2. Sinks=k1

# Sources
A2. Sources. S1. Type=avro
A2. Sources. S1. Bind=hadoop102
A2. Sources. S1. The port=4142

# Channels
A2. Channels. C1. Type=memory
A2. Channels. C1. Capacity=1000
A2. Channels. C1. TransactionCapacity=100

# Sinks
A2. Sinks. K1. Type=logger

# Bind
A2. Sources. S1. Channels=c1
A2. Sinks. K1. Channel=c1

[a3] - & gt; Receive digital
# Name
A3. Sources=s1
A3. Channels=c1
A3. Sinks=k1

# Sources
A3. Sources. S1. Type=avro
A3. Sources. S1. Bind=hadoop103
A3. Sources. S1. The port=4141

# Channels
A3. Channels. C1. Type=memory
A3. Channels. C1. Capacity=1000
A3. Channels. C1. TransactionCapacity=100

# Sinks
A3. Sinks. K1. Type=logger

# Bind
A3. Sources. S1. Channels=c1
A3. Sinks. K1. Channel=c1

The interceptor code in


 package dell. Flume. The test; 
Import the Java. Util. List;
The import org. Apache. The flume. The Context;
The import org. Apache. The flume. The Event;
The import org. Apache. The flume. The interceptor. The interceptor;

Public class test1 implements Interceptor
{
Public void the initialize ()
{
}
The public Event intercept (Event Event)
{
byte[] body=event.getBody();
If (body [0] <'z' & amp; & The body [0] & gt; 'a')
{
Event. GetHeaders (). The put (" type ", "letter");
} else if (body [0] & gt; '0' & amp; & The body [0] <'9')
Event. GetHeaders (). The put (" type ", "number");
return event;
}
Public List Intercept (List Events)
{
For (Event Event: events) {
Intercept (event);
}
Return the events;
}
Public void the close ()
{
}
Public static class Builder implements Interceptor. Builder
{
Public Interceptor build ()
{
Return new test1 ();
}
Public void the configure (Context Context)
{
}
}
}


Runtime screenshot


Why there's nothing in my Headers? I clearly have I use Maven generated Jar in the flume/lib directory,

The following is my the generated Jar file screenshot


The following is my introduction of POM package
 & lt; Project XMLNS="http://maven.apache.org/POM/4.0.0" XMLNS: xsi="http://www.w3.org/2001/XMLSchema-instance" xsi: schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" & gt; 
4.0.0 & lt;/modelVersion>
Test. Flume
Dell. Flume. Test
0.0.1 - SNAPSHOT




Org. Apache. Flume
The flume - ng - core
1.7.0 & lt;/version>



Org. Apache. Logging. Log4j & lt;/groupId>
Log4j - core
2.8.2 & lt;/version>



Org. Apache. Logging. Log4j & lt;/groupId>
Log4j - core
2.8.2 & lt;/version>


  • Related