Home > other >  Flink CEP
Flink CEP

Time:10-08

A, the CEP



One or more composed of simple event event flow through a certain rule matching, then the output of data, users want satisfy the rules of complex events,

CEP supported on the flow pattern matching, according to the conditions of different pattern, divided into the condition of continuous or discontinuous conditions; Model conditions allow time limitation when within the scope of the condition of not meet the conditions, can lead to a pattern matching timeout,

,

CEP is equivalent to the event on the flow pattern matching, such as two consecutive failed login log not more than 2 seconds, the fault early warning,



Two, the CEP use process



2.1 to obtain flow

Case class LoginEvent (userId: String, IP: String, eventType: String, eventTime: String)



Val env=StreamExecutionEnvironment. GetExecutionEnvironment

Env. SetStreamTimeCharacteristic (TimeCharacteristic. EventTime)

Env. SetParallelism (1)



Val loginEventStream=env. FromCollection (List (

LoginEvent (" 1 ", "192.168.0.1", "fail", "1558430842"),

LoginEvent (" 1 ", "192.168.0.2", "fail", "1558430843"),

LoginEvent (" 1 ", "192.168.0.3", "fail", "1558430844"),

LoginEvent (" 2 ", "192.168.10.10", "success", "1558430845")

)). AssignAscendingTimestamps (_) eventTime) toLong)



2.2 define the Pattern

Val loginFailPattern=Pattern. The begin [LoginEvent] (" begin ")

The where (_) eventType) equals (" fail "))//a login failure

.next (" next ")

The where (_) eventType) equals (" fail "))//a log on to the event and the failure

Within (Time. Seconds the interval of two (2)//no more than two seconds



2.3 to perform the Pattern

PatternStream:

Val input=...

Val pattern=...



Val patternStream=CEP. The pattern (input, the pattern)//an input stream + matching the pattern

Val patternStream=CEP. The pattern (loginEventStream keyBy (_. UserId), loginFailPattern)



2.4 through the select or flatSelect access to qualified flow



Select:

Val loginFailDataStream=patternStream

Select ((the pattern: the Map [String, Iterable [LoginEvent]])=& gt; {

Val first=pattern. The getOrElse (" begin ", null). The iterator. Next ()

Val second=pattern. The getOrElse (" next ", null). The iterator. Next ()



(second. UserId, second IP, second, eventType)

})

The return value is 1 only records,

PatternFlatSelectFunction flatSelect: through implementation, implementation and the select of similar function, the only difference is the flatSelect method can return multiple records,

In version 1.8, are out of date, use ProcessFunction,



Three, case



3.1 with the user 2 seconds twice failed to login





The object LoginFailWithCep {

Def main (args: Array [String]) : Unit={

Val env=StreamExecutionEnvironment. GetExecutionEnvironment

Env. SetStreamTimeCharacteristic (TimeCharacteristic. EventTime)

Env. SetParallelism (1)



//custom test data

Val loginStream=env. FromCollection (List (

LoginEvent (1, "192.168.0.1", "fail", 1558430842),

LoginEvent (1, "192.168.0.2", "success", 1558430843),

LoginEvent (1, "192.168.0.3", "fail", 1558430844),

LoginEvent (1, "192.168.0.3", "fail", 1558430847),

LoginEvent (1, "192.168.0.3", "fail", 1558430848),

LoginEvent (2, "192.168.10.10", "success", 1558430850)

))

AssignAscendingTimestamps (_. EventTime * 1000)



//define the pattern, the event flow pattern matching

Val loginFailPattern=Pattern. The begin [LoginEvent] (" begin ")

The where (_. EventType=="fail")

.next (" next ")

The where (_. EventType=="fail")

Within (Time. Seconds (2))



//the pattern on the basis of the input stream application, get the pattern matching stream

Val patternStream=CEP. The pattern (loginStream keyBy (_. UserId), loginFailPattern)



//use the select method of output data extracted from the pattern stream flow



//import scala. Collection. The Map

//val loginFailDataStream: DataStream [Warning]=patternStream. Select ((patternEvents: Map [String, Iterable [LoginEvent]])=& gt; {

////from the Map corresponding login failed events, then packaged into warning

.//val firstFailEvent=patternEvents getOrElse (" begin ", null). The iterator. Next ()

.//val secondFailEvent=patternEvents getOrElse (" next ", null). The iterator. Next ()

//Warning (firstFailEvent userId, firstFailEvent eventTime, secondFailEvent. EventTime, "login fail waring")

//})

Val loginFailDataStream=patternStream. Select (new MySelectFuction ())



//will receive warning information flow output sink

LoginFailDataStream. Print (" warning ")



Env. The execute (" Login Fail Detect with CEP ")

}

}



The class MySelectFuction () extends PatternSelectFunction [LoginEvent, Warning] {

Override def select (patternEvents: util. Map [String, util. The List [LoginEvent]]) : Warning={

Val firstFailEvent=patternEvents. GetOrDefault (" begin ", null). The iterator. Next ()

Val secondFailEvent=patternEvents. GetOrDefault (" next ", null). The iterator. Next ()

Warning (firstFailEvent userId, firstFailEvent eventTime, secondFailEvent. EventTime, "login fail waring")

}

}



15 minutes after the 3.2 users order without pay
nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull
  • Related