I have model called AisSignal with about 3000 records and I am running each one against another model called Footprint with about 10 records, so we have a loop 3000 x 10.
I tried:
Parallel.each(AisSignal.all, in_processes: 8) do |signal|
Footprint.all.each do |footprint|
if footprint.cover([signal.lon, signal.lat])
signal.update(imo: 'in')
break
end
end
end
but it runs in 10 seconds just like normal block.
I tried to change from processes to threads like below but this causes application freezing.
Parallel.each(AisSignal.all, in_threads: 8) do |signal|
Footprint.all.each do |footprint|
if footprint.cover([signal.lon, signal.lat])
signal.update(imo: 'in')
break
end
end
end
I have 50 pool size in database.yml
Any idea or approach to have multiple threads that run in parallel to update records. I will need to update more records actually which can take about minutes.
CodePudding user response:
Threads and forks often don't play well with database connections. If not handled correctly the threads/processes can wind up trying to use the same connection at the same time.
Parallel mentions this in their documentation. You need to make use of connection pooling.
A connection pool synchronizes thread access to a limited number of database connections. The basic idea is that each thread checks out a database connection from the pool, uses that connection, and checks the connection back in. ConnectionPool is completely thread-safe, and will ensure that a connection cannot be used by two threads at the same time, as long as ConnectionPool's contract is correctly followed. It will also handle cases in which there are more threads than connections: if all connections have been checked out, and a thread tries to checkout a connection anyway, then ConnectionPool will wait until some other thread has checked in a connection.
Parallel.each(AisSignal.all, in_threads: 8) do |signal|
ActiveRecord::Base.connection_pool.with_connection do
Footprint.all.each do |footprint|
if footprint.cover([signal.lon, signal.lat])
signal.update(imo: 'in')
break
end
end
end
end
Note that this code is very inefficient.
- It loads the entire
AisSignal
table. - For each signal it loads and scans the entire
Footprint
table.
It will use a lot of memory, and it will run in s*f time where s
is the number of signals and f
is the number of footprints.
You can reduce the memory footprint by replacing Footprint.all.each
with Footprint.find_each
. This will load rows in batches.
Threading is not how you make database queries faster. The fundamental problem is you're scanning Footprint multiple times in Ruby rather than letting the database do it. if footprint.cover([signal.lon, signal.lat])
should instead be a where clause.
AisSignal.find_each do |signal|
# With ... being the equivalent of `cover([signal.lon, signal.lat])`
# as a where clause.
signal.update!(imo: 'in') if Footprint.exists?(...)
end
This could be done even faster as a join.
# ... is the equivalent of `cover([signal.lon, signal.lat])`
AisSignal.joins("inner join footprints on ...").update_all(imo: 'in')