Git initial commit

This commit is contained in:
Lars Hahn 2023-08-19 23:38:13 +02:00
commit 8192cc6330
14 changed files with 1210 additions and 0 deletions

BIN
3dprints/Dashcam_Picamcase.FCStd Executable file

Binary file not shown.

Binary file not shown.

BIN
3dprints/Dashcam_Picase-Top.stl Executable file

Binary file not shown.

BIN
3dprints/Dashcam_Picase.FCStd Executable file

Binary file not shown.

BIN
3dprints/PiCam_BaseCase.stl Executable file

Binary file not shown.

BIN
3dprints/PiCam_BaseCaseCover.stl Executable file

Binary file not shown.

9
LICENSE Executable file
View File

@ -0,0 +1,9 @@
MIT License
Copyright (c) 2022 Hahn, Lars
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

90
README.md Executable file
View File

@ -0,0 +1,90 @@
# pi-dashcam
This application follows the idea to setup an OpenSource dashcam
that can be run on a low-cost hardware like the Raspberry PI 3 A+
(but also as in my intial approach with a PI Zero).
Main purpose is to have a trustfull software following the requirements
of GDPR rules in Germany (DSGVO) to store data only temporary and save
video chunks only in cases of accidents/emergencies. Data can be stored
either on RPi SD Card or external USB device (automounted).
Triggering of video data storing can also be done via acceleration sensor.
This project comes with a custom camera and rpi case:
- there is a camera case for the RPi Camera V2 (e.g. NoIR edition)
- there is a RPi 3A+ fitting base, where an RTC-DS3231 can be inserted into
- there is a top covering case, where a breadboard plate of 5x7cm can be easily inserted
(watch for the LED and Button cutouts!)
I created my own HAT for the Pi-3A+; pictures will follow...
## Requirements
- 32-Bit Raspberry Pi OS (necessary to run picamera v1!)
- min. 1 CPU
- min. 128MB RAM
- Pi Camera attached
- legacy camera support enabled in raspi-config
- RealTime-Clock RTC DS3231 (e.g.: https://www.amazon.de/dp/B077XN4LL4/ref=pe_27091401_487024491_TE_item )
- ADXL345 acceleration sensor (e.g.: https://www.amazon.de/dp/B09T376QXX/ref=pe_27091401_487027711_TE_SCE_dp_1 )
- 4 Buttons and one RGB-LED (e.g.: on amazon, the 12mm x 12mm x 7mm tactile push botton with 5 colours ...)
- 3D printed cases for camera and rpi from 3dprints subfolder
(and obviously a 3dprinter or you know somebody ;) )
- Some Dashcam carrier for your car (e.g.: https://www.amazon.de/dp/B091BVDT56/ref=pe_27091401_487024491_TE_item )
A single core and 512MB of RAM is sufficient enough to have a 1080p
dashcam running. However, I decided to stick to the Pi 3A+ in order to be
more powerfull for future approaches.
For a nicer interaction, this application allows the usage of buttons
to e.g. trigger a copy of all legal files at the moment (by default, the last
10 minutes) to a separate folder on the disk (or external, auto-mounted device),
to be available later.
In addition, three LEDs (or an RGB-LED) can be controlled; one is a Power-LED,
that indicates a running system (by default a slow heartbeat pulsar, can be adjusted via Button)
next to a Data-Copy-LED, that lights up when current legal video chunks are stored separately
and blinks for some seconds when that process is done, and finally an info LED
that e.g. shows, no external device mounted or the video capturing process stopped.
More parameter can be set (e.g. resolution, chunk size and count), but the
default values should optimal for the most cases (1080p, 10 video chunks of 60s).
Real-World approach is then to solder all com
For any question, do not hesitate to contact me.
Best regards,
Lars
___________________________
Default PIN scheme:
GND <--> Button Power <--> Board Pin 11 (GPIO 17)
GND <--> Button Copy <--> Board Pin 12 (GPIO 18)
GND <--> Button Stop <--> Board Pin 13 (GPIO 27)
GND <--> Button Info control <--> Board Pin 15 (GPIO 22)
GND <--> 220R <--> LED Data Copy <--> Board Pin 29 (GPIO 27)
GND <--> 220R <--> LED Power <--> Board Pin 33 (GPIO 13)
GND <--> 220R <--> LED Power <--> Board Pin 37 (GPIO 26)
ADXL345 VCC/3V3 <--> Board Pin 1 (3V3)
ADXL345 GND <--> Board Pin 9 (GND)
ADXL345 CS <--> Board Pin 24 (GPIO 8 / SPIO CE0)
ADXL345 SD0 <--> Board Pin 21 (GPIO 9 / SPIO MISO)
ADXL345 SDA <--> Board Pin 19 (GPIO 10 / SPIO MOSI)
ADXL345 SCL <--> Board Pin 23 (GPIO 11 / SPIO SCLK)

552
dashcam.py Executable file
View File

@ -0,0 +1,552 @@
#!/usr/bin/env python3
import os
import shutil
import argparse
import picamera
from led import LED
from switch import Switch
from time import time, sleep
from threading import Thread, Lock
from random import randbytes
from movement import Adxl345Spi
from math import fabs
def get_usb_storage_device(desired_device=None):
with open("/proc/partitions") as file:
file.readline() #skip heading line
file.readline() #skip empty line
device_info = [
[ item.strip() for item in line.strip().split() ]
for line in file
]
devices = [
device[3]
for device in device_info
if len(device) >= 3 and device[0] in ("8", "259")
]
primary_devices = [
device[3]
for device in device_info
if len(device) >= 3 and device[0] in ("8", "259") and (int(device[1]) % 16) == 0
]
block_class_path = "/sys/class/block"
primary_usb_devices = [
device
for device in primary_devices
if (
os.path.islink(f"{block_class_path}/{device}") and
os.path.realpath(f"{block_class_path}/{device}").find("/usb") > 0
)
]
usb_partitions = sorted(
[
partition
for partition in devices
for usb_device in primary_usb_devices
if partition.startswith(usb_device) and usb_device != partition
],
reverse=True #assuming the last partition of the last is correct
)
if desired_device is not None:
if desired_device in usb_partitions:
return desired_device
return None
return usb_partitions[0]
def mount_usb_device(device, id):
mnt_path = f"/mnt/{id}"
if not os.path.ismount(mnt_path):
os.system(f"mkdir -p {mnt_path}")
os.system(f"mount {device} {mnt_path}")
return mnt_path
class Dashcam():
def __init__(
self, sequence_count=10, sequence_length=60, resolution=(1920, 1080),
video_type="h264", video_name_prefix="video-dashcam", bitrate = 17000000,
framerate=30, video_file_path="/opt/dashcam", pin_btn_pwr=11, pin_btn_cpy=12,
pin_btn_stop=13, pin_btn_info=15, pin_led_cpy=29, pin_led_pwr=33,
pin_led_info=37, led_pwr_dim_perc=5, g_force_limit=1.5, salt_bytes=4):
self.pin_btn_cpy = pin_btn_cpy
self.pin_btn_pwr = pin_btn_pwr
self.pin_btn_info = pin_btn_info
self.pin_btn_stop = pin_btn_stop
self.pin_led_pwr = pin_led_pwr
self.pin_led_cpy = pin_led_cpy
self.pin_led_info = pin_led_info
self.pin_led_pwr_dim_percent = led_pwr_dim_perc
self.pin_blink_seconds = 2
self.pin_blink_on_seconds = 0.1
self.g_force_limit = g_force_limit
self.video_sequence_seconds = sequence_length
self.video_sequence_count = sequence_count
self.video_resolution = resolution
self.video_name_prefix = video_name_prefix
self.video_file_path=video_file_path
self.video_file_path_legal=f"{self.video_file_path}/legal"
self.video_type = video_type if video_type in ("h264", "mjpeg") else "h264"
self.video_bit_rate = bitrate
self.video_frame_rate = framerate
# using a salt to not eventually overwrite files
# after an unexpected reboot in car; is like
# a unique identifier for an ongoing record session
self.video_name_salt = randbytes(salt_bytes).hex()
self.LED_data = LED(self.pin_led_cpy)
self.LED_power = LED(self.pin_led_pwr)
self.LED_info = LED(self.pin_led_info)
self.BTN_data = Switch(self.pin_btn_cpy)
self.BTN_power = Switch(self.pin_btn_pwr)
self.BTN_stop = Switch(self.pin_btn_stop)
self.BTN_info = Switch(self.pin_btn_info)
self.file_lock = Lock()
self.camera_lock = Lock()
self.adxl345 = Adxl345Spi()
self.camera = picamera.PiCamera(
resolution=self.video_resolution,
framerate=self.video_frame_rate
)
self.camera_state = 0 #0: off, 1: turndown, 2: on
self.info_led_state = 0
self.segment_ctr = 0
self.video_filename = ""
def get_video_id(self):
return self.video_name_salt
def set_video_path(self, path):
self.video_file_path = path
self.video_file_path_legal = f"{self.video_file_path}/legal"
def __del__(self):
del self.LED_data, self.LED_power
def _dashcam_video_thread(self):
self.video_filename = (
f"{self.video_name_prefix}_"
f"{int(time())}-{self.video_name_salt}-"
f"{self.segment_ctr}.{self.video_type}"
)
video_path = f"{self.video_file_path}/{self.video_filename}"
print(f"Recording to '{video_path}'.")
self.camera.start_recording(
video_path, format=self.video_type, bitrate=self.video_bit_rate
)
self.camera.wait_recording(self.video_sequence_seconds)
while self.camera_state > 1:
self.segment_ctr += 1
tmp_video_filename = (
f"{self.video_name_prefix}_"
f"{int(time())}-{self.video_name_salt}-"
f"{self.segment_ctr}.{self.video_type}"
)
video_path = f"{self.video_file_path}/{tmp_video_filename}"
print(f"Recording to '{video_path}'.")
self.camera.split_recording(video_path)
# as the copy thread callback might be a bit too fast,
# we manage to set the final new filename AFTER the switch
# which guarantees, that the file is really finished.
self.video_filename = tmp_video_filename
self.camera.wait_recording(self.video_sequence_seconds)
self.camera.stop_recording()
self.camera_state = 0
def _dashcam_file_cleanup_thread(self):
while True:
self.file_lock.acquire()
video_file_list = self.get_directory_file_list(
self.video_file_path, self.video_type
)
video_file_list_legal = self.get_video_file_list_legal(
video_file_list, buffer=1
)
delete_video_file_list = [
video_file
for video_file in video_file_list
if video_file not in video_file_list_legal
]
for del_video_file in delete_video_file_list:
print(f"DELETE file '{del_video_file}'")
try:
os.remove(f"{self.video_file_path}/{del_video_file}")
except FileNotFoundError:
print(
f"WARNING! File '{self.video_file_path}/{del_video_file}'"
" is gone. Ignoreing file. Continue"
)
self.file_lock.release()
sleep(self.video_sequence_seconds)
def _led_power_heartbeat(self, LED):
LED.set_duty_cycle(self.pin_led_pwr_dim_percent)
sleep(0.15)
LED.set_duty_cycle(0)
sleep(0.15)
LED.set_duty_cycle(self.pin_led_pwr_dim_percent)
sleep(0.2)
LED.set_duty_cycle(0)
def _dashcam_powerled_thread(self):
round_cntr = 0
round_time = 0.5
LED_is_on = True
LED_state_switch = {
0: self.LED_info,
2: self.LED_power
}
while True:
if self.camera_state in LED_state_switch:
LED = LED_state_switch[self.camera_state]
if (self.info_led_state % 4) == 0 and (round_cntr % 60) == 0:
self._led_power_heartbeat(LED)
elif (self.info_led_state % 4) == 1 and (round_cntr % 1) == 0:
self._led_power_heartbeat(LED)
elif (self.info_led_state % 4) == 2:
if not LED_is_on:
LED.set_on()
LED.set_duty_cycle(self.pin_led_pwr_dim_percent)
LED_is_on = True
else:
if LED_is_on:
LED.set_off()
LED_is_on = False
elif self.camera_state == 1:
if not LED_is_on:
self.LED_info.set_duty_cycle(self.pin_led_pwr_dim_percent)
LED_is_on = True
else:
self.LED_info.set_off()
LED_is_on = False
#sleep to make catch changes in e.g. info led status changes
sleep(round_time)
round_cntr += round_time
def _g_force_surveillance(self):
for _ in range(5):
#cleanup at every start/coldstart (like at car ;) )
x,y,z = self.adxl345.get_acceleration()
sleep(1)
while self.camera_state > 0:
accl_xyz = self.adxl345.get_acceleration()
if any(
fabs(val) > self.g_force_limit
for val in accl_xyz
):
self.save_video_file_legal(self.LED_data)
sleep(0.5)
self.adxl345.stop()
def get_directory_file_list(self, path, filetype):
return [
file
for file in os.listdir(path)
if (
os.path.isfile(os.path.join(path,file)) and
file.endswith(f'.{filetype}')
)
]
def get_video_file_list_legal(self, video_file_list, reverse=True, buffer=0):
prefix_match_sorted_reduced_video_fileid_list = sorted(
[
file.removeprefix(
f"{self.video_name_prefix}_"
).removesuffix(
f'.{self.video_type}'
)
for file in video_file_list
if file.startswith(f"{self.video_name_prefix}_")
], key = (
lambda x: (
f"{int(x.split('-')[1] == self.video_name_salt)}{x}"
)
), reverse=reverse
)
return [
f"{self.video_name_prefix}_{fileid}.{self.video_type}"
for fileid in prefix_match_sorted_reduced_video_fileid_list
][:self.video_sequence_count+buffer]
def save_video_file_legal(self, LED):
self.file_lock.acquire()
LED.set_on()
video_file_list_legal = self.get_video_file_list_legal(
self.get_directory_file_list(
self.video_file_path, self.video_type
), reverse=False
)
timestamp = int(time())
legal_path = f"{self.video_file_path_legal}/{timestamp}_utc"
os.makedirs(legal_path, exist_ok=True)
current_video = self.video_filename
is_active_saving = current_video in video_file_list_legal
if is_active_saving:
video_file_list_legal.remove(current_video)
for video_file in video_file_list_legal:
src = f"{self.video_file_path}/{video_file}"
dst = f"{legal_path}/INCIDENT_{video_file}"
print(f"Copy '{src}' to '{dst}'.")
try:
shutil.copyfile(src, dst)
except FileNotFoundError:
print(f"WARNING! File '{src}' is gone. Ignoreing file. Continue")
while is_active_saving and self.video_filename == current_video:
#waiting for current video to finish
sleep(5)
if is_active_saving:
src = f"{self.video_file_path}/{current_video}"
dst = f"{legal_path}/INCIDENT_{current_video}"
print(f"Copy '{src}' to '{dst}'.")
try:
shutil.copyfile(src, dst)
except FileNotFoundError:
print(f"WARNING! File '{src}' is gone. Ignoreing file. Continue")
print("Copy done.")
for round in range(int(self.pin_blink_seconds / self.pin_blink_on_seconds)):
if round % 2 == 0:
LED.set_on()
else:
LED.set_off()
sleep(self.pin_blink_on_seconds)
LED.set_off()
self.file_lock.release()
def _button_copy_functor(self, input):
if input == 0:
self.save_video_file_legal(self.LED_data)
def _button_start_functor(self, input):
if input == 0:
if self.camera_state == 0:
self.camera_lock.acquire()
self.camera_state = 2
self.video_thread = Thread(target=self._dashcam_video_thread)
self.g_force_thread = Thread(target=self._g_force_surveillance)
self.video_thread.start()
self.LED_power.set_duty_cycle(self.pin_led_pwr_dim_percent)
self.g_force_thread.start()
sleep(10) #mainly user notification via LED on
self.camera_lock.release()
def _button_stop_functor(self, input):
if input == 0:
if self.camera_state == 2:
self.camera_lock.acquire()
self.camera_state = 1
self.LED_power.set_off()
self.video_thread.join()
self.g_force_thread.join()
del self.video_thread
del self.g_force_thread
self.camera_lock.release()
def _button_info_functor(self, input):
if input == 0:
# in order to keep numbers small and as we probably won't
# have more than 100 blinking states, lets keep it 0 < x < 100 !
self.info_led_state = (self.info_led_state + 1) % 100
def do_warning(self):
# just blink at info LED!
# can be used when e.g. mountpoint is unavailable!!!
for _ in range(10):
self.LED_info.set_on()
sleep(0.5)
self.LED_info.set_off()
sleep(0.5)
def start(self):
os.makedirs(self.video_file_path, exist_ok=True)
os.makedirs(self.video_file_path_legal, exist_ok=True)
self.power_led_thread = Thread(target=self._dashcam_powerled_thread)
self.clean_thread = Thread(target=self._dashcam_file_cleanup_thread)
self.BTN_data.set_functor(self._button_copy_functor)
self.BTN_power.set_functor(self._button_start_functor)
self.BTN_stop.set_functor(self._button_stop_functor)
self.BTN_info.set_functor(self._button_info_functor)
self.power_led_thread.start()
self.clean_thread.start()
self._button_start_functor(0)
def join_clean_thread(self):
self.clean_thread.join()
def main():
parser = argparse.ArgumentParser(
description="""DashCam-app following the German traffic and GDPR (DSGVO)
laws. Is designed to work on any Raspberry PI that provides PiCamera
library in Python; this fits basically for any 32-Bit driven RPI.
Mainly used in combination with a RPI Zero W v1.1, which is totally
sufficient for this purpose.
Instead of a single, increasing video file, the input video stream is
splitted into e.g. 60s video chunks; only the last e.g. 10 chunks are kept.
This allows to have e.g. the last 10 minutes stored but also regularly cleaned
in order to stay conform towards GDPR.
There is a special incident procedure: when an attached button is pressed,
the last e.g. 10 legal video chunks are separately stored and not cleaned
to e.g. provide some legal information in any case of accident/emergency.
In addition, two LEDs can be attached to print Power/Heartbeat information
and to indicate the current, active data copy in any case of accident (this
will only happen, when you press the button).
"""
)
parser.add_argument(
"-s", "--video_chunk_duration", metavar="S", type=int, required=False,
default=60, help="Length of a single stored video chunk."
)
parser.add_argument(
"-c", "--video_chunk_count", metavar="C", type=int, required=False,
default=10, help=(
"Max. number of sequential video chunks that are stored in parallel"
"on the disk; this correlates do GDPR laws."
)
)
parser.add_argument(
"-p", "--video_file_path", metavar="P", type=str, required=False,
default="/opt/dashcam", help="Location to store dashcam video chunks"
)
parser.add_argument(
"-f", "--video_file_prefix", metavar="F", type=str, required=False,
default="video-dashcam", help=(
"Filename prefix for the locally stored video file chunks"
)
)
parser.add_argument(
"-r", "--video_resolution", metavar="R", nargs=2, type=int, required=False,
default=(1920, 1080), help="Length of a single stored video chunk."
)
parser.add_argument(
"-br", "--video_bitrate", metavar="BR", type=int, required=False,
default=7000000, help="Bitrate used to store videos."
)
parser.add_argument(
"-vf", "--video_format", metavar="VF", type=str, required=False,
default="h264", choices=("h264","mjpeg"),help="File format used to store videos."
)
parser.add_argument(
"-lc", "--pin_led_copy", metavar="PLC", type=int, required=False,
default=29, help="Pin number in GPIO.BOARD layout for a data-copy LED."
)
parser.add_argument(
"-lp", "--pin_led_power", metavar="PLP", type=int, required=False,
default=33, help="Pin number in GPIO.BOARD layout for a power LED."
)
parser.add_argument(
"-li", "--pin_led_info", metavar="PLP", type=int, required=False,
default=37, help="Pin number in GPIO.BOARD layout for an info LED."
)
parser.add_argument(
"-bp", "--pin_button_power", metavar="PBP", type=int, required=False,
default=11, help="Pin number in GPIO.BOARD layout for a start button."
)
parser.add_argument(
"-bc", "--pin_button_copy", metavar="PBC", type=int, required=False,
default=12, help="Pin number in GPIO.BOARD layout for a data copy button."
)
parser.add_argument(
"-bs", "--pin_button_stop", metavar="PBS", type=int, required=False,
default=13, help="Pin number in GPIO.BOARD layout for a stop button."
)
parser.add_argument(
"-bi", "--pin_button_info", metavar="PBI", type=int, required=False,
default=15, help="Pin number in GPIO.BOARD layout for a info-led control button."
)
parser.add_argument(
"-d", "--pin_power_dim_percent", metavar="LPD", type=int, required=False,
default=5, help="Percent dim for the Power LED; if it might be to bright."
)
parser.add_argument(
"-g", "--g_force_limit", metavar="G", type=int, required=False, default=1.5,
help="Threshold in terms of g-Force, when a data copy should be triggered."
)
parser.add_argument(
"--external_usb_storage_device", metavar="DEVICE", type=str, required=False,
help=(
"Dev shortcut, e.g. sda1, for a partition/device you want to use as "
"video storage. Will be directly written on it."
)
)
args = parser.parse_args()
video_chunk_duration = args.video_chunk_duration
video_chunk_count = args.video_chunk_count
video_file_path = args.video_file_path
video_file_prefix = args.video_file_prefix
video_resolution = args.video_resolution
video_bitrate = args.video_bitrate
video_format = args.video_format
pin_button_copy = args.pin_button_copy
pin_button_power = args.pin_button_power
pin_button_stop = args.pin_button_stop
pin_button_info = args.pin_button_info
pin_led_power = args.pin_led_power
pin_led_copy = args.pin_led_copy
pin_led_info = args.pin_led_info
pin_power_dim_percent = args.pin_power_dim_percent
g_force_limit = args.g_force_limit
usb_storage = args.external_usb_storage_device if hasattr(args,'external_usb_storage_device') else None
dashcam = Dashcam(
sequence_count=video_chunk_count, sequence_length=video_chunk_duration,
resolution=video_resolution, video_type=video_format, bitrate=video_bitrate,
video_name_prefix=video_file_prefix, video_file_path=video_file_path,
pin_btn_cpy=pin_button_copy, pin_btn_pwr=pin_button_power,
pin_btn_info=pin_button_info, pin_btn_stop=pin_button_stop,
pin_led_cpy=pin_led_copy, pin_led_pwr=pin_led_power, pin_led_info=pin_led_info,
led_pwr_dim_perc=pin_power_dim_percent, g_force_limit=g_force_limit
)
if usb_storage is not None:
usb_device = get_usb_storage_device(usb_storage)
if usb_device is not None:
mount_path = mount_usb_device(f"/dev/{usb_device}", "dashcam-videodata")
dashcam.set_video_path(mount_path)
else:
dashcam.do_warning()
dashcam.start()
dashcam.join_clean_thread()
if __name__=='__main__':
main()

13
dashcam.service.example Executable file
View File

@ -0,0 +1,13 @@
[Unit]
Description=RaspberryPI native dashcam script to be schedule after boot
Wants=network.target
Requires=pigpiod.service
After=pigpiod.service
[Service]
ExecStart=/opt/dashcam/.venv/bin/python3 /opt/dashcam/dashcam.py
WorkingDirectory=/opt/dashcam
Restart=always
[Install]
WantedBy=multi-user.target

37
install.sh Executable file
View File

@ -0,0 +1,37 @@
#!/bin/bash
# This script is used for a simple installation
# of the dashcam application on a e.g. a Raspberry
# PI Zero.
# You will probably asked for sudo password, if you
# did not have the default settings of Raspberry PI
# OS.
# Please feel free to adjust to your needs, e.g.
# adding some mount info like sda1 for external usb sticks
DASHCAM_ROOT="/opt/dashcam"
DASHCAM_ROOT_LEGAL=$DASHCAM_ROOT"/legal"
echo "Install necessary packages (python-venv, pip)"
sudo apt install python3-venv python3-pip pigpio
for DCFile in dashcam.py led.py switch.py;
do
echo "Copy file "$DCFile" to "$DASHCAM_ROOT/$DCFile
sudo cp $DCFile $DASHCAM_ROOT/$DCFile
sudo chmod a+x $DASHCAM_ROOT/$DCFile
done
echo "Setting Python3 virtual env '.venv' at '"$DASHCAM_ROOT"/.venv'"
sudo python3 -m venv $DASHCAM_ROOT/.venv
sudo $DASHCAM_ROOT/.venv/bin/pip3 install --upgrade pip
sudo $DASHCAM_ROOT/.venv/bin/pip3 install picamera RPi.GPIO pigpio
echo "Setup systemd service at /etc/systemd/system/dashcam.service"
sudo cp dashcam.service.example /etc/systemd/system/dashcam.service
echo "Reload systemctl daemon service"
sudo systemctl daemon-reload
echo "Prepare dashcam service to start after reboot"
sudo systemctl enable dashcam.service pigpiod.service
echo "Start dashcam service"
sudo systemctl start pigpiod.service dashcam.service

134
led.py Executable file
View File

@ -0,0 +1,134 @@
#!/usr/bin/env python3
"""
This module provides a class to easy setup an LED instance for GPIO that
is capable to handle a real LED like switching on and off or to dim the light.
Each instance holds one pin and controls this pin.
The developer/user has to make sure, that there are no overlapping instances used...
In addition some basic functionality tests are provide as stand-alone script.
Classes:
LED
Functions:
main
class_test
"""
import RPi.GPIO as GPIO
GPIO.setmode(GPIO.BOARD)
class LED:
"""
Class to controll an LED via a GPIO PIN in GPIO.BOARD configuration.
Each class instance controls exactly one pin.
Make sure they are not overlapping!
Methods:
__init__(pin, freq, is_inverse)
__del__()
freq()
set_freq(freq)
duty_cycle()
set_duty_cycle(duty_cycle)
set_on()
set_off()
"""
def __init__(self, pin, freq=2000, is_inverse=False):
"""
Constructor to create a single LED instance with one pin asociated.
The frequency can be set and also an inverse_state.
Keyword Arguments:
pin -- the GPIO.BOARD pin
freq -- the frequency for the LED (default: 2000)
is_inverse -- boolean if the LED is inverted (connected to 3.3V instead of GND) (default: False)
"""
self._pin = pin
self._is_inverse = is_inverse
GPIO.setup(self._pin, GPIO.OUT)
GPIO.output(self._pin, GPIO.LOW)
self._freq = freq
self._duty_cycle = 0 if not self._is_inverse else 100
self._pwm = GPIO.PWM(self._pin, self._freq)
self._pwm.start(self._duty_cycle)
def __del__(self):
"""
Destructor to stop PWM activated on a pin and setup the output low.
"""
self._pwm.stop()
GPIO.output(self._pin, GPIO.LOW)
def freq(self):
"""
Function to get the current used frequency.
Returns: freq
"""
return self._freq
def set_freq(self, freq):
"""
Function to set the frequency for the LED.
Keyword Arguments:
freq -- the frequency to be set
"""
self._freq = freq
self._pwm.ChangeFrequency(freq)
def duty_cycle(self):
"""
Function to get the current used duty cycle (PWM; dimming).
Is an integer 0 <= duty_cycle <= 100.
Returns: duty_cycle
"""
return self._duty_cycle
def set_duty_cycle(self, duty_cycle):
"""
Function to set the duty cycle (PWM; dimming) for the LED.
Has to be an integer 0 <= duty_cycle <= 100.
Keyword Arguments:
duty_cycle -- the frequency to be set
"""
dc = min(100,max(duty_cycle,0))
self._duty_cycle = dc if not self._is_inverse else 100 - dc
self._pwm.ChangeDutyCycle(self._duty_cycle)
def set_on(self):
"""
Function to switch an LED on and set the duty cycle to max.
"""
self.set_duty_cycle(100)
GPIO.output(self._pin, GPIO.HIGH)
def set_off(self):
"""
Function to switch an LED off and set the duty cycle to min.
"""
self.set_duty_cycle(0)
GPIO.output(self._pin, GPIO.LOW)
def class_test():
"""
Class to provide basic functionality testing.
Connect LED to pin 11,12 and GND.
Run led.py locally. The LED should turn on, switch
a bit and dim it self.
For each LED on pin 11 and 12 individually
"""
#Basic Testing
pins = (11,12)
for pin in pins:
LED1 = LED(pin)
#Basic Turn on and off
LED1.set_on()
time.sleep(1)
LED1.set_off()
time.sleep(1)
# Use simple PWM and off
LED1.set_duty_cycle(50)
time.sleep(1)
LED1.set_off()
time.sleep(2)
# Turn slowly down
for dc in range(100,-1,-1):
LED1.set_duty_cycle(dc)
time.sleep(0.1)
del LED1
GPIO.cleanup()
if __name__ == "__main__":
"""
A main function that is used, when this module is used as a stand-alone script.
Local imports in order not to disturb library import functionality (keeping it clean!)
"""
import time
class_test()

164
movement.py Executable file
View File

@ -0,0 +1,164 @@
#!/usr/bin/env python3
"""
"""
from math import log2, fabs
from time import sleep
import pigpio
class Adxl345():
ADDR_DEV_ID = 0x00
ADDR_RATE_BW = 0x2C
ADDR_DATA_FORMAT = 0x31
ADDR_DATA_X_0 = 0x32 # from here on 6 bytes (2 per coordinate) for X,Y,Z
ADDR_OFFSET_X = 0x1E
ADDR_OFFSET_Y = 0x1F
ADDR_OFFSET_Z = 0x20
ADDR_POWER_CTL = 0x2D
def __init__(
self, sensitivity_range=4, data_rate_level=0):
# high range values means lower resolution for small movements!
# low range values means higher resolution for small movements!
self.set_sensitivity_range(sensitivity_range)
self.set_data_rate_level(data_rate_level)
def decode(self, lsb, msb):
accl = (msb << 8) | lsb
adjust = 2 * (2**(self.sensitivity_range + 1)) / (2**13) # given g range with 13 bit precision
correct_accl = (accl - (1 << 16)) * adjust if accl & (1 << 15) else accl * adjust
return correct_accl
def set_sensitivity_range(self, sensitivity_range):
errmsg = f"Sensitivity Range '{sensitivity_range}' needs to integer of 2,4,8,16"
if not isinstance(sensitivity_range, int):
raise TypeError(errmsg)
if sensitivity_range not in (2,4,8,16):
raise ValueError(errmsg)
self.sensitivity_range = int(log2(sensitivity_range))-1
data = self.from_address(Adxl345.ADDR_DATA_FORMAT, 1)[0] & ~0x0F | self.sensitivity_range | 0x08
self.to_address(Adxl345.ADDR_DATA_FORMAT, data)
def set_data_rate_level(self, data_rate_level):
errmsg = f"DataRateLevel '{data_rate_level}' needs to be integer within 0 < DRL <= 16!"
if not isinstance(data_rate_level, int):
raise TypeError(errmsg)
if data_rate_level < 0 or data_rate_level > 16:
raise ValueError(errmsg)
self.data_rate_code = 0b1111-(data_rate_level)
self.data_rate = int(3200/(2**(data_rate_level)))
self.to_address(Adxl345.ADDR_RATE_BW, self.data_rate_code)
def set_offset(self, x, y, z):
offset = {
Adxl345.ADDR_OFFSET_X: x,
Adxl345.ADDR_OFFSET_Y: y,
Adxl345.ADDR_OFFSET_Z: z,
}
for addr in offset.keys():
self.to_address(
addr,
int(offset[addr] / Adxl345.FACTOR_HIGH_RES / 4 ) & 0xFF
)
def set_on(self):
self.to_address(Adxl345.ADDR_POWER_CTL, 0x08)
def set_off(self):
self.to_address(Adxl345.ADDR_POWER_CTL, 0x00)
def calibrate(self, margin=0.1):
self.set_offset(0,0,0)
accel = self.get_acceleration()
x,y,z = accel[0], accel[1], accel[2]
if not all(
(0-margin < fabs(v) < 0+margin) or (1-margin < fabs(v) < 1+margin)
for v in (x, y, z)
):
raise ValueError(
f"WARNING! Please place sensor on appropriate surface; "
f"values should be around 0 or 1 and not {x,y,z}")
if not round(x) ^ round(y) ^ round(z):
raise ValueError(
"WARNING! Please place sensor on appropriate surface; "
"values should be around 0 or 1, with only one value 1 "
f"and not {x,y,z}"
)
calibration = [
round(v) - v
for v in (x,y,z)
]
self.set_offset(*calibration)
def get_acceleration(self,axis=0b111):
byte_ctr = 6 #2 bytes each for x,y,z values
data = self.from_address(Adxl345.ADDR_DATA_X_0, byte_ctr)
return [
self.decode(data[idx], data[idx+1])
for idx in range(0,byte_ctr, 2)
if axis & (1 << (idx//2))
]
class Adxl345Spi(Adxl345):
BITMASK_READ = 0x80
BITMASK_MULTI = 0x40
ADDR_SELECT_MASK = 0x3f
def __init__(self, channel=0, mode=0b11, baudrate=2e6):
self.channel = int(channel)
self.mode = int(mode)
self.baudrate = int(baudrate)
self.pi = pigpio.pi()
self.spi = self.pi.spi_open(self.channel, self.baudrate, self.mode)
super().__init__()
def from_address(self, addr, byte_count):
bit_msg = [
addr | Adxl345Spi.BITMASK_READ | (Adxl345Spi.BITMASK_MULTI * (byte_count > 1))
]
# add some random bytes, read bitmask is set -> no writing!
bit_msg.extend([
0xFF
for _ in range(byte_count)
])
count, data = self.pi.spi_xfer(self.spi, bit_msg)
if count != (byte_count+1) or len(data) != count:
raise ValueError(
f"Returned SPI bytes from {addr} seems not to be correct!\n"
f"Found {[x for x in data]}"
)
return data[1:]
def to_address(self, addr, values):
data_values = values if isinstance(values, list) else [values]
bit_msg = [
addr | (Adxl345Spi.BITMASK_MULTI * (len(data_values) > 1))
]
bit_msg.extend(data_values)
self.pi.spi_xfer(self.spi, bit_msg)
def stop(self):
self.pi.spi_close(self.spi)
self.pi.stop()
class Adxl345I2C(Adxl345):
pass
def main():
test = Adxl345Spi()
test.set_on()
for _ in range(10):
print(", ".join(str(val) for val in test.get_acceleration()))
sleep(1)
test.stop()
if __name__ == "__main__":
# execute only if run as a script
main()

211
switch.py Executable file
View File

@ -0,0 +1,211 @@
#!/usr/bin/env python3
"""
This module provides classes that relate/beling to the physical class of switches.
It can be used to easily read state from switches and transform physical states
into computer and human readable values.
Each switch instance will represent exactly one physical switch, so that
a certain switch pin related to exact one Switch.
The developer/user has to make sure, that there are no overlapping instances used...
In addition some basic functionality tests are provide as stand-alone script.
Classes:
Switch
Functions:
main
class_Switch_test
class_Switch_functor
"""
import RPi.GPIO as GPIO
GPIO.setmode(GPIO.BOARD)
class Switch:
"""
Class to handle input/states of physical switches in GPIO.BOARD configuration.
Each class instance controls exactly one switch.
Can be used for simple buttons aswell as binary tilt-switches or binary
vibrations switches.
Make sure pins are mutual exclusiv!
Methods:
__init__(pin, freq, is_inverse)
bouncetime()
set_bouncetime(bouncetime)
functor()
set_functor(functor)
edge()
set_edge(edge)
_update_callback()
_press_btn(*args)
default_functor()
"""
def map_edge(edge):
"""
Translate users integer input to return corresponding
GPIO edge detection states (GPIO.RISING, GPIO.FALLING, GPIO.BOTH).
GPIO.RISING means callback connected to an voltage increase
(e.g. pull-down-resistor with button pressed)
Keyword Arguments:
edge -- Integer to set/update the wished edge detection mode.
>0: RISING, ==0: BOTH, <0: FALLING
"""
if edge > 0:
return GPIO.RISING
if edge < 0:
return GPIO.FALLING
return GPIO.BOTH
def __init__(self, pin, functor=None, bouncetime=10, edge_detector=0, pud=1):
"""
Constructor to create connect to a single switch; bouncetime, edge detection,
PUD and a functor (what should be called on switch interaction) can be set.
Mandatory to set is the pin to read the signal/state from a button.
Keyword Arguments:
pin -- the GPIO.BOARD pin
functor -- function pointer, what should be called on switch action; (default: None -> calls default functor)
bouncetime -- switch sleepyness, in which periods signals are ignored (do not react on button flickering) (default: 10)
edge_detector -- integer to indicate on which kind of signal change (edge) to be listened to (default: 0 -> GPIO.BOTH)
pud -- Integer to indicate if it is pull-up or pull-down resistor (default: 1 -> GPIO.PUD_UP)
"""
self._pin = pin
self._functor = functor if functor is not None else Switch.default_functor
self._bouncetime = bouncetime
self._edge = Switch.map_edge(edge_detector)
self._pud = GPIO.PUD_UP if pud >= 0 else GPIO.PUD_DOWN
GPIO.setup(self._pin, GPIO.IN, pull_up_down=self._pud)
self._update_callback()
def pud(self):
"""
Return the PUD (pull up or down) resistor state of switch.
Cannot be set, only during creation of switch instance.
Returns: GPIO.PUD_UP or GPIO.PUD_DOWN
"""
return self._pud
def bouncetime(self):
"""
Return the currently used bouncetime; bouncetime indicates how fast a signal change (edge)
should/can trigger the callback function.
Returns: bouncetime -- integer
"""
return self._bouncetime
def set_bouncetime(self, bouncetime):
"""
Override the currently used bouncetime; will trigger an update of the callback function
by removing and adding (with new values) an event_detection to button signal pin.
Bouncetime indicates how fast a signal change (edge) should/can trigger the callback function.
Keyword Arguments:
bouncetime -- switch sleepyness, after which time periods signals are recognised.
"""
self._bouncetime = bouncetime
self._update_callback()
def functor(self):
"""
Returns the currently used functor that is used within callback triggering of the switch.
No real usage except comporing somewhere maybe ...
Returns: functor -- function pointer
"""
return self._functor
def set_functor(self, functor):
"""
Set the functor, function pointer that is used during the callback triggering; the input
for the functor is the current state of the button when pressed/released (typical: 0,1).
Allows the overriding for general purpose usage.
Keyword Arguments:
functor -- function pointer
"""
self._functor = functor
def edge(self):
"""
Returns the currently used edge detection for the switch.
Indicates if it is configured to listen on GPIO.RISING, -.FALLING or -.BOTH.
Returns: GPIO.BOTH, GPIO.FALLING or GPIO.RISING
"""
return self._edge
def set_edge(self, edge_detector):
"""
Allows overwriting of the edge detectiong during the callback triggering.
Based on the PUD resitor type, we can listen to a falling, rising or both
voltage changes.
Input is an integer, that is mapped accordingly to the three states
RISING, BOTH, FALLING (>0, ==0 , <0).
Keyword Arguments:
edge_detector -- integer, where the sign indicates for the edge_detection
(RISING, BOTH, FALLING)
"""
self._edge = Switch.map_edge(edge_detector)
self._update_callback()
def _update_callback(self):
"""
Internal function that is used to update the callback by removing and adding
the event_detection with adjusted values.
Is used in order to included changes on edge or bouncetime.
"""
GPIO.remove_event_detect(self._pin)
GPIO.add_event_detect(self._pin, self._edge, callback=self._press_btn, bouncetime=self._bouncetime)
def _press_btn(self, *args):
"""
Internal callback function that is used when a switch is triggered;
generically makes use of the given functor (default or adjusted by needs);
the current button/pin state is provided to the functor.
Keyword Arguments:
args -- generic arguments from the callback, currently not used.
"""
self._functor(GPIO.input(self._pin))
def default_functor(input):
"""
Static method, default functor that is used if not specified otherwise in
constructor.
Gets an input (button state) and prints based on the current state some
information (button pressed or not).
Keyword Arguments:
input -- integer, 0,1 and the current button state if pressed/released
"""
if input == 0:
print("Switch pressed")
elif input == 1:
print("Switch released")
else:
print("WARNING! UNKNOWN Switch STATE")
def class_switch_functor(input):
"""
Example switch functor that mutual exclusively turns
two LEDs on and off according to button pressing.
LED1+2 are set globally in main function in order to not
disturb the class functionality.
Keyword Arguments:
input -- integer, 0,1 and the current button state if pressed/released
"""
if input == 0:
LED1.set_on()
LED2.set_off()
elif input == 1:
LED1.set_off()
LED2.set_on()
else:
LED1.set_off()
LED2.set_off()
def class_switch_test():
"""
Example function to test the basic functionality of the switch class.
Makes use of in main globally setup LED1+2 instances. Is only called whe
library is called as it's own script (not imported).
Will call the default switch functor and a functor turning off and on
LEDs in pin 12 and 13.
"""
BTN1 = Switch(11)
print("Please press Switch (default test) ... 10s")
time.sleep(10)
print("Done!")
print("Now again please press Switch (functor test) ... 10s")
BTN1.set_functor(class_switch_functor)
time.sleep(10)
print("Done!")
GPIO.cleanup()
if __name__ == "__main__":
"""
A main function that is used, when this module is used as a stand-alone script.
Local imports in order not to disturb library import functionality (keeping it clean!)
"""
import time
from led import LED
LED1 = LED(12)
LED2 = LED(13)
class_switch_test()