Sekun utilities

sirtunnel.py
Login

File prog/sirtunnel.py from the latest check-in


#!/usr/bin/env python3

import argparse
import json
import signal
from fnmatch import fnmatch
from subprocess import check_output
from urllib import request

ALLOWED_HOSTS = ["*"]

parser = argparse.ArgumentParser(description="Port forwarder using caddy and ssh")
parser.add_argument("host", help="Hostname", type=str)
parser.add_argument("port", help="Port number", type=int)
parser.add_argument(
    "--auth", help="User/Password protection using user:password formating", type=str
)

args = parser.parse_args()

host = args.host
port = args.port
tunnel_id = f"{host}-{port}"

if not any(fnmatch(host, allowed_host) for allowed_host in ALLOWED_HOSTS):
    print(f"You can only chose {','.join(ALLOWED_HOSTS)} hostnames")
    exit(1)

caddy_add_route_request = {
    "@id": tunnel_id,
    "match": [
        {
            "host": [host],
        }
    ],
    "handle": [
        {
            "handler": "reverse_proxy",
            "upstreams": [{"dial": f":{port}"}],
        },
    ],
}

if args.auth is not None:
    username, raw_password = args.auth.split(":")
    require_auth = True

    password = check_output(
        ["caddy", "hash-password", "-plaintext", raw_password]
    ).decode("ascii")
    auth_json = {
        "handler": "authentication",
        "providers": {
            "http_basic": {
                "accounts": [
                    {
                        "password": password,
                        "username": username,
                    }
                ],
                "hash": {"algorithm": "bcrypt"},
                "hash_cache": {},
            }
        },
    }
    caddy_add_route_request["handle"].append(auth_json)

body = json.dumps(caddy_add_route_request).encode("utf-8")
headers = {"Content-Type": "application/json"}
create_url = "http://127.0.0.1:2019/config/apps/http/servers/https/routes"

req = request.Request(method="POST", url=create_url, headers=headers)
request.urlopen(req, body)

try:
    print("Tunnel created successfully")
    signal.pause()
finally:
    print("Cleaning up tunnel")
    delete_url = "http://127.0.0.1:2019/id/" + tunnel_id
    req = request.Request(method="DELETE", url=delete_url)
    request.urlopen(req)