I have 4 numpy arrays in python. I'm sending them to a spawned c process. This works with small arrays, but when the array gets too big, the c program reads (what appears to be the opposite byte order of the machine (little)) an incorrect value.
typical values for the python/numpy array are 0.25, 0.5, 0.7 etc.
the c program receives these correctly until it gets to be many thousands of elements, and then the c program begins to receive strange values which appear to be the big endian version of the little endian values it should get. It has nothing to do with syncing, because the middle array sends all data perfectly no matter the size. It's only my first and last array that does this -- even though all python/numpy arrays have been forced to be the same type.
once array gets too big: c receives values like: -4.34345e-266, -5.67456e-50, etc.
Does anyone have any guesses as to why this would happen? I have been stuck on this for a while.
Edit: It seems like a platform issue. The code works on my local computer (Ubuntu 18.04, python 3.6.9, g 7.5.0) but fails on Amazon AWS (Ubuntu 18.04, python 3.6.9, g 7.5.0) -- same specs it seems. Are there other platform dependencies I should be considering?
Edit 2: I've narrowed it down more. It appears to be an issue that occurs once a certain amount of time has elapsed, rather than occurring after a certain array size. Which is why my local machine transferred all data before the issue arose. AWS uses vCPU (virtual) and it may take longer to communicate between processes, which is why I notice it on the cloud server and not my local server... Any suggestions as to what may be causing this is helpful.
# python
...
p = subprocess.Popen(
'shell text to launch C process',
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
...
<loop through all arrays>
array = arrays[i]
p.stdin.write(<the size and type of array>)
<looping through numpy array>
value = array[x][y]
b = struct.pack('<d', value)
p.stdin.write(b)
p.stdin.flush()
// c
...
<loop through expected arrays>
int size;
r = fread(&size, sizeof(int), 1, stdin);
<allocate array with size and type>
<loop through size of array>
double value;
r = fread(&value, sizeof(double), 1, stdin);
<input values into allocated array>
Here are the unique values in python right before sending, and in c right after receiving (aid is the array's id):
py >> aid: 0 val : 1.5147748861987186e-07
py >> aid: 0 val : 0.00027064327098285035
py >> aid: 0 val : 0.00032440267655912817
py >> aid: 0 val : 0.00039440984799195717
py >> aid: 0 val : 0.00048666128622491974
py >> aid: 0 val : 0.0006088895533821487
py >> aid: 0 val : 0.0007693269194044289
py >> aid: 0 val : 0.0009713703111738414
py >> aid: 0 val : 0.0011995490513889394
py >> aid: 0 val : 0.0013989766453051436
py >> aid: 0 val : 0.0014816246426333445
py >> aid: 0 val : 0.000971370311173842
py >> aid: 0 val : 0.000608889553382149
py >> aid: 0 val : 0.0003944098479919573
py >> aid: 0 val : 0.0003244026765591282
py >> aid: 0 val : 0.0006091367065721885
py >> aid: 0 val : 1.1195904948938864
py >> aid: 0 val : 0.0006535893828657839
py >> aid: 0 val : 0.0006896927523924802
py >> aid: 0 val : 0.0007134077346636406
py >> aid: 0 val : 0.0007216922125962245
py >> aid: 0 val : 0.17
py >> aid: 0 val : 205.0
py >> aid: 0 val : 0.0016111429295754896
py >> aid: 0 val : 0.001897252365911768
py >> aid: 0 val : 0.0020511342434220143
py >> aid: 0 val : 0.0021170160163027038
py >> aid: 0 val : 0.0021201443585756935
py >> aid: 0 val : 2.407359807268656e-06
py >> aid: 0 val : 0.0011768390663802282
py >> aid: 0 val : 0.0013958354194843637
py >> aid: 0 val : 0.001670834944478645
py >> aid: 0 val : 0.002016004763800768
py >> aid: 0 val : 0.002444680031953682
py >> aid: 0 val : 0.0029617508597233667
py >> aid: 0 val : 0.0035475199335043986
py >> aid: 0 val : 0.004133617431826363
py >> aid: 0 val : 0.0045900107907456715
py >> aid: 0 val : 0.0047657186623283965
py >> aid: 0 val : 0.004133617431826364
py >> aid: 0 val : 0.0035475199335044
py >> aid: 0 val : 0.0024446800319536835
py >> aid: 0 val : 0.0016708349444786454
py >> aid: 1 val : 0.0
py >> aid: 1 val : 0.1978674776035768
py >> aid: 2 val : 295.15
c >> aid: 0 val: 1.51477e-07
c >> aid: 0 val: 0.000270643
c >> aid: 0 val: 0.000324403
c >> aid: 0 val: 0.00039441
c >> aid: 0 val: 0.000486661
c >> aid: 0 val: 0.00060889
c >> aid: 0 val: 0.000769327
c >> aid: 0 val: 0.00097137
c >> aid: 0 val: 0.00119955
c >> aid: 0 val: 0.00139898
c >> aid: 0 val: 0.00148162
c >> aid: 0 val: 0.00097137
c >> aid: 0 val: 0.00060889
c >> aid: 0 val: 0.00039441
c >> aid: 0 val: 0.000324403
c >> aid: 0 val: 0.000609137
c >> aid: 0 val: 1.11959
c >> aid: 0 val: 0.000653589
c >> aid: 0 val: 0.000689693
c >> aid: 0 val: 0.000713408
c >> aid: 0 val: 0.000721692
c >> aid: 0 val: 0.17
c >> aid: 0 val: 205
c >> aid: 0 val: -7.26344e 201
c >> aid: 0 val: -7.26344e 201
c >> aid: 0 val: -1.11628e-125
c >> aid: 0 val: -1.11628e-125
c >> aid: 0 val: -7.26344e 201
c >> aid: 0 val: -4.31009e 12
c >> aid: 0 val: -1.49167e-154
c >> aid: 0 val: -1.49167e-154
c >> aid: 0 val: -4.31009e 12
c >> aid: 0 val: -7.26344e 201
c >> aid: 0 val: -4.31009e 12
c >> aid: 0 val: 3.27272e 181
c >> aid: 0 val: -7.26344e 201
c >> aid: 0 val: 3.27272e 181
c >> aid: 0 val: 3.27272e 181
c >> aid: 0 val: 2.31398e-204
c >> aid: 0 val: -7.26344e 201
c >> aid: 0 val: 2.31398e-204
c >> aid: 0 val: 2.31398e-204
c >> aid: 0 val: -3.46371e 65
c >> aid: 0 val: -7.26344e 201
c >> aid: 0 val: -3.46371e 65
c >> aid: 0 val: -3.46371e 65
c >> aid: 0 val: 3.12744e 114
c >> aid: 0 val: -7.26344e 201
c >> aid: 0 val: 3.12744e 114
c >> aid: 0 val: 3.12744e 114
c >> aid: 0 val: 1.23842e 146
c >> aid: 0 val: 1.23842e 146
c >> aid: 0 val: 5.81017e-69
c >> aid: 0 val: 9.07095e 38
c >> aid: 0 val: 9.07095e 38
c >> aid: 0 val: -3.08372e 147
c >> aid: 0 val: -3.08372e 147
c >> aid: 0 val: 2.46041e 154
c >> aid: 0 val: 2.46041e 154
c >> aid: 0 val: -4.86069e-290
c >> aid: 0 val: -4.86069e-290
c >> aid: 0 val: 1.33577e-275
c >> aid: 0 val: 1.33577e-275
c >> aid: 0 val: 8.41193e 15
c >> aid: 0 val: 8.41193e 15
c >> aid: 0 val: 4.09165e-233
c >> aid: 0 val: 4.09165e-233
c >> aid: 0 val: -7.36835e 223
c >> aid: 0 val: -7.36835e 223
c >> aid: 0 val: -6.59221e 62
c >> aid: 0 val: -6.59221e 62
c >> aid: 0 val: -1.60471e-283
c >> aid: 0 val: -1.60471e-283
c >> aid: 0 val: -6.59221e 62
c >> aid: 0 val: -7.36835e 223
c >> aid: 0 val: -7.36835e 223
c >> aid: 0 val: 4.09165e-233
c >> aid: 0 val: 4.09165e-233
c >> aid: 0 val: 8.41193e 15
c >> aid: 0 val: 1.33577e-275
c >> aid: 0 val: 1.33577e-275
c >> aid: 0 val: -4.86069e-290
c >> aid: 0 val: 2.46041e 154
c >> aid: 0 val: 2.46041e 154
c >> aid: 0 val: -3.08372e 147
c >> aid: 0 val: 9.07095e 38
c >> aid: 0 val: 5.81017e-69
c >> aid: 1 val: 0
c >> aid: 1 val: 0.197867
c >> aid: 2 val: 295.15
Here's a sample of what is going on. Run the python code and see how it communicates. While creating this example, I noticed a limit on the dim
to be 144
which means the max bytes my computer could transfer was 165888
before failing. (Edit: this is due to pipe blocking when read and write are in the same thread, as pointed out in an answer below, but my actual code uses threads and is non-blocking. The issue still seems to be time lapse related, as it works on my local server)
# python 3
import numpy
import subprocess
import struct
import sys
import time
s = subprocess.Popen(
'g -Ofast -pthread -o script comm_test.cpp && ./script',
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=-1,
)
dim = 4
a = numpy.arange(dim*dim, dtype=numpy.float64).reshape((dim, dim))
print(a)
bdim = dim.to_bytes(4, byteorder=sys.byteorder)
s.stdin.write(bdim)
for i in range(dim):
for j in range(dim):
b = struct.pack('<d', a[i][j])
s.stdin.write(b)
s.stdin.flush()
while True:
data = s.stderr.readline()
text = '[c ] {}'.format(data.decode('utf-8'), end='')
print(text)
if 'done' in text:
break
// comm_test.cpp
#include <iostream>
#include <stdio.h>
#include <streambuf>
using namespace std;
int main(){
cerr << "started" << endl;
int r, dim;
r = fread(&dim, sizeof(int), 1, stdin);
cerr << "dim: " << dim << endl;
double* array = new double[dim*dim];
for(int i = 0; i < dim*dim; i ){
double val;
r = fread(&val, sizeof(double), 1, stdin);
array[i] = val;
cerr << val << endl;
}
cerr << "done" << endl;
return 0;
}
CodePudding user response:
Your problem is not on the Python side. Your problem is that the C application is filling up its stdout pipe buffer and blocking, because your Python code doesn't read anything until it finishes writing.
To do this kind of communication, you need reads and writes to be simultaneous. You can do this with a simple TCP socket, but you'd have the same problem; the buffers are not infinitely large.
CodePudding user response:
I figured it out. I had a loop somewhere that was pinging the c process on a timer. It was running prior to the completion of all initial data transfer, therefore, the ping would send a couple bytes of data somewhere in the middle of transfer (hence why I couldn't nail down an exact point of error) and this would throw off all the subsequent pieces of data. I was right in assuming my local computer IPC was faster than AWS Cloud vCPUs IPC. That's why my local computer worked - initial data transfer was able to out-pace the timer loop.
Since this won't help anyone much, I can post a better version of a simple inter process PIPE comm (with threads, so it's non-blocking [no limit on data transferred]):
# python 3
import numpy
import subprocess
import struct
import sys
import time
import threading
from time import perf_counter
s = subprocess.Popen(
'g -Ofast -pthread -o script comm_test.cpp && ./script',
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
def reader(s):
while True:
data = s.stdout.readline()
text = '[c ] {}'.format(data.decode('utf-8'), end='')
print(text)
if 'done' in text:
break
r = threading.Thread(target=reader, args=(s,))
r.start()
dim = 1000
a = numpy.arange(dim*dim, dtype=numpy.float64).reshape((dim, dim))
print(a)
bdim = struct.pack('<i', dim)
s.stdin.write(bdim)
t1 = perf_counter()
for i in range(dim):
for j in range(dim):
val = a[i][j]
val = numpy.float64(val)
b = struct.pack('<d', val)
s.stdin.write(b)
s.stdin.flush()
t2 = perf_counter()
r.join()
print('finished in {:.4f} seconds'.format(t2-t1))
// c comm_test.cpp
#include <iostream>
#include <stdio.h>
#include <streambuf>
using namespace std;
int main(){
cerr << "started" << endl;
int r, dim;
r = fread(&dim, sizeof(int), 1, stdin);
cerr << "dim: " << dim << endl;
double* array = new double[dim*dim];
for(int i = 0; i < dim*dim; i ){
double val;
r = fread(&val, sizeof(double), 1, stdin);
array[i] = val;
cerr << val << endl;
}
cerr << "done" << endl;
return 0;
}