Fremde Menschen befinden sich auf deinem Grundstück? Ungewollte Hunde? Der Postbote? Dein Garten soll überwacht werden? Aber das System soll nicht bei jeder Bewegung gleich Alarm geben? Dann vielleicht mit KI-Objekterkennung.
Betrachten wir zunächst rechtliche Aspekte. Ich kann und darf keine rechtlicher Beratung an dieser Stelle geben. Allerdings hat unsere Verbraucherzentrale dazu einen Artikel geschrieben, auf den ich dich hinweisen möchte. Einschließlich dem Hinweis, dass du nicht einfach alles filmen darfst.
Nachdem das geklärt ist, kommen wir zur technischen Umsetzung und Projektskizze: Du brauchst Ein Raspberry Pi 3 oder höher um Leistungsstark genug für KI-Anwendungen zu sein. Du brauchst eine passende Kamera für das Pi, möglichst mit Nachtsicht und natürlich musst du dich um die Anbringung an Fenster/Wand bemühen. Die Sofwatre besteht aus Python-Code und generiert mehrere Threads. Bildaufnahme, vorgelagerte Bewegungserkennung (um die Notwendigkeit der Tensorflow-Prozess zugunsten deines Stromverbrauchs weiter zu reduzieren), Objekterkennung mit Tensorflow und Meldungen per MQTT an deine Smarthome-Zentrale.
Hardware
Die gerade angerissene Hardware liste ich wie folgt nochmal auf.
- Raspberry Pi 3 oder höher (oder kompatibles Konkurrenzprodukt)
- einschließlich Zubehör (Stromkabel, Gehäuse, Kühlkörper)
- Nachtsichtkamera (Pi-Kompatibel) mit IR cut
- ggf. ein verlängertes Kamera-Kabel
- 3D-gedrucktes Kameragehäuse
Software
Zunächst musst du auf deinem Raspberry notwendige Bibliotheken und Vorbedingungen schaffen. Ich gehe davon aus, du hast dein Raspberry mit Raspberry Pi OS Buster installiert und kannst bereits mittels SSH darauf zugreifen. Verzichte auf den Desktop, wenn du Energie sparen willst.
#Informationsquelle: https://github.com/EdjeElectronics/TensorFlow-Lite-Object-Detection-on-Android-and-Raspberry-Pi/blob/master/Raspberry_Pi_Guide.md
#Camera-Schnittstelle aktivieren in sudo raspi-config
sudo apt install git python3-pip libgtk-3-0 \
libjpeg-dev libtiff5-dev libjasper-dev libpng-dev \
libavcodec-dev libavformat-dev libswscale-dev libv4l-dev \
libxvidcore-dev libx264-dev libatlas-base-dev
git clone https://github.com/EdjeElectronics/TensorFlow-Lite-Object-Detection-on-Android-and-Raspberry-Pi.git
mv TensorFlow-Lite-Object-Detection-on-Android-and-Raspberry-Pi tflite1
cd tflite1
# -- Modified from get_pi_requirements.sh --
# Get packages required for OpenCV
# Need to get an older version of OpenCV because version 4 has errors
# Versions: https://pypi.org/project/opencv-python/#history
sudo -u pi pip3 install opencv-python==3.4.17.63 tflite-runtime
# -- End Modified from get_pi_requirements.sh --
sudo -u pi pip3 install imutils flask paho-mqtt pillow picamera pytz
#Model, Source: https://www.tensorflow.org/lite/examples/object_detection/overview
mkdir /home/pi/tflite1/captures/
mkdir /home/pi/tflite1/Sample_TF_Lite_Model
wget https://tfhub.dev/tensorflow/lite-model/ssd_mobilenet_v1/1/metadata/1?lite-format=tflite -O /home/pi/tflite1/Sample_TF_Lite_Model/detect.tflite
unzip /home/pi/tflite1/Sample_TF_Lite_Model/detect.tflite -d /home/pi/tflite1/Sample_TF_Lite_Model/
Wenn diese Abhängigkeiten erfüllt wurden, fehlt noch das eigentliche script unter /home/pi/tflite1/tensor_stream_alert.py mit folgendem Inhalt.
Nachdem die JSON-Config erstmalig über MQTT eingegangen ist, werden mehrere Threads gestartet. Einer holt das Bild von der Kamera ab und stellt es bereit. Ein Thread macht Bewegungserkennung. Erst, wenn im Bild eine Bewegung erkannt wurde, greift die Objekterkennung. Einerseits schont das massiv deine Rechenleistung, andererseits erhältst du weniger Falschmeldungen aufgrund von fälschlich erkannter Objekte.
Und letztlich läuft auch der Webserver unabhängig davon mittels Flask. Dort kann das zuletzt geschossene Bild, sowie das aktuell laufende Video abgefragt werden:
- http://hostname:5000/api/video/PiCamera für das Video
- http://hostname:5000/api/last_img/PiCamera für das Bild
Das Bild wird Zeitverzögert geschossen, um einem erscheinenden Objekt (Mensch) Gelegenheit zu geben, ganz ins Bild zu kommen. Ist das nicht der Fall, wird das Bild des ersten Auftretens verwendet. Und für eine Minute erfolgen für diese Objektklasse dann keine weiteren Meldungen mehr.
Das Script sendet für jede Bewegungserkennung und für jede Objekterkennung entsprechend dem Intervall 1min eine MQTT-Meldung an den per Config definierten Kanal.
#!/usr/bin/env python3
######## Webcam Object Detection Using Tensorflow-trained Classifier #########
### Imports ###
# Common
import time #
import json
#import math
import threading
import datetime as dt #
import imutils #
import numpy as np #
import os #
import argparse #
import importlib.util #
import pytz
import socket
# Python 3 server example
from flask import Flask, jsonify, make_response, render_template, Response, send_file
from imutils.video import VideoStream
# OpenCV
import cv2
import base64
from PIL import Image
# MQTT: https://www.ev3dev.org/docs/tutorials/sending-and-receiving-messages-with-mqtt/
import paho.mqtt.client as mqtt
# for motion detection
import urllib.request
class Conf:
hostname = socket.gethostname()
port = 5000
img_path = "/home/pi/tflite1/captures/"
time_format = "%Y-%m-%dT%H:%M:%S%z"
mqtt_topic_base = ""
fqdn = ""
alert_objects = {}
flip = False
watchdog_mode = True
cameras = {
#''
#'PiCamera':['PiCamera','PiCamera']
#'ESP32Cam1': ['http://esp32-arduino.rlyeh/','ESP32Cam1']
}
my_mqtt = None
#Camera stream threads parameters
#Tensorflow parameters
modeldir = "/home/pi/tflite1/Sample_TF_Lite_Model" #Folder the .tflite file is located in
graph = 'detect.tflite' #Name of the .tflite file, if different than detect.tflite
labels_file = 'labelmap.txt' #Name of the labelmap file, if different than labelmap.txt
threshold = 0.5 #Minimum confidence threshold for displaying detected objects
resolution = '1280x720' #Desired webcam resolution in WxH. If the webcam does not support the resolution entered, errors may occur.
use_TPU = False #Use Coral Edge TPU Accelerator to speed up detection. Possible Value: False|'store_true'
input_mean = 127.5
input_std = 127.5
tensorflow_iteration_interval = 0.5
#Motion detection
min_area = 500 # if the contour is too small, ignore it
last_motion_detection_message = {}
motion_detection_iteration_interval = 0.5
#automatic calulated
resW = ''
resH = ''
imW = 0
imH = 0
min_conf_threshold = 0.5
cwd_path = '' # Get path to current working directory
path_to_ckpt = '' # Path to .tflite file, which contains the model that is used for object detection
path_to_labels = '' # Path to label map file
interpreter = None
labels = None
input_details = None
output_details = None
height = 0
width = 0
floating_model = None
@staticmethod
def read_json(json_s):
json_obj = json.loads(json_s)
for key in json_obj:
setattr(Conf, key, json_obj[key])
#print("MQTT config loaded: " + Conf.mqtt_topic_base)
Conf.resW, Conf.resH = Conf.resolution.split('x')
Conf.imW, Conf.imH = int(Conf.resW), int(Conf.resH)
Conf.min_conf_threshold = float(Conf.threshold)
Conf.calc_TF_Params()
Conf.load_TFlite_model()
Conf.load_label_map()
Conf.interpreter.allocate_tensors()
# Get model details
Conf.input_details = Conf.interpreter.get_input_details()
Conf.output_details = Conf.interpreter.get_output_details()
Conf.height = Conf.input_details[0]['shape'][1]
Conf.width = Conf.input_details[0]['shape'][2]
Conf.floating_model = (Conf.input_details[0]['dtype'] == np.float32)
@staticmethod
def calc_TF_Params():
# If using Edge TPU, assign filename for Edge TPU model
if Conf.use_TPU:
# If user has specified the name of the .tflite file, use that name, otherwise use default 'edgetpu.tflite'
if (Conf.graph == 'detect.tflite'):
Conf.graph = 'edgetpu.tflite'
# Get path to current working directory
Conf.cwd_path = os.getcwd()
# Path to .tflite file, which contains the model that is used for object detection
Conf.path_to_ckpt = os.path.join(Conf.cwd_path,Conf.modeldir,Conf.graph)
# Path to label map file
Conf.path_to_labels = os.path.join(Conf.cwd_path,Conf.modeldir,Conf.labels_file)
@staticmethod
def load_TFlite_model():
# If tflite_runtime is installed, import interpreter from tflite_runtime, else import from regular tensorflow
# If using Coral Edge TPU, import the load_delegate library
pkg = importlib.util.find_spec('tflite_runtime')
if pkg:
from tflite_runtime.interpreter import Interpreter
if Conf.use_TPU:
from tflite_runtime.interpreter import load_delegate
else:
from tensorflow.lite.python.interpreter import Interpreter
if Conf.use_TPU:
from tensorflow.lite.python.interpreter import load_delegate
# Load the Tensorflow Lite model.
# If using Edge TPU, use special load_delegate argument
if Conf.use_TPU:
Conf.interpreter = Interpreter(model_path=Conf.path_to_ckpt,
experimental_delegates=[load_delegate('libedgetpu.so.1.0')])
print(Conf.path_to_ckpt)
else:
Conf.interpreter = Interpreter(model_path=Conf.path_to_ckpt)
@staticmethod
def load_label_map():
with open(Conf.path_to_labels, 'r') as f:
Conf.labels = [line.strip() for line in f.readlines()]
# Have to do a weird fix for label map if using the COCO "starter model" from
# https://www.tensorflow.org/lite/models/object_detection/overview
# First label is '???', which has to be removed.
if Conf.labels[0] == '???':
del(Conf.labels[0])
class Results:
cameraOutputFrame = {}
cameraOutputFrameEncoded = {}
lock_cameraOutput = {}
lock_cameraOutput_encoded = {}
motion_detected = {}
last_motion_detection_message = {}
occupied = {}
outputFrame_tensor = None
lock_tensor = None
# Telegram & Capture buffer
last_buffered = {}
last_message = {}
object_found = {}
telegram_send_buffer = {}
object_in_telegram_send_buffer = {}
@staticmethod
def init():
Results.lock_tensor = threading.Lock()
class myMQTT:
client = None
refresh_config_thread = None
def __init__(self):
self.client = mqtt.Client()
self.client.username_pw_set(username="pi",password="mosquittopi")
self.refresh_config_thread = threading.Thread(target=self.refresh_config)
self.refresh_config_thread.daemon = True
self.refresh_config_thread.start()
def refresh_config(self):
while True:
self.connect()
time.sleep(120.0)
def publish(self, topic, text):
try:
self.client.connect("pi.rlyeh",1883,30)
self.client.publish(topic, text)
#print("JSON sent: " + text)
self.client.disconnect()
except Exception as e:
print(e)
print("JSON NOT sent: " + text)
def connect(self):
try:
self.client.on_message = self.on_message
self.client.connect("pi.rlyeh",1883,30)
#print("MQTT Hello")
self.client.loop_start()
self.client.subscribe("+/+/+/+/+/" + Conf.hostname + "/conf")
#print("MQTT subscribe: +/+/+/+/+/" + Conf.hostname + "/conf")
now = dt.datetime.now(pytz.timezone('Europe/Berlin')) # current date and time
topic = "undef/undef/undef/undef/undef/" + Conf.hostname + "/hello"
msg = '{"device":"' + Conf.hostname + '","time":"' + now.isoformat() + '"}'
self.client.publish(topic, msg);
#print("MQTT publish: " + msg + " > " + topic)
time.sleep(2) # wait
self.client.loop_stop() #stop the loop
self.client.disconnect()
except Exception as e:
print(e)
print("Error while connecting to MQTT-Broker")
def on_message(self, client, userdata, msg):
global last_buffered, last_message, telegram_send_buffer, object_in_telegram_send_buffer
payload = str(msg.payload.decode("utf-8"))
#print("message received " , payload)
try:
Conf.read_json(payload)
for key in Conf.alert_objects:
Results.last_buffered[key] = 0
Results.last_message[key] = 0
Results.telegram_send_buffer[key] = 0
Results.object_in_telegram_send_buffer[key] = 0
for mqtt_name in Conf.cameras:
Results.lock_cameraOutput[mqtt_name] = threading.Lock()
Results.cameraOutputFrame[mqtt_name] = None
Results.motion_detected[mqtt_name] = False
Results.last_motion_detection_message[mqtt_name] = 0
Results.occupied[mqtt_name] = False
except Exception as e:
print(e)
#Streams Camera frame to Results.cameraOutputFrame[mqtt_name]
class Camera_capture:
thread = None
video_stream = {}
def __init__(self):
for mqtt_name in Conf.cameras:
if mqtt_name == 'PiCamera':
self.video_stream[mqtt_name] = VideoStream(usePiCamera=1, resolution=(Conf.imW,Conf.imH), framerate=6).start()
time.sleep(1) # warm up camera
else:
url = Conf.cameras[mqtt_name]
self.video_stream[mqtt_name] = None
while self.video_stream[mqtt_name] == None:
try:
stream = urllib.request.urlopen(url = url, timeout = 10)
Conf.my_mqtt.publish(Conf.mqtt_topic_base + "/" + mqtt_name + "/camera/info", '{"message":"' + mqtt_name + ' connected!","time":"' + dt.datetime.now(pytz.timezone('Europe/Berlin')).isoformat() + '"}')
except:
Conf.my_mqtt.publish(Conf.mqtt_topic_base + "/" + mqtt_name + "/camera/error", '{"message":"' + mqtt_name + ' not found!","time":"' + dt.datetime.now(pytz.timezone('Europe/Berlin')).isoformat() + '"}')
time.sleep(5.0)
# start a thread that will perform the video stream for the Pi camera
self.thread = threading.Thread(target=self.camera_capture_thread)
#self.thread.daemon = True
self.thread.start()
def camera_capture_thread(self):
while True:
for mqtt_name in Conf.cameras:
if mqtt_name == 'PiCamera':
frame = self.video_stream[mqtt_name].read() #<-- Verursacht die Hauptlast im gesamten Prozess!!
if(Conf.flip):
frame = cv2.flip(frame,-1)
# acquire the lock_ircut, set the output frame, and release the lock_ircut
with Results.lock_cameraOutput[mqtt_name]:
Results.cameraOutputFrame[mqtt_name] = frame.copy()
else: #HTTP-Camera (ESP32Cam)
# grab the current frame and initialize the occupied/unoccupied text
try:
buffer = self.video_stream[mqtt_name].read(1024)
except:
Conf.my_mqtt.publish(Conf.mqtt_topic_base + "/" + mqtt_name + "/camera/error", '{"message":"' + mqtt_name + ' lost!","time":"' + dt.datetime.now(pytz.timezone('Europe/Berlin')).isoformat() + '"}')
self.video_stream[mqtt_name].close()
thread_ok = False
continue
bytes += buffer
a = bytes.find(b'\xff\xd8')
b = bytes.find(b'\xff\xd9')
if a != -1 and b != -1:
jpg = bytes[a:b+2]
bytes = bytes[b+2:]
frame = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
# acquire the lock_ircut, set the output frame, and release the lock_ircut
with Results.lock_cameraOutput[mqtt_name]:
Results.cameraOutputFrame[mqtt_name] = frame.copy()
with Results.lock_cameraOutput_encoded[mqtt_name]:
#TODO: Nur wenn Flask aktiv!
Results.cameraOutputFrameEncoded[mqtt_name] = jpg
#reads Results.outputFrame_ircut, processes with tensorflow and writes to MQTT and file on success
class Tensor_process:
thread = None
def __init__(self):
self.thread = threading.Thread(target=self.tensor_thread)
#self.thread.daemon = True
self.thread.start()
def tensor_thread(self):
# Initialize frame rate calculation
frame_rate_calc = 1
freq = cv2.getTickFrequency()
mqtt_name = 'PiCamera'
while True:
try:
calc_start = time.time()
# Only check with tensorflow if motion detection succeeded during the last 6 seconds
if (time.time() - Results.last_motion_detection_message[mqtt_name]) > Conf.tensorflow_activation_during_motion_detection:
time.sleep(Conf.tensorflow_iteration_interval)
continue
# wait until the lock_ircut is acquired
with Results.lock_cameraOutput[mqtt_name]:
if Results.cameraOutputFrame[mqtt_name] is None:
continue
# get image from memory
frame = Results.cameraOutputFrame[mqtt_name].copy()
now = dt.datetime.now(pytz.timezone('Europe/Berlin')) # current date and time
# Start timer (for calculating frame rate)
t1 = cv2.getTickCount()
# Acquire frame and resize to expected shape [1xHxWx3]
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame_resized = cv2.resize(frame_rgb, (Conf.width, Conf.height))
input_data = np.expand_dims(frame_resized, axis=0)
# Normalize pixel values if using a floating model (i.e. if model is non-quantized)
if Conf.floating_model:
input_data = (np.float32(input_data) - Conf.input_mean) / Conf.input_std
# Perform the actual detection by running the model with the image as input
Conf.interpreter.set_tensor(Conf.input_details[0]['index'],input_data)
Conf.interpreter.invoke()
# Retrieve detection results
boxes = Conf.interpreter.get_tensor(Conf.output_details[0]['index'])[0] # Bounding box coordinates of detected objects
classes = Conf.interpreter.get_tensor(Conf.output_details[1]['index'])[0] # Class index of detected objects
scores = Conf.interpreter.get_tensor(Conf.output_details[2]['index'])[0] # Confidence of detected objects
#num = Conf.interpreter.get_tensor(Conf.output_details[3]['index'])[0] # Total number of detected objects (inaccurate and not needed)
# Loop over all detections and draw detection box if confidence is above minimum threshold
for key in Conf.alert_objects:
Results.object_found[key] = 0
for i in range(len(scores)):
if ((scores[i] > Conf.min_conf_threshold) and (scores[i] <= 1.0)):
# Get bounding box coordinates and draw box
# Interpreter can return coordinates that are outside of image dimensions, need to force them to be within image using max() and min()
ymin = int(max(1,(boxes[i][0] * Conf.imH)))
xmin = int(max(1,(boxes[i][1] * Conf.imW)))
ymax = int(min(Conf.imH,(boxes[i][2] * Conf.imH)))
xmax = int(min(Conf.imW,(boxes[i][3] * Conf.imW)))
cv2.rectangle(frame, (xmin,ymin), (xmax,ymax), (10, 255, 0), 2)
# Draw label
object_name = Conf.labels[int(classes[i])] # Look up object name from "labels" array using class index
label = '%s: %d%%' % (object_name, int(scores[i]*100)) # Example: 'person: 72%'
labelSize, baseLine = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2) # Get font size
label_ymin = max(ymin, labelSize[1] + 10) # Make sure not to draw label too close to top of window
cv2.rectangle(frame, (xmin, label_ymin-labelSize[1]-10), (xmin+labelSize[0], label_ymin+baseLine-10), (255, 255, 255), cv2.FILLED) # Draw white box to put label text in
cv2.putText(frame, label, (xmin, label_ymin-7), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2) # Draw label text
for key in Conf.alert_objects:
if(object_name == key):
Results.object_found[key] = scores[i]
# Draw framerate and brightness in corner of frame
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV )
brightness = cv2.mean(hsv)
cv2.putText(frame,'Brightness: {0:.2f}'.format(brightness[2]),(220,50),cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,0),2,cv2.LINE_AA)
cv2.putText(frame,'FPS: {0:.2f}'.format(frame_rate_calc),(30,50),cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,0),2,cv2.LINE_AA)
# All the results have been drawn on the frame, so it's time to display it.
#cv2.imshow('Object detector', frame)
with Results.lock_tensor:
Results.outputFrame_tensor = frame.copy()
### Alert on defined object sighted ###
for key in Conf.alert_objects:
#Wenn Wahrscheinlichkeit für gefundenes Objekt den definierten Schwellwert übersteigt und Bild hell genug ist
if (Results.object_found[key] > Conf.alert_objects[key] and brightness[2] > 30):
if ( (time.time() - Results.last_message[key]) >= 60 and Results.object_in_telegram_send_buffer[key] == 0):
Results.telegram_send_buffer[key] = frame
Results.object_in_telegram_send_buffer[key] = 1
Results.last_buffered[key] = time.time()
#Wenn ein Bild vor 2s aufgenommen und zwischengespeichert wurde
if ( (time.time() - Results.last_buffered[key]) >= 2 and Results.object_in_telegram_send_buffer[key]):
# und das Objekt immernoch da ist, sende das neue Bild
if(Results.object_found[key] > Conf.alert_objects[key]):
cv2.imwrite(Conf.img_path + "frame" + mqtt_name + ".jpg",frame)
# und das Objekt jetzt fehlt, sende das alte Bild
else:
cv2.imwrite(Conf.img_path + "frame" + mqtt_name + ".jpg",Results.telegram_send_buffer[key])
with open(Conf.img_path + "frame" + mqtt_name + ".jpg", "rb") as f:
Conf.my_mqtt.publish(Conf.mqtt_topic_base + "/" + mqtt_name + "/tensorflow", '{"video":"http://' + Conf.fqdn + ':' + str(Conf.port) + '/api/video/' + mqtt_name + '","last_img":"http://' + Conf.fqdn + ':' + str(Conf.port) + '/api/last_img/' + mqtt_name + '","object":"' + key + '","time":"' + now.isoformat() + '"}')
Results.object_in_telegram_send_buffer[key] = 0
Results.last_message[key] = time.time()
# Calculate framerate
t2 = cv2.getTickCount()
time1 = (t2-t1)/freq
frame_rate_calc = 1/time1
# Sleep Intervals
#if brightness[2] < 25:
# time.sleep(10.0)
#elif brightness[2] < 70:
# time.sleep(2.0)
#else:
except Exception as e:
print(e)
time.sleep(Conf.tensorflow_iteration_interval)
class Motion_detection_process:
thread = None
def __init__(self):
self.thread = threading.Thread(target=self.motion_detection)
#self.thread.daemon = True
self.thread.start()
def motion_detection(self):
mqtt_name = 'PiCamera'
firstFrame = None
Results.last_motion_detection_message[mqtt_name] = 0
while True:
try:
calc_start = time.time()
# wait until the lock_ircut is acquired
with Results.lock_cameraOutput[mqtt_name]:
if Results.cameraOutputFrame[mqtt_name] is None:
continue
# get image from memory
frame = Results.cameraOutputFrame['PiCamera'].copy()
now = dt.datetime.now(pytz.timezone('Europe/Berlin')) # current date and time
Results.occupied[mqtt_name] = False
text = "Unoccupied"
if Conf.watchdog_mode: # and (time.time() - Results.last_motion_detection_message[mqtt_name]) >= 30:
# resize the frame, convert it to grayscale, and blur it
frame = imutils.resize(frame, width=400)
frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
frame_gray = cv2.GaussianBlur(frame_gray, (21, 21), 0)
# if the first frame is None, initialize it
if firstFrame is None:
firstFrame = frame_gray
continue
# compute the absolute difference between the current frame and
# first frame
frameDelta = cv2.absdiff(firstFrame, frame_gray)
thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1]
# dilate the thresholded image to fill in holes, then find contours
# on thresholded image
thresh = cv2.dilate(thresh, None, iterations=2)
cnts = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)
cnts = imutils.grab_contours(cnts)
# loop over the contours
for c in cnts:
# if the contour is too small, ignore it
if cv2.contourArea(c) < Conf.min_area:
continue
# compute the bounding box for the contour, draw it on the frame,
# and update the text
#(x, y, w, h) = cv2.boundingRect(c)
#cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
Results.occupied[mqtt_name] = True
text = "Occupied"
#set new first frame, if unoccupied, so adapt actual image with time
#if not occupied and loopCounter > 10:
firstFrame = frame_gray
# draw the text and timestamp on the frame
#cv2.putText(frame, "Room Status: {}".format(text), (10, 20),
# cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
#cv2.putText(frame, now.strftime("%A %d %B %Y %I:%M:%S%p"),
# (10, frame.shape[0] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 0, 255), 1)
#print( "Motion detection: " + str(time.time() - calc_start) )
if Results.occupied[mqtt_name]:
#cv2.imwrite("frame" + mqtt_name + ".jpg",frame)
Conf.my_mqtt.publish(Conf.mqtt_topic_base + "/" + mqtt_name + "/motionDetection", '{"video":"http://' + Conf.fqdn + ':' + str(Conf.port) + '/api/video/' + mqtt_name + '","last_img":"http://' + Conf.fqdn + ':' + str(Conf.port) + '/api/last_img/' + mqtt_name + '","object":"' + text + '","time":"' + now.isoformat() + '"}')
Results.last_motion_detection_message[mqtt_name] = time.time()
except Exception as e:
print(e)
time.sleep(Conf.motion_detection_iteration_interval)
### Flask ###
# https://www.pyimagesearch.com/2019/09/02/opencv-stream-video-to-web-browser-html-page/
app = Flask(__name__)
@app.route('/api/<stream>/<mqtt_name>', methods=['GET'])
def get_stream(stream=None, mqtt_name=None):
if stream == "video":
if mqtt_name == 'PiCamera':
return Response(myFlask.CameraToJPEG_stream(),
mimetype = "multipart/x-mixed-replace; boundary=frame")
else:
return Response(myFlask.captures_jpeg_to_stream(mqtt_name),
mimetype = "multipart/x-mixed-replace; boundary=frame")
elif stream == "last_img":
try:
return send_file(Conf.img_path + "frame" + mqtt_name + ".jpg", "frame" + mqtt_name + ".jpg")
except Exception as e:
return str(e)
else:
return not_found(404)
class myFlask:
def CameraToJPEG_stream():
while True:
# wait until the lock_ircut is acquired
with Results.lock_cameraOutput['PiCamera']:
if Results.cameraOutputFrame['PiCamera'] is None:
continue
# encode the frame in JPEG format
(flag, encodedImage) = cv2.imencode(".jpg", Results.cameraOutputFrame['PiCamera'])
# ensure the frame was successfully encoded
if not flag:
continue
# yield the output frame in the byte format
yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' +
bytearray(encodedImage) + b'\r\n')
def captures_jpeg_to_stream(mqtt_name):
while True:
with Results.lock_frames_encoded[mqtt_name]:
if Results.cameraOutputFrameEncoded[mqtt_name] is None:
continue
# yield the output frame in the byte format
yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' +
Results.cameraOutputFrameEncoded[mqtt_name] + b'\r\n')
### Motion Detection ###
# construct the argument parser and parse the arguments
#ap = argparse.ArgumentParser()
#ap.add_argument("-a", "--min-area", type=int, default=500, help="minimum area size")
#args = vars(ap.parse_args())
#provide some global dictionarys to save some information in context of the cameras
frames_encoded = {}
lock_frames_encoded = {}
thread_markers = {}
threads = {}
if __name__ == "__main__":
#Get Config via MQTT and refreshes regularly in own thread
Conf.my_mqtt = myMQTT()
while (Conf.mqtt_topic_base == ""):
time.sleep(2.0)
#Prepare thread-locks
Results.init()
# start a thread that will perform the video stream for the Pi camera
thread_vs = Camera_capture()
thread_tensor = Tensor_process()
thread_motion_detection = Motion_detection_process()
# start the flask app
app.run(debug=True, host='0.0.0.0', use_reloader=False, port=Conf.port)
# Clean up
# release the video stream pointer
vs.stop()
Das Script geht davon aus, dass es sich via MQTT unter „undef/undef/undef/undef/undef/[hostname]/hello“ beim Einschalten und in regelmäßigen Abständen meldet. OpenHAB soll daraufhin mit einer Konfiguration antworten, den dieses Gerät benutzen soll. Das lässt sich mit einer Regel bewerkstelligen:
val sendConfig = [String message_file |
executeCommandLine(Duration.ofSeconds(5), "cat", "/etc/openhab/rules/json/" + message_file + ".json")
]
rule "TensorPi01 send config"
when
Item TensorPi01Hello_Time changed
then {
var String message = sendConfig.apply("TensorPi01_watchdogON")
getActions("mqtt","mqtt:broker:myMosquitto").publishMQTT("undef/undef/undef/undef/undef/tensorPi01/conf", message, true)
}
end
Die gesendete Konfiguration lagere ich unter rules in einen neuen Unterordner „json“ aus. Fand ich übersichtlicher. So könnte die Konfiguration exemplarisch aussehen. Wie du siehst, werden hier einfach mehrere Variablen aus dem Script bedient und dynamisch gesetzt.
{
"mqtt_topic_base":"home/undef/undef/undef/tensorPi/tensorPi01",
"fqdn": "tensorpi.rlyeh",
"watchdog":true,
"cameras":{
"PiCamera": "PiCamera"
},
"resolution":"1280x720",
"alert_objects":{
"dog" : 0.5,
"person" : 0.6
},
"flip":false,
"motion_detection_iteration_interval": 0.5,
"tensorflow_iteration_interval": 0.5,
"tensorflow_activation_during_motion_detection": 6
}
Und zu guter Letzt wäre ein Autostart noch schön:
sudo sed -i 's/# By default this script does nothing./# By default this script does nothing.\nsudo -u pi python3 \/home\/pi\/tflite1\/tensor_stream_alert.py > \/home\/pi\/tflite1\/tensor_stream_alert.log \&\n/g' /etc/rc.local
Ein Reboot und deine Kamera sollte funktionieren.
Pingback: Alarmanlage und Anwesenheitssimulator mit OpenHAB - Smarthome DIY - Heimautomatisierung selbst gemacht