pico-test/picoproxy_not_functional_recheck.py
2023-08-19 23:37:59 +02:00

243 lines
7.9 KiB
Python
Executable File

import socket
from time import sleep
from queue import Queue
from threading import Thread, Event
from select import select
class PicoProxy:
### CLS FUNCTIONS #############################################################
wait_symbols = "-\|/"
def _get_socket(proxy_type="TCP"):
return socket.socket(
socket.AF_INET,
(
socket.SOCK_STREAM
if proxy_type == "TCP"
else
socket.SOCK_DGRAM
)
)
def init_tunnle(request, sock_in, sock_out, host, port):
if request.startswith(b"CONNECT"):
try:
sock_out.connect((host,port))
sock_in.sendall(b"HTTP/1.1 200 established\r\n\r\n")
except Exception as e:
print("Cannot initiate HTTPS connection:", e)
def proxy_forward_filter(request):
#looks ugly, yes; but is able to run on Pico W micro-controller :D
header = request.split('\n')[0]
url = header.split()[1]
port = 80
protocol = None
has_port = False
has_protocol = False
if url.startswith("http"):
protocol, host_part = url.split('://')
has_protocol = True
else:
host_part = url
if ":" in host_part:
splitter = host_part.split(':')
host_domain = splitter[0]
port = int(splitter[1])
has_port = True
elif "/" in host_part:
host_domain = host_part.split('/')[0]
if not has_protocol and has_port:
if port == 443:
protocol = "https"
else:
protocol = "http"
if not has_port:
if protocol == "https":
port = 443
else:
port = 80
return (protocol, host_domain, port)
### OBJ FUNCTIONS #############################################################
def __init__(self, buf_byte_size=4096, client_timeout=0.5):
self._buf_byte_size = buf_byte_size
self._client_timeout = client_timeout
self._listener_event = Event()
self._is_listening = False
self._incoming = []
self._outgoing = []
self._channel_msg_map = {}
self._channel_map = {}
self._channel_init = {}
self._channel_from_client = []
def _set_listener(self):
# Check what kind of socket is needed to
# bind onto.
# Take the first possible socket and the
# required IP info for binding.
self._addr_listen = socket.getaddrinfo(
self._addr, self._port
)[0][-1]
if hasattr(self, '_socket_listen') and self._socket_listen is not None:
self.stop()
self._socket_listen = PicoProxy._get_socket()
self._socket_listen.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
def listen(self, addr, port, proxy_type="TCP", backlog=0):
if not self._is_listening:
self._addr = addr
self._port = port
self._proxy_type = "UDP" if not proxy_type == "TCP" else proxy_type
self._set_listener()
self._socket_listen.bind(self._addr_listen)
self._socket_listen.listen(backlog)
self._incoming.append(self._socket_listen)
self._listen_thread = Thread(
target=self._listener_thread, args=(self._listener_event,)
)
self._listen_thread.start()
print(f"init done for serving on {self._addr_listen}")
self._is_listening = True
def stop(self):
if self._is_listening:
self._listener_event.set()
ctr = 0
while self._listen_thread.is_alive():
print(
(
"Waiting for listener thread to finish... "
f"{PicoProxy.wait_symbols[ctr%len(PicoProxy.wait_symbols)]}\r"
),
end=""
)
ctr += 1
sleep(0.5)
else:
print("Listener thread finished closing safely.")
self._socket_listen.close()
self._is_listening = False
self._incoming.clear()
self._outgoing.clear()
self._channel_msg_map.clear()
self._channel_map.clear()
self._channel_init.clear()
def join(self):
if self._is_listening:
self._listen_thread.join()
def _listener_thread(self, event):
while self._incoming and not event.is_set():
inrecv, outsend, excpt = select(
self._incoming, self._outgoing, self._incoming
)
for sock in inrecv:
if sock is self._socket_listen:
self._handle_connection_incoming()
elif (
sock in self._channel_init and not self._channel_init[sock] and
sock not in self._channel_from_client
):
continue
else:
data = sock.recv(self._buf_byte_size)
if data:
self._handle_connection_receive(sock, data)
else:
self._handle_connection_close(sock)
# for sock in outsend:
# self._handle_connection_send(sock)
# for sock in excpt:
# self._handle_connection_error(sock)
event.clear()
def _handle_connection_incoming(self):
conn, addr = self._socket_listen.accept()
reverse_conn = PicoProxy._get_socket(self._proxy_type)
self._channel_from_client.append(conn)
self._incoming.append(conn)
self._channel_map[conn] = reverse_conn
self._channel_map[reverse_conn] = conn
self._channel_msg_map[conn] = Queue()
self._channel_msg_map[reverse_conn] = Queue()
self._channel_init[conn] = False
self._channel_init[reverse_conn] = False
def _handle_connection_receive(self, sock, data):
reverse_sock = self._channel_map[sock]
if not self._channel_init[sock] and not self._channel_init[reverse_sock]:
protocol, host_domain, port = PicoProxy.proxy_forward_filter(data.decode())
if protocol == "https" or port == 443:
PicoProxy.init_tunnle(
data, sock, reverse_sock, host_domain, port
)
else:
reverse_sock.connect((host_domain,port))
#not a tunnel request, directly forward
reverse_sock.sendall(data)
self._incoming.append(reverse_sock)
self._channel_init[sock] = True
self._channel_init[reverse_sock] = True
else:
reverse_sock.sendall(data)
# self._channel_msg_map[reverse_sock].put(data)
# if reverse_sock not in self._outgoing:
# self._outgoing.append(reverse_sock)
def _handle_connection_send(self, sock):
try:
msg = self._channel_msg_map[sock].get_nowait()
except Queue.Empty:
self._outgoing.remove(sock)
else:
sock.sendall(msg)
def _handle_connection_close(self, sock):
reverse_sock = self._channel_map[sock]
for s in (sock, reverse_sock):
if s in self._outgoing:
self._outgoing.remove(s)
if s in self._incoming:
self._incoming.remove(s)
if s in self._channel_from_client:
self._channel_from_client.remove(s)
s.close()
del self._channel_msg_map[s]
del self._channel_init[s]
del self._channel_map[s]
def _handle_connection_error(self, sock):
self._handle_connection_close(sock)
def main():
mitm = PicoProxy()
mitm.listen(addr='0.0.0.0', port=8080)
if __name__ == "__main__":
main()