Initial setup of MFRC522 for Pico.
This commit is contained in:
parent
7247f06bbf
commit
e7efd6278e
326
src/radiotech/rfid/umfrc522.py
Normal file
326
src/radiotech/rfid/umfrc522.py
Normal file
@ -0,0 +1,326 @@
|
|||||||
|
import logging
|
||||||
|
from machine import Pin, SPI
|
||||||
|
# Basis on 1AdityaX at https://github.com/1AdityaX/mfrc522-python
|
||||||
|
# BASEDOCU is found at: https://www.nxp.com/docs/en/data-sheet/MFRC522.pdf
|
||||||
|
|
||||||
|
class UMFRC522:
|
||||||
|
#default clean register states refer to BASEDOCU page 36ff.
|
||||||
|
CLEAN_REGISTER_STATES = {
|
||||||
|
0x2A: 0b10001101, #TModeReg, internal timer: timer automatic + non-gated + timer decrements to 0 + timer values 4 bits (higher TPrescaler value -> TMode)
|
||||||
|
0x2B: 0b00111110, #TPrescalerReg, lower 8 bits
|
||||||
|
0x2D: 0b00011110, #TReloadReg, lower 8 bits
|
||||||
|
0x2C: 0b00000000, #TReloadReg, higher 8 bits
|
||||||
|
0x15: 0b01000000, #TxASKReg
|
||||||
|
0x11: 0b00111101 #ModeReg
|
||||||
|
}
|
||||||
|
MASK_SPI_DATA = 0b01111110 # Mask for adressing data bits when acting with registers, leftest and rightest bits are special handled
|
||||||
|
MASK_LEADING_BIT = 0b10000000 #Mask e.g. to add leading bit
|
||||||
|
MASK_ERROR = 0b00011011 # Consider err (l2r): Protocol, Parity, x , collision, buffer overflow, x , x, x
|
||||||
|
|
||||||
|
STATUS_ERROR = 0x02
|
||||||
|
STATUS_NO_TOKEN = 0x01
|
||||||
|
STATUS_OK = 0x00
|
||||||
|
|
||||||
|
REG_TX_CONTROL = 0x14
|
||||||
|
REG_COMMAND = 0x01
|
||||||
|
REG_COM_I_EN = 0x02
|
||||||
|
REG_COM_IRQ = 0x04
|
||||||
|
REG_FIFO_LEVEL = 0x0A
|
||||||
|
REG_FIFO_DATA = 0x09
|
||||||
|
REG_BIT_FRAMING = 0x0D
|
||||||
|
REG_ERROR = 0x06
|
||||||
|
REG_CONTROL = 0x0C
|
||||||
|
REG_DIV_IRQ = 0x05
|
||||||
|
REG_CRC_RESULT_LEFT = 0x22
|
||||||
|
REG_CRC_RESULT_MID = 0x21
|
||||||
|
REG_STATUS = 0x08
|
||||||
|
|
||||||
|
COMMAND_CLEAN = 0b1111
|
||||||
|
COMMAND_IDLE = 0b0000
|
||||||
|
COMMAND_AUTHENTICATE = 0b1110
|
||||||
|
COMMAND_TRANSCEIV = 0b1100
|
||||||
|
COMMAND_CALC_CRC = 0b0011
|
||||||
|
COMMAND_SELECT_TOKEN = 0x93
|
||||||
|
COMMAND_ANTICOLLISION = COMMAND_SELECT_TOKEN
|
||||||
|
COMMAND_READ = 0x30
|
||||||
|
COMMAND_WRITE = 0xA0
|
||||||
|
|
||||||
|
REQUEST_IDLE = 0x26
|
||||||
|
REQUEST_ALL = 0x52
|
||||||
|
|
||||||
|
LEN_MAX = 16 # maximum 16 bytes per block in a sector
|
||||||
|
|
||||||
|
AUTH_MODE_A = 0x60
|
||||||
|
AUTH_MODE_B = 0x61
|
||||||
|
|
||||||
|
def id_to_string(serial_number):
|
||||||
|
return int.from_bytes(bytes(serial_number[:-1]),"little",False)
|
||||||
|
|
||||||
|
def __init__(self, sck, mosi, miso, rst, cs, baudrate=1000000,spi_dev=0, debug_level=logging.WARNING):
|
||||||
|
self._logger = logging.getLogger("Umfrc522Logger")
|
||||||
|
self._logger.setLevel(debug_level)
|
||||||
|
|
||||||
|
self._baudrate = baudrate
|
||||||
|
self._sck = Pin(sck, Pin.OUT) #serial clock
|
||||||
|
self._mosi = Pin(mosi, Pin.OUT) #master out, slave in
|
||||||
|
self._miso = Pin(miso) #master in, slave out
|
||||||
|
self._rst = Pin(rst, Pin.OUT, value = 0)
|
||||||
|
self._cs = Pin(cs, Pin.OUT, value = 1) #chip select
|
||||||
|
self._spi = SPI(
|
||||||
|
spi_dev, baudrate=self._baudrate, polarity=0, phase=0, sck=self._sck,
|
||||||
|
mosi=self._mosi, miso=self._miso
|
||||||
|
)
|
||||||
|
self._rst.on()
|
||||||
|
self.clear_state()
|
||||||
|
|
||||||
|
def _register_write(self, addr, data):
|
||||||
|
# Follows BASEDOCU logic on Chapter 8.1.2.3
|
||||||
|
# leftest bit (#7) of data byte defines action, '1' is read, '0' write
|
||||||
|
# rightest bit (#0) is always 0.
|
||||||
|
self._cs.off()
|
||||||
|
addr_byte = bytes([(addr << 1) & self.MASK_SPI_DATA])
|
||||||
|
data_byte = bytes([data & 0b11111111])
|
||||||
|
self._spi.write(addr_byte)
|
||||||
|
self._spi.write(data_byte)
|
||||||
|
self._cs.on()
|
||||||
|
def _register_read(self, addr):
|
||||||
|
# Follows BASEDOCU logic on Chapter 8.1.2.3
|
||||||
|
# leftest bit (#7) of data byte defines action, '1' is read, '0' write
|
||||||
|
# rightest bit (#0) is always 0.
|
||||||
|
self._cs.off()
|
||||||
|
addr_byte = bytes([(addr << 1) & self.MASK_SPI_DATA | self.MASK_LEADING_BIT]) # shift bits to left by 1 position for trailing 0 and ensure length with mask + attach leading '1' for reading access.
|
||||||
|
self._spi.write(addr_byte)
|
||||||
|
data = self._spi.read(1)
|
||||||
|
self._cs.on()
|
||||||
|
return data[0]
|
||||||
|
|
||||||
|
def _bitmask_set(self, reg, mask):
|
||||||
|
masked_byte = self._register_read(reg) | mask
|
||||||
|
self._register_write(reg, masked_byte)
|
||||||
|
def _bitmask_clear(self, reg, mask):
|
||||||
|
unmasked_byte = self._register_read(reg) & (~mask)
|
||||||
|
self._register_write(reg, unmasked_byte)
|
||||||
|
|
||||||
|
def _antenna_on(self):
|
||||||
|
mask = 0b00000011
|
||||||
|
data = self._register_read(self.REG_TX_CONTROL)
|
||||||
|
if (data & mask) != mask:
|
||||||
|
self._bitmask_set(self.REG_TX_CONTROL, mask=mask)
|
||||||
|
def _antenna_off(self):
|
||||||
|
self._bitmask_clear(self.REG_TX_CONTROL, 0b00000011)
|
||||||
|
|
||||||
|
def _to_card(self, command, data):
|
||||||
|
recv_data = []
|
||||||
|
recv_data_leng = 0
|
||||||
|
recv_bytes = 0
|
||||||
|
status = self.STATUS_ERROR
|
||||||
|
interrupt_request = 0x0
|
||||||
|
wait_interrupt_request = 0x0
|
||||||
|
last_bits = None
|
||||||
|
|
||||||
|
if command == self.COMMAND_AUTHENTICATE:
|
||||||
|
interrupt_request = 0b00010010
|
||||||
|
wait_interrupt_request = 0b00010000
|
||||||
|
elif command == self.COMMAND_TRANSCEIV:
|
||||||
|
interrupt_request = 0b01110111
|
||||||
|
wait_interrupt_request = 0b00110000
|
||||||
|
|
||||||
|
# Interrupt enabling and reset FIFO buffer of data
|
||||||
|
self._register_write(self.REG_COM_I_EN, interrupt_request | self.MASK_LEADING_BIT)
|
||||||
|
self._bitmask_clear(self.REG_COM_IRQ, self.MASK_LEADING_BIT)
|
||||||
|
self._bitmask_set(self.REG_FIFO_LEVEL, self.MASK_LEADING_BIT)
|
||||||
|
|
||||||
|
# Set into IDLE mode
|
||||||
|
self._register_write(self.REG_COMMAND, self.COMMAND_IDLE)
|
||||||
|
|
||||||
|
# Put data into FIFO buffer
|
||||||
|
for byte in data:
|
||||||
|
self._register_write(self.REG_FIFO_DATA, byte)
|
||||||
|
|
||||||
|
# Execute Command on data
|
||||||
|
self._register_write(self.REG_COMMAND, command)
|
||||||
|
if command == self.COMMAND_TRANSCEIV:
|
||||||
|
self._bitmask_set(self.REG_BIT_FRAMING, self.MASK_LEADING_BIT)
|
||||||
|
|
||||||
|
wait_ctr = 2000
|
||||||
|
command_executed = False
|
||||||
|
while not command_executed:
|
||||||
|
recv_bytes = self._register_read(self.REG_COM_IRQ)
|
||||||
|
wait_ctr -= 1
|
||||||
|
if not wait_ctr:
|
||||||
|
self._logger.error("Could not execute command. Timeout.")
|
||||||
|
break
|
||||||
|
if (recv_bytes & 0x01) or (recv_bytes & wait_interrupt_request):
|
||||||
|
command_executed = True
|
||||||
|
|
||||||
|
if command == self.COMMAND_TRANSCEIV:
|
||||||
|
self._bitmask_clear(self.REG_BIT_FRAMING, self.MASK_LEADING_BIT)
|
||||||
|
|
||||||
|
# Status check and adjustments
|
||||||
|
errors = self._register_read(self.REG_ERROR)
|
||||||
|
if errors & self.MASK_ERROR:
|
||||||
|
# at least one bit considered is set to true -> we got an error;
|
||||||
|
# TODO: parse errors in future
|
||||||
|
status = self.STATUS_ERROR
|
||||||
|
else:
|
||||||
|
status = self.STATUS_OK
|
||||||
|
if recv_bytes & interrupt_request & 0x01:
|
||||||
|
status = self.STATUS_NO_TOKEN
|
||||||
|
if command == self.COMMAND_TRANSCEIV:
|
||||||
|
recv_bytes = self._register_read(self.REG_FIFO_LEVEL)
|
||||||
|
last_bits = self._register_read(self.REG_CONTROL) & 0b00000111
|
||||||
|
recv_data_leng = (recv_bytes - 1) * 8 + last_bits if last_bits else recv_bytes * 8
|
||||||
|
recv_bytes = min(max(recv_bytes, 1), self.LEN_MAX)
|
||||||
|
recv_data = [self._register_read(self.REG_FIFO_DATA) for _ in range(recv_bytes)]
|
||||||
|
return (status, recv_data, recv_data_leng)
|
||||||
|
|
||||||
|
def _get_crc(self, data):
|
||||||
|
# Clearing Interrupts and settion FIFO to maximum
|
||||||
|
self._bitmask_clear(self.REG_DIV_IRQ, 0b00000100) # third bit was for collision detection in BASEDOCU table 34.
|
||||||
|
self._bitmask_set(self.REG_FIFO_LEVEL, self.MASK_LEADING_BIT)
|
||||||
|
|
||||||
|
for datum in data:
|
||||||
|
self._register_write(self.REG_FIFO_DATA, datum)
|
||||||
|
|
||||||
|
self._register_write(self.REG_COMMAND, self.COMMAND_CALC_CRC)
|
||||||
|
|
||||||
|
wait_ctr = 255
|
||||||
|
crc_completed = False
|
||||||
|
while not crc_completed:
|
||||||
|
recv_bytes = self._register_read(self.REG_DIV_IRQ)
|
||||||
|
wait_ctr -= 1
|
||||||
|
if not wait_ctr:
|
||||||
|
self._logger.error("Could not calculate CRC. Timeout.")
|
||||||
|
break
|
||||||
|
if recv_bytes & 0b0100: # Bit #2 (3. pos) shows data is processed, BASEDOCU Table 32.
|
||||||
|
crc_completed = True
|
||||||
|
return [self._register_read(self.REG_CRC_RESULT_LEFT), self._register_read(self.REG_CRC_RESULT_MID)]
|
||||||
|
|
||||||
|
def _anti_collision(self):
|
||||||
|
serial_number_check = 0
|
||||||
|
serial_numbers = [self.COMMAND_ANTICOLLISION, 0x20]
|
||||||
|
|
||||||
|
self._register_write(self.REG_BIT_FRAMING, 0x00)
|
||||||
|
status, recv_data, _ = self._to_card(self.COMMAND_TRANSCEIV, serial_numbers)
|
||||||
|
|
||||||
|
if status == self.STATUS_OK and len(recv_data) == 5:
|
||||||
|
for i in range(4):
|
||||||
|
#calculating Checksum; should match 5ths byte
|
||||||
|
serial_number_check = serial_number_check ^ recv_data[i]
|
||||||
|
if serial_number_check != recv_data[4]:
|
||||||
|
status = self.STATUS_ERROR
|
||||||
|
else:
|
||||||
|
status = self.STATUS_ERROR
|
||||||
|
return status, recv_data
|
||||||
|
|
||||||
|
def _select_token(self, serial_number):
|
||||||
|
data = [self.COMMAND_SELECT_TOKEN, 0x70]
|
||||||
|
data.extend(serial_number)
|
||||||
|
data.extend(self._get_crc(data))
|
||||||
|
|
||||||
|
status, recv_data, recv_data_leng = self._to_card(self.COMMAND_TRANSCEIV, data)
|
||||||
|
if status == self.STATUS_OK and (recv_data_leng == 24):
|
||||||
|
return recv_data[0]
|
||||||
|
else:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def _write_block(self, block_id, data):
|
||||||
|
data_buffer = [self.COMMAND_WRITE, block_id]
|
||||||
|
data_buffer.extend(self._get_crc(data_buffer))
|
||||||
|
status, recv_data, recv_data_leng = self._to_card(self.COMMAND_TRANSCEIV, data_buffer)
|
||||||
|
|
||||||
|
if status != self.STATUS_OK or recv_data_leng != 4 or (recv_data[0] & 0x0F) != 0x0A:
|
||||||
|
self._logger.error(f"Could not init write-data to block {block_id}. Abort.")
|
||||||
|
return
|
||||||
|
|
||||||
|
data_buffer = list(data)
|
||||||
|
data_buffer.extend(self._get_crc(data_buffer))
|
||||||
|
status, recv_data, recv_data_leng = self._to_card(self.COMMAND_TRANSCEIV, data_buffer)
|
||||||
|
|
||||||
|
if status != self.STATUS_OK or recv_data_leng != 4 or (recv_data[0] & 0x0F) != 0x0A:
|
||||||
|
self._logger.error(f"Could not write data to block {block_id}. Abort.")
|
||||||
|
def _read_block(self, block_id):
|
||||||
|
data = [self.COMMAND_READ, block_id]
|
||||||
|
data.extend(self._get_crc(data))
|
||||||
|
|
||||||
|
status, recv_data, _ = self._to_card(self.COMMAND_TRANSCEIV, data)
|
||||||
|
if status != self.STATUS_OK:
|
||||||
|
self._logger.error(f"Could not read data from block {block_id}. Abort.")
|
||||||
|
return None
|
||||||
|
if len(recv_data) != self.LEN_MAX:
|
||||||
|
self._logger.error(f"Incomplete data ({len(recv_data)} / {self.LEN_MAX} bytes) from block {block_id}. Abort.")
|
||||||
|
return None
|
||||||
|
return recv_data
|
||||||
|
|
||||||
|
def read(self, block_id):
|
||||||
|
return self._read_block(block_id)
|
||||||
|
def write(self, block_id, data):
|
||||||
|
sector = block_id // 4
|
||||||
|
sector_block = block_id % 4
|
||||||
|
if sector == 0:
|
||||||
|
# first block contains munfacturer info, do not overwrite!
|
||||||
|
self._logger.error("Unsafe write into manufacturer sector '0'. Abort.")
|
||||||
|
return
|
||||||
|
if sector_block == 3:
|
||||||
|
# trailing block in each sector contains password information, do not overwrite!
|
||||||
|
self._logger.error(f"Unsafe write into trailing credential block '{block_id}' of sector '{sector}'. Abort.")
|
||||||
|
return
|
||||||
|
return self._write_block(block_id, data)
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
self._register_write(self.REG_COMMAND,self.COMMAND_CLEAN)
|
||||||
|
def clear_state(self):
|
||||||
|
self.reset()
|
||||||
|
for addr, data in self.CLEAN_REGISTER_STATES.items():
|
||||||
|
self._register_write(addr, data)
|
||||||
|
self._antenna_on()
|
||||||
|
|
||||||
|
def request(self, mode):
|
||||||
|
self._register_write(self.REG_BIT_FRAMING, 0b0111)
|
||||||
|
status, _, recv_data_leng = self._to_card(self.COMMAND_TRANSCEIV, [mode])
|
||||||
|
if (status != self.STATUS_OK) or (recv_data_leng != self.LEN_MAX):
|
||||||
|
status = self.STATUS_ERROR
|
||||||
|
return status, recv_data_leng
|
||||||
|
|
||||||
|
def select_near_token(self):
|
||||||
|
status, serial_number = self._anti_collision()
|
||||||
|
if status != self.STATUS_OK:
|
||||||
|
return status, []
|
||||||
|
select_status = self._select_token(serial_number)
|
||||||
|
if select_status == 0:
|
||||||
|
return self.STATUS_ERROR, []
|
||||||
|
return status, serial_number
|
||||||
|
|
||||||
|
def authenticate(self, serial_number, block, sector_key, auth_mode):
|
||||||
|
data = [auth_mode, block]
|
||||||
|
data.extend(sector_key)
|
||||||
|
data.extend(serial_number[:-1])
|
||||||
|
status, _, _ = self._to_card(self.COMMAND_AUTHENTICATE, data)
|
||||||
|
|
||||||
|
if status != self.STATUS_OK:
|
||||||
|
self._logger.error(f"Cannot authenticate to token with ID '{UMFRC522.id_to_string(serial_number)}'")
|
||||||
|
return status
|
||||||
|
def authenticate_cancel(self):
|
||||||
|
self._bitmask_clear(self.REG_STATUS, self.REG_STATUS)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import utime
|
||||||
|
try:
|
||||||
|
reader = UMFRC522(spi_dev=0,sck=6,miso=4,mosi=7,cs=5,rst=22)
|
||||||
|
while True:
|
||||||
|
reader.clear_state()
|
||||||
|
status, token = reader.request(reader.REQUEST_IDLE)
|
||||||
|
if status == reader.STATUS_OK:
|
||||||
|
status, serial_number = reader.select_near_token()
|
||||||
|
if status == reader.STATUS_OK:
|
||||||
|
card_id = UMFRC522.id_to_string(serial_number)
|
||||||
|
print(f"CARD ID: {card_id}")
|
||||||
|
else:
|
||||||
|
print(f"No Card\r", end="")
|
||||||
|
utime.sleep_ms(500)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
#Clean the latest line
|
||||||
|
print(f"\r{" "*80}\r")
|
Loading…
Reference in New Issue
Block a user