238 lines
7 KiB
Python
238 lines
7 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import sys
|
|
import glob
|
|
import socket
|
|
import asyncio
|
|
from collections import OrderedDict
|
|
from tabulate import tabulate
|
|
|
|
# Import from add_host.py
|
|
from add_host import add_host, print_info, print_error, print_warning, Colors
|
|
# Import the new edit_host function from edit_host.py
|
|
from edit_host import edit_host as edit_existing_host
|
|
|
|
# ------------------ Configuration ------------------
|
|
SSH_DIR = os.path.expanduser("~/.ssh")
|
|
CONF_DIR = os.path.join(SSH_DIR, "conf")
|
|
MAIN_CONFIG = os.path.join(SSH_DIR, "config")
|
|
SOCKET_DIR = os.path.join(SSH_DIR, "s") # Additional directory for ControlPath
|
|
|
|
# Default content to use if ~/.ssh/config doesn't exist
|
|
DEFAULT_CONFIG_CONTENT = """###
|
|
#Local ssh
|
|
###
|
|
|
|
Include conf/*/config
|
|
|
|
###
|
|
#Catch all ssh config
|
|
###
|
|
|
|
Host *
|
|
UserKnownHostsFile /dev/null
|
|
StrictHostKeyChecking no
|
|
ServerAliveInterval 60
|
|
ConnectTimeout 60
|
|
AddKeysToAgent yes
|
|
EscapeChar `
|
|
ControlMaster auto
|
|
ControlPersist 72000
|
|
ControlPath ~/.ssh/s/%C
|
|
"""
|
|
|
|
def ensure_ssh_setup():
|
|
"""
|
|
Ensure ~/.ssh, ~/.ssh/conf, and ~/.ssh/s exist, and if ~/.ssh/config does not exist,
|
|
create it with DEFAULT_CONFIG_CONTENT.
|
|
"""
|
|
# 1) Create ~/.ssh if it doesn't exist
|
|
if not os.path.isdir(SSH_DIR):
|
|
os.makedirs(SSH_DIR, mode=0o700, exist_ok=True)
|
|
print_info(f"Created directory: {SSH_DIR}")
|
|
|
|
# 2) Create ~/.ssh/conf if it doesn't exist
|
|
if not os.path.isdir(CONF_DIR):
|
|
os.makedirs(CONF_DIR, mode=0o700, exist_ok=True)
|
|
print_info(f"Created directory: {CONF_DIR}")
|
|
|
|
# 3) Create ~/.ssh/s if it doesn't exist
|
|
if not os.path.isdir(SOCKET_DIR):
|
|
os.makedirs(SOCKET_DIR, mode=0o700, exist_ok=True)
|
|
print_info(f"Created directory: {SOCKET_DIR}")
|
|
|
|
# 4) Create ~/.ssh/config if it doesn't exist
|
|
if not os.path.isfile(MAIN_CONFIG):
|
|
with open(MAIN_CONFIG, "w") as f:
|
|
f.write(DEFAULT_CONFIG_CONTENT)
|
|
print_info(f"Created default SSH config at: {MAIN_CONFIG}")
|
|
|
|
def load_config_file(file_path):
|
|
"""
|
|
Parse a single SSH config file and return a list of host blocks.
|
|
Each block is an OrderedDict with keys like 'Host', 'HostName', etc.
|
|
"""
|
|
blocks = []
|
|
host_data = None
|
|
|
|
try:
|
|
with open(file_path, 'r') as f:
|
|
lines = f.readlines()
|
|
except Exception as e:
|
|
print_error(f"Error reading SSH config file {file_path}: {e}")
|
|
return blocks
|
|
|
|
for line in lines:
|
|
stripped_line = line.strip()
|
|
# Skip empty lines and comments
|
|
if not stripped_line or stripped_line.startswith('#'):
|
|
continue
|
|
|
|
# Start of a new Host block
|
|
if stripped_line.lower().startswith('host '):
|
|
host_labels = stripped_line.split()[1:]
|
|
for label in host_labels:
|
|
if '*' not in label:
|
|
if host_data:
|
|
blocks.append(host_data)
|
|
host_data = OrderedDict({'Host': label})
|
|
break
|
|
elif host_data:
|
|
# Split on the first whitespace into key/value
|
|
if ' ' in stripped_line:
|
|
key, value = stripped_line.split(None, 1)
|
|
host_data[key] = value.strip()
|
|
|
|
if host_data:
|
|
blocks.append(host_data)
|
|
|
|
return blocks
|
|
|
|
async def check_ssh_port(ip_address, port):
|
|
"""
|
|
Attempt to open an SSH connection to see if the port is open.
|
|
Returns True if successful, False otherwise.
|
|
"""
|
|
try:
|
|
reader, writer = await asyncio.wait_for(
|
|
asyncio.open_connection(ip_address, port), timeout=1
|
|
)
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
async def check_host(host):
|
|
"""
|
|
Given a host block (Host, HostName, User, Port, IdentityFile, etc.),
|
|
resolve the IP, check SSH port, etc. Return a row for tabulate.
|
|
"""
|
|
host_label = host.get('Host', 'N/A')
|
|
hostname = host.get('HostName', 'N/A')
|
|
user = host.get('User', 'N/A')
|
|
port = int(host.get('Port', '22'))
|
|
identity_file = host.get('IdentityFile', 'N/A')
|
|
|
|
# Check if identity file exists
|
|
if identity_file != 'N/A':
|
|
expanded_identity = os.path.expanduser(identity_file)
|
|
identity_exists = os.path.isfile(expanded_identity)
|
|
else:
|
|
identity_exists = False
|
|
|
|
identity_display = (
|
|
f"{Colors.GREEN}{identity_file}{Colors.RESET}"
|
|
if identity_exists
|
|
else f"{Colors.RED}{identity_file}{Colors.RESET}"
|
|
)
|
|
|
|
# Resolve IP
|
|
try:
|
|
ip_address = socket.gethostbyname(hostname)
|
|
colored_ip = f"{Colors.GREEN}{ip_address}{Colors.RESET}"
|
|
except socket.error:
|
|
ip_address = None
|
|
colored_ip = f"{Colors.RED}N/A{Colors.RESET}"
|
|
|
|
# Check if port is open
|
|
if ip_address:
|
|
port_open = await check_ssh_port(ip_address, port)
|
|
colored_port = (
|
|
f"{Colors.GREEN}{port}{Colors.RESET}"
|
|
if port_open
|
|
else f"{Colors.RED}{port}{Colors.RESET}"
|
|
)
|
|
else:
|
|
colored_port = f"{Colors.RED}{port}{Colors.RESET}"
|
|
|
|
return [
|
|
host_label,
|
|
user,
|
|
colored_port,
|
|
hostname,
|
|
colored_ip,
|
|
identity_display
|
|
]
|
|
|
|
async def list_hosts():
|
|
"""
|
|
List out all hosts found in ~/.ssh/conf/*/config, showing connectivity details.
|
|
If no hosts are found, print an empty table or a warning message.
|
|
"""
|
|
pattern = os.path.join(CONF_DIR, "*", "config")
|
|
conf_files = sorted(glob.glob(pattern))
|
|
|
|
all_host_blocks = []
|
|
for conf_file in conf_files:
|
|
blocks = load_config_file(conf_file)
|
|
all_host_blocks.extend(blocks)
|
|
|
|
headers = ["No.", "Host", "User", "Port", "HostName", "IP Address", "IdentityFile"]
|
|
if not all_host_blocks:
|
|
print_warning("No hosts found. The server list is empty.")
|
|
print("\nSSH Conf Subdirectory Host List")
|
|
print(tabulate([], headers=headers, tablefmt="grid"))
|
|
return
|
|
|
|
tasks = [check_host(h) for h in all_host_blocks]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
table = [[idx + 1] + row for idx, row in enumerate(results)]
|
|
print("\nSSH Conf Subdirectory Host List")
|
|
print(tabulate(table, headers=headers, tablefmt="grid"))
|
|
|
|
def interactive_menu():
|
|
"""
|
|
A simple interactive menu:
|
|
1. List Hosts
|
|
2. Add a Host
|
|
3. Edit a Host
|
|
4. Exit
|
|
"""
|
|
while True:
|
|
print("\n" + f"{Colors.CYAN}{Colors.BOLD}SSH Config Manager Menu{Colors.RESET}")
|
|
print("1. List Hosts")
|
|
print("2. Add a Host")
|
|
print("3. Edit a Host")
|
|
print("4. Exit")
|
|
|
|
choice = input("Select an option (1-4): ").strip()
|
|
if choice == '1':
|
|
asyncio.run(list_hosts())
|
|
elif choice == '2':
|
|
add_host(CONF_DIR)
|
|
elif choice == '3':
|
|
edit_existing_host(CONF_DIR)
|
|
elif choice == '4':
|
|
print_info("Exiting...")
|
|
break
|
|
else:
|
|
print_error("Invalid choice. Please select 1, 2, 3, or 4.")
|
|
|
|
def main():
|
|
ensure_ssh_setup()
|
|
interactive_menu()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|