Home > Back-end >  Dubbo asynchronous invocation solutions to problems
Dubbo asynchronous invocation solutions to problems

Time:09-17

Scenario description: client remote asynchronous call ServiceA, ServiceA in the process of handling the client requests to remote synchronization call ServiceB, ServiceA took the data from the response of ServiceB, get is null, the is the pits,
Use the DEBUG mode, analysis of Dubbo source code for the cause of the problem,
The analysis process is as follows:
Client and server communication, netty configuration used for network transmission, through the NettyHandler specific messaging operations, so from now on start with source code analysis,
Client to A remote method ServiceA asynchronous invocation, the RpcContext (RpcContext recorder is A temporary state, when receiving the RPC requests, or launch the RPC request, RpcContext state will change, such as: A, B, B and C, B machine, before the C B, RpcContext record information is A callback B, after the C B, RpcContext record is the information of the C B) the attachments (Map) structure attribute to add async=true of key-value pairs, as well as in RpcInvocation attachments (Map) add async=true key/value pair,
After a series of the Filter, the program runs to AbstractInvoker invoke method, pay attention to the method in the following code segment,
Map The context=RpcContext. GetContext (). GetAttachments ();
If (the context.=null) {
Invocation. AddAttachmentsIfAbsent (context);
}
Here will add attachments in the current RpcContext to call ServiceB RpcInvocation, at this time the async=true have been added, and then execute the following code snippet,
If (getUrl (). GetMethodParameter (invocation. GetMethodName (), the ASYNC_KEY, false)) {
Invocation. SetAttachment (Constants. ASYNC_KEY, Boolean. TRUE. The toString ());
}
The above code to determine whether a call ServiceB URL contains async=true, if you have to set the async=true to RpcInvocation attachments, at that time is not contain,
Continue to tracking code, run into the DubboInvoker, call doInvoke method, this method has the following code snippet, Boolean isAsync=RpcUtils. IsAsync (getUrl (), invocation), the isAsync method concrete statement as follows,
Public static Boolean isAsync (URL URL, Invocation inv) {
Boolean isAsync;
//if the Java code set priority.
If (Boolean. TRUE. The toString () equals (inv. GetAttachment (the ASYNC_KEY))) {
IsAsync=true;
} else {
IsAsync=url. GetMethodParameter (getMethodName (inv), the ASYNC_KEY, false);
}
Return isAsync;
}
In the attachments above method first determines RpcInvocation async=true whether, if found, this is an asynchronous invocation, or whether async in the request URL=true, if the establishment is an asynchronous invocation, or is a synchronous call, according to the above parameters passed, now isAsync method returns true, ServiceA synchronous invocation ServiceB became an asynchronous call, continue to look down at the asynchronous call, the code is as follows,
Else if (isAsync) {
ResponseFuture future=currentClient. Request (inv, timeout);
RpcContext. GetContext (). SetFuture (new FutureAdapter (future));
Return new RpcResult ();
}
Here returned to a RpcResult object directly, without the data content, so here, in this case is broken, ServiceA want to target data from the response of course is null, extend again, if ServiceB ServiceC synchronous calls again, this is can normal synchronous calls, because after ServiceA call ServiceB, ConsumerContextFilter invoke method can eliminate attachements, so ServiceC ServiceB can normal synchronous calls,

For the problems above, there are three solutions:
1. The method call twice, so don't recommend
ServiceA call ServiceB place to write the same call twice, like ServiceC ServiceB call this method principle, namely clear attachements, this method is the most simple, but there is a risk for people who do not understand, write the repeat the business code, you accidentally deleted, and from the point of view of writing code, this is very small, so don't recommend,
2. Modify the Dubbo source
Modify AbstractInvoker line 137, change every time to the actual assignment of async,
Boolean isAsync=getUrl (). GetMethodParameter (invocation. GetMethodName (), the ASYNC_KEY, false);
Invocation. SetAttachment (the ASYNC_KEY, String. The valueOf (isAsync));
3. Custom Filter
Implement com. Alibaba. Dubbo. RPC. Filter, remove the async in RpcContext,

@ Activate (group={the PROVIDER})
Public class AsyncFilter implements Filter {
@ Override
Public Result invoke (Invoker<?> Invoker, Invocation Invocation) throws RpcException {
RpcContext. GetContext (). GetAttachments (). Remove (the ASYNC_KEY);
Return invoker. Invoke (invocation);
}
}
At the same time in the SRC/main/resources/meta-inf/dubbo/add com. Alibaba. Dubbo. RPC. The Filter file, file content is as follows:
AsyncFilter=com. ABC. Filter. AsyncFilter

The scheduler Dispatcher scheduling policy
All: all message distributed to the thread pool, including the request, response, connect, disconnect events, such as heartbeat, the default,
Direct: all of the messages are not distributed to the thread pool, all direct execution on IO thread,
Distributed message: only the request and response message to the thread pool, other connection disconnect events, such as heartbeat message, directly on the IO thread execution,
Distributed execution: only request message to the thread pool, do not contain response, response and other connection disconnect events, such as heartbeat, directly on the IO thread execution,
Connection: in IO thread will connect disconnect event in the queue, orderly execute them one by one, the other message distributed to the thread pool,
The thread pool ThreadPool
Fixed: the fixed size thread pool, starts to build thread, not to close, has been held, the default, 200,
Cached: cached thread pool, spare one minute to automatically delete, when need to rebuild,
Limited: scalable thread pool, but the number of threads in the pool will only increase will not shrink, not only growth in order to avoid contraction when suddenly to the performance of the large flow cause problems,
Eager: first create a Worker thread pool, in the task number greater than corePoolSize but less than maximumPoolSize, first create the Worker to handle tasks, while the number of tasks than maximumPoolSize, tasks to be included in the blocking queue, blocking queue is thrown when RejectedExecutionException, (compared with the cached: cached in the task more than the maximumPoolSize directly throw an exception rather than the task into a blocking queue)

CodePudding user response:

Personally test effectively
  • Related