diff --git a/src/radiotech/rfid/umfrc522.py b/src/radiotech/rfid/umfrc522.py new file mode 100644 index 0000000..6d2a836 --- /dev/null +++ b/src/radiotech/rfid/umfrc522.py @@ -0,0 +1,646 @@ +import logging +from machine import Pin, SPI + + +class UMFRC522: + """Class for connecting and controlling to a MFRC522 RFID reader. + Can be used to interact with RFID token; writing data and even updating + authentication keys of whole sectors. + + Theoretical documentation can be found at the BASEDOCU here: + https://www.nxp.com/docs/en/data-sheet/MFRC522.pdf + + This code basis on the implementation of 1AdityaX at + https://github.com/1AdityaX/mfrc522-python + """ + + # default clean register states refer to BASEDOCU page 36ff. + CLEAN_REGISTER_STATES = { + 0x2A: 0b10001101, # TModeReg + 0x2B: 0b00111110, # TPrescalerReg, lower 8 bits + 0x2D: 0b00011110, # TReloadReg, lower 8 bits + 0x2C: 0b00000000, # TReloadReg, higher 8 bits + 0x15: 0b01000000, # TxASKReg + 0x11: 0b00111101 # ModeReg + } + # Mask for adressing data bits when acting with registers, + # leftest and rightest bits are special handled + MASK_SPI_DATA = 0b01111110 + # Mask e.g. to add leading bit + MASK_LEADING_BIT = 0b10000000 + # Consider err (l2r): Protocol, Parity, x , collision, + # buffer overflow, x , x, x + MASK_ERROR = 0b00011011 + + STATUS_ERROR = 0x02 + STATUS_NO_TOKEN = 0x01 + STATUS_OK = 0x00 + + REG_TX_CONTROL = 0x14 + REG_COMMAND = 0x01 + REG_COM_I_EN = 0x02 + REG_COM_IRQ = 0x04 + REG_FIFO_LEVEL = 0x0A + REG_FIFO_DATA = 0x09 + REG_BIT_FRAMING = 0x0D + REG_ERROR = 0x06 + REG_CONTROL = 0x0C + REG_DIV_IRQ = 0x05 + REG_CRC_RESULT_LEFT = 0x22 + REG_CRC_RESULT_MID = 0x21 + REG_STATUS = 0x08 + + COMMAND_CLEAN = 0b1111 + COMMAND_IDLE = 0b0000 + COMMAND_AUTHENTICATE = 0b1110 + COMMAND_TRANSCEIV = 0b1100 + COMMAND_CALC_CRC = 0b0011 + COMMAND_SELECT_TOKEN = 0x93 + COMMAND_ANTICOLLISION = COMMAND_SELECT_TOKEN + COMMAND_READ = 0x30 + COMMAND_WRITE = 0xA0 + + REQUEST_IDLE = 0x26 + REQUEST_ALL = 0x52 + + LEN_MAX = 16 # maximum 16 bytes per block in a sector + + AUTH_MODE_A = 0x60 + AUTH_MODE_B = 0x61 + + def id_to_string(serial_number): + """Converts 5-byte serial-number digit list into + human readable string. + + Keyword arguments: + serial_number -- list of 5 digits, digit < 256 + """ + return int.from_bytes(bytes(serial_number[:-1]), "little", False) + + def __init__( + self, sck, mosi, miso, rst, cs, + baudrate=1000000, spi_dev=0, debug_level=logging.WARNING): + """Constructor to initialise the RC522 card reader for Raspberry Pico. + + Keyword arguments: + sck -- Serial Clock pin + mosi -- MasterOutSlaveIn pin + miso -- MasterInSlaveOut pin + rst -- Reset pin + cs -- Chip select pin + baudrate -- Transmission rate (default 1000000) + spi_dev -- SerialPortInterface device (default 0) + debug_level -- Logger level for debuggin (default loggin.WARNING) + """ + self._logger = logging.getLogger("Umfrc522Logger") + self._logger.setLevel(debug_level) + + self._baudrate = baudrate + self._sck = Pin(sck, Pin.OUT) # serial clock + self._mosi = Pin(mosi, Pin.OUT) # master out, slave in + self._miso = Pin(miso) # master in, slave out + self._rst = Pin(rst, Pin.OUT, value=0) + self._cs = Pin(cs, Pin.OUT, value=1) # chip select + self._spi = SPI( + spi_dev, baudrate=self._baudrate, polarity=0, phase=0, + sck=self._sck, mosi=self._mosi, miso=self._miso + ) + self._rst.on() + self.clear_state() + + def _register_write(self, addr, data): + """Internal write function, directly writes a byte to a given address. + + Keyword arguments: + addr -- The register where data should be written into. + data -- The data to be written, a single byte. + """ + # Follows BASEDOCU logic on Chapter 8.1.2.3 + # leftest bit (#7) of data byte defines action, '1' is read, '0' write + # rightest bit (#0) is always 0. + self._cs.off() + addr_byte = bytes([(addr << 1) & self.MASK_SPI_DATA]) + data_byte = bytes([data & 0b11111111]) + self._spi.write(addr_byte) + self._spi.write(data_byte) + self._cs.on() + + def _register_read(self, addr): + """Internal read function, directly reads a byte from a given address. + + Keyword arguments: + addr -- The register where data should be read from. + + Returns: + single byte of given address. + """ + # Follows BASEDOCU logic on Chapter 8.1.2.3 + # leftest bit (#7) of data byte defines action, '1' is read, '0' write + # rightest bit (#0) is always 0. + self._cs.off() + # shift bits to left by 1 position for trailing 0 and ensure length + # with mask + attach leading '1' for reading access. + addr_byte = bytes( + [(addr << 1) & self.MASK_SPI_DATA | self.MASK_LEADING_BIT] + ) + self._spi.write(addr_byte) + data = self._spi.read(1) + self._cs.on() + return data[0] + + def _bitmask_set(self, reg, mask): + """Internal mask function to apply a bitmask to a certain + register of the RC522 reader. + + Keyword arguments: + reg -- Register, to which the mask should be applied. + mask -- The bit mask (8 bits max) to be applied. + """ + masked_byte = self._register_read(reg) | mask + self._register_write(reg, masked_byte) + + def _bitmask_clear(self, reg, mask): + """Internal mask function to un-apply a bitmask to a certain + register of the RC522 reader. + + Keyword arguments: + reg -- Register, to which the mask should be un-applied from. + mask -- The bit mask (8 bits max) to be un-applied. + """ + unmasked_byte = self._register_read(reg) & (~mask) + self._register_write(reg, unmasked_byte) + + def _antenna_on(self): + """Internal function to enable the antenna of the RC522 reader module. + """ + mask = 0b00000011 + data = self._register_read(self.REG_TX_CONTROL) + if (data & mask) != mask: + self._bitmask_set(self.REG_TX_CONTROL, mask=mask) + + def _antenna_off(self): + """Internal function to disable the antenna of the RC522 reader module. + """ + self._bitmask_clear(self.REG_TX_CONTROL, 0b00000011) + + def _to_card(self, command, data): + """Internal function to send commands to the reader and the connected + RFID card/token. + + Keyword arguments: + command -- Byte, command to be executed, depends on a + previously activated register. + data -- Byte, that should be send to the card with the command. + + Returns: + status, data, bits -- Response from the card with a status, some data + and some response bits. + """ + recv_data = [] + recv_data_leng = 0 + recv_bytes = 0 + status = self.STATUS_ERROR + interrupt_request = 0x0 + wait_interrupt_request = 0x0 + last_bits = None + + if command == self.COMMAND_AUTHENTICATE: + interrupt_request = 0b00010010 + wait_interrupt_request = 0b00010000 + elif command == self.COMMAND_TRANSCEIV: + interrupt_request = 0b01110111 + wait_interrupt_request = 0b00110000 + + # Interrupt enabling and reset FIFO buffer of data + self._register_write( + self.REG_COM_I_EN, + interrupt_request | self.MASK_LEADING_BIT + ) + self._bitmask_clear(self.REG_COM_IRQ, self.MASK_LEADING_BIT) + self._bitmask_set(self.REG_FIFO_LEVEL, self.MASK_LEADING_BIT) + + # Set into IDLE mode + self._register_write(self.REG_COMMAND, self.COMMAND_IDLE) + + # Put data into FIFO buffer + for byte in data: + self._register_write(self.REG_FIFO_DATA, byte) + + # Execute Command on data + self._register_write(self.REG_COMMAND, command) + if command == self.COMMAND_TRANSCEIV: + self._bitmask_set(self.REG_BIT_FRAMING, self.MASK_LEADING_BIT) + + wait_ctr = 2000 + command_executed = False + while not command_executed: + recv_bytes = self._register_read(self.REG_COM_IRQ) + wait_ctr -= 1 + if not wait_ctr: + self._logger.error("Could not execute command. Timeout.") + break + if (recv_bytes & 0x01) or (recv_bytes & wait_interrupt_request): + command_executed = True + + if command == self.COMMAND_TRANSCEIV: + self._bitmask_clear(self.REG_BIT_FRAMING, self.MASK_LEADING_BIT) + + # Status check and adjustments + errors = self._register_read(self.REG_ERROR) + if errors & self.MASK_ERROR: + # at least one bit considered is set to true -> we got an error; + # TODO: parse errors in future + status = self.STATUS_ERROR + else: + status = self.STATUS_OK + if recv_bytes & interrupt_request & 0x01: + status = self.STATUS_NO_TOKEN + if command == self.COMMAND_TRANSCEIV: + recv_bytes = self._register_read(self.REG_FIFO_LEVEL) + last_bits = self._register_read(self.REG_CONTROL) & 0b00000111 + recv_data_leng = ( + (recv_bytes - 1) * 8 + last_bits + if last_bits + else + recv_bytes * 8 + ) + recv_bytes = min(max(recv_bytes, 1), self.LEN_MAX) + recv_data = [ + self._register_read(self.REG_FIFO_DATA) + for _ in range(recv_bytes) + ] + return (status, recv_data, recv_data_leng) + + def _get_crc(self, data): + """Internal function to calculate the CRC / checksum given + some input data that will be written to the reader. + + Keyword arguments: + data -- The data (list of bytes) to be used for CRC calculation. + + Returns: + bit-list -- List of the CRC bits, left and then right bits. + """ + # Clearing Interrupts and settion FIFO to maximum + # third bit was for collision detection in BASEDOCU table 34. + self._bitmask_clear(self.REG_DIV_IRQ, 0b00000100) + self._bitmask_set(self.REG_FIFO_LEVEL, self.MASK_LEADING_BIT) + + for datum in data: + self._register_write(self.REG_FIFO_DATA, datum) + + self._register_write(self.REG_COMMAND, self.COMMAND_CALC_CRC) + + wait_ctr = 255 + crc_completed = False + while not crc_completed: + recv_bytes = self._register_read(self.REG_DIV_IRQ) + wait_ctr -= 1 + if not wait_ctr: + self._logger.error("Could not calculate CRC. Timeout.") + break + if recv_bytes & 0b0100: + # Bit #2 (3. pos) shows data is processed, BASEDOCU Table 32. + crc_completed = True + return [ + self._register_read(self.REG_CRC_RESULT_LEFT), + self._register_read(self.REG_CRC_RESULT_MID) + ] + + def _anti_collision(self): + """Internal function to perform an anti-collision algorithm + on the reader and return one dedicated serial number of a close + RFID token + + Returns: + status, serial_number -- The status of algorithm success and if positiv + the serial number bits of a close token/card. + """ + serial_number_check = 0 + serial_numbers = [self.COMMAND_ANTICOLLISION, 0x20] + + self._register_write(self.REG_BIT_FRAMING, 0x00) + status, recv_data, _ = self._to_card( + self.COMMAND_TRANSCEIV, serial_numbers + ) + + if status == self.STATUS_OK and len(recv_data) == 5: + for i in range(4): + # calculating Checksum; should match 5ths byte + serial_number_check = serial_number_check ^ recv_data[i] + if serial_number_check != recv_data[4]: + status = self.STATUS_ERROR + else: + status = self.STATUS_ERROR + return status, recv_data + + def _select_token(self, serial_number): + """Internal function to select and estabilish the connection/ + communication to a token close to the RFID reader. + + Keyword arguments: + serial_number -- list of bytes corresponding to the serial ID / UID + of a nearby RFID token/card. + + Returns: + byte -- first byte of the command response or 0 in fail. + """ + data = [self.COMMAND_SELECT_TOKEN, 0x70] + data.extend(serial_number) + data.extend(self._get_crc(data)) + + status, recv_data, recv_data_leng = self._to_card( + self.COMMAND_TRANSCEIV, data + ) + if status == self.STATUS_OK and (recv_data_leng == 24): + return recv_data[0] + else: + return 0 + + def _write_block(self, block_id, data): + """Internal function to write to a dedicated block on the card. + Requires authentication and selection of a card beforehand. + There is NO limitation on blocks; sector 0 and trailing blocks in + each sector can be written. + + Keyword arguments: + block_id -- the global ID of a block to be written (0-63). + data -- the list of 16 bytes to be written into the block. + """ + data_buffer = [self.COMMAND_WRITE, block_id] + data_buffer.extend(self._get_crc(data_buffer)) + status, recv_data, recv_data_leng = self._to_card( + self.COMMAND_TRANSCEIV, data_buffer + ) + + if ( + status != self.STATUS_OK or + recv_data_leng != 4 or + (recv_data[0] & 0x0F) != 0x0A + ): + self._logger.error( + f"Could not init write-data to block {block_id}. Abort." + ) + return + + data_buffer = list(data) + data_buffer.extend(self._get_crc(data_buffer)) + status, recv_data, recv_data_leng = self._to_card( + self.COMMAND_TRANSCEIV, data_buffer + ) + + if ( + status != self.STATUS_OK or + recv_data_leng != 4 or + (recv_data[0] & 0x0F) != 0x0A + ): + self._logger.error( + f"Could not write data to block {block_id}. Abort." + ) + + def _read_block(self, block_id): + """Internal function to read of a dedicated block on the card. + Requires authentication and selection of a card beforehand. + There is no limitation on blocks; sector 0 and trailing blocks in + each sector can be read. + + Keyword arguments: + block_id -- the global ID of a block to be written (0-63). + + Returns: + byte-list -- list of 16 bytes read from the given block. + """ + data = [self.COMMAND_READ, block_id] + data.extend(self._get_crc(data)) + + status, recv_data, _ = self._to_card(self.COMMAND_TRANSCEIV, data) + if status != self.STATUS_OK: + self._logger.error( + f"Could not read data from block {block_id}. Abort." + ) + return None + if len(recv_data) != self.LEN_MAX: + self._logger.error( + f"Incomplete data ({len(recv_data)} / {self.LEN_MAX} bytes) " + f"from block {block_id}. Abort." + ) + return None + return recv_data + + def reset(self): + """Function to reset the MFRC522 RFID reader.""" + self._register_write(self.REG_COMMAND, self.COMMAND_CLEAN) + + def clear_state(self): + """Function to initialise the MFRC522 RFID reader into + a clear state; ready to authenticate with close card/token. + """ + self.reset() + for addr, data in self.CLEAN_REGISTER_STATES.items(): + self._register_write(addr, data) + self._antenna_on() + + def request(self, mode): + """Funtion to send a request command for interacting with a near by + RFID card/token. + + Keyword arguments: + mode -- The authentication mode, either UMFRC522.AUTH_MODE_A or .._B. + + Returns: + status, bit number -- The status and the bits received from the token. + """ + self._register_write(self.REG_BIT_FRAMING, 0b0111) + status, _, recv_data_leng = self._to_card( + self.COMMAND_TRANSCEIV, [mode] + ) + if (status != self.STATUS_OK) or (recv_data_leng != self.LEN_MAX): + status = self.STATUS_ERROR + return status, recv_data_leng + + def select_near_token(self): + """Function to select and connect to the nearest RFID token. + Performs internally the anticollision algorithm and establishes + a connection to the card/token. + + Returns: + status, serial_number -- Status of the connection and the list of + bytes representing the serial number of the connected token/card. + """ + status, serial_number = self._anti_collision() + if status != self.STATUS_OK: + return status, [] + select_status = self._select_token(serial_number) + if select_status == 0: + return self.STATUS_ERROR, [] + return status, serial_number + + def authenticate(self, serial_number, block, sector_key, auth_mode): + """Function to authenticate to a given login block and work with data. + The block has always to be a trailing block of a sector, e.g. + for sector 3, trailing block is 3*4 +3 => block with ID 15. + + Keyword arguments: + serial_number -- List of bytes corresponding to the serial ID / UID + of a nearby RFID token/card. + block -- The global id (0-63) of a block to log into the corresponding + sector, e.g. for Sector 3, login block ID is 3*4 +3 = 15. + sector_key -- The authentication key for the block to log in. + auth_mode -- The auth mode, either UMFRC522.AUTH_MODE_A or .._B. + Decides on the password bits. _A may not be the same as _B. + + Returns: + status -- Status of success of authenticating to a sector/block. + """ + if block % 4 != 3: + recommendation = (block // 4) + 3 + self._logger.error( + "Authentication can only be done to a " + "trailing block of a sector. " + f"Login block for '{block}' " + f"is block with ID '{recommendation}'. " + "Abort." + ) + data = [auth_mode, block] + data.extend(sector_key) + data.extend(serial_number[:-1]) + status, _, _ = self._to_card(self.COMMAND_AUTHENTICATE, data) + + if status != self.STATUS_OK: + self._logger.error( + "Cannot authenticate to token with ID " + f"'{UMFRC522.id_to_string(serial_number)}'" + ) + return status + + def authenticate_cancel(self): + """Function to stop authentication on the card. + Is always needed before when switching to a differen card. + """ + self._bitmask_clear(self.REG_STATUS, self.REG_STATUS) + + def read(self, block_id): + """Function to read data of a block_id from the RFID token/card. + Requires authentication and selection of a token in beforehand. + + Keyword arguments: + block_id -- the global ID of a block to be read from (0-63). + + Returns: + bytes -- list of 16 bytes, data from the given block_id on the card. + """ + return self._read_block(block_id) + + def write(self, block_id, data): + """Function to write data to a block_id on the RFID token/card. + Writes in safe-mode, trailing blocks of sectors or the first sector + cannot be written to. + Requires authentication and selection of a token in beforehand. + + block_id -- the global ID of a block to be written to (0-63). + data -- the list of 16 bytes to be written into the block. + """ + sector = block_id // 4 + sector_block = block_id % 4 + if sector == 0: + # first block contains munfacturer info, do not overwrite! + self._logger.error( + "Unsafe write into manufacturer sector '0'. Abort." + ) + return + if sector_block == 3: + self._logger.error( + # trailing block in each sector contains password information, + # do not overwrite! + "Unsafe write into trailing credential block " + f"'{block_id}' of sector '{sector}'. Abort." + ) + return + return self._write_block(block_id, data) + + def update_sector_auth_key(self, sector, new_sector_key, auth_mode): + """Function to update the authentication key with a given auth mode + on a certain sector (0-15). Do NOT mixup sector with blocks, there are + 16 sectors on the card, each sector having 4 blocks; thus in total 64 + blocks on the card. We make use here of the SECTORS! + Requires authentication and selection of a token in beforehand. + + Keyword arguments: + sector -- The sector id (0-15) where the key change should take action. + new_sector_key -- list of 6 bytes, containing the new auth key. + auth_mode -- The auth mode, either UMFRC522.AUTH_MODE_A or .._B. + Decides on the password bits. _A may not be the same as _B. + """ + if auth_mode not in (self.AUTH_MODE_A, self.AUTH_MODE_B): + self._logger.error( + "Update key requires valid auto mode, either " + "UMFRC522.AUTH_MODE_A or UMFRC522.AUTH_MODE_B. Abort." + ) + return self.STATUS_ERROR + if ( + len(new_sector_key) != 6 or + any(not isinstance(c, int) or c > 255 for c in new_sector_key) + ): + self._logger.error( + "New sector key needs to be list of exact 6 bytes. Abort." + ) + return self.STATUS_ERROR + + credential_block = sector*4 + 3 + self._logger.info( + f"Update credential effects block with ID '{credential_block}'" + ) + + data = self._read_block(credential_block) + if auth_mode == self.AUTH_MODE_A: + new_data = new_sector_key + data[6:] + else: + new_data = data[:-6] + new_sector_key + self._write_block(credential_block, new_data) + + +if __name__ == "__main__": + import utime + try: + reader = UMFRC522(spi_dev=0, sck=6, miso=4, mosi=7, cs=5, rst=22) + while True: + reader.clear_state() + status, token = reader.request(reader.REQUEST_IDLE) + if status == reader.STATUS_OK: + status, serial_number = reader.select_near_token() + if status == reader.STATUS_OK: + card_id = UMFRC522.id_to_string(serial_number) + print(f"CARD ID: {card_id}\n") + default_key = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF] + for sector in range(16): + if sector == 0: + print( + f"__Sector__ ____{'____ ____'.join( + 'block__'+str(i) for i in range(4) + )}____" + ) + login_block = sector*4 + 3 + status = reader.authenticate( + serial_number, login_block, + default_key, reader.AUTH_MODE_A + ) + if status != reader.STATUS_OK: + continue + try: + sector_text = " ".join( + "".join( + chr(c) if c > 32 and c < 127 else '.' + for c in reader.read( + sector*4 + sector_block + ) + ) + for sector_block in range(4) + ) + print(f"Sector {sector: 02d}: {sector_text}") + except TypeError: + break # When removing card, just skip + else: + print(f"No Card\r", end="") + utime.sleep_ms(500) + except KeyboardInterrupt: + print(f"\r{" "*80}\r") # Clean the latest line