I have two processes:
- frame_collector, this process collect JFIF frame from UDP socket and put it in
frame
queue. - flask_service, this process is just normal web server that will display JPEG series aka MJPEG in index route (
/
), it also hasframe_consumer
that will get frame from queue that has been put fromframe_collector
This is my code:
def frame_collector(frame): # process 1
import io
import socket
SERV_IPV4, SERV_PORT = ('192.168.43.150', 8888)
udpSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udpSock.bind((SERV_IPV4,SERV_PORT))
isWriting = False
while 1:
dataRecv, CLNT_ADDR = udpSock.recvfrom(65507)
if not isWriting:
if dataRecv[:2] == b'\xff\xd8': # Start of JPEG
isWriting = True
buf = io.BytesIO()
if isWriting:
buf.write(dataRecv)
if dataRecv[-2:] == b'\xff\xd9': # End of JPEG
isWriting = False
buf.seek(0)
frame.put(buf.read())
def flask_service(frame): # process 2
from flask import Flask, Response
app = Flask(__name__)
def frame_consumer():
while 1: yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' frame.get() b'\r\n'
@app.route('/mjpeg')
def mjpeg():
return Response(frame_consumer(),mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/')
def index():
return """
<body style="background: blue;">
<div style="width: 800px; margin: 0px auto;">
<img src="/mjpeg">
</div>
</body>
"""
app.run(host='192.168.43.150', threaded=True)
if __name__ == '__main__':
from multiprocessing import Process, Queue
from time import sleep
frame = Queue()
p1 = Process(target=frame_collector, args=(frame,))
p2 = Process(target=flask_service, args=(frame,))
p1.start()
p2.start()
while 1:
try:
sleep(5)
except:
break
p1.terminate()
p2.terminate()
print('All processes terminated!')
When a client access (doing http request) my webserver, the MJPEG display smoothly 60FPS. The problem is, when many clients access it, my MJPEG start lagging. More clients more lagging, how do I handle this?
I suspect in frame
variable which it's a Queue that shared between thread in flask_service
.
Update:
Thanks for your try Booboo, I tested your code and this is what I got:
First error is flask_service
error need argument, it's okay I fix it with remove parameter in flask_service
.
Then your code is working fine the webserver is running. After it I tested access the 192.168.43.150:5000/mjpeg
. Yes it's working fine like what I did in my original code, but the problem come when another client access it at same time, I tested it with many clients. Here is the error said and that new client is stuck in loading while first client got 60FPS MJPEG.
Traceback (most recent call last):
File "C:\Users\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\process.py", line 315, in _bootstrap
self.run()
File "C:\Users\User\AppData\Local\Programs\Python\Python39\lib\multiprocessing\process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "c:\Users\User\Documents\PlatformIO\Projects\udp-wifi-camera\src\test_punya_orang.py", line 9, in frame_collector
udpSock.bind((SERV_IPV4,SERV_PORT))
OSError: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted
Ending up.
No wonder error was occured, your code was trying create new socket with new address when new client come while that socket (with same address) already created.
CodePudding user response:
TL;DR feel free to skip down to the third code example. But if you want an explanation as to how I arrived at it, then read the following:
Since I don't have access to your server, I have modified your program so that the frame_collector
just writes sequential numbers 1, 2, 3 ... to the queue and I am using server-sent events (sse) to provide a stream of these numbers to flask_service
:
from time import sleep
def frame_collector(frame): # process 1
n = 0
while True:
n = 1
frame.put(f'data {n}')
sleep(.5)
def flask_service(frame): # process 2
from flask import Flask, Response
app = Flask(__name__)
def frame_consumer():
while True:
data = frame.get()
msg = f'data: {data}\n\n'
yield msg
@app.route('/stream')
def stream():
return Response(frame_consumer(), mimetype='text/event-stream')
@app.route('/')
def index():
return """<!doctype html>
<html>
<head>
<meta charset="UTF-8">
<title>Server-sent events demo</title>
</head>
<body>
<ul>
</ul>
<script>
try {
let eventList = document.querySelector('ul');
let evtSource = new EventSource('/stream');
evtSource.onopen = function() {
console.log("Connection to server opened.");
};
evtSource.onmessage = function(e) {
var newElement = document.createElement("li");
newElement.textContent = e.data;
eventList.appendChild(newElement);
};
evtSource.onerror = function() {
console.log("EventSource is done?.");
console.log('Closing connection ...');
evtSource.close();
};
}
catch(error) {
console.error(error);
}
</script>
</body>
</html>
"""
app.run(threaded=True)
if __name__ == '__main__':
from multiprocessing import Process, Queue
frame = Queue()
p1 = Process(target=frame_collector, args=(frame,))
p2 = Process(target=flask_service, args=(frame,))
p1.start()
p2.start()
while 1:
try:
sleep(5)
except:
break
p1.terminate()
p2.terminate()
print('All processes terminated!')
If I start this up and go to http://127.0.0.1:5000/, I will see these numbers being added to a <ul>
(user list). But if I then open up a second browser tab and go to http://127.0.0.1:5000/, you will see that each tab is printing only half of the generated numbers. In other words, you are correct that each client is getting frames (or numbers in this demo) from the same queue and that is the problem.
The solution is as follows:
First, see the documentation on multiprocessing.Pipe and its related connection
objects.
The main process creates an initially-empty managed dictionary, which is sharable among processes, and passes this to the child processes frame_collector
and frame_service
as the clients argument. Every time the '/stream' route is invoked ('/mjpeg' in your case) a new Pipe
instance is allocated and its sender connection
is added to the clients dictionary where the key is the connection's handle (an integer) and the value is the connection itself. When the frame_collector
has assembled a complete frame, it broadcasts the frame to every connection in that managed clients dictionary. If an error occurs sending to a particular connection, that client's sender connection is deleted from the dictionary of clients. A managed dictionary is used since it makes the removal of such connections easier.
Needless to say, the more clients you have the more connection.send
calls must be made for each frame. Performance will be determined by your CPU power, what the frame rate is and how many clients there are:
from time import sleep
from multiprocessing import Pipe
def frame_collector(clients):
n = 0
try:
while True:
n = 1
msg = f'data {n}'
d = dict(clients) # Make copy into local dict
for handle, sender in d.items():
try:
sender.send(msg)
except: # Client has gone away?
del clients[handle]
sleep(1/60)
except KeyboardInterrupt:
pass
def flask_service(clients):
from flask import Flask, Response
app = Flask(__name__)
def frame_consumer():
receiver, sender = Pipe(False)
clients[sender._handle] = sender
while True:
data = receiver.recv()
msg = f'data: {data}\n\n'
yield msg
@app.route('/stream')
def stream():
return Response(frame_consumer(), mimetype='text/event-stream')
@app.route('/')
def index():
return """<!doctype html>
<html>
<head>
<meta charset="UTF-8">
<title>Server-sent events demo</title>
</head>
<body>
<ul>
</ul>
<script>
try {
let eventList = document.querySelector('ul');
let evtSource = new EventSource('/stream');
evtSource.onopen = function() {
console.log("Connection to server opened.");
};
evtSource.onmessage = function(e) {
var newElement = document.createElement("li");
newElement.textContent = e.data;
eventList.appendChild(newElement);
};
evtSource.onerror = function() {
console.log("EventSource is done?.");
console.log('Closing connection ...');
evtSource.close();
};
}
catch(error) {
console.error(error);
}
</script>
</body>
</html>
"""
app.run(threaded=True)
if __name__ == '__main__':
from multiprocessing import Process, Manager
with Manager() as manager:
clients = manager.dict()
p1 = Process(target=frame_collector, args=(clients,))
p2 = Process(target=flask_service, args=(clients,))
p1.start()
p2.start()
try:
p2.join() # block while flask_service is running
except: # ctrl-C
p1.terminate()
p2.terminate()
finally:
print('Ending up.')
Your code should therefore look something like:
from time import sleep
from multiprocessing import Pipe
def frame_collector(clients):
import io
import socket
SERV_IPV4, SERV_PORT = ('192.168.43.150', 8888)
udpSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udpSock.bind((SERV_IPV4,SERV_PORT))
isWriting = False
try:
while 1:
dataRecv, CLNT_ADDR = udpSock.recvfrom(65507)
if not isWriting:
if dataRecv[:2] == b'\xff\xd8': # Start of JPEG
isWriting = True
buf = io.BytesIO()
if isWriting:
buf.write(dataRecv)
if dataRecv[-2:] == b'\xff\xd9': # End of JPEG
isWriting = False
buf.seek(0)
frame = buf.read()
d = dict(clients) # Make copy into local dict
for handle, sender in d.items():
try:
sender.send(frame)
except: # Client has gone away?
del clients[handle]
except KeyboardInterrupt: # ctrl-C
pass
def flask_service(clients): # process 2
from flask import Flask, Response
app = Flask(__name__)
def frame_consumer():
receiver, sender = Pipe(False)
clients[sender._handle] = sender
while True:
yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' receiver.recv() b'\r\n'
@app.route('/mjpeg')
def mjpeg():
return Response(frame_consumer(),mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/')
def index():
return """
<body style="background: blue;">
<div style="width: 800px; margin: 0px auto;">
<img src="/mjpeg">
</div>
</body>
"""
app.run(host='192.168.43.150', threaded=True)
if __name__ == '__main__':
from multiprocessing import Process, Manager
with Manager() as manager:
clients = manager.dict()
p1 = Process(target=frame_collector, args=(clients,))
p2 = Process(target=flask_service, args=(clients,))
p1.start()
p2.start()
try:
p2.join() # block while flask_service is running
except: # ctrl-C
p1.terminate()
p2.terminate()
finally:
print('Ending up.')