import socket from time import sleep from queue import Queue from threading import Thread, Event from select import select class PicoProxy: ### CLS FUNCTIONS ############################################################# wait_symbols = "-\|/" def _get_socket(proxy_type="TCP"): return socket.socket( socket.AF_INET, ( socket.SOCK_STREAM if proxy_type == "TCP" else socket.SOCK_DGRAM ) ) def init_tunnle(request, sock_in, sock_out, host, port): if request.startswith(b"CONNECT"): try: sock_out.connect((host,port)) sock_in.sendall(b"HTTP/1.1 200 established\r\n\r\n") except Exception as e: print("Cannot initiate HTTPS connection:", e) def proxy_forward_filter(request): #looks ugly, yes; but is able to run on Pico W micro-controller :D header = request.split('\n')[0] url = header.split()[1] port = 80 protocol = None has_port = False has_protocol = False if url.startswith("http"): protocol, host_part = url.split('://') has_protocol = True else: host_part = url if ":" in host_part: splitter = host_part.split(':') host_domain = splitter[0] port = int(splitter[1]) has_port = True elif "/" in host_part: host_domain = host_part.split('/')[0] if not has_protocol and has_port: if port == 443: protocol = "https" else: protocol = "http" if not has_port: if protocol == "https": port = 443 else: port = 80 return (protocol, host_domain, port) ### OBJ FUNCTIONS ############################################################# def __init__(self, buf_byte_size=4096, client_timeout=0.5): self._buf_byte_size = buf_byte_size self._client_timeout = client_timeout self._listener_event = Event() self._is_listening = False self._incoming = [] self._outgoing = [] self._channel_msg_map = {} self._channel_map = {} self._channel_init = {} self._channel_from_client = [] def _set_listener(self): # Check what kind of socket is needed to # bind onto. # Take the first possible socket and the # required IP info for binding. self._addr_listen = socket.getaddrinfo( self._addr, self._port )[0][-1] if hasattr(self, '_socket_listen') and self._socket_listen is not None: self.stop() self._socket_listen = PicoProxy._get_socket() self._socket_listen.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) def listen(self, addr, port, proxy_type="TCP", backlog=0): if not self._is_listening: self._addr = addr self._port = port self._proxy_type = "UDP" if not proxy_type == "TCP" else proxy_type self._set_listener() self._socket_listen.bind(self._addr_listen) self._socket_listen.listen(backlog) self._incoming.append(self._socket_listen) self._listen_thread = Thread( target=self._listener_thread, args=(self._listener_event,) ) self._listen_thread.start() print(f"init done for serving on {self._addr_listen}") self._is_listening = True def stop(self): if self._is_listening: self._listener_event.set() ctr = 0 while self._listen_thread.is_alive(): print( ( "Waiting for listener thread to finish... " f"{PicoProxy.wait_symbols[ctr%len(PicoProxy.wait_symbols)]}\r" ), end="" ) ctr += 1 sleep(0.5) else: print("Listener thread finished closing safely.") self._socket_listen.close() self._is_listening = False self._incoming.clear() self._outgoing.clear() self._channel_msg_map.clear() self._channel_map.clear() self._channel_init.clear() def join(self): if self._is_listening: self._listen_thread.join() def _listener_thread(self, event): while self._incoming and not event.is_set(): inrecv, outsend, excpt = select( self._incoming, self._outgoing, self._incoming ) for sock in inrecv: if sock is self._socket_listen: self._handle_connection_incoming() elif ( sock in self._channel_init and not self._channel_init[sock] and sock not in self._channel_from_client ): continue else: data = sock.recv(self._buf_byte_size) if data: self._handle_connection_receive(sock, data) else: self._handle_connection_close(sock) # for sock in outsend: # self._handle_connection_send(sock) # for sock in excpt: # self._handle_connection_error(sock) event.clear() def _handle_connection_incoming(self): conn, addr = self._socket_listen.accept() reverse_conn = PicoProxy._get_socket(self._proxy_type) self._channel_from_client.append(conn) self._incoming.append(conn) self._channel_map[conn] = reverse_conn self._channel_map[reverse_conn] = conn self._channel_msg_map[conn] = Queue() self._channel_msg_map[reverse_conn] = Queue() self._channel_init[conn] = False self._channel_init[reverse_conn] = False def _handle_connection_receive(self, sock, data): reverse_sock = self._channel_map[sock] if not self._channel_init[sock] and not self._channel_init[reverse_sock]: protocol, host_domain, port = PicoProxy.proxy_forward_filter(data.decode()) if protocol == "https" or port == 443: PicoProxy.init_tunnle( data, sock, reverse_sock, host_domain, port ) else: reverse_sock.connect((host_domain,port)) #not a tunnel request, directly forward reverse_sock.sendall(data) self._incoming.append(reverse_sock) self._channel_init[sock] = True self._channel_init[reverse_sock] = True else: reverse_sock.sendall(data) # self._channel_msg_map[reverse_sock].put(data) # if reverse_sock not in self._outgoing: # self._outgoing.append(reverse_sock) def _handle_connection_send(self, sock): try: msg = self._channel_msg_map[sock].get_nowait() except Queue.Empty: self._outgoing.remove(sock) else: sock.sendall(msg) def _handle_connection_close(self, sock): reverse_sock = self._channel_map[sock] for s in (sock, reverse_sock): if s in self._outgoing: self._outgoing.remove(s) if s in self._incoming: self._incoming.remove(s) if s in self._channel_from_client: self._channel_from_client.remove(s) s.close() del self._channel_msg_map[s] del self._channel_init[s] del self._channel_map[s] def _handle_connection_error(self, sock): self._handle_connection_close(sock) def main(): mitm = PicoProxy() mitm.listen(addr='0.0.0.0', port=8080) if __name__ == "__main__": main()