On a Windows 10 system running Python 3.6, when trying to use multiprocessing.Process to create a new rq worker,
multiprocessing.Process(target=Worker(qs).work, kwargs={'burst': True}).start()
we encounter the error
TypeError: can't pickle _thread.lock objects
and
OSError: [WinError 87] The parameter is incorrect
rq.SimpleWorker was used instead of rq.Worker because Windows does not support the fork function used by rq.Worker.
Question: What is causing this error? How can we solve it?
from rq import SimpleWorker, Connection
def needMoreWorkers():
return Trueif __name__ == '__main__':
qs = sys.argv[1:] or ['default']
with Connection(connection=my_redis_conn):
if needMoreWorkers():
multiprocessing.Process(target=SimpleWorker(qs).work, kwargs={'burst': True}).start()
Error Traceback
Traceback (most recent call last):
File "WorkerFactory.py", line 53, in <module>
main(qs)
File "WorkerFactory.py", line 45, in main
multiprocessing.Process(target=SimpleWorker(qs)).start()
File "C:\Users\x\AppData\Local\Continuum\anaconda3\envs\test\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\Users\x\AppData\Local\Continuum\anaconda3\envs\test\lib\multiprocessing\context.py", line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Users\x\AppData\Local\Continuum\anaconda3\envs\test\lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
File "C:\Users\x\AppData\Local\Continuum\anaconda3\envs\test\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
reduction.dump(process_obj, to_child)
File "C:\Users\x\AppData\Local\Continuum\anaconda3\envs\test\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
(test) C:\Code\test\source>Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\x\AppData\Local\Continuum\anaconda3\envs\test\lib\multiprocessing\spawn.py", line 99, in spawn_main
new_handle = reduction.steal_handle(parent_pid, pipe_handle)
File "C:\Users\x\AppData\Local\Continuum\anaconda3\envs\test\lib\multiprocessing\reduction.py", line 82, in steal_handle
_winapi.PROCESS_DUP_HANDLE, False, source_pid)
OSError: [WinError 87] The parameter is incorrect
The issue is caused by the fact that the SimpleWorker instance is being passed as the target for the multiprocessing.Process constructor instead of a function. The multiprocessing module uses pickling to serialize objects for passing them between processes, and some objects cannot be pickled, such as _thread.lock objects.
To solve the issue, you can define a function that creates a new SimpleWorker instance and calls its work method, and pass that function as the target for the multiprocessing.Process constructor. For example:
import multiprocessing from rq import SimpleWorker, Connection def create_worker(qs): with Connection(connection=my_redis_conn): SimpleWorker(qs).work(burst=True) if __name__ == '__main__': qs = sys.argv[1:] or ['default'] iprocessing.Process(target=create_worker, args=(qs,)).start()
Alternatively, you can use the functools.partial function to create a callable object that calls the work method of a SimpleWorker instance with the desired arguments, and pass that object as the target for the multiprocessing.Process constructor. For example:
import functools import multiprocessing from rq import SimpleWorker, Connection def needMoreWorkers(): return Trueif __name__ == '__main__': qs = sys.argv[1:] or ['default'] with Connection(connection=my_redis_conn): if needMoreWorkers(): worker = SimpleWorker(qs) target = functools.partial(worker.work, burst=True) iprocessing.Process(target=target).start()