smart-mobcom/mobcom.py
2023-08-19 23:38:24 +02:00

167 lines
5.5 KiB
Python
Executable File

import logging
from RPi import GPIO as GPIO
from serial import Serial
from time import sleep
class mobile():
message_storage_types = ("SM", "ME", "MT", "BM", "SR", "TA")
enter_code = b'\x1A'
def parse_message(message):
pass
def __init__(
self, dev="/dev/ttyS0", baud=115200, power_key=31,
mode=GPIO.BOARD):
self._device = dev
self._baud_rate = baud
self._power_key = power_key
self._retry = 3
devname = self._device.split("/")[-1]
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
self._logger = logging.getLogger(f"dev={devname}")
self._logger.addHandler(console)
self._logger.setLevel(logging.DEBUG)
self._serial = Serial(self._device, self._baud_rate)
self._serial.flushInput()
self._is_started = False
GPIO.setmode(mode)
def __del__(self):
self._serial.close()
GPIO.cleanup()
def _at_cmd(self, command, timeout=1):
self._serial.write(f'{command}\r\n'.encode())
sleep(timeout)
rtry = self._retry
while not self._serial.inWaiting() and abs(rtry) >= 0:
rtry -= 1
sleep(0.01)
if abs(rtry) < 0:
self._logger.warn(
f"No Response for cmd '{command}' within {self._retry} retries."
)
return ''
return self._serial.read(self._serial.inWaiting()).decode()
def set_retry(self, retry):
if retry == 0:
self._logger.warn(
"Retry must not be 0! Setting to default (3)."
)
retry = 3
self._retry = retry
def get_retry(self):
return self._retry
def start_device(self):
GPIO.setwarnings(False)
GPIO.setup(self._power_key,GPIO.OUT)
self._logger.info(
f"Startup SIM device on {self._device} in progress..."
)
GPIO.output(self._power_key,GPIO.HIGH)
sleep(5)
GPIO.output(self._power_key,GPIO.LOW)
sleep(20)
self._serial.flushInput()
self._is_started = True
self._logger.info(f"Startup SIM device on {self._device} is done.")
def stop_device(self):
self._logger.info(
f"Stopping SIM device on {self._device} in progress..."
)
GPIO.output(self._power_key,GPIO.HIGH)
sleep(5)
GPIO.output(self._power_key,GPIO.LOW)
sleep(20)
self._is_started = False
self._logger.info(f"Stopping SIM device on {self._device} is done.")
def set_message_storage(self, storage_types):
if len(storage_types) > 3 or any(
storage_type not in mobile.message_storage_types
for storage_type in storage_types
):
storage_types = ["SM","SM","SM"]
self._logger.error(
"Only up to 3 valid message storages "
f"({','.join(mobile.message_storage_types)})"
"are allowed! "
f"Reset to default: {storage_types}"
)
storage_types = ",".join(
f'\"{storage}\"'
for storage in storage_types
)
atcmd = f"AT+CPMS={storage_types}"
self._logger.debug(f"Setup message storage type: {atcmd}")
response = self._at_cmd(atcmd)
self._logger.debug(f"Response message storage type: {response}")
if 'OK:' in response:
self._logger.info("Setup message storages successfully.")
else:
self._logger.warn(f"Cannot setup message storages: {atcmd}")
def send_sms(self, phone_number, message):
self._logger.info(f"Sending SMS via {self._device} in progress...")
atcmd = "AT+CMGF=1"
self._logger.debug(f"Setup TEXT mode: {atcmd}")
response = self._at_cmd(atcmd)
self._logger.debug(f"Response TEXT mode: {response}")
atcmd = f"AT+CMGS=\"{phone_number}\""
self._logger.debug(f"Setup message target: {atcmd}")
response = self._at_cmd(atcmd)
self._logger.debug(f"Response message targeting: {response}")
if ">" in response:
self._logger.info(f"Sending message: {message}")
self._serial.write(message.encode())
self._logger.debug(f"Sending 'enter' for execution: {mobile.enter_code}")
self._serial.write(mobile.enter_code)
self._logger.debug("Awaiting confirmation...")
response = self._at_cmd('', 20)
self._logger.debug(f"Response message awaited: {response}")
if "OK" in response:
self._logger.info("Success! Message was sent.")
else:
self._logger.warn("Warning! Message sending failed.")
else:
self._logger.warn(f"Cannot target message target: {atcmd}")
self._logger.info(f"Sending SMS via {self._device} is done.")
def receive_sms(self, index=1):
self._logger.info(f"Receiving SMS via {self._device} in progress...")
atcmd = "AT+CMGF=1"
self._logger.debug(f"Setup TEXT mode: {atcmd}")
response = self._at_cmd(atcmd)
self._logger.debug(f"Response TEXT mode: {response}")
atcmd = f"AT+CMGR={index}"
self._logger.debug(f"Receive message: {atcmd}")
response = self._at_cmd(atcmd)
self._logger.debug(f"Response receiving message: {response}")
if "+CMGR:" not in response:
self._logger.warn("Warning! Message receiving failed.")
return response
self._logger.info("Success! Message was received.")
return response