Home > Blockchain >  Scheduled executor service of 10 threads halts after 5 iterations of using Netty client to send and
Scheduled executor service of 10 threads halts after 5 iterations of using Netty client to send and

Time:01-14

I have an application which uses a ScheduledExecutorService:

private final ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);

All it does right is now is use a Netty client to connect to a Netty server and send test heartbeats (JSON strings) to the server:

log.info("Starting up signaling client.");
        this.signalingClient = new SignalingClient(host, port, this);

        signalingClient.getChannelFuture().addListener(future -> {
            if (future.isSuccess()) {
                log.info("Connection to signaling server completed. Scheduling heartbeat task.");
                ses.scheduleAtFixedRate(() -> signalingClient.sendMessage(testHeartBeat()),
                        3L, 30L, TimeUnit.SECONDS);
            } else {
                log.info("Connection to signaling server completed with exception.");
            }
        });
    } else {
        log.warn("Unable to start Signaling Gateway. Args must match --voice.adapter.endpoint=127.0.0.1:[port]");
    }

Here is the Signaling Client's sendMessage():

public void sendMessage(WrappedMessage msg) {
    if (channel != null && channel.isActive()) {
        log.info("Writing msg to the channel.");
        channel.writeAndFlush(msg);
    } else {
        log.info("Unable to write message to channel.");
    }
}

And here is the client initialization:

public SignalingClient(String host, int port, MessageHandler handler) {
    serverAddress = new InetSocketAddress(host, port);

    bootstrap.group(bossGroup)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG))
                            .addLast(new MessageDecoder(MessageDecoder.DecodingState.VERSION))
                            .addLast(new MessageEncoder())
                            .addLast(new AdapterClientStreamHandler(handler));
                }
            });

    this.f = doConnect();
    if (f != null) {
        f.syncUninterruptibly();
    }
}

private ChannelFuture doConnect() {
        try {
            f = bootstrap.connect(serverAddress);
            f.addListener(future -> channel = f.channel());
            return f;
        } catch (Exception ex) {
            log.warn("Connection to signaling server failed: {}.", ex.getMessage());
            return null;
        }
    }

Given that the connection is healthy and heartbeats can be exchanged, why might it be that after 5 iterations of the executor service does the application halt?

Here's a bit of an update with some log lines and added a future listener for send message:

 public void sendMessage(WrappedMessage msg) {
    if (channel != null && channel.isActive()) {
        log.info("Writing msg to the channel.");
        try {
            channel.writeAndFlush(msg).addListener(f -> {
                if (f.isSuccess()) {
                    log.info("Successfully wrote message to the channel.");
                } else {
                    log.info("Failed to write message to the channel.");
                }
            });
        } catch (Exception e) {
            log.info("Caught exception in writing message: ", e);
        }

    } else {
        log.info("Unable to write message to channel.");
    }
}

    2023-01-09 17:05:13.635-0800  [I] [26] Writing msg to the channel.
2023-01-09 17:05:13.661-0800  [I] [25] Successfully wrote message to the channel.
2023-01-09 17:05:13.698-0800  [I] [25] Beginning message decode.
2023-01-09 17:05:13.699-0800  [I] [25] Finished decoding message.
2023-01-09 17:05:13.700-0800  [I] [25] Received message from v'1' signaling server: Type: 'HEARTBEAT', Data: '{"type":"heartbeat","data":{"version":"","connections":0,"capabilities":{"udp":[{"address":"10.128.32.86","port":44678}],"tcp":[{"address":"10.128.32.86","port":36197}],"dtls":[{"address":"10.128.32.86","port":44215}]},"expire":"1671582427","instanceId":"b7ea0ad2-363c-40e2-ab15-10eeda74350a"}}'.
2023-01-09 17:05:23.625-0800  [I] [26] Writing msg to the channel.
2023-01-09 17:05:23.626-0800  [I] [25] Successfully wrote message to the channel.
2023-01-09 17:05:23.627-0800  [I] [25] Beginning message decode.
2023-01-09 17:05:23.627-0800  [I] [25] Finished decoding message.
2023-01-09 17:05:23.627-0800  [I] [25] Received message from v'1' signaling server: Type: 'HEARTBEAT', Data: '{"type":"heartbeat","data":{"version":"","connections":0,"capabilities":{"udp":[{"address":"10.128.32.86","port":44678}],"tcp":[{"address":"10.128.32.86","port":36197}],"dtls":[{"address":"10.128.32.86","port":44215}]},"expire":"1671582427","instanceId":"b7ea0ad2-363c-40e2-ab15-10eeda74350a"}}'.
2023-01-09 17:05:33.624-0800  [I] [28] Writing msg to the channel.
2023-01-09 17:05:33.626-0800  [I] [25] Successfully wrote message to the channel.
2023-01-09 17:05:33.627-0800  [I] [25] Beginning message decode.
2023-01-09 17:05:33.627-0800  [I] [25] Finished decoding message.
2023-01-09 17:05:33.627-0800  [I] [25] Received message from v'1' signaling server: Type: 'HEARTBEAT', Data: '{"type":"heartbeat","data":{"version":"","connections":0,"capabilities":{"udp":[{"address":"10.128.32.86","port":44678}],"tcp":[{"address":"10.128.32.86","port":36197}],"dtls":[{"address":"10.128.32.86","port":44215}]},"expire":"1671582427","instanceId":"b7ea0ad2-363c-40e2-ab15-10eeda74350a"}}'.
2023-01-09 17:05:43.626-0800  [I] [26] Writing msg to the channel.
2023-01-09 17:05:43.627-0800  [I] [25] Successfully wrote message to the channel.
2023-01-09 17:05:43.628-0800  [I] [25] Beginning message decode.
2023-01-09 17:05:43.628-0800  [I] [25] Finished decoding message.
2023-01-09 17:05:43.628-0800  [I] [25] Received message from v'1' signaling server: Type: 'HEARTBEAT', Data: '{"type":"heartbeat","data":{"version":"","connections":0,"capabilities":{"udp":[{"address":"10.128.32.86","port":44678}],"tcp":[{"address":"10.128.32.86","port":36197}],"dtls":[{"address":"10.128.32.86","port":44215}]},"expire":"1671582427","instanceId":"b7ea0ad2-363c-40e2-ab15-10eeda74350a"}}'.
2023-01-09 17:05:53.625-0800  [I] [29] Writing msg to the channel.

In the last iteration, the channel future does not complete. Why could this be happening?

Here's another thing I tried in the channelActive of the client handler:

@Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("Connection to the signaling adapter successful. Scheduling heartbeat task.");
        ctx.executor().scheduleAtFixedRate(() -> ctx.writeAndFlush(testHeartBeat()), 0L, 60L, TimeUnit.SECONDS);
    }

But the executor still only executes five times.

Update: Increasing the frequency of the executor is allowing the heartbeat signals to persist. So perhaps the channel is idling out? I don't have a lot of Netty experience so I'm bumbling in the dark a little bit.

Another update: I ran my application directly from the command line, as opposed to how it will normally run under production by being a spawned process using a ProcessBuilder. I found that the application runs fine on the command line, but it stops after a few minutes after being spawned by ProcessBuilder. What gives?

CodePudding user response:

I was using a process builder to spawn a process to run this application. But I didn't inherit the IO for the process.

This is what I needed: https://stackoverflow.com/a/55629297/11450331

  • Related