SSH-key-Manager/list_hosts.py

141 lines
4.2 KiB
Python

# ssh_manager/list_hosts.py
import os
import glob
import socket
import asyncio
from tabulate import tabulate
from collections import OrderedDict
from .utils import print_warning, print_error, Colors
async def check_ssh_port(ip_address, port):
"""
Attempt to open an SSH connection to see if the port is open.
Returns True if successful, False otherwise.
"""
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(ip_address, port), timeout=1
)
writer.close()
await writer.wait_closed()
return True
except:
return False
def load_config_file(file_path):
"""
Parse a single SSH config file and return a list of host blocks.
Each block is an OrderedDict with keys like 'Host', 'HostName', etc.
"""
blocks = []
host_data = None
try:
with open(file_path, 'r') as f:
lines = f.readlines()
except Exception as e:
print_error(f"Error reading SSH config file {file_path}: {e}")
return blocks
for line in lines:
stripped_line = line.strip()
if not stripped_line or stripped_line.startswith('#'):
continue
if stripped_line.lower().startswith('host '):
host_labels = stripped_line.split()[1:]
for label in host_labels:
if '*' not in label:
if host_data:
blocks.append(host_data)
host_data = OrderedDict({'Host': label})
break
elif host_data:
if ' ' in stripped_line:
key, value = stripped_line.split(None, 1)
host_data[key] = value.strip()
if host_data:
blocks.append(host_data)
return blocks
async def check_host(host):
"""
Given a host block, resolve IP, check SSH port, identity file existence, etc.
Return a row for tabulate.
"""
host_label = host.get('Host', 'N/A')
hostname = host.get('HostName', 'N/A')
user = host.get('User', 'N/A')
port = int(host.get('Port', '22'))
identity_file = host.get('IdentityFile', 'N/A')
# Identity file check
if identity_file != 'N/A':
expanded_identity = os.path.expanduser(identity_file)
identity_exists = os.path.isfile(expanded_identity)
else:
identity_exists = False
identity_display = (
f"{Colors.GREEN}{identity_file}{Colors.RESET}"
if identity_exists
else f"{Colors.RED}{identity_file}{Colors.RESET}"
)
# Resolve IP
try:
ip_address = socket.gethostbyname(hostname)
colored_ip = f"{Colors.GREEN}{ip_address}{Colors.RESET}"
except socket.error:
ip_address = None
colored_ip = f"{Colors.RED}N/A{Colors.RESET}"
# Check if port is open
if ip_address:
port_open = await check_ssh_port(ip_address, port)
colored_port = (
f"{Colors.GREEN}{port}{Colors.RESET}"
if port_open
else f"{Colors.RED}{port}{Colors.RESET}"
)
else:
colored_port = f"{Colors.RED}{port}{Colors.RESET}"
return [
host_label,
user,
colored_port,
hostname,
colored_ip,
identity_display
]
async def list_hosts(conf_dir):
"""
List out all hosts found in ~/.ssh/conf/*/config, showing connectivity details.
If no hosts are found, print an empty table or a warning message.
"""
pattern = os.path.join(conf_dir, "*", "config")
conf_files = sorted(glob.glob(pattern))
all_host_blocks = []
for conf_file in conf_files:
blocks = load_config_file(conf_file)
all_host_blocks.extend(blocks)
headers = ["No.", "Host", "User", "Port", "HostName", "IP Address", "IdentityFile"]
if not all_host_blocks:
print_warning("No hosts found. The server list is empty.")
print("\nSSH Conf Subdirectory Host List")
print(tabulate([], headers=headers, tablefmt="grid"))
return
tasks = [check_host(h) for h in all_host_blocks]
results = await asyncio.gather(*tasks)
table = [[idx + 1] + row for idx, row in enumerate(results)]
print("\nSSH Conf Subdirectory Host List")
print(tabulate(table, headers=headers, tablefmt="grid"))