I have an application which uses a ScheduledExecutorService:
private final ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
All it does right is now is use a Netty client to connect to a Netty server and send test heartbeats (JSON strings) to the server:
log.info("Starting up signaling client.");
this.signalingClient = new SignalingClient(host, port, this);
signalingClient.getChannelFuture().addListener(future -> {
if (future.isSuccess()) {
log.info("Connection to signaling server completed. Scheduling heartbeat task.");
ses.scheduleAtFixedRate(() -> signalingClient.sendMessage(testHeartBeat()),
3L, 30L, TimeUnit.SECONDS);
} else {
log.info("Connection to signaling server completed with exception.");
}
});
} else {
log.warn("Unable to start Signaling Gateway. Args must match --voice.adapter.endpoint=127.0.0.1:[port]");
}
Here is the Signaling Client's sendMessage():
public void sendMessage(WrappedMessage msg) {
if (channel != null && channel.isActive()) {
log.info("Writing msg to the channel.");
channel.writeAndFlush(msg);
} else {
log.info("Unable to write message to channel.");
}
}
And here is the client initialization:
public SignalingClient(String host, int port, MessageHandler handler) {
serverAddress = new InetSocketAddress(host, port);
bootstrap.group(bossGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG))
.addLast(new MessageDecoder(MessageDecoder.DecodingState.VERSION))
.addLast(new MessageEncoder())
.addLast(new AdapterClientStreamHandler(handler));
}
});
this.f = doConnect();
if (f != null) {
f.syncUninterruptibly();
}
}
private ChannelFuture doConnect() {
try {
f = bootstrap.connect(serverAddress);
f.addListener(future -> channel = f.channel());
return f;
} catch (Exception ex) {
log.warn("Connection to signaling server failed: {}.", ex.getMessage());
return null;
}
}
Given that the connection is healthy and heartbeats can be exchanged, why might it be that after 5 iterations of the executor service does the application halt?
Here's a bit of an update with some log lines and added a future listener for send message:
public void sendMessage(WrappedMessage msg) {
if (channel != null && channel.isActive()) {
log.info("Writing msg to the channel.");
try {
channel.writeAndFlush(msg).addListener(f -> {
if (f.isSuccess()) {
log.info("Successfully wrote message to the channel.");
} else {
log.info("Failed to write message to the channel.");
}
});
} catch (Exception e) {
log.info("Caught exception in writing message: ", e);
}
} else {
log.info("Unable to write message to channel.");
}
}
2023-01-09 17:05:13.635-0800 [I] [26] Writing msg to the channel.
2023-01-09 17:05:13.661-0800 [I] [25] Successfully wrote message to the channel.
2023-01-09 17:05:13.698-0800 [I] [25] Beginning message decode.
2023-01-09 17:05:13.699-0800 [I] [25] Finished decoding message.
2023-01-09 17:05:13.700-0800 [I] [25] Received message from v'1' signaling server: Type: 'HEARTBEAT', Data: '{"type":"heartbeat","data":{"version":"","connections":0,"capabilities":{"udp":[{"address":"10.128.32.86","port":44678}],"tcp":[{"address":"10.128.32.86","port":36197}],"dtls":[{"address":"10.128.32.86","port":44215}]},"expire":"1671582427","instanceId":"b7ea0ad2-363c-40e2-ab15-10eeda74350a"}}'.
2023-01-09 17:05:23.625-0800 [I] [26] Writing msg to the channel.
2023-01-09 17:05:23.626-0800 [I] [25] Successfully wrote message to the channel.
2023-01-09 17:05:23.627-0800 [I] [25] Beginning message decode.
2023-01-09 17:05:23.627-0800 [I] [25] Finished decoding message.
2023-01-09 17:05:23.627-0800 [I] [25] Received message from v'1' signaling server: Type: 'HEARTBEAT', Data: '{"type":"heartbeat","data":{"version":"","connections":0,"capabilities":{"udp":[{"address":"10.128.32.86","port":44678}],"tcp":[{"address":"10.128.32.86","port":36197}],"dtls":[{"address":"10.128.32.86","port":44215}]},"expire":"1671582427","instanceId":"b7ea0ad2-363c-40e2-ab15-10eeda74350a"}}'.
2023-01-09 17:05:33.624-0800 [I] [28] Writing msg to the channel.
2023-01-09 17:05:33.626-0800 [I] [25] Successfully wrote message to the channel.
2023-01-09 17:05:33.627-0800 [I] [25] Beginning message decode.
2023-01-09 17:05:33.627-0800 [I] [25] Finished decoding message.
2023-01-09 17:05:33.627-0800 [I] [25] Received message from v'1' signaling server: Type: 'HEARTBEAT', Data: '{"type":"heartbeat","data":{"version":"","connections":0,"capabilities":{"udp":[{"address":"10.128.32.86","port":44678}],"tcp":[{"address":"10.128.32.86","port":36197}],"dtls":[{"address":"10.128.32.86","port":44215}]},"expire":"1671582427","instanceId":"b7ea0ad2-363c-40e2-ab15-10eeda74350a"}}'.
2023-01-09 17:05:43.626-0800 [I] [26] Writing msg to the channel.
2023-01-09 17:05:43.627-0800 [I] [25] Successfully wrote message to the channel.
2023-01-09 17:05:43.628-0800 [I] [25] Beginning message decode.
2023-01-09 17:05:43.628-0800 [I] [25] Finished decoding message.
2023-01-09 17:05:43.628-0800 [I] [25] Received message from v'1' signaling server: Type: 'HEARTBEAT', Data: '{"type":"heartbeat","data":{"version":"","connections":0,"capabilities":{"udp":[{"address":"10.128.32.86","port":44678}],"tcp":[{"address":"10.128.32.86","port":36197}],"dtls":[{"address":"10.128.32.86","port":44215}]},"expire":"1671582427","instanceId":"b7ea0ad2-363c-40e2-ab15-10eeda74350a"}}'.
2023-01-09 17:05:53.625-0800 [I] [29] Writing msg to the channel.
In the last iteration, the channel future does not complete. Why could this be happening?
Here's another thing I tried in the channelActive of the client handler:
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("Connection to the signaling adapter successful. Scheduling heartbeat task.");
ctx.executor().scheduleAtFixedRate(() -> ctx.writeAndFlush(testHeartBeat()), 0L, 60L, TimeUnit.SECONDS);
}
But the executor still only executes five times.
Update: Increasing the frequency of the executor is allowing the heartbeat signals to persist. So perhaps the channel is idling out? I don't have a lot of Netty experience so I'm bumbling in the dark a little bit.
Another update: I ran my application directly from the command line, as opposed to how it will normally run under production by being a spawned process using a ProcessBuilder. I found that the application runs fine on the command line, but it stops after a few minutes after being spawned by ProcessBuilder. What gives?
CodePudding user response:
I was using a process builder to spawn a process to run this application. But I didn't inherit the IO for the process.
This is what I needed: https://stackoverflow.com/a/55629297/11450331