import logging from RPi import GPIO as GPIO from serial import Serial from time import sleep class mobile(): message_storage_types = ("SM", "ME", "MT", "BM", "SR", "TA") enter_code = b'\x1A' def parse_message(message): pass def __init__( self, dev="/dev/ttyS0", baud=115200, power_key=31, mode=GPIO.BOARD): self._device = dev self._baud_rate = baud self._power_key = power_key self._retry = 3 devname = self._device.split("/")[-1] console = logging.StreamHandler() console.setLevel(logging.DEBUG) self._logger = logging.getLogger(f"dev={devname}") self._logger.addHandler(console) self._logger.setLevel(logging.DEBUG) self._serial = Serial(self._device, self._baud_rate) self._serial.flushInput() self._is_started = False GPIO.setmode(mode) def __del__(self): self._serial.close() GPIO.cleanup() def _at_cmd(self, command, timeout=1): self._serial.write(f'{command}\r\n'.encode()) sleep(timeout) rtry = self._retry while not self._serial.inWaiting() and abs(rtry) >= 0: rtry -= 1 sleep(0.01) if abs(rtry) < 0: self._logger.warn( f"No Response for cmd '{command}' within {self._retry} retries." ) return '' return self._serial.read(self._serial.inWaiting()).decode() def set_retry(self, retry): if retry == 0: self._logger.warn( "Retry must not be 0! Setting to default (3)." ) retry = 3 self._retry = retry def get_retry(self): return self._retry def start_device(self): GPIO.setwarnings(False) GPIO.setup(self._power_key,GPIO.OUT) self._logger.info( f"Startup SIM device on {self._device} in progress..." ) GPIO.output(self._power_key,GPIO.HIGH) sleep(5) GPIO.output(self._power_key,GPIO.LOW) sleep(20) self._serial.flushInput() self._is_started = True self._logger.info(f"Startup SIM device on {self._device} is done.") def stop_device(self): self._logger.info( f"Stopping SIM device on {self._device} in progress..." ) GPIO.output(self._power_key,GPIO.HIGH) sleep(5) GPIO.output(self._power_key,GPIO.LOW) sleep(20) self._is_started = False self._logger.info(f"Stopping SIM device on {self._device} is done.") def set_message_storage(self, storage_types): if len(storage_types) > 3 or any( storage_type not in mobile.message_storage_types for storage_type in storage_types ): storage_types = ["SM","SM","SM"] self._logger.error( "Only up to 3 valid message storages " f"({','.join(mobile.message_storage_types)})" "are allowed! " f"Reset to default: {storage_types}" ) storage_types = ",".join( f'\"{storage}\"' for storage in storage_types ) atcmd = f"AT+CPMS={storage_types}" self._logger.debug(f"Setup message storage type: {atcmd}") response = self._at_cmd(atcmd) self._logger.debug(f"Response message storage type: {response}") if 'OK:' in response: self._logger.info("Setup message storages successfully.") else: self._logger.warn(f"Cannot setup message storages: {atcmd}") def send_sms(self, phone_number, message): self._logger.info(f"Sending SMS via {self._device} in progress...") atcmd = "AT+CMGF=1" self._logger.debug(f"Setup TEXT mode: {atcmd}") response = self._at_cmd(atcmd) self._logger.debug(f"Response TEXT mode: {response}") atcmd = f"AT+CMGS=\"{phone_number}\"" self._logger.debug(f"Setup message target: {atcmd}") response = self._at_cmd(atcmd) self._logger.debug(f"Response message targeting: {response}") if ">" in response: self._logger.info(f"Sending message: {message}") self._serial.write(message.encode()) self._logger.debug(f"Sending 'enter' for execution: {mobile.enter_code}") self._serial.write(mobile.enter_code) self._logger.debug("Awaiting confirmation...") response = self._at_cmd('', 20) self._logger.debug(f"Response message awaited: {response}") if "OK" in response: self._logger.info("Success! Message was sent.") else: self._logger.warn("Warning! Message sending failed.") else: self._logger.warn(f"Cannot target message target: {atcmd}") self._logger.info(f"Sending SMS via {self._device} is done.") def receive_sms(self, index=1): self._logger.info(f"Receiving SMS via {self._device} in progress...") atcmd = "AT+CMGF=1" self._logger.debug(f"Setup TEXT mode: {atcmd}") response = self._at_cmd(atcmd) self._logger.debug(f"Response TEXT mode: {response}") atcmd = f"AT+CMGR={index}" self._logger.debug(f"Receive message: {atcmd}") response = self._at_cmd(atcmd) self._logger.debug(f"Response receiving message: {response}") if "+CMGR:" not in response: self._logger.warn("Warning! Message receiving failed.") return response self._logger.info("Success! Message was received.") return response