# ssh_manager/list_hosts.py import os import glob import socket import asyncio import ipaddress from tabulate import tabulate from collections import OrderedDict from .utils import print_warning, print_error, Colors async def check_ssh_port(ip_address, port): """ Attempt to open an SSH connection to see if the port is open. Returns True if successful, False otherwise. """ try: reader, writer = await asyncio.wait_for( asyncio.open_connection(ip_address, port), timeout=1 ) writer.close() await writer.wait_closed() return True except: return False def load_config_file(file_path): """ Parse a single SSH config file and return a list of host blocks. Each block is an OrderedDict with keys like 'Host', 'HostName', etc. """ blocks = [] host_data = None try: with open(file_path, 'r') as f: lines = f.readlines() except Exception as e: print_error(f"Error reading SSH config file {file_path}: {e}") return blocks for line in lines: stripped_line = line.strip() # Skip empty lines and comments if not stripped_line or stripped_line.startswith('#'): continue # Start of a new Host block if stripped_line.lower().startswith('host '): host_labels = stripped_line.split()[1:] for label in host_labels: if '*' not in label: if host_data: blocks.append(host_data) host_data = OrderedDict({'Host': label}) break elif host_data: if ' ' in stripped_line: key, value = stripped_line.split(None, 1) host_data[key] = value.strip() if host_data: blocks.append(host_data) return blocks async def check_host(host): """ Given a host block, resolve IP, check SSH port, etc. Returns a tuple of: 1) Host label 2) User 3) Port (colored if open) 4) HostName 5) IP Address (colored if resolved) 6) Conf Directory (green if has IdentityFile, else no color) 7) raw_ip (uncolored string for sorting) """ host_label = host.get('Host', 'N/A') hostname = host.get('HostName', 'N/A') user = host.get('User', 'N/A') port = int(host.get('Port', '22')) identity_file = host.get('IdentityFile', 'N/A') # Resolve IP try: raw_ip = socket.gethostbyname(hostname) # uncolored colored_ip = f"{Colors.GREEN}{raw_ip}{Colors.RESET}" except socket.error: raw_ip = "N/A" colored_ip = f"{Colors.RED}N/A{Colors.RESET}" # Check port if raw_ip != "N/A": port_open = await check_ssh_port(raw_ip, port) colored_port = ( f"{Colors.GREEN}{port}{Colors.RESET}" if port_open else f"{Colors.RED}{port}{Colors.RESET}" ) else: colored_port = f"{Colors.RED}{port}{Colors.RESET}" # Conf Directory = ~/.ssh/conf/ conf_path = f"~/.ssh/conf/{host_label}" # If there's an IdentityFile, color the conf path green if identity_file != 'N/A': conf_path_display = f"{Colors.GREEN}{conf_path}{Colors.RESET}" else: conf_path_display = conf_path # Return the data plus the uncolored IP for sorting return ( host_label, user, colored_port, hostname, colored_ip, conf_path_display, raw_ip # for sorting ) async def list_hosts(conf_dir): """ List out all hosts found in ~/.ssh/conf/*/config, sorted by IP in ascending order. Columns: No., Host, User, Port, HostName, IP Address, Conf Directory """ pattern = os.path.join(conf_dir, "*", "config") conf_files = sorted(glob.glob(pattern)) all_host_blocks = [] for conf_file in conf_files: blocks = load_config_file(conf_file) all_host_blocks.extend(blocks) headers = ["No.", "Host", "User", "Port", "HostName", "IP Address", "Conf Directory"] if not all_host_blocks: print_warning("No hosts found. The server list is empty.") print("\nSSH Conf Subdirectory Host List") print(tabulate([], headers=headers, tablefmt="grid")) return # Gather full data for each host tasks = [check_host(h) for h in all_host_blocks] results = await asyncio.gather(*tasks) # We want to sort by IP ascending. results[i] is a tuple: # (host_label, user, colored_port, hostname, colored_ip, conf_path, raw_ip) # We'll parse raw_ip as an ipaddress for sorting. "N/A" => sort to the end. def parse_ip(ip_str): try: return ipaddress.ip_address(ip_str) except ValueError: return None # Convert the results into a list of (ip_obj, original_tuple) # so we can sort, then rebuild the final data. sortable = [] for row in results: raw_ip = row[-1] # last element ip_obj = parse_ip(raw_ip) # We'll sort None last by using a sort key that puts (True) after (False) # e.g. (ip_obj is None, ip_obj) sortable.append(((ip_obj is None, ip_obj), row)) # Sort by (is_none, ip_obj) sortable.sort(key=lambda x: x[0]) # Rebuild the final display table, ignoring the raw_ip at the end final_data = [] for idx, (_, row) in enumerate(sortable, start=1): # row is (host_label, user, colored_port, hostname, colored_ip, conf_path, raw_ip) final_data.append([idx] + list(row[:-1])) # omit raw_ip print("\nSSH Conf Subdirectory Host List (Sorted by IP Ascending)") print(tabulate(final_data, headers=headers, tablefmt="grid"))