I am running a simple python script in a docker container to turn some GPIO Leds on. I am using the gpiozero
library. The script runs fine and works as expected in the container. The issue is that when I abruptly stop the container, the LEDs stay on. Here is the code for GPIO.
from gpiozero import PWMLED
from gpiozero import LEDBoard
from time import sleep
leds = LEDBoard(12, 16)
freq = 1
for led in leds:
led.on()
sleep(1000)
for led in leds:
led.off()
And here is my Dockerfile.
from arm64v8/python:3.10-alpine3.15
RUN pip install gpiozero
RUN mkdir /app
COPY bread_pi.py /app/bread_pi.py
WORKDIR /app
CMD ["python3", "bread_pi.py"]
When I run the program on the machine locally (no docker), it behaves as expected and when I terminate the program abruptly with Control C
the GPIO pins and their respective LEDs turn off. When I stop the docker container with docker stop
, the container stops successfully but the GPIO pins remain On. Is there a correct way to gracefully terminate a python program running in a docker container?
Hardware: -Rasberry PI Compute Module 4 and IO board -BreadPi PiHAT
Software: -python3.10 -Docker 20.10.18
CodePudding user response:
Because your program seems to behave as expected when abruptly stopped does not mean it does a clean stop..
When a docker container is requested to stop, it sends a SINGINT to the running processes. In order to stop gracefully, you may want to catch that signal from your Python application and execute accordingly to release the GPIO signals level.
As for an example, here is how you can catch the SIGINT for a threaded application:
import logging
import signal
import threading
# keep the application alive until explicitly stated otherwise
semaphore = threading.Semaphore(value=0)
# notes:
# - you have to implement your blink thread inheriting from
# the standard class `threading.Thread`.
# - you have to implement a function `stop_request` which stops
# the thread execution.
t = BlinkThread()
# register event handler for CTRL C
def handler(signum, frame):
logging.fatal("received stop request, terminating application...")
# wait for your thread to stop
t.request_stop()
t.join()
# notify application can stop
semaphore.release()
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
# start your application thread initialized earlier
t.start()
# wait for quit request to be received
semaphore.acquire()
return 0
To give you a full example, here is a stoppable thread and a blink thread example implementation classes:
class StoppableThread(threading.Thread):
"""
Stoppable thread can be started and stopped in a clean way.
"""
def __init__(self):
"""
Create a stoppable thread ready to execute upon start().
"""
threading.Thread.__init__(self)
self.running = True
def request_stop(self, blocking=True):
"""
Stop the current thread and wait or it to complete.
"""
self.running = False
if (blocking):
self.join()
def is_running(self):
"""
Check whether the thread is expected to keep running.
@return True to keep execution running, False otherwise.
"""
return self.running
def not_running(self):
"""
Mark the thread as not running anymore. This function can be called
when the thread exits its main loop if ever the request_stop() function
has not been called.
"""
self.running = False
def run(self):
pass
class BlinkThread(StoppableThread):
def __init__(self, leds, period_ms):
StoppableThread.__init__(self)
self.leds = leds
self.period_ms = period_ms
def run(self):
while self.is_running():
for led in self.leds:
led.on()
sleep(self.period_ms)
for led in self.leds:
led.off()
sleep(self.period_ms)
self.not_running()