Home > other >  Kafka console can't consumption Flume to collect the production data
Kafka console can't consumption Flume to collect the production data

Time:09-26

1, his first start the zookeeper, kafka cluster after cluster after start up, look at the following process:
 
[root @ flink102 kafka - 2.11] # JPS
15459 QuorumPeerMain
Kafka 21466


2, it is the topic of kafka created, the current server check all the topic are as follows:
 
[root @ flink102 kafka - 2.11] # bin/kafka - switchable viewer. Sh - zookeeper flink102:2181 - list
Computed tomography (ct)


3, and then create your own kafka consumers
 
[root @ flink102 kafka - 2.11] # bin/kafka - the console - consumer. Sh - zookeeper flink102:2181 - from - beginning - topic ct
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider Using the new consumer by passing home [the bootstrap - server] [they are].


4, oneself in workProject file directory to create the flume - kafka. Conf file
 
/root @ flink102 ~ # CD/opt/workProject/
[root @ flink102 workProject] # ll
The total of 32
- rw - r - r - 1 root root. 4312 Mar 27 he call log
Rw - r - r - 1 root root 543 Mar 24 if then contact. The log
Rw - r - r - 1 root root 14155 Mar 24 12:53 ct - producer. Jar
Rw - r - r - 1 root root 683 Mar 27 let the flume - kafka. Conf
DRWXR xr - x 2 root root 24 Mar 25 11:11 log
[root @ flink102 workProject] # vim flume - kafka. Conf


//add configuration parameters:
# define
A1. Sources=r1
A1. Sinks=k1
A1. Channels=c1

# # source
A1. Sources. R1. Type=exec
A1.sources.r1.com mand=tail - F - c + 0/opt/workProject/call. The log
A1. Sources. R1. Shell=/bin/bash - c

# # sink
A1. Sinks. K1. Type=org. Apache. The flume. Sink. Kafka. KafkaSink
A1. Sinks. K1. Kafka. The bootstrap. The servers=flink102:9092
A1. Sinks. K1. Kafka. Topic=ct
A1. Sinks. K1. Kafka. FlumeBatchSize=20
A1. Sinks. K1. Kafka. Producer. The acks=1
A1. Sinks. K1. Kafka. Producer. Linger. Ms=1

# # channel
A1. Channels. C1. Type=memory
A1. Channels. C1. Capacity=1000
A1. Channels. C1. TransactionCapacity=100
#
# # bind
A1. Sources. R1. Channels=c1
A1. Sinks. K1. Channel=c1



There are data. Among them, the call log, as follows:
 [root @ flink102 workProject] # tail -f. Call log 
15884588694 19154926260 20180721043739 1172
16574556259 19154926260 20180311120306 0942
15280214634 15647679901 20180904154615 0234
16160892861 14171709460 20181223154548 1720
15244749863 19342117869 20180404160230 2565
15647679901 14171709460 20180801213806 0758
15884588694 14397114174 20180222050955 0458
19154926260 16569963779 20180715235743 1489
14171709460 19602240179 20181120075855 2488
19683537146 16574556259 20180724031723 0652


5, start the flume to do data collection
/root @ flink102 ~ # CD/usr/hadoop/module/flume/flume 1.7.0/
[root @ flink102 flume - 1.7.0] # conf/bin/flume - ng agent - c - f/opt/workProject/flume - kafka. Conf


Perform the process of loading data, as shown:



6, in kafka consumers view, data found no, can't consumption data


Always stay in:
 
[root @ flink102 kafka - 2.11] # bin/kafka - the console - consumer. Sh - zookeeper flink102:2181 - from - beginning - topic ct
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider Using the new consumer by passing home [the bootstrap - server] [they are].
[the 2020-03-30 10:59:11, 139] INFO [Group Metadata Manager on the Broker 3] : Removed 0 expired offsets in 0 milliseconds. (kafka. Coordinator. Group. GroupMetadataManager)




7, in/flume - 1.7.0 directory of logs log view, found an error:
 
30 Mar 2020 11:15:33, 808 ERROR [main] (org. Apache. The flume. Node. Application. Main: 348) - A fatal ERROR occurred while running. The Exception follows.
Org.apache.com mons. Cli. MissingOptionException: Missing required option: n
At org.apache.com mons. Cli. Parser. CheckRequiredOptions (299) Parser. Java:
At org.apache.com mons. Cli. Parser. Parse (231) Parser. Java:
At org.apache.com mons. Cli. Parser. Parse (85) Parser. Java:
The at org. Apache. The flume. The node. The Application. The main (263) Application. Java:



Ask next, bosses, this is what reason, how to solve, thank you!

CodePudding user response:

 # # source 
A1. Sources. R1. Type=exec
A1.sources.r1.com mand=tail - F - c + 0/opt/workProject/call. The log
A1. Sources. R1. Shell=/bin/bash - c

Tail - F - c + 0/opt/workProject/call log should be the problem here, you print out this part first test pass in the shell, copied in, should be the -f not - F

CodePudding user response:

Ok, thank you, I see

CodePudding user response:

Just in accordance with your method tried,
 
# # source
A1. Sources. R1. Type=exec
A1.sources.r1.com mand=tail - f - c + 0/opt/workProject/call. The log
A1. Sources. R1. Shell=/bin/bash - c


As shown in figure, :
nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull
  • Related