following this question, I face the following problem:
When I want to loop over some hashes in a couple of threads and update them, I get the following error:
Thread .. terminated abnormally: Invalid value for shared scalar at ...
This is the code:
use feature qw(say);
use strict;
use warnings;
use threads ;
use threads::shared ;
use Data::Dumper qw(Dumper);
my %h1 = (a => 1, b => 2);
my %h2 = (c => 3, d => 4);
my $a1 = \%h1;
my $b1 = \%h2;
my $a1c = shared_clone($a1);
my $b1c = shared_clone($b1);
my $lockvar:shared;
my $nthreads = 3;
for ( 1..$nthreads ) {
threads->create('job_to_parallelize', $a1c, $b1c, \$lockvar ) ;
}
$_->join() for threads->list();
sub job_to_parallelize {
my ($a1, $b1, $lockvar) = @_;
{
lock $lockvar;
$a1->{a} ;
$b1->{d} ;
$a1->{scalar}{10} = 1;
}
}
print Dumper($a1c);
print Dumper($b1c);
From what I understand, it's because "scalar" didn't exist before I cloned. Any idea how to solve this? (i.e. Allow me to declare inside threads on cloned structures).
CodePudding user response:
The issue is that $a1->{scalar}{10} = 1
creates a new hash, which is stored in $a1->{scalar}
and has one key (10
), whose value is 1
. This new hash is not shared (you never shared it), hence the issue. To fix the issue, you need to share this new hash. For instance:
$a1->{scalar} //= shared_clone {} ;
$a1->{scalar}{10} = 1;
Which can also be written as:
($a1->{scalar} //= shared_clone {} )->{10} = 1;
I recognize that it's a bit tedious, especially if you have a lot of data to share. If you don't update the same key from different threads, then you could create an arbitrarily complex hashes within the threads, and share them at the end. For instance,
sub job_to_parallelize {
my ($a1, $b1, $lockvar) = @_;
my %thr_data = (key1 => 'data1',
key2 => { inside => 42 });
$a1->{threads->tid} = shared_clone(\%thr_data);
}
Even if you do update the same keys in the threads, you might be able to use this approach and recombine the hashes at the end, for instance with Hash::Merge
.
CodePudding user response:
As explained in the answer by @Dada, if you try to add a reference to a hash, and that reference did not exist at the time you cloned the hash, you need to mark the new reference as shared too. Here is another example of how you could do that recursively:
use feature qw(say);
use strict;
use warnings;
use threads ;
use threads::shared ;
use Data::Dumper qw(Dumper);
my %h1 = (a => 1, b => 2);
my %h2 = (c => 3, d => 4);
my $a1 = \%h1;
my $b1 = \%h2;
my $a1c = shared_clone($a1);
my $b1c = shared_clone($b1);
my $lockvar:shared;
my $nthreads = 3;
for ( 1..$nthreads ) {
threads->create('job_to_parallelize', $a1c, $b1c, \$lockvar ) ;
}
$_->join() for threads->list();
sub job_to_parallelize {
my ($a1, $b1, $lockvar) = @_;
{
lock $lockvar;
my $ref = check_exists_key($a1, 'a');
$$ref ;
$ref = check_exists_key($b1, 'd');
$$ref ;
$ref = check_exists_key($a1, ['scalar', '10']);
$$ref = 1;
}
}
sub check_exists_key {
my ($ref, $key ) = @_;
if (ref $key eq 'ARRAY') {
die "Unexpected: key array is empty" if @$key == 0;
if ( @$key == 1 ) {
$key = shift @$key;
}
else {
my $k = shift @$key;
if (exists $ref->{$k}) {
die "Will not overwrite scalar value with a reference"
if $ref->{$k} eq '';
return check_exists_key( $ref->{$k}, $key);
}
if (@$key > 0) {
my %hash;
share(%hash);
$ref->{$k} = \%hash;
return check_exists_key( $ref->{$k}, $key );
}
}
}
if ( !exists $ref->{$key} ) {
$ref->{$key} = undef;
}
return \($ref->{$key});
}
print Dumper({a1c => $a1c});
print Dumper({b1c => $b1c});
Output:
$VAR1 = {
'a1c' => {
'b' => 2,
'a' => 4,
'scalar' => {
'10' => 1
}
}
};
$VAR1 = {
'b1c' => {
'c' => 3,
'd' => 7
}
};