#!/usr/bin/env python3
# Stereo Camera based on the Picamera2 library.
# The Raspberry Pi 5 has two CSI ports for cameras.
# A shutter button needs to be connected to a configurable GPIO pin.
# Preview streams are accessible via http.
# Picamera2 Manual:
# https://datasheets.raspberrypi.com/camera/picamera2-manual.pdf
# Picamera2 Source:
# https://github.com/raspberrypi/picamera2
# Picamera2 Example:
# https://github.com/raspberrypi/picamera2/blob/main/examples/mjpeg_server_2.py
# Raspberry Pi GPIO Pinout:
# https://pinout.xyz/
# GPIO Zero
# https://gpiozero.readthedocs.io/
# To automatically start this script on boot, define a systemd service:
# /etc/systemd/system/picam2_stereo.service
#
# [Unit]
# Description=Picamera2 Stereo Camera
#
# [Service]
# ExecStart=/usr/local/bin/picam2_stereo.py
#
# [Install]
# WantedBy=multi-user.target
__author__ = "Gernot Walzl"
__date__ = "2025-12-31"
import sys
import time
import os
from datetime import datetime
import base64
from enum import Enum
import signal
import shutil
import io
from threading import Thread, Lock, Condition
from configparser import ConfigParser
import logging
from logging.handlers import TimedRotatingFileHandler
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
import libcamera
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder
from picamera2.outputs import FileOutput
from gpiozero import LED, Button
def load_config():
config = ConfigParser()
config.add_section('output')
config.set('output', 'path_rec', os.path.expanduser('~/cam/rec/'))
config.set('output', 'path_log', os.path.expanduser('~/cam/log/'))
config.add_section('gpio')
config.set('gpio', 'shutter_button', '17')
config.set('gpio', 'recording_led', '12')
config.add_section('camera')
config.set('camera', 'width', '4808')
config.set('camera', 'height', '2592')
config.set('camera', 'exposure_time', '0.02')
config.set('camera', 'flip', '0')
config.set('camera', 'preview_downscale', '4')
config.set('camera', 'initial_delay', '5.0')
config.set('camera', 'delay', '0.5')
config.add_section('server')
config.set('server', 'port', '8000')
config.set('server', 'user', 'user')
config.set('server', 'pass', 'pass')
config.read(os.path.expanduser('~/.config/picam2_stereo.ini'))
return config
def init_logging(config):
logger = logging.getLogger()
logger.setLevel(logging.INFO)
loghandler = TimedRotatingFileHandler(
config.get('output', 'path_log') + 'picam2_stereo.log',
'midnight')
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
loghandler.setFormatter(formatter)
logger.addHandler(loghandler)
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
super().__init__()
self.frame = None
self.condition = Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
class CamState(Enum):
OFF = 0
CAPTURING = 1
STARTING_REC = 2
RECORDING = 3
STOPPING_REC = 4
class StereoCamera:
def __init__(self, config):
self._logger = logging.getLogger(self.__class__.__name__)
self._path_output = config.get('output', 'path_rec')
self._width = config.getint('camera', 'width')
self._height = config.getint('camera', 'height')
self._exposure_time = config.getfloat('camera', 'exposure_time')
self._flip = config.getint('camera', 'flip')
self._preview_downscale = config.getint('camera', 'preview_downscale')
self._initial_delay = config.getfloat('camera', 'initial_delay')
self._delay = config.getfloat('camera', 'delay')
self._state = CamState.OFF
self._lock_state = Lock()
self._recording_to = None
self._counter_recorded = 0
self._picams = []
self._encoders_serving = []
self._outputs = []
self._locks_serving = []
self._counters_serving = []
for cam_info in Picamera2.global_camera_info():
self._picams.append(Picamera2(cam_info['Num']))
self._encoders_serving.append(None)
self._outputs.append(None)
self._locks_serving.append(Lock())
self._counters_serving.append(0)
def num_cams(self):
return len(self._picams)
def configure(self):
for idx, picam2 in enumerate(self._picams):
flip = (self._flip + idx) % 2
capture_config = picam2.create_still_configuration(
main={
'size': (self._width, self._height)
},
lores={
'size': (
int(self._width/self._preview_downscale),
int(self._height/self._preview_downscale)
)
},
transform=libcamera.Transform(hflip=flip, vflip=flip)
)
if self._exposure_time:
capture_config['controls']['ExposureTime'] = \
int(self._exposure_time * 1e6)
self._logger.info("Configuring camera %d: %s", idx, capture_config)
picam2.configure(capture_config)
def autofocus(self):
success = True
af_jobs = []
for picam2 in self._picams:
af_jobs.append(picam2.autofocus_cycle(wait=False))
for idx, picam2 in enumerate(self._picams):
success &= picam2.wait(af_jobs[idx])
self._logger.info("Autofocus success=%s", success)
return success
def record_files(self, filename="img_{:01d}.jpg"):
cf_jobs = []
for idx, picam2 in enumerate(self._picams):
cf_jobs.append(picam2.capture_file(
filename.format(idx), wait=False))
for idx, picam2 in enumerate(self._picams):
picam2.wait(cf_jobs[idx])
def _start_recording(self):
success = False
dir_recording = datetime.now().strftime("%Y-%m-%d_%H%M%S")
if not os.path.exists(self._path_output + dir_recording):
os.makedirs(self._path_output + dir_recording)
if os.path.isdir(self._path_output + dir_recording):
if self._initial_delay > 0.0:
self._logger.info(
"Sleeping initial_delay=%.2f", self._initial_delay)
time.sleep(self._initial_delay)
self.autofocus()
self._counter_recorded = 0
self._recording_to = dir_recording
self._logger.info("Recording started (dir=%s)", dir_recording)
success = True
return success
def _stop_recording(self):
success = False
dir_recorded = self._recording_to
self._recording_to = None
self._logger.info(
"Recording stopped (recorded=%d)",
self._counter_recorded)
archive_name = shutil.make_archive(
self._path_output + dir_recorded, "zip",
root_dir=self._path_output, base_dir=dir_recorded)
if archive_name:
shutil.rmtree(self._path_output + dir_recorded)
self._logger.info("Created %s", os.path.basename(archive_name))
success = True
return success
def state(self):
with self._lock_state:
return self._state
def capturing_loop(self):
self._logger.info("Start capturing loop")
self.configure()
for picam2 in self._picams:
picam2.start()
with self._lock_state:
self._state = CamState.CAPTURING
state = self._state
self.autofocus()
while state is not CamState.OFF:
state = self.state()
if state is CamState.RECORDING:
time_started = time.time()
path_file = (
self._path_output + self._recording_to + "/img_" +
datetime.now().strftime("%Y-%m-%d_%H%M%S") +
"_{:03d}".format(self._counter_recorded) +
"_{:01d}.jpg")
self.record_files(path_file)
self._counter_recorded += 1
sleep_secs = self._delay - (time.time() - time_started)
if sleep_secs > 0.0:
time.sleep(sleep_secs)
elif state is CamState.CAPTURING:
time.sleep(0.05)
elif state is CamState.STARTING_REC:
if self._start_recording():
with self._lock_state:
self._state = CamState.RECORDING
elif state is CamState.STOPPING_REC:
self._stop_recording()
with self._lock_state:
self._state = CamState.CAPTURING
if self._recording_to:
self._stop_recording()
for picam2 in self._picams:
picam2.stop()
self._logger.info("Capturing stopped")
def stop_capturing(self):
self._logger.info("Stop capturing")
with self._lock_state:
self._state = CamState.OFF
def start_recording(self):
self._logger.info("Start recording")
with self._lock_state:
if self._state is CamState.CAPTURING:
self._state = CamState.STARTING_REC
def stop_recording(self):
self._logger.info("Stop recording")
with self._lock_state:
if self._state is CamState.RECORDING:
self._state = CamState.STOPPING_REC
def start_serving(self, idx):
self._logger.info("Start serving (idx=%d)", idx)
with self._locks_serving[idx]:
if self._counters_serving[idx] == 0:
self._logger.info("Start MJPEG encoder (idx=%d)", idx)
self._encoders_serving[idx] = MJPEGEncoder()
self._outputs[idx] = StreamingOutput()
self._picams[idx].start_encoder(
self._encoders_serving[idx],
FileOutput(self._outputs[idx]),
name="lores")
self._counters_serving[idx] += 1
return self._outputs[idx]
def stop_serving(self, idx):
self._logger.info("Stop serving (idx=%d)", idx)
with self._locks_serving[idx]:
self._counters_serving[idx] -= 1
if self._counters_serving[idx] == 0:
self._picams[idx].stop_encoder(self._encoders_serving[idx])
self._encoders_serving[idx] = None
self._outputs[idx] = None
self._logger.info("MJPEG encoder (idx=%d) stopped", idx)
self._logger.info("Serving (idx=%d) stopped", idx)
class GPIOThread(Thread):
def __init__(self, config, stereocam):
super().__init__()
self._logger = logging.getLogger(self.__class__.__name__)
self._button_shutter = Button(config.getint('gpio', 'shutter_button'))
self._led_recording = LED(config.getint('gpio', 'recording_led'))
self._stereocam = stereocam
self._running = False
def _on_button_shutter_pressed(self):
self._logger.info("Shutter button pressed")
state = self._stereocam.state()
if state is CamState.RECORDING:
self._stereocam.stop_recording()
elif state is CamState.CAPTURING:
self._stereocam.start_recording()
def run(self):
self._logger.info("Starting GPIO thread")
self._logger.info("button_shutter=%s", self._button_shutter)
self._logger.info("led_recording=%s", self._led_recording)
self._button_shutter.when_pressed = self._on_button_shutter_pressed
self._running = True
while self._running:
state = self._stereocam.state()
if state is CamState.RECORDING:
self._led_recording.on()
elif state is CamState.STARTING_REC or \
state is CamState.STOPPING_REC:
self._led_recording.toggle()
else:
self._led_recording.off()
time.sleep(0.5)
self._led_recording.off()
self._logger.info("GPIO thread stopped")
def stop(self):
self._logger.info("Stopping GPIO thread")
self._running = False
class CameraHTTPRequestHandler(BaseHTTPRequestHandler):
def logger(self):
return logging.getLogger('HTTPRequestHandler')
def check_auth(self):
result = False
if (self.server.auth is None or
self.server.auth == self.headers.get('authorization')):
result = True
else:
self.send_response(401)
self.send_header('WWW-Authenticate', 'Basic')
self.end_headers()
return result
def send_index(self):
html = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<title>Stereo Camera</title>
</head>
<body>"""
for idx in range(self.server.stereocam.num_cams()):
html += """
<img src="/cam{i:d}.mjpg" alt="cam{i:d}" /><br />""".format(i=idx)
html += """
</body>
"""
content = html.encode('utf-8')
self.send_header('Content-Type', 'text/html')
self.send_header('Content-Length', len(content))
self.end_headers()
self.wfile.write(content)
def send_jpeg(self, output):
with output.condition:
output.condition.wait()
self.send_header('Content-Type', 'image/jpeg')
self.send_header('Content-Length', len(output.frame))
self.end_headers()
self.wfile.write(output.frame)
def is_valid_idx(self, chr_idx):
success = False
try:
idx = int(chr_idx)
if 0 <= idx and idx < self.server.stereocam.num_cams():
success = True
except ValueError:
success = False
return success
def do_GET(self):
if self.path == '/index.html':
if self.check_auth():
self.send_response(200)
self.send_index()
elif self.path[:4] == "/cam" and \
self.is_valid_idx(self.path[4]) and \
self.path[5:] == ".jpg":
if self.check_auth():
idx = int(self.path[4])
output = self.server.stereocam.start_serving(idx)
try:
self.send_response(200)
self.send_jpeg(output)
except Exception as err:
self.logger().warning(
"Exception while serving client %s: %s",
self.client_address, str(err))
finally:
self.server.stereocam.stop_serving(idx)
output = None
elif self.path[:4] == "/cam" and \
self.is_valid_idx(self.path[4]) and \
self.path[5:] == ".mjpg":
if self.check_auth():
idx = int(self.path[4])
output = self.server.stereocam.start_serving(idx)
try:
self.send_response(200)
self.send_header(
'Content-Type',
'multipart/x-mixed-replace; boundary=FRAME')
self.end_headers()
while not self.wfile.closed:
self.wfile.write(b"--FRAME\r\n")
self.send_jpeg(output)
self.wfile.write(b"\r\n")
self.wfile.flush()
except Exception as err:
self.logger().warning(
"Exception while serving client %s: %s",
self.client_address, str(err))
finally:
self.server.stereocam.stop_serving(idx)
output = None
else:
self.send_error(404)
def log_error(self, format, *args):
self.logger().error("%s - %s" % (self.address_string(), format % args))
def log_message(self, format, *args):
self.logger().info("%s - %s" % (self.address_string(), format % args))
class HTTPServerThread(Thread):
def __init__(self, config, stereocam):
super().__init__()
self.logger = logging.getLogger(self.__class__.__name__)
self.server = ThreadingHTTPServer(
('', config.getint('server', 'port')),
CameraHTTPRequestHandler)
self.server.stereocam = stereocam
self.server.auth = None
if config.get('server', 'user') and config.get('server', 'pass'):
str_auth = config.get('server', 'user') + \
':' + config.get('server', 'pass')
self.server.auth = 'Basic ' + \
base64.b64encode(str_auth.encode()).decode()
def run(self):
self.logger.info(
"Starting HTTP server on port %s", self.server.server_port)
self.server.serve_forever()
def stop_serving(self):
self.logger.info("Stopping HTTP server")
self.server.shutdown()
self.logger.info("HTTP server stopped")
class SignalHandler:
def __init__(self, stereocam):
self._logger = logging.getLogger(self.__class__.__name__)
self._stereocam = stereocam
def _handle_signal(self, signum, frame):
self._logger.info("Handling signal %s", signum)
self._stereocam.stop_capturing()
def register_signal_handlers(self):
signal.signal(signal.SIGINT, self._handle_signal) # Ctrl-C
signal.signal(signal.SIGTERM, self._handle_signal) # kill
def main():
config = load_config()
if not os.path.exists(config.get('output', 'path_rec')):
os.makedirs(config.get('output', 'path_rec'))
if not os.path.exists(config.get('output', 'path_log')):
os.makedirs(config.get('output', 'path_log'))
init_logging(config)
logging.info("Logging initialized")
stereocam = StereoCamera(config)
httpthread = None
if config.getint('server', 'port') > 0:
httpthread = HTTPServerThread(config, stereocam)
httpthread.start()
gpiothread = GPIOThread(config, stereocam)
gpiothread.start()
sighandler = SignalHandler(stereocam)
sighandler.register_signal_handlers()
stereocam.capturing_loop()
gpiothread.stop()
if httpthread:
httpthread.stop_serving()
if __name__ == '__main__':
sys.exit(main())