# ssh_manager/list_hosts.py import os import glob import socket import asyncio import ipaddress import subprocess from pathlib import Path from typing import List, Dict, Tuple, Optional, OrderedDict as OrderedDictType from functools import lru_cache from tabulate import tabulate from collections import OrderedDict from .utils import print_warning, print_error, Colors from .config import CONF_DIR # Cache DNS lookups @lru_cache(maxsize=128, typed=True) def resolve_hostname(hostname: str) -> Optional[str]: try: return socket.gethostbyname(hostname) except socket.error: return None def load_config_file(file_path: str) -> List[OrderedDictType]: blocks: List[OrderedDictType] = [] host_data: Optional[OrderedDictType] = None try: with open(file_path, 'r') as f: lines = f.readlines() except Exception as e: print_error(f"Error reading SSH config file {file_path}: {e}") return blocks for line in lines: stripped_line = line.strip() if not stripped_line or stripped_line.startswith('#'): continue if stripped_line.lower().startswith('host '): host_labels = stripped_line.split()[1:] for label in host_labels: if '*' not in label: if host_data: blocks.append(host_data) host_data = OrderedDict({'Host': label}) break elif host_data is not None: if ' ' in stripped_line: key, value = stripped_line.split(None, 1) host_data[key] = value.strip() if host_data: blocks.append(host_data) return blocks async def gather_host_info(all_host_blocks: List[OrderedDictType]) -> List[Tuple]: """ Given a list of host blocks, gather full info: - resolved IP - 'Conf Directory' coloring if IdentityFile != 'N/A' Returns a list of 7-tuples: (host_label, user, port, hostname, colored_ip, conf_path_display, raw_ip) """ results = [] async def process_block(h: OrderedDictType) -> Tuple: host_label: str = h.get('Host', 'N/A') hostname: str = h.get('HostName', 'N/A') user: str = h.get('User', 'N/A') port: str = h.get('Port', '22') # Keep as string since we're not testing it identity_file: str = h.get('IdentityFile', 'N/A') # Resolve IP using cached function raw_ip = resolve_hostname(hostname) or "N/A" # Check if the IP is reachable def is_ip_reachable(ip: str) -> bool: try: subprocess.run(["ping", "-c", "1", "-W", "1", ip], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return True except subprocess.CalledProcessError: return False # Determine IP color if raw_ip != "N/A" and is_ip_reachable(raw_ip): colored_ip = f"{Colors.GREEN}{raw_ip}{Colors.RESET}" else: colored_ip = raw_ip # No color if not reachable # Determine hostname color try: ipaddress.ip_address(hostname) colored_hostname = hostname # No color if hostname is an IP except ValueError: if raw_ip != "N/A": colored_hostname = f"{Colors.GREEN}{hostname}{Colors.RESET}" else: colored_hostname = f"{Colors.RED}{hostname}{Colors.RESET}" # Conf Directory = ~/.ssh/conf/ conf_path = f"~/.ssh/conf/{host_label}" conf_path_display = ( f"{Colors.GREEN}{conf_path}{Colors.RESET}" if identity_file != 'N/A' else conf_path ) return ( host_label, user, port, # Port is now uncolored colored_hostname, colored_ip, conf_path_display, raw_ip # uncolored IP for sorting ) # Process blocks concurrently with semaphore to limit concurrent connections sem = asyncio.Semaphore(10) # Limit concurrent connections async def process_with_semaphore(block): async with sem: return await process_block(block) tasks = [process_with_semaphore(b) for b in all_host_blocks] results = await asyncio.gather(*tasks) return results @lru_cache(maxsize=1) def parse_ip(ip_str: str) -> Optional[ipaddress.IPv4Address]: """ Convert a string IP to an ipaddress object for sorting. Returns None if invalid or 'N/A'. """ try: return ipaddress.ip_address(ip_str) except ValueError: return None def sort_by_ip(results: List[Tuple]) -> List[Tuple]: """ Sort the 7-tuples by IP ascending, with 'N/A' last. """ def sort_key(row): raw_ip = row[-1] ip_obj = parse_ip(raw_ip) return (ip_obj is None, ip_obj or ipaddress.IPv4Address('0.0.0.0')) return sorted(results, key=sort_key) async def build_host_list_table(conf_dir: str) -> Tuple[List[str], List[List]]: """ Gathers + sorts all hosts in conf_dir by IP ascending. Returns (headers, final_table_rows), each row omitting the raw_ip. """ pattern = os.path.join(conf_dir, "*", "config") conf_files = sorted(glob.glob(pattern)) all_host_blocks: List[OrderedDictType] = [] for conf_file in conf_files: blocks = load_config_file(conf_file) all_host_blocks.extend(blocks) headers = ["No.", "Host", "User", "Port", "HostName", "IP Address", "Conf Directory"] if not all_host_blocks: return headers, [] results = await gather_host_info(all_host_blocks) sorted_rows = sort_by_ip(results) # Build final table final_data = [ [idx] + list(row[:-1]) for idx, row in enumerate(sorted_rows, start=1) ] return headers, final_data async def list_hosts(conf_dir: str) -> None: """Display a formatted table of all SSH hosts.""" headers, final_data = await build_host_list_table(conf_dir) if not final_data: print_warning("No hosts found. The server list is empty.") print("\nSSH Conf Subdirectory Host List") print(tabulate([], headers=headers, tablefmt="grid")) return print("\nSSH Conf Subdirectory Host List (Sorted by IP Ascending)") print(tabulate(final_data, headers=headers, tablefmt="grid"))