I have a Springboot Kotlin app running in AWS ECS/EC2 with some code written by my predecessor. It runs into OutOfMemoryError after running a while in the server. I am not familiar with ReactiveX and threading, so would like to ask for some help on this.
I see from the error logs that it should be related to the Observable.blockingForEach method?
The listOfKeys is a limited source of 5 to 50 items, so it should not be due to backpressure?
I also pasted the thread dump when OutOfMemoryError occurred.
Code inside myMethod
Observable.fromIterable(listOfKeys)
.flatMap { key ->
Maybe.fromCallable {
Pair(key, methodWhichCallsExternalAPI))
}
.toObservable()
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(4)))
}
.blockingForEach {
// some simple assignment of result to variables
}
Exception logs
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
at io.reactivex.internal.schedulers.ExecutorScheduler.scheduleDirect(ExecutorScheduler.java:59)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn.subscribeActual(ObservableSubscribeOn.java:36)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:165)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
at io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable.run(ObservableFromIterable.java:98)
at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:58)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at io.reactivex.internal.operators.observable.BlockingObservableIterable.iterator(BlockingObservableIterable.java:39)
at io.reactivex.Observable.blockingForEach(Observable.java:5183)
at com.myOrg.MyController.myMethod(MyController.kt:240)
at com.myOrg.MyController$$FastClassBySpringCGLIB$$38688575.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:61)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at com.dbs.localisemanagement.localise.LocaliseController$$EnhancerBySpringCGLIB$$3e102d4f.lokaliseProjectsS3(<generated>)
at sun.reflect.GeneratedMethodAccessor134.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1064)
Thread dump - each of the 3 parts are repeated many times
at sun.misc.Unsafe.park(Native Method)
"pool-30968-thread-1" Id=36794 WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@394a0729
at java.lang.Thread.run(Thread.java:748)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
- waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@f0c2d74
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940)
- locked java.lang.Object@5e78eb81
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.socketRead0(Native Method)
"pool-30982-thread-1" Id=36808 RUNNABLE
- java.util.concurrent.ThreadPoolExecutor$Worker@7d90150f
Number of locked synchronizers = 1
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:306)
at org.apache.http.pool.AbstractConnPool$2.get(AbstractConnPool.java:198)
- locked org.apache.http.pool.AbstractConnPool$2@a0abf72
at org.apache.http.pool.AbstractConnPool$2.get(AbstractConnPool.java:253)
at org.apache.http.pool.AbstractConnPool.access$300(AbstractConnPool.java:70)
at org.apache.http.pool.AbstractConnPool.getPoolEntryBlocking(AbstractConnPool.java:393)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
- waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@7399fe63
at sun.misc.Unsafe.park(Native Method)
"pool-30987-thread-1" Id=36813 WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@7399fe63
- java.util.concurrent.ThreadPoolExecutor$Worker@5b4dcfd1
Number of locked synchronizers = 1
CodePudding user response:
The most likely problem is the repeated creation of a thread pool that is not released after use.
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(4)))
The more the method runs the more abandoned threads it creates.
You should probably create a static Scheduler
and use that:
static final ExecutorService pool =
Executors.newFixedThreadPool(4);
static final Scheduler scheduler = Schedulers.from(pool);
Later on, after the app's lifecycle ended, call pool.shutdown()
to free the threads.