python – How to reproduce a TCP stream via HTTP requests

Suppose a conjuncture where is needed to pass a normal TCP/IP traffic through a HTTP server, like the below scheme:

Client <> SOCKS5 proxy <> HTTP server <> Remote

First, the client connection to remote will be go through a SOCKS5 proxy. Then, the SOCKS5 proxy will be responsible for converting the ongoing data in a HTTP request to another server. Finally, the HTTP server sends the data to the remote server and spits the response, simulating a TCP stream.

For example:

Client → localhost:8080 (SOCKS5 server):

domain test.com.br

localhost:8080 → localhost:8081 (HTTP server):

GET /?target=0:whois.registro.br:43:domain+test.com HTTP/1.1
Host: localhost:8081
Accept: */*
Connection: close

Then, the HTTP server will send domain test.com.br to whois.registro.br:43 and pass the response the way back until the client.

I’ve already written the SOCKS and HTTP server algorithms in Python:

#!/usr/bin/env python3
# socks_server.py

import logging
import socket
import struct
from threading import Thread
from queue import Queue
from time import sleep
from socketserver import ThreadingMixIn, TCPServer, StreamRequestHandler
from base64 import urlsafe_b64encode, b64decode

SOCKS_VERSION = 5

TUNNEL_ADDR = '0.0.0.0'             # http server IP address
TUNNEL_PORT = 8080                  # http server port
TUNNEL_HOST = 'localhost:8080'      # http server 'Host' header

class ListenThread(Thread):
    q_w:Queue                       # worker queue
    client:socket                   # client socket

    def __init__(self, client, args=(), kwargs=None):
        Thread.__init__(self, args=(), kwargs=None)
        self.q_w = Queue()
        self.daemon = True

        self.client = client

    def run(self):
        while True:
            res = self.client.recv(4096)
            self.q_w.put(res)


class ThreadedTCPServer(ThreadingMixIn, TCPServer):
    allow_reuse_address = True
    pass

class SocksProxy(StreamRequestHandler):
    remote_addr:str
    remote_port:int

    def handle(self):
        logging.info('Accepting from %s:%s' % self.client_address)

        header = self.connection.recv(2)
        version, nmethods = struct.unpack('!BB', header)

        assert version == SOCKS_VERSION
        assert nmethods > 0

        methods = self.get_available_methods(nmethods)

        # no auth
        if 0 not in set(methods):
            self.server.close_request(self.request)
            return

        # welcome msg
        self.connection.sendall(struct.pack('!BB', SOCKS_VERSION, 0))

        version, cmd, _, address_type = struct.unpack('!BBBB', self.connection.recv(4))
        assert version == SOCKS_VERSION

        if address_type == 1:
            self.remote_addr = socket.inet_ntoa(self.connection.recv(4))
        elif address_type == 3:
            domain_length = self.connection.recv(1)(0)
            self.remote_addr = self.connection.recv(domain_length)
            self.remote_addr = socket.gethostbyname(self.remote_addr)

        self.remote_port = struct.unpack('!H', self.connection.recv(2))(0)

        try:
            if cmd == 1:
                pass
            else:
                self.server.close_request(self.request)

            addr = struct.unpack('!I', socket.inet_aton(self.remote_addr))(0)
            port = self.remote_port
            reply = struct.pack('!BBBBIH', SOCKS_VERSION, 0, 0, 1, addr, port)

        except:
            reply = self.generate_failed_reply(address_type, SOCKS_VERSION)

        self.connection.sendall(reply)

        # data exchange
        if reply(1) == 0 and cmd == 1:
            self.exchange_loop(self.connection)

        self.server.close_request(self.request)
        


    def get_available_methods(self, n):
        return ( ord(self.connection.recv(1)) for i in range(n) )
    
    def generate_failed_reply(self):
        return struck.pack('!BBBBIH', SOCKS_VERSION, error_number, 0, address_type, 0, 0)

    # relevant part here
    def exchange_loop(self, client):

        packetnum = 0

        while True:

            # reads from client
            req = b''
            while True:
                chunk = client.recv(4096)
                req += chunk

                if len(chunk) < 4096:
                    break

            segments = (
                str(packetnum).encode(),
                self.remote_addr.encode(),
                str(self.remote_port).encode(),
                req
            )

            segments = ( urlsafe_b64encode(s).decode() for s in segments )
            segments = ':'.join(segments)

            data = 'GET /?target=%s HTTP/1.1rn' % segments
            data += 'Host: %srn' % TUNNEL_HOST
            data += 'Accept: */*rn'
            data += 'Connection: closernrn'

            print('>> ' + repr(data))

            # connects to the HTTP server and send request
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            s.connect((TUNNEL_ADDR, TUNNEL_PORT))
            s.sendall(data.encode())

            res = b''
            while True:
                chunk = s.recv(4096, socket.MSG_WAITALL)
                res += chunk

                if len(chunk) < 4096:
                    break

            s.close()

            print('<< ' + repr(res))

            if res.decode().find('HTTP') != 0:
                raise Exception('not a HTTP response')

            # finally, retrieves response to client
            res = res.decode().split('rnrn', 1)(1)
            res = b64decode(res)

            print(res)

            if client.send(res) <= 0:
                break

            packetnum += 1

if __name__ == '__main__':
    with ThreadedTCPServer(('0.0.0.0', 9011), SocksProxy) as server:
        server.serve_forever()

The HTTP server spawns a thread for each new connection, then append it to a dict for being reused. In our purpose, the proxy must be able to send and receive more than one packet per connection.

#!/usr/bin/env python3
# http_server.py

from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib import parse
from base64 import urlsafe_b64decode, b64encode
from queue import Queue

import threading
import socket

class SocketThread(threading.Thread):
    socket.socket       # socket to remote server    
    q_m:Queue           # main queue (worker -> main)
    q_w:Queue           # worker queue (main -> worker)

    addr:str            # remote ip address
    port:int            # remote port

    def __init__(self, queue, args=(), kwargs=None):
        threading.Thread.__init__(self, args=(), kwargs=None)
        self.q_m = queue
        self.q_w = Queue()
        self.daemon = True
        
        (self.addr, self.port, self.r) = args

    def run(self):
        print(threading.current_thread().getName())
        
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.connect((self.addr, self.port))
        
        while True:
            try:
                val = self.q_w.get()

                bc = self.socket.sendall(val if isinstance(val, bytes) else val.encode())
                print(f'{bc} bytes sent to {self.addr}:{self.port}')

                ret = b''
                while True:
                    chunk = self.socket.recv(4096, socket.MSG_WAITALL)
                    ret += chunk

                    if len(chunk) < 4096:
                        break
                    
                ret = b64encode(ret)
                print('Got response!')

                self.r.push_packet(ret)
                self.q_m.put(ret)

            except:
                self.q_m.put('CONN_CLOSE')
                raise
                break


class Handler(BaseHTTPRequestHandler):
    protocol_version = 'HTTP/1.1'
    connections:dict = {}   # dict containg socket threads

    def __init__(self, *args, **kwargs):
        super(BaseHTTPRequestHandler, self).__init__(*args, **kwargs)

    def do_GET(self):
        self.send_response(200)
        self.send_header('Content-type', 'text/plain')

        try:
            parsed_uri = parse.urlsplit(self.path)
            query = dict(parse.parse_qsl(parsed_uri.query))

            segments = query('target').split(':')
            segments = ( urlsafe_b64decode(s) for s in segments )

            assert len(segments) == 4

            self.packetnum = int(segments(0) or 0)
            self.port = int(segments(2) or 0)
            self.addr = segments(1)
            self.data = segments(3)

            assert self.addr != None
            assert self.port != 0

            if self.packetnum == 0 or self.data:
                conn_name = '%s:%d' % (self.addr, self.port)
                if not self.connections.get(conn_name):
                    print('New connection, spawning new thread...')
                    q = Queue()
                    self.connections(conn_name) = SocketThread(q, args=(self.addr, self.port, self.r))
                    self.connections(conn_name).start()
                else:
                    print('Reusing thread')

                print(f'name: {conn_name}')
                self.connections(conn_name).q_w.put(self.data)

                ret = self.connections(conn_name).q_m.get()

                if ret == 'CONN_CLOSE':
                    print(ret)
                    self.connections(conn_name) = None
                    return

                print(repr(ret))
                self.send_header('Content-length', len(ret))
                self.end_headers()
                self.wfile.write(ret)

        except KeyError:
            self.end_headers()
            raise


def main():
    server = HTTPServer(('0.0.0.0', 8080), Handler)
    server.serve_forever()

if __name__ == '__main__':
    main()

This, however, doesn’t seem to be working. The SOCKS server suddenly blocks at some points, and I couldn’t yet figure out how to handle connection close. Could someone give me a hand to put it working?