Home > Back-end >  Question about using multiprocessing and slaves in Python. Getting error: <class 'TypeError&
Question about using multiprocessing and slaves in Python. Getting error: <class 'TypeError&

Time:12-27

I just started with trying to use multiprocessing in Python to offload some tasks. This is the basic code here, but I am using it as part of a 'Python Plug-in' that is part of Orthanc, as referenced here: Orthanc Multiprocessing

It is a little complicated, but the issue I am having seems to be maybe pretty simple:

"Slave Process"

def DelegateStudyArchive(uri):

    new_zip = BytesIO()
    logging.info("In the Slave Handler")
    r = requests.get('http://localhost:8042' uri, headers = { 'Authorization' : TOKEN })
    logging.info(r.ok)
    logging.info(r.headers)
    archive = r.text # vs. text vs. content
    with ZipFile('/python/radiant_cd.zip', 'r') as radiant_zip:
        with ZipFile(new_zip, 'w') as new_archive:
            for item in radiant_zip.filelist:
                #  To get rid of '__MACOSX' files skip them here
                if '__MACOSX' not in item.filename:
#                     logging.info("Adding "  item.filename  " to archive")
                    new_archive.writestr(item, radiant_zip.read(item.filename))
                else:
                    logging.info("Skipping "  item.filename  ", it is a Mac OS file remnant.")
            new_archive.writestr('dcmdata.zip', archive)
            # Important to read as binary, otherwise the codec fails.
            f = open("/python/ReadMe.pdf", "rb")
            new_archive.writestr('ReadMe.pdf', f.read())
            f.close()
    value = new_zip.getvalue()
    return value

Main script

def OnDownloadStudyArchive(output, uri, **request):

    # Offload the call to "SlowComputation" onto one slave process.
    # The GIL is unlocked until the slave sends its answer back.
    host = "Not Defined"
    userprofilejwt = "Not Defined"
    if "headers" in request and "host" in request['headers']:
        host = request['headers']['host']
    if "headers" in request and "userprofilejwt" in request['headers']:
        userprofilejwt = request['headers']['userprofilejwt']
    logging.info("STUDY|DOWNLOAD_ARCHIVE|ID="   request['groups'][0]   "  HOST="   host   "  PROFILE=  "   userprofilejwt)
    uri = uri.replace("_slave", '')
    answer = POOL.apply(DelegateStudyArchive(uri), args=(uri), kwds = {})
    pool.close()
    output.AnswerBuffer(answer, 'application/zip')

orthanc.RegisterRestCallback('/studies/(.*)/archive_slave', OnDownloadStudyArchive)

I got far enough to get the Main script to call DelegateStudyArchive(uri) because the logger is showing:

2022-12-25 04:55:24,504 | root | INFO     | In the Slave Handler
2022-12-25 04:55:24,525 | urllib3.connectionpool | DEBUG    | Starting new HTTP connection (1): localhost:8042
2022-12-25 04:55:24,686 | urllib3.connectionpool | DEBUG    | http://localhost:8042 "GET /studies/0cc9fb82-726d3dfc-e6f2b353-e96558d7-986cbb2c/archive HTTP/1.1" 200 None
2022-12-25 04:55:25,610 | root | INFO     | JOB|JOB_SUCCESS|{"CompletionTime": "20221225T095525.609389", "Content": {"ArchiveSize": "7520381", "ArchiveSizeMB": 7, "Description": "REST API", "InstancesCount": 51, "UncompressedSize": "17817326", "UncompressedSizeMB": 16}, "CreationTime": "20221225T095524.546173", "EffectiveRuntime": 0.923, "ErrorCode": 0, "ErrorDescription": "Success", "ErrorDetails": "", "ID": "8b619458-5b82-441d-9505-94e68d90398e", "Priority": 0, "Progress": 100, "State": "Success", "Timestamp": "20221225T095525.609624", "Type": "Archive"}
2022-12-25 04:55:25,612 | root | INFO     | JOB|MEDIA|ArchiveorDCMCreatedviaJOB
2022-12-25 04:55:25,622 | root | INFO     | True
2022-12-25 04:55:25,623 | root | INFO     | {'Connection': 'close', 'Content-Disposition': 'filename="0cc9fb82-726d3dfc-e6f2b353-e96558d7-986cbb2c.zip"', 'Content-Type': 'application/zip'}
2022-12-25 04:55:26,468 | charset_normalizer | DEBUG    | Encoding detection: Unable to determine any suitable charset.

But then I get an error in the main script that says:

E1225 04:55:27.163292 PluginsManager.cpp:153] Error in the REST callback, traceback:
<class 'TypeError'>
'bytes' object is not callable

  File "/python/combined.py", line 2147, in OnDownloadStudyArchive
    answer = POOL.apply(DelegateStudyArchive(uri), args=(uri), kwds = {})

  File "/usr/lib/python3.9/multiprocessing/pool.py", line 357, in apply
    return self.apply_async(func, args, kwds).get()

  File "/usr/lib/python3.9/multiprocessing/pool.py", line 771, in get
    raise self._value

and so I think "answer" is null or just throws an exception and the zip file is not returned. I presume / hope there is an easy fix for that since it otherwise seems to be working, and if so, I have several other places where I would like to do something similar.

CodePudding user response:

Thank you for hints. I refactored my code. I think things are complicated a little by the fact that my main script is a "Plug-In" for Orthanc. What I did was to create a separate download.py file for my custom script like:

download.py

from zipfile import ZipFile, ZipInfo
import io
from io import BytesIO
import requests
import orthanc

TOKEN = orthanc.GenerateRestApiAuthorizationToken()

def DelegateStudyArchive(uri):
    print("In download.py module via multiprocessing")
    new_zip = BytesIO()
    r = requests.get('http://localhost:8042' uri, headers = { 'Authorization' : TOKEN })
    archive = r.content # vs. text vs. content
    with ZipFile('/python/radiant_cd.zip', 'r') as radiant_zip:
        with ZipFile(new_zip, 'w') as new_archive:
            for item in radiant_zip.filelist:
                if '__MACOSX' not in item.filename:
                    new_archive.writestr(item, radiant_zip.read(item.filename))
            new_archive.writestr('dcmdata.zip', archive)
            f = open("/python/ReadMe.pdf", "rb")
            new_archive.writestr('ReadMe.pdf', f.read())
            f.close()
    return new_zip.getvalue()

and then in my main script, I import that custom script:

main.py

from  download import DelegateStudyArchive
. . .
# PROTOTYPE FOR DELEGATING TO SLAVE MODULE USING MULTIPROCESSING

def OnDownloadStudyArchive(output, uri, **request):

    # Offload the call to "SlowComputation" onto one slave process.
    # The GIL is unlocked until the slave sends its answer back.
    host = "Not Defined"
    userprofilejwt = "Not Defined"
    if "headers" in request and "host" in request['headers']:
        host = request['headers']['host']
    if "headers" in request and "userprofilejwt" in request['headers']:
        userprofilejwt = request['headers']['userprofilejwt']
    logging.info("Delegating Study Archive Download to download.py module . . . .")
    logging.info("STUDY|DOWNLOAD_ARCHIVE|ID="   request['groups'][0]   "  HOST="   host   "  PROFILE=  "   userprofilejwt)
    uri = uri.replace("_slave", '')
    answer = POOL.apply(DelegateStudyArchive, args=(uri, ))
    output.AnswerBuffer(answer, 'application/zip')

orthanc.RegisterRestCallback('/studies/(.*)/archive_slave', OnDownloadStudyArchive)

Now, answer = POOL.apply(DelegateStudyArchive, args=(uri, )) works and it executes the script as desired.

This is actually a much better arrangement because I have a few other methods currently in the mains script that I would like to handle in the same way, and they are also sort of 'modular' this way.

If you read the details in the link given at the top of this post, you'll see a more detailed explanation about why this might be necessary when using the Orthanc Python Plug-in.

CodePudding user response:

You have coded:

answer = POOL.apply(DelegateStudyArchive(uri), args=(uri), kwds = {})

What does this do? If you look at the first argument to apply, it is the result of calling DelegateStudyArchive(uri), which is a bytes string. This argument should just be a reference to a "worker" function. Your second argument, the value passed as the args keyword, should be an iterable enumerating all the positional arguments to be passed to DelegateStudyArchive. What you are passing is uri, a string. Note that the parentheses around uri accomplishes nothing and it is equivalent to coding args=uri. Since a string is an iterable, you would be passing N arguments to your worker function where N is the length of uri and the arguments are just the individual characters of the string. The following code demonstrates this:

from multiprocessing import Pool

def worker(arg0, arg1, arg2):
    print(arg0)
    print(arg1)
    print(arg2)

if __name__ == '__main__':
    with Pool(1) as pool:
        pool.apply(worker, args=('abc'))

Prints:

a
b
c

So what you need to be passing as the args keyword value is either a list or tuple of your actual parameters to be passed:

from multiprocessing import Pool

def worker(s):
    print(s)

if __name__ == '__main__':
    with Pool(1) as pool:
        pool.apply(worker, args=('abc',))

Prints:

abc

Note the comma in args=(s,) making (s,) a tuple.

So, in your case:

answer = POOL.apply(DelegateStudyArchive, args=(uri,), kwds = {})
  • Related