From 1b81b8d8fd2f2e377c0e9b1b41868832e53d81af Mon Sep 17 00:00:00 2001 From: lhahn Date: Tue, 22 Oct 2024 19:24:50 +0200 Subject: [PATCH] Example code from internet, will be redefined. Initial setup of MFRC522 for Pico. --- src/radiotech/rfid/mfrc522.py | 380 +++++++++++++++++++++++++++++++++ src/radiotech/rfid/umfrc522.py | 267 +++++++++++++++++++++++ 2 files changed, 647 insertions(+) create mode 100644 src/radiotech/rfid/mfrc522.py create mode 100644 src/radiotech/rfid/umfrc522.py diff --git a/src/radiotech/rfid/mfrc522.py b/src/radiotech/rfid/mfrc522.py new file mode 100644 index 0000000..5f06ca6 --- /dev/null +++ b/src/radiotech/rfid/mfrc522.py @@ -0,0 +1,380 @@ +from machine import Pin, SPI +from os import uname + + +class MFRC522: + + DEBUG = False + OK = 0 + NOTAGERR = 1 + ERR = 2 + + REQIDL = 0x26 + REQALL = 0x52 + AUTHENT1A = 0x60 + AUTHENT1B = 0x61 + + PICC_ANTICOLL1 = 0x93 + PICC_ANTICOLL2 = 0x95 + PICC_ANTICOLL3 = 0x97 + + + def __init__(self, sck, mosi, miso, rst, cs,baudrate=1000000,spi_id=0): + + self.sck = Pin(sck, Pin.OUT) + self.mosi = Pin(mosi, Pin.OUT) + self.miso = Pin(miso) + self.rst = Pin(rst, Pin.OUT) + self.cs = Pin(cs, Pin.OUT) + + self.rst.value(0) + self.cs.value(1) + + board = uname()[0] + + if board == 'WiPy' or board == 'LoPy' or board == 'FiPy': + self.spi = SPI(0) + self.spi.init(SPI.MASTER, baudrate=1000000, pins=(self.sck, self.mosi, self.miso)) + elif (board == 'esp8266') or (board == 'esp32'): + self.spi = SPI(baudrate=100000, polarity=0, phase=0, sck=self.sck, mosi=self.mosi, miso=self.miso) + self.spi.init() + elif board == 'rp2': + self.spi = SPI(spi_id,baudrate=baudrate,sck=self.sck, mosi= self.mosi, miso= self.miso) + else: + raise RuntimeError("Unsupported platform") + + self.rst.value(1) + self.init() + + def _wreg(self, reg, val): + + self.cs.value(0) + self.spi.write(b'%c' % int(0xff & ((reg << 1) & 0x7e))) + self.spi.write(b'%c' % int(0xff & val)) + self.cs.value(1) + + def _rreg(self, reg): + + self.cs.value(0) + self.spi.write(b'%c' % int(0xff & (((reg << 1) & 0x7e) | 0x80))) + val = self.spi.read(1) + self.cs.value(1) + + return val[0] + + def _sflags(self, reg, mask): + self._wreg(reg, self._rreg(reg) | mask) + + def _cflags(self, reg, mask): + self._wreg(reg, self._rreg(reg) & (~mask)) + + def _tocard(self, cmd, send): + + recv = [] + bits = irq_en = wait_irq = n = 0 + stat = self.ERR + + if cmd == 0x0E: + irq_en = 0x12 + wait_irq = 0x10 + elif cmd == 0x0C: + irq_en = 0x77 + wait_irq = 0x30 + + self._wreg(0x02, irq_en | 0x80) + self._cflags(0x04, 0x80) + self._sflags(0x0A, 0x80) + self._wreg(0x01, 0x00) + + for c in send: + self._wreg(0x09, c) + self._wreg(0x01, cmd) + + if cmd == 0x0C: + self._sflags(0x0D, 0x80) + + i = 2000 + while True: + n = self._rreg(0x04) + i -= 1 + if ~((i != 0) and ~(n & 0x01) and ~(n & wait_irq)): + break + + self._cflags(0x0D, 0x80) + + if i: + if (self._rreg(0x06) & 0x1B) == 0x00: + stat = self.OK + + if n & irq_en & 0x01: + stat = self.NOTAGERR + elif cmd == 0x0C: + n = self._rreg(0x0A) + lbits = self._rreg(0x0C) & 0x07 + if lbits != 0: + bits = (n - 1) * 8 + lbits + else: + bits = n * 8 + + if n == 0: + n = 1 + elif n > 16: + n = 16 + + for _ in range(n): + recv.append(self._rreg(0x09)) + else: + stat = self.ERR + + return stat, recv, bits + + def _crc(self, data): + + self._cflags(0x05, 0x04) + self._sflags(0x0A, 0x80) + + for c in data: + self._wreg(0x09, c) + + self._wreg(0x01, 0x03) + + i = 0xFF + while True: + n = self._rreg(0x05) + i -= 1 + if not ((i != 0) and not (n & 0x04)): + break + + return [self._rreg(0x22), self._rreg(0x21)] + + def init(self): + + self.reset() + self._wreg(0x2A, 0x8D) + self._wreg(0x2B, 0x3E) + self._wreg(0x2D, 30) + self._wreg(0x2C, 0) + self._wreg(0x15, 0x40) + self._wreg(0x11, 0x3D) + self.antenna_on() + + def reset(self): + self._wreg(0x01, 0x0F) + + def antenna_on(self, on=True): + + if on and ~(self._rreg(0x14) & 0x03): + self._sflags(0x14, 0x03) + else: + self._cflags(0x14, 0x03) + + def request(self, mode): + + self._wreg(0x0D, 0x07) + (stat, recv, bits) = self._tocard(0x0C, [mode]) + + if (stat != self.OK) | (bits != 0x10): + stat = self.ERR + + return stat, bits + + def anticoll(self,anticolN): + + ser_chk = 0 + ser = [anticolN, 0x20] + + self._wreg(0x0D, 0x00) + (stat, recv, bits) = self._tocard(0x0C, ser) + + if stat == self.OK: + if len(recv) == 5: + for i in range(4): + ser_chk = ser_chk ^ recv[i] + if ser_chk != recv[4]: + stat = self.ERR + else: + stat = self.ERR + + return stat, recv + + + def PcdSelect(self, serNum,anticolN): + backData = [] + buf = [] + buf.append(anticolN) + buf.append(0x70) + #i = 0 + ###xorsum=0; + for i in serNum: + buf.append(i) + #while i<5: + # buf.append(serNum[i]) + # i = i + 1 + pOut = self._crc(buf) + buf.append(pOut[0]) + buf.append(pOut[1]) + (status, backData, backLen) = self._tocard( 0x0C, buf) + if (status == self.OK) and (backLen == 0x18): + return 1 + else: + return 0 + + + def SelectTag(self, uid): + byte5 = 0 + + #(status,puid)= self.anticoll(self.PICC_ANTICOLL1) + #print("uid",uid,"puid",puid) + for i in uid: + byte5 = byte5 ^ i + puid = uid + [byte5] + + if self.PcdSelect(puid,self.PICC_ANTICOLL1) == 0: + return (self.ERR,[]) + return (self.OK , uid) + + def tohexstring(self,v): + s="[" + for i in v: + if i != v[0]: + s = s+ ", " + s=s+ "0x{:02X}".format(i) + s= s+ "]" + return s + + + + + def SelectTagSN(self): + valid_uid=[] + (status,uid)= self.anticoll(self.PICC_ANTICOLL1) + #print("Select Tag 1:",self.tohexstring(uid)) + if status != self.OK: + return (self.ERR,[]) + + if self.DEBUG: print("anticol(1) {}".format(uid)) + if self.PcdSelect(uid,self.PICC_ANTICOLL1) == 0: + return (self.ERR,[]) + if self.DEBUG: print("pcdSelect(1) {}".format(uid)) + + #check if first byte is 0x88 + if uid[0] == 0x88 : + #ok we have another type of card + valid_uid.extend(uid[1:4]) + (status,uid)=self.anticoll(self.PICC_ANTICOLL2) + #print("Select Tag 2:",self.tohexstring(uid)) + if status != self.OK: + return (self.ERR,[]) + if self.DEBUG: print("Anticol(2) {}".format(uid)) + rtn = self.PcdSelect(uid,self.PICC_ANTICOLL2) + if self.DEBUG: print("pcdSelect(2) return={} uid={}".format(rtn,uid)) + if rtn == 0: + return (self.ERR,[]) + if self.DEBUG: print("PcdSelect2() {}".format(uid)) + #now check again if uid[0] is 0x88 + if uid[0] == 0x88 : + valid_uid.extend(uid[1:4]) + (status , uid) = self.anticoll(self.PICC_ANTICOLL3) + #print("Select Tag 3:",self.tohexstring(uid)) + if status != self.OK: + return (self.ERR,[]) + if self.DEBUG: print("Anticol(3) {}".format(uid)) + if self.MFRC522_PcdSelect(uid,self.PICC_ANTICOLL3) == 0: + return (self.ERR,[]) + if self.DEBUG: print("PcdSelect(3) {}".format(uid)) + valid_uid.extend(uid[0:5]) + # if we are here than the uid is ok + # let's remove the last BYTE whic is the XOR sum + + return (self.OK , valid_uid[:len(valid_uid)-1]) + #return (self.OK , valid_uid) + + + + + + + def auth(self, mode, addr, sect, ser): + return self._tocard(0x0E, [mode, addr] + sect + ser[:4])[0] + + def authKeys(self,uid,addr,keyA=None, keyB=None): + status = self.ERR + if keyA is not None: + status = self.auth(self.AUTHENT1A, addr, keyA, uid) + elif keyB is not None: + status = self.auth(self.AUTHENT1B, addr, keyB, uid) + return status + + + def stop_crypto1(self): + self._cflags(0x08, 0x08) + + def read(self, addr): + + data = [0x30, addr] + data += self._crc(data) + (stat, recv, _) = self._tocard(0x0C, data) + return stat, recv + + def write(self, addr, data): + + buf = [0xA0, addr] + buf += self._crc(buf) + (stat, recv, bits) = self._tocard(0x0C, buf) + + if not (stat == self.OK) or not (bits == 4) or not ((recv[0] & 0x0F) == 0x0A): + stat = self.ERR + else: + buf = [] + for i in range(16): + buf.append(data[i]) + buf += self._crc(buf) + (stat, recv, bits) = self._tocard(0x0C, buf) + if not (stat == self.OK) or not (bits == 4) or not ((recv[0] & 0x0F) == 0x0A): + stat = self.ERR + return stat + + + def writeSectorBlock(self,uid, sector, block, data, keyA=None, keyB = None): + absoluteBlock = sector * 4 + (block % 4) + if absoluteBlock > 63 : + return self.ERR + if len(data) != 16: + return self.ERR + if self.authKeys(uid,absoluteBlock,keyA,keyB) != self.ERR : + return self.write(absoluteBlock, data) + return self.ERR + + def readSectorBlock(self,uid ,sector, block, keyA=None, keyB = None): + absoluteBlock = sector * 4 + (block % 4) + if absoluteBlock > 63 : + return self.ERR, None + if self.authKeys(uid,absoluteBlock,keyA,keyB) != self.ERR : + return self.read(absoluteBlock) + return self.ERR, None + + def MFRC522_DumpClassic1K(self,uid, Start=0, End=64, keyA=None, keyB=None): + for absoluteBlock in range(Start,End): + status = self.authKeys(uid,absoluteBlock,keyA,keyB) + # Check if authenticated + print("{:02d} S{:02d} B{:1d}: ".format(absoluteBlock, absoluteBlock//4 , absoluteBlock % 4),end="") + if status == self.OK: + status, block = self.read(absoluteBlock) + if status == self.ERR: + break + else: + for value in block: + print("{:02X} ".format(value),end="") + print(" ",end="") + for value in block: + if (value > 0x20) and (value < 0x7f): + print(chr(value),end="") + else: + print('.',end="") + print("") + else: + break + if status == self.ERR: + print("Authentication error") + return self.ERR + return self.OK diff --git a/src/radiotech/rfid/umfrc522.py b/src/radiotech/rfid/umfrc522.py new file mode 100644 index 0000000..519d5c0 --- /dev/null +++ b/src/radiotech/rfid/umfrc522.py @@ -0,0 +1,267 @@ +from machine import Pin, SPI +# Basis on 1AdityaX at https://github.com/1AdityaX/mfrc522-python +# BASEDOCU is found at: https://www.nxp.com/docs/en/data-sheet/MFRC522.pdf + +class UMFRC522: + #default clean register states refer to BASEDOCU page 36ff. + CLEAN_REGISTER_STATES = { + 0x2A: 0b10001101, #TModeReg, internal timer: timer automatic + non-gated + timer decrements to 0 + timer values 4 bits (higher TPrescaler value -> TMode) + 0x2B: 0b00111110, #TPrescalerReg, lower 8 bits + 0x2D: 0b00011110, #TReloadReg, lower 8 bits + 0x2C: 0b00000000, #TReloadReg, higher 8 bits + 0x15: 0b00010101, #TxASKReg + 0x11: 0b00111101 #ModeReg + } + MASK_SPI_DATA = 0b01111110 # Mask for adressing data bits when acting with registers, leftest and rightest bits are special handled + MASK_LEADING_BIT = 0b10000000 #Mask e.g. to add leading bit + MASK_ERROR = 0b00011011 # Consider err (l2r): Protocol, Parity, x , collision, buffer overflow, x , x, x + + STATUS_ERROR = 0x02 + STATUS_NO_TOKEN = 0x01 + STATUS_OK = 0x00 + + REG_TX_CONTROL = 0x14 + REG_COMMAND = 0x01 + REG_COM_I_EN = 0x02 + REG_COM_IRQ = 0x04 + REG_FIFO_LEVEL = 0x0A + REG_FIFO_DATA = 0x09 + REG_BIT_FRAMING = 0x0D + REG_ERROR = 0x06 + REG_CONTROL = 0x0C + REG_DIV_IRQ = 0x05 + REG_CRC_RESULT_LEFT = 0x22 + REG_CRC_RESULT_MID = 0x21 + REG_STATUS = 0x08 + + COMMAND_IDLE = 0b0000 + COMMAND_AUTHENTICATE = 0b1110 + COMMAND_TRANSCEIV = 0b1100 + COMMAND_CALC_CRC = 0b0011 + COMMAND_SELECT_TOKEN = 0x93 + COMMAND_ANTICOLLISION = COMMAND_SELECT_TOKEN + COMMAND_READ = 0x30 + COMMAND_WRITE = 0xA0 + + LEN_MAX = 16 # maximum 16 bytes per block in a sector + + def __init__(self, sck, mosi, miso, rst, cs, baudrate=1000000,spi_dev=0): + self._baudrate = baudrate + self._sck = Pin(sck, Pin.OUT) #serial clock + self._mosi = Pin(mosi, Pin.OUT) #master out, slave in + self._miso = Pin(miso) #master in, slave out + self._spi = SPI( + spi_dev, baudrate=self._baudrate, polarity=0, phase=0, sck=self._sck, + mosi=self._mosi, miso=self._miso + ) + self._rst = Pin(rst, Pin.OUT) #reset + self._cs = Pin(cs, Pin.OUT, value=1) #chip select + + def reset(self): + clean_command_byte = 0b00001111 + self._register_write(self.REG_COMMAND,clean_command_byte) + + def _register_write(self, addr, data): + # Follows BASEDOCU logic on Chapter 8.1.2.3 + # leftest bit (#7) of data byte defines action, '1' is read, '0' write + # rightest bit (#0) is always 0. + self._cs.off() + addr_byte = (addr << 1) & self.MASK_SPI_DATA + data_byte = data & 0b11111111 + self._spi.write(f"{addr_byte:08b}") + self._spi.write(f"{data_byte:08b}") + self._cs.on() + def _register_read(self, addr): + # Follows BASEDOCU logic on Chapter 8.1.2.3 + # leftest bit (#7) of data byte defines action, '1' is read, '0' write + # rightest bit (#0) is always 0. + self._cs.off() + addr_byte = (addr << 1) & self.MASK_SPI_DATA | self.MASK_LEADING_BIT # shift bits to left by 1 position for trailing 0 and ensure length with mask + attach leading '1' for reading access. + self._spi.write(f"{addr_byte:08b}") + data = self._spi.read(1) + self._cs.on() + return data[0] + + def _bitmask_set(self, reg, mask): + masked_byte = self._register_read(reg) | mask + self._register_write(reg, masked_byte) + def _bitmask_clear(self, reg, mask): + unmasked_byte = self._register_read(reg) & (~mask) + self._register_write(reg, unmasked_byte) + + def _antenna_on(self): + mask = 0b00000011 + data = self._register_read(self.REG_TX_CONTROL) + if (data & mask) != mask: + self._bitmask_set(self.REG_TX_CONTROL, mask=mask) + def _antenna_off(self): + self._bitmask_clear(self.REG_TX_CONTROL, 0b00000011) + + def _to_card(self, command, data): + recv_data = [] + recv_data_leng = 0 + recv_bytes = 0 + status = self.STATUS_ERROR + interrupt_request = 0x0 + wait_interrupt_request = 0x0 + last_bits = None + + if command == self.COMMAND_AUTHENTICATE: + interrupt_request = 0b00010010 + wait_interrupt_request = 0b00010000 + elif command == self.COMMAND_TRANSCEIV: + interrupt_request = 0b01110111 + wait_interrupt_request = 0b00110000 + + # Interrupt enabling and reset FIFO buffer of data + self._register_write(self.REG_COM_I_EN, interrupt_request | self.MASK_LEADING_BIT) + self._bitmask_clear(self.REG_COM_IRQ, self.MASK_LEADING_BIT) + self._bitmask_set(self.REG_FIFO_LEVEL, self.MASK_LEADING_BIT) + + # Set into IDLE mode + self._register_write(self.REG_COMMAND, self.COMMAND_IDLE) + + # Put data into FIFO buffer + for byte in data: + self._register_write(self.REG_FIFO_DATA, byte) + + # Execute Command on data + self._register_write(self.REG_COMMAND, command) + if command == self.COMMAND_TRANSCEIV: + self._bitmask_set(self.REG_BIT_FRAMING, self.MASK_LEADING_BIT) + + wait_ctr = 2000 + command_executed = False + while not command_executed: + recv_bytes = self._register_read(self.REG_COM_IRQ) + wait_ctr -= 1 + if not wait_ctr: + raise ConnectionError("Could not execute command. Timeout.") + if (recv_bytes & 0x01) or (recv_bytes & wait_interrupt_request): + command_executed = True + + if command == self.COMMAND_TRANSCEIV: + self._bitmask_clear(self.REG_BIT_FRAMING, self.MASK_LEADING_BIT) + + # Status check and adjustments + errors = self._register_read(self.REG_ERROR) + if errors & self.MASK_ERROR: + # at least one bit considered is set to true -> we got an error; + # TODO: parse errors in future + status = self.STATUS_ERROR + else: + status = self.STATUS_OK + if recv_bytes & interrupt_request & 0x01: + status = self.STATUS_NO_TOKEN + if command == self.COMMAND_TRANSCEIV: + recv_bytes = self._register_read(self.REG_FIFO_LEVEL) + last_bits = self._register_read(self.REG_CONTROL) & 0b00000111 + recv_data_leng = (recv_bytes - 1) * 8 + last_bits if last_bits else recv_bytes * 8 + recv_bytes = min(max(recv_bytes, 1), self.LEN_MAX) + recv_data = [self._register_read(self.REG_FIFO_DATA) for _ in range(recv_bytes)] + return (status, recv_data, recv_data_leng) + + def _get_crc(self, data): + # Clearing Interrupts and settion FIFO to maximum + self._bitmask_clear(self.REG_DIV_IRQ, 0b00000100) # third bit was for collision detection in BASEDOCU table 34. + self._bitmask_set(self.REG_FIFO_LEVEL, self.MASK_LEADING_BIT) + + for datum in data: + self._register_write(self.REG_FIFO_DATA, datum) + + self._register_write(self.REG_COMMAND, self.COMMAND_CALC_CRC) + + wait_ctr = 255 + crc_completed = False + while not crc_completed: + recv_bytes = self._register_read(self.REG_DIV_IRQ) + wait_ctr -= 1 + if not wait_ctr: + raise ConnectionError("Could not calculate CRC. Timeout.") + if recv_bytes & 0b0100: # Bit #2 (3. pos) shows data is processed, BASEDOCU Table 32. + crc_completed = True + + return [self._register_read(self.REG_CRC_RESULT_LEFT), self._register_read(self.REG_CRC_RESULT_MID)] + + def _request(self, mode): + self._register_write(self.REG_BIT_FRAMING, 0b0111) + status, _, recv_data_leng = self._to_card(self.COMMAND_TRANSCEIV, [mode]) + if (status != self.STATUS_OK) or (recv_data_leng != self.LEN_MAX): + status = self.STATUS_ERROR + return status, recv_data_leng + + def _anti_collision(self): + serial_number_check = 0 + serial_numbers = [self.COMMAND_ANTICOLLISION, 0x20] + + status, recv_data, _ = self._to_card(self.COMMAND_TRANSCEIV, serial_numbers) + + if status != self.STATUS_OK: + raise ConnectionError("Could not execute anticollision command. Timeout.") + if len(recv_data) != 5: + raise ConnectionError(f"Expected 5 bytes in return, got only '{len(recv_data)}' from anti-collision command response.") + for i in range(4): + #calculating Checksum; should match 5ths byte + serial_number_check = serial_number_check ^ recv_data[i] + if serial_number_check != recv_data[4]: + raise ConnectionError(f"Checksum for anticollision command was calculated '{serial_number_check}' but should have been '{recv_data[4]}'") + return status, recv_data + + def _select_token(self, serial_number): + if len(serial_number) != 5: + raise ValueError("Expected exact 5 values for the serial number!") + data = [self.COMMAND_SELECT_TOKEN, 0x70] + data.extend(serial_number) + data.extend(self._get_crc(data)) + + status, recv_data, recv_data_leng = self._to_card(self.COMMAND_TRANSCEIV, data) + if status == self.STATUS_OK and (recv_data_leng == 24): + return recv_data[0] + else: + return 0 + + def authenticate(self, serial_number, sector_key, block, auth_mode): + data = [auth_mode, block] + data.extend(sector_key) + data.extend(serial_number) + + status, _, _ = self._to_card(self.COMMAND_AUTHENTICATE, data) + + if status != self.STATUS_OK: + raise ConnectionError(f"Cannot authenticate to token with ID '{''.join(serial_number)}'. Abort.") + if not (self._register_read(self.REG_STATUS) & self.REG_STATUS): + raise ConnectionError(f"Error during Authentication. Abort.") + return status + def authenticate_cancel(self): + self._bitmask_clear(self.REG_STATUS, self.REG_STATUS) + + def read_block(self, block): + data = [self.COMMAND_READ, block] + data.extend(self._get_crc(data)) + + status, recv_data, _ = self._to_card(self.COMMAND_TRANSCEIV, data) + if status != self.STATUS_OK: + raise ConnectionError(f"Could not read data from block {block}. Abort.") + if len(recv_data) != self.LEN_MAX: + return None + return recv_data + def write_block(self, block, data): + data_buffer = [self.COMMAND_WRITE, block] + data_buffer.extend(self._get_crc(data_buffer)) + status, recv_data, recv_data_leng = self._to_card(self.COMMAND_TRANSCEIV, data_buffer) + + if status != self.STATUS_OK or recv_data_leng != 4 or (recv_data[0] & 0x0F) != 0x0A: + raise ConnectionError(f"Could not init write-data to block {block}. Abort.") + + data_buffer = list(data) + data_buffer.extend(self._get_crc(data_buffer)) + status, recv_data, recv_data_leng = self._to_card(self.COMMAND_TRANSCEIV, data_buffer) + + if status != self.STATUS_OK or recv_data_leng != 4 or (recv_data[0] & 0x0F) != 0x0A: + raise ConnectionError(f"Could not write data to block {block}. Abort.") + + def clear_state(self): + self.reset() + for addr, data in self.CLEAN_REGISTER_STATES.items(): + self._register_write(addr, data) + self._antenna_on() \ No newline at end of file