Home > other >  Structured Streaming + kafka @ Resource injection problem
Structured Streaming + kafka @ Resource injection problem

Time:09-17

Recently in a Structured Streaming + kafka development, initialize display Resource annotation beCalTask object is normal, when several of kafka start spending again beCalTask tip is null, the window standalone deployment time no problem, will be deployed on the server cluster, I ask you how to solve?
 
@ Repository
Public class RapidCalculateTask implements Serializable {
Private static Logger log=LoggerFactory. GetLogger (RapidCalculateTask. Class);
/* *
*
*/
Private static final long serialVersionUID=- 7664074371811203339 l;

@ the Resource
BeCalculateTaskDao beCalTask;

Public void startStruStram () {
Try {
CalculateService calService=new CalculateService ();

The Properties of prop=calService. The getProperties ();
String servers_kafka=prop. GetProperty (" spark. Servers_kf ");
String the subscribe=prop. GetProperty (" spark. The subscribe ");

SparkConf conf.=calService getSparkConfig (prop);
The conf. SetJars (SparkContext. JarOfClass (enclosing getClass ()). The toList ());
SparkSession spark=SparkSession. Builder (). The config (conf.) getOrCreate ();

Dataset Lines=spark. ReadStream ()
. The format (" kafka ")
Option (" kafka. The bootstrap. The servers, "servers_kafka)
Option (" subscribe ", subscribe)
The load (). SelectExpr (" CAST (value AS a STRING) ");


StreamingQuery query=lines. WriteStream ()
. Foreach (new RapidForeachWriter (beCalTask))
The start ();

Query. AwaitTermination ();
} the catch (StreamingQueryException e) {
e.printStackTrace();
}
}
}

 
Public class RapidForeachWriter Extends ForeachWriter Implements the Serializable {
/* *
*
*/
Private static final long serialVersionUID=5549303376220035143 l;
Private static Logger log=LoggerFactory. GetLogger (RapidForeachWriter. Class);


Private BeCalculateTaskDao beCalTask;
Private JavaSparkContext javaCont;



Public RapidForeachWriter (BeCalculateTaskDao beCalTask) {
The info (" RapidForeachWriter check whether BeCalculateTaskDao is empty "+ beCalTask);
Enclosing beCalTask=beCalTask;
}

@ Override
Public void close (Throwable errorOrNull) {
//TODO Auto - generated method stub
}

@ Override
Public Boolean open (long partitionId, long version) {
The info (" check whether beCalTask is empty - [partitionId: "+ partitionId +"; version: "+ version +"] "+ beCalTask);
Try {
{if (beCalTask. Equals (null))
return false;
}
return true;
} the catch (Exception e) {
return false;
}
}

@ Override
Public void process (value) Rows {

}
}
  • Related