added additional functions

This commit is contained in:
Arctic 2025-03-07 06:08:30 -06:00
parent 1e55abf811
commit 67a5534246
8 changed files with 654 additions and 107 deletions

View file

@ -1,10 +1,21 @@
# SSH Hosts Configuration (Example) python -m ssh_manager
This table provides an example of SSH host configurations. SSH Conf Subdirectory Host List
+-------+------------+--------+--------+------------+--------------+------------------------+
| No. | Host | User | Port | HostName | IP Address | Conf Directory |
+=======+============+========+========+============+==============+========================+
| 1 | example_1 | user_1 | 22 | 10.0.0.6 | 10.0.0.6 | ~/.ssh/conf/example_1 |
+-------+------------+--------+--------+------------+--------------+------------------------+
| 3 | example_2 | user_1 | 22 | 10.0.0.6 | 10.0.0.6 | ~/.ssh/conf/example_1 |
+-------+------------+--------+--------+------------+--------------+------------------------+
| 3 | example_3 | user_2 | 2222 | 10.0.0.6 | 10.0.0.6 | ~/.ssh/conf/example_1 |
+-------+------------+--------+--------+------------+--------------+------------------------+
| No. | Host | User | Port | HostName | IP Address | IdentityFile | SSH Config Manager Menu
|---- |----------|--------|------|-----------------|--------------|--------------------------------------| 1. List Hosts
| 1 | server1 | user1 | 22 | host1.example.com | 192.168.1.10 | N/A | 2. Add a Host
| 2 | server2 | user1 | 22 | host2.example.com | 203.0.113.10 | N/A | 3. Edit a Host
| 3 | server3 | user1 | 22 | host3.example.com | 10.0.0.5 | ~/.ssh/conf/server3/id_ed25519 | 4. Regenerate Key
| 4 | server4 | user1 | 22 | host4.example.com | 10.0.0.6 | ~/.ssh/conf/server4/id_ed25519 | 5. Remove Host
6. Exit
Select an option (1-6):

View file

@ -2,7 +2,7 @@
import os import os
import subprocess import subprocess
from .utils import print_error, print_warning, print_info from .utils import print_error, print_warning, print_info, safe_input
def add_host(conf_dir): def add_host(conf_dir):
""" """
@ -12,18 +12,31 @@ def add_host(conf_dir):
""" """
print_info("Adding a new SSH host...") print_info("Adding a new SSH host...")
host_label = input("Enter Host label (e.g. myserver): ").strip() host_label = safe_input("Enter Host label (e.g. myserver): ")
if host_label is None:
return # User canceled (Ctrl+C)
host_label = host_label.strip()
if not host_label: if not host_label:
print_error("Host label cannot be empty.") print_error("Host label cannot be empty.")
return return
hostname = input("Enter HostName (IP or domain): ").strip() hostname = safe_input("Enter HostName (IP or domain): ")
if hostname is None:
return
hostname = hostname.strip()
if not hostname: if not hostname:
print_error("HostName cannot be empty.") print_error("HostName cannot be empty.")
return return
user = input("Enter username (default: 'root'): ").strip() or "root" user = safe_input("Enter username (default: 'root'): ")
port = input("Enter SSH port (default: 22): ").strip() or "22" if user is None:
return
user = user.strip() or "root"
port = safe_input("Enter SSH port (default: 22): ")
if port is None:
return
port = port.strip() or "22"
# Create subdirectory: ~/.ssh/conf/<label> # Create subdirectory: ~/.ssh/conf/<label>
host_dir = os.path.join(conf_dir, host_label) host_dir = os.path.join(conf_dir, host_label)
@ -37,26 +50,29 @@ def add_host(conf_dir):
if os.path.exists(config_path): if os.path.exists(config_path):
print_warning(f"Config file already exists: {config_path}; it will be overwritten/updated.") print_warning(f"Config file already exists: {config_path}; it will be overwritten/updated.")
# Ask about generating an SSH key gen_key_choice = safe_input("Generate a new ed25519 SSH key for this host? (y/n): ")
gen_key_choice = input("Generate a new ed25519 SSH key for this host? (y/n): ").lower().strip() if gen_key_choice is None:
identity_file = "" return
gen_key_choice = gen_key_choice.lower().strip()
identity_file = ""
if gen_key_choice == 'y': if gen_key_choice == 'y':
key_path = os.path.join(host_dir, "id_ed25519") key_path = os.path.join(host_dir, "id_ed25519")
if os.path.exists(key_path): if os.path.exists(key_path):
print_warning(f"{key_path} already exists. Skipping generation.") print_warning(f"{key_path} already exists. Skipping generation.")
identity_file = key_path identity_file = key_path
else: else:
# Generate a new SSH key (quietly, suppressing randomart)
cmd = ["ssh-keygen", "-q", "-t", "ed25519", "-N", "", "-f", key_path] cmd = ["ssh-keygen", "-q", "-t", "ed25519", "-N", "", "-f", key_path]
try: try:
subprocess.check_call(cmd) subprocess.check_call(cmd)
print_info(f"Generated new SSH key at {key_path}") print_info(f"Generated new SSH key at {key_path}")
identity_file = key_path identity_file = key_path
# Prompt to copy the key to the server # Prompt to copy the key
copy_key = input("Would you like to copy this key to the server now? (y/n): ").lower().strip() copy_key = safe_input("Would you like to copy this key to the server now? (y/n): ")
if copy_key == 'y': if copy_key is None:
return
if copy_key.lower().strip() == 'y':
ssh_copy_cmd = ["ssh-copy-id", "-i", key_path] ssh_copy_cmd = ["ssh-copy-id", "-i", key_path]
if port != "22": if port != "22":
ssh_copy_cmd += ["-p", port] ssh_copy_cmd += ["-p", port]
@ -69,12 +85,13 @@ def add_host(conf_dir):
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
print_error(f"Error generating SSH key: {e}") print_error(f"Error generating SSH key: {e}")
else: else:
# If not generating a new key, optionally ask for an existing path existing_key = safe_input("Enter existing IdentityFile path (or leave empty to skip): ")
existing_key = input("Enter existing IdentityFile path (or leave empty to skip): ").strip() if existing_key is None:
return
existing_key = existing_key.strip()
if existing_key: if existing_key:
identity_file = os.path.expanduser(existing_key) identity_file = os.path.expanduser(existing_key)
# Build the config lines
config_lines = [ config_lines = [
f"Host {host_label}", f"Host {host_label}",
f" HostName {hostname}", f" HostName {hostname}",
@ -84,7 +101,6 @@ def add_host(conf_dir):
if identity_file: if identity_file:
config_lines.append(f" IdentityFile {identity_file}") config_lines.append(f" IdentityFile {identity_file}")
# Write (or overwrite) the config
try: try:
with open(config_path, "w") as f: with open(config_path, "w") as f:
for line in config_lines: for line in config_lines:

32
cli.py
View file

@ -1,31 +1,32 @@
# ssh_manager/cli.py # ssh_manager/cli.py
import os import os
import glob
import asyncio import asyncio
from .utils import print_info, print_error, print_warning, Colors from .utils import print_info, print_error, print_warning, Colors, safe_input
from .config import SSH_DIR, CONF_DIR, SOCKET_DIR, MAIN_CONFIG, DEFAULT_CONFIG_CONTENT from .config import SSH_DIR, CONF_DIR, SOCKET_DIR, MAIN_CONFIG, DEFAULT_CONFIG_CONTENT
from .add_host import add_host from .add_host import add_host
from .edit_host import edit_host from .edit_host import edit_host
from .list_hosts import list_hosts from .list_hosts import list_hosts
from .regen_key import regenerate_key
from .remove_host import remove_host
def ensure_ssh_setup(): def ensure_ssh_setup():
# Make ~/.ssh if missing """
Creates ~/.ssh, ~/.ssh/conf, and ~/.ssh/s if missing,
and writes a default ~/.ssh/config if it doesn't exist.
"""
if not os.path.isdir(SSH_DIR): if not os.path.isdir(SSH_DIR):
os.makedirs(SSH_DIR, mode=0o700, exist_ok=True) os.makedirs(SSH_DIR, mode=0o700, exist_ok=True)
print_info(f"Created directory: {SSH_DIR}") print_info(f"Created directory: {SSH_DIR}")
# Make ~/.ssh/conf if missing
if not os.path.isdir(CONF_DIR): if not os.path.isdir(CONF_DIR):
os.makedirs(CONF_DIR, mode=0o700, exist_ok=True) os.makedirs(CONF_DIR, mode=0o700, exist_ok=True)
print_info(f"Created directory: {CONF_DIR}") print_info(f"Created directory: {CONF_DIR}")
# Make ~/.ssh/s if missing
if not os.path.isdir(SOCKET_DIR): if not os.path.isdir(SOCKET_DIR):
os.makedirs(SOCKET_DIR, mode=0o700, exist_ok=True) os.makedirs(SOCKET_DIR, mode=0o700, exist_ok=True)
print_info(f"Created directory: {SOCKET_DIR}") print_info(f"Created directory: {SOCKET_DIR}")
# Create ~/.ssh/config if not present
if not os.path.isfile(MAIN_CONFIG): if not os.path.isfile(MAIN_CONFIG):
with open(MAIN_CONFIG, "w") as f: with open(MAIN_CONFIG, "w") as f:
f.write(DEFAULT_CONFIG_CONTENT) f.write(DEFAULT_CONFIG_CONTENT)
@ -34,14 +35,23 @@ def ensure_ssh_setup():
def main(): def main():
ensure_ssh_setup() ensure_ssh_setup()
# Display the server list on first load
asyncio.run(list_hosts(CONF_DIR))
while True: while True:
print("\n" + f"{Colors.CYAN}{Colors.BOLD}SSH Config Manager Menu{Colors.RESET}") print("\n" + f"{Colors.CYAN}{Colors.BOLD}SSH Config Manager Menu{Colors.RESET}")
print("1. List Hosts") print("1. List Hosts")
print("2. Add a Host") print("2. Add a Host")
print("3. Edit a Host") print("3. Edit a Host")
print("4. Exit") print("4. Regenerate Key")
print("5. Remove Host")
print("6. Exit")
choice = input("Select an option (1-4): ").strip() choice = safe_input("Select an option (1-6): ")
if choice is None:
continue # User pressed Ctrl+C => safe_input returns None => re-show menu
choice = choice.strip()
if choice == '1': if choice == '1':
asyncio.run(list_hosts(CONF_DIR)) asyncio.run(list_hosts(CONF_DIR))
elif choice == '2': elif choice == '2':
@ -49,9 +59,13 @@ def main():
elif choice == '3': elif choice == '3':
edit_host(CONF_DIR) edit_host(CONF_DIR)
elif choice == '4': elif choice == '4':
asyncio.run(regenerate_key(CONF_DIR))
elif choice == '5':
asyncio.run(remove_host(CONF_DIR))
elif choice == '6':
print_info("Exiting...") print_info("Exiting...")
break break
else: else:
print_error("Invalid choice. Please select 1, 2, 3, or 4.") print_error("Invalid choice. Please select 1 through 6.")
return 0 return 0

View file

@ -1,85 +1,166 @@
# ssh_manager/edit_host.py
import os import os
import asyncio
from collections import OrderedDict from collections import OrderedDict
from .utils import print_error, print_warning, print_info from .utils import print_error, print_warning, print_info, safe_input
from .list_hosts import list_hosts, load_config_file, check_ssh_port
def load_config_file(file_path): async def get_all_host_blocks(conf_dir):
""" """
Parse the given config file into a list of host blocks (OrderedDict). Similar to list_hosts, but returns the list of host blocks + a table of results.
We'll build a table ourselves so we can map row numbers to actual host labels.
""" """
blocks = [] import glob
host_data = None import socket
pattern = os.path.join(conf_dir, "*", "config")
conf_files = sorted(glob.glob(pattern))
all_blocks = []
for conf_file in conf_files:
blocks = load_config_file(conf_file)
all_blocks.extend(blocks)
# If no blocks found, return empty
if not all_blocks:
return []
# We want to do a partial version of check_host to get row data
# so we can display the table right here and keep track of each blocks host label.
# But let's do it similarly to list_hosts:
table_rows = []
for idx, b in enumerate(all_blocks, start=1):
host_label = b.get("Host", "N/A")
hostname = b.get("HostName", "N/A")
user = b.get("User", "N/A")
port = int(b.get("Port", "22"))
identity_file = b.get("IdentityFile", "N/A")
# Identity check
if identity_file != "N/A":
expanded_identity = os.path.expanduser(identity_file)
identity_exists = os.path.isfile(expanded_identity)
else:
identity_exists = False
# IP resolution
try: try:
with open(file_path, 'r') as f: ip_address = socket.gethostbyname(hostname)
lines = f.readlines() except socket.error:
except Exception as e: ip_address = None
print_error(f"Error reading SSH config file {file_path}: {e}")
return blocks
for line in lines: # Port check
stripped_line = line.strip() if ip_address:
if not stripped_line or stripped_line.startswith('#'): port_open = await asyncio.wait_for(check_ssh_port(ip_address, port), timeout=1)
continue else:
port_open = False
if stripped_line.lower().startswith('host '): # Colors for display (optional, or we can keep it simple):
host_labels = stripped_line.split()[1:] ip_display = f"\033[0;32m{ip_address}\033[0m" if ip_address else "\033[0;31mN/A\033[0m"
for label in host_labels: port_display = f"\033[0;32m{port}\033[0m" if port_open else f"\033[0;31m{port}\033[0m"
if '*' not in label: identity_disp= f"\033[0;32m{identity_file}\033[0m" if identity_exists else f"\033[0;31m{identity_file}\033[0m"
if host_data:
blocks.append(host_data)
host_data = OrderedDict({'Host': label})
break
elif host_data:
if ' ' in stripped_line:
key, value = stripped_line.split(None, 1)
host_data[key] = value.strip()
if host_data: row = [
blocks.append(host_data) idx,
return blocks host_label,
user,
port_display,
hostname,
ip_display,
identity_disp
]
table_rows.append(row)
# Print the table
from tabulate import tabulate
headers = ["No.", "Host", "User", "Port", "HostName", "IP Address", "IdentityFile"]
print("\nSSH Conf Subdirectory Host List")
print(tabulate(table_rows, headers=headers, tablefmt="grid"))
return all_blocks
def edit_host(conf_dir): def edit_host(conf_dir):
""" """
Let the user update fields for an existing host in ~/.ssh/conf/<label>/config. Let the user update fields for an existing host in ~/.ssh/conf/<label>/config.
The user may type either the row number OR the actual host label.
""" """
host_label = input("Enter the Host label to edit: ").strip()
if not host_label: # 1) Gather + display the current host list
print_error("Host label cannot be empty.") print_info("Here is the current list of hosts:\n")
all_blocks = asyncio.run(get_all_host_blocks(conf_dir))
if not all_blocks:
print_warning("No hosts found to edit.")
return return
# 2) Prompt for which host to edit (by label or row number)
choice = safe_input("Enter the row number or the Host label to edit: ")
if choice is None:
return # user canceled (Ctrl+C)
choice = choice.strip()
if not choice:
print_error("Host label or row number cannot be empty.")
return
# Check if user typed a digit -> row number
target_block = None
if choice.isdigit():
row_idx = int(choice)
# Validate index
if row_idx < 1 or row_idx > len(all_blocks):
print_warning(f"Invalid row number {row_idx}.")
return
target_block = all_blocks[row_idx - 1] # zero-based
else:
# The user typed a host label
# We must search all_blocks for a matching Host
for b in all_blocks:
if b.get("Host") == choice:
target_block = b
break
if not target_block:
print_warning(f"No matching host label '{choice}' found.")
return
# Now we have a target_block with existing data
host_label = target_block.get("Host", "")
if not host_label:
print_warning("This host block has no label. Cannot edit.")
return
# Derive the config path
host_dir = os.path.join(conf_dir, host_label) host_dir = os.path.join(conf_dir, host_label)
config_path = os.path.join(host_dir, "config") config_path = os.path.join(host_dir, "config")
if not os.path.isfile(config_path): if not os.path.isfile(config_path):
print_warning(f"No config file found at {config_path}; cannot edit this host.") print_warning(f"No config file found at {config_path}; cannot edit this host.")
return return
blocks = load_config_file(config_path)
if not blocks:
print_warning(f"No valid Host blocks found in {config_path}")
return
target_block = None
for b in blocks:
if b.get("Host") == host_label:
target_block = b
break
if not target_block:
print_warning(f"No matching Host '{host_label}' found in {config_path}")
return
old_hostname = target_block.get("HostName", "") old_hostname = target_block.get("HostName", "")
old_user = target_block.get("User", "") old_user = target_block.get("User", "")
old_port = target_block.get("Port", "22") old_port = target_block.get("Port", "22")
old_identity = target_block.get("IdentityFile", "") old_identity = target_block.get("IdentityFile", "")
print_info("Leave a field blank to keep its current value.") print_info("Leave a field blank to keep its current value.")
new_hostname = input(f"Enter new HostName [{old_hostname}]: ").strip()
new_user = input(f"Enter new User [{old_user}]: ").strip() new_hostname = safe_input(f"Enter new HostName [{old_hostname}]: ")
new_port = input(f"Enter new Port [{old_port}]: ").strip() if new_hostname is None:
new_ident = input(f"Enter new IdentityFile [{old_identity}]: ").strip() return
new_hostname = new_hostname.strip()
new_user = safe_input(f"Enter new User [{old_user}]: ")
if new_user is None:
return
new_user = new_user.strip()
new_port = safe_input(f"Enter new Port [{old_port}]: ")
if new_port is None:
return
new_port = new_port.strip()
new_ident = safe_input(f"Enter new IdentityFile [{old_identity}]: ")
if new_ident is None:
return
new_ident = new_ident.strip()
final_hostname = new_hostname if new_hostname else old_hostname final_hostname = new_hostname if new_hostname else old_hostname
final_user = new_user if new_user else old_user final_user = new_user if new_user else old_user

View file

@ -8,6 +8,10 @@ from tabulate import tabulate
from collections import OrderedDict from collections import OrderedDict
from .utils import print_warning, print_error, Colors from .utils import print_warning, print_error, Colors
"""
This file is responsible for listing all SSH hosts discovered in ~/.ssh/conf/<label>/config.
We've modified it to show a "Conf Directory" column instead of "IdentityFile."
"""
async def check_ssh_port(ip_address, port): async def check_ssh_port(ip_address, port):
""" """
@ -63,8 +67,14 @@ def load_config_file(file_path):
async def check_host(host): async def check_host(host):
""" """
Given a host block, resolve IP, check SSH port, identity file existence, etc. Given a host block, resolve IP, check SSH port, etc.
Return a row for tabulate. Returns a row for tabulate containing:
1) Host label
2) User
3) Port (color-coded if open)
4) HostName
5) IP Address (color-coded if resolved)
6) Conf Directory (green if has IdentityFile, else no color)
""" """
host_label = host.get('Host', 'N/A') host_label = host.get('Host', 'N/A')
hostname = host.get('HostName', 'N/A') hostname = host.get('HostName', 'N/A')
@ -72,19 +82,6 @@ async def check_host(host):
port = int(host.get('Port', '22')) port = int(host.get('Port', '22'))
identity_file = host.get('IdentityFile', 'N/A') identity_file = host.get('IdentityFile', 'N/A')
# Identity file check
if identity_file != 'N/A':
expanded_identity = os.path.expanduser(identity_file)
identity_exists = os.path.isfile(expanded_identity)
else:
identity_exists = False
identity_display = (
f"{Colors.GREEN}{identity_file}{Colors.RESET}"
if identity_exists
else f"{Colors.RED}{identity_file}{Colors.RESET}"
)
# Resolve IP # Resolve IP
try: try:
ip_address = socket.gethostbyname(hostname) ip_address = socket.gethostbyname(hostname)
@ -93,7 +90,7 @@ async def check_host(host):
ip_address = None ip_address = None
colored_ip = f"{Colors.RED}N/A{Colors.RESET}" colored_ip = f"{Colors.RED}N/A{Colors.RESET}"
# Check if port is open # Check port
if ip_address: if ip_address:
port_open = await check_ssh_port(ip_address, port) port_open = await check_ssh_port(ip_address, port)
colored_port = ( colored_port = (
@ -104,19 +101,27 @@ async def check_host(host):
else: else:
colored_port = f"{Colors.RED}{port}{Colors.RESET}" colored_port = f"{Colors.RED}{port}{Colors.RESET}"
# Conf Directory = ~/.ssh/conf/<host_label>
conf_path = f"~/.ssh/conf/{host_label}"
# If there's an IdentityFile, we color this path green
if identity_file != 'N/A':
conf_path_display = f"{Colors.GREEN}{conf_path}{Colors.RESET}"
else:
conf_path_display = conf_path
return [ return [
host_label, host_label,
user, user,
colored_port, colored_port,
hostname, hostname,
colored_ip, colored_ip,
identity_display conf_path_display
] ]
async def list_hosts(conf_dir): async def list_hosts(conf_dir):
""" """
List out all hosts found in ~/.ssh/conf/*/config, showing connectivity details. List out all hosts found in ~/.ssh/conf/*/config.
If no hosts are found, print an empty table or a warning message. Shows columns: No., Host, User, Port, HostName, IP Address, Conf Directory
""" """
pattern = os.path.join(conf_dir, "*", "config") pattern = os.path.join(conf_dir, "*", "config")
conf_files = sorted(glob.glob(pattern)) conf_files = sorted(glob.glob(pattern))
@ -126,7 +131,8 @@ async def list_hosts(conf_dir):
blocks = load_config_file(conf_file) blocks = load_config_file(conf_file)
all_host_blocks.extend(blocks) all_host_blocks.extend(blocks)
headers = ["No.", "Host", "User", "Port", "HostName", "IP Address", "IdentityFile"] # Prepare table
headers = ["No.", "Host", "User", "Port", "HostName", "IP Address", "Conf Directory"]
if not all_host_blocks: if not all_host_blocks:
print_warning("No hosts found. The server list is empty.") print_warning("No hosts found. The server list is empty.")
print("\nSSH Conf Subdirectory Host List") print("\nSSH Conf Subdirectory Host List")

221
regen_key.py Normal file
View file

@ -0,0 +1,221 @@
import os
import glob
import subprocess
import asyncio
from collections import OrderedDict
from .utils import (
print_info,
print_warning,
print_error,
safe_input,
Colors
)
from .list_hosts import load_config_file, check_ssh_port
async def get_all_host_blocks(conf_dir):
pattern = os.path.join(conf_dir, "*", "config")
conf_files = sorted(glob.glob(pattern))
all_blocks = []
for conf_file in conf_files:
blocks = load_config_file(conf_file)
all_blocks.extend(blocks)
if not all_blocks:
return []
return all_blocks
async def regenerate_key(conf_dir):
"""
Menu-driven function to regenerate a key for a selected host:
1) Show the host table with row numbers
2) Let user pick row # or label
3) Read old pub data
4) Remove old local key files
5) Generate new key
6) Copy new key
7) Remove old key from remote authorized_keys (improved logic to detect existence)
"""
print_info("Regenerate Key - Step 1: Show current hosts...\n")
# 1) Gather host blocks
all_blocks = await get_all_host_blocks(conf_dir)
if not all_blocks:
print_warning("No hosts found. Cannot regenerate a key.")
return
# Display them in a table (similar to list_hosts):
import socket
from tabulate import tabulate
table_rows = []
for idx, block in enumerate(all_blocks, start=1):
host_label = block.get("Host", "N/A")
hostname = block.get("HostName", "N/A")
user = block.get("User", "N/A")
port = int(block.get("Port", "22"))
identity_file = block.get("IdentityFile", "N/A")
# Check IP
try:
ip_address = socket.gethostbyname(hostname)
port_open = await asyncio.wait_for(check_ssh_port(ip_address, port), timeout=1)
except:
ip_address = None
port_open = False
ip_disp = f"\033[0;32m{ip_address}\033[0m" if ip_address else "\033[0;31mN/A\033[0m"
port_disp = f"\033[0;32m{port}\033[0m" if port_open else f"\033[0;31m{port}\033[0m"
row = [
idx,
host_label,
user,
port_disp,
hostname,
ip_disp,
identity_file
]
table_rows.append(row)
headers = ["No.", "Host", "User", "Port", "HostName", "IP", "IdentityFile"]
print("\nSSH Conf Subdirectory Host List")
print(tabulate(table_rows, headers=headers, tablefmt="grid"))
# 2) Prompt for row # or label
choice = safe_input("Enter the row number or the Host label to regenerate: ")
if choice is None:
return
choice = choice.strip()
if not choice:
print_error("No choice given.")
return
target_block = None
if choice.isdigit():
row_idx = int(choice)
if row_idx < 1 or row_idx > len(all_blocks):
print_warning(f"Invalid row number {row_idx}.")
return
target_block = all_blocks[row_idx - 1]
else:
# user typed a label
for b in all_blocks:
if b.get("Host") == choice:
target_block = b
break
if not target_block:
print_warning(f"No matching host label '{choice}' found.")
return
# 3) Gather info from block
host_label = target_block.get("Host", "")
hostname = target_block.get("HostName", "")
user = target_block.get("User", "root")
port = int(target_block.get("Port", "22"))
identity_file = target_block.get("IdentityFile", "")
if not host_label or not hostname:
print_error("Missing Host or HostName; cannot regenerate.")
return
if not identity_file or identity_file == "N/A":
print_error("No IdentityFile found in config; cannot regenerate.")
return
# Derive local paths
expanded_key = os.path.expanduser(identity_file)
key_dir = os.path.dirname(expanded_key)
pub_path = expanded_key + ".pub"
# 3a) Read old pub key data
old_pub_data = ""
if os.path.isfile(pub_path):
try:
with open(pub_path, "r") as f:
old_pub_data = f.read().strip()
except Exception as e:
print_warning(f"Could not read old pub key: {e}")
else:
print_warning("No old pub key found locally.")
# 4) Remove old local key files
print_info("Removing old key files locally...")
for path in [expanded_key, pub_path]:
if os.path.isfile(path):
try:
os.remove(path)
print_info(f"Removed {path}")
except Exception as e:
print_warning(f"Could not remove {path}: {e}")
# 5) Generate new key
print_info("Generating new ed25519 SSH key...")
new_key_path = expanded_key # Reuse the same path from config
cmd = ["ssh-keygen", "-q", "-t", "ed25519", "-N", "", "-f", new_key_path]
try:
subprocess.check_call(cmd)
print_info(f"Generated new SSH key at {new_key_path}")
except subprocess.CalledProcessError as e:
print_error(f"Error generating new SSH key: {e}")
return
# 6) Copy new key to remote
copy_choice = safe_input("Copy new key to remote now? (y/n): ")
if copy_choice and copy_choice.lower().startswith('y'):
ssh_copy_cmd = ["ssh-copy-id", "-i", new_key_path]
if port != 22:
ssh_copy_cmd += ["-p", str(port)]
ssh_copy_cmd.append(f"{user}@{hostname}")
try:
subprocess.check_call(ssh_copy_cmd)
print_info("New key successfully copied to remote server.")
except subprocess.CalledProcessError as e:
print_error(f"Error copying new key: {e}")
# 7) Remove old key from authorized_keys if old_pub_data is non-empty
if old_pub_data:
print_info("Attempting to remove old key from remote authorized_keys...")
await remove_old_key_remote(old_pub_data, user, hostname, port)
else:
print_warning("No old pub key data found locally, skipping remote removal.")
async def remove_old_key_remote(old_pub_data, user, hostname, port):
"""
1) Check if old_pub_data is present on remote server (grep -q).
2) If found, remove it with grep -v ...
3) Otherwise, print "No old key found on remote."
"""
# 1) Check if old_pub_data exists in authorized_keys
check_cmd = [
"ssh",
"-o", "StrictHostKeyChecking=no",
"-p", str(port),
f"{user}@{hostname}",
f"grep -F '{old_pub_data}' ~/.ssh/authorized_keys"
]
found_key = False
try:
subprocess.check_call(check_cmd)
found_key = True
except subprocess.CalledProcessError:
# grep returns exit code 1 if not found
pass
if not found_key:
print_warning("No old key found on remote authorized_keys.")
return
# 2) Actually remove it
remove_cmd = [
"ssh",
"-o", "StrictHostKeyChecking=no",
"-p", str(port),
f"{user}@{hostname}",
f"grep -v '{old_pub_data}' ~/.ssh/authorized_keys > ~/.ssh/tmp && mv ~/.ssh/tmp ~/.ssh/authorized_keys"
]
try:
subprocess.check_call(remove_cmd)
print_info("Old public key removed from remote authorized_keys.")
except subprocess.CalledProcessError:
print_warning("Failed to remove old key from remote authorized_keys (permission or other error).")

186
remove_host.py Normal file
View file

@ -0,0 +1,186 @@
# ssh_manager/remove_host.py
import os
import glob
import subprocess
import asyncio
from collections import OrderedDict
from .utils import (
print_info,
print_warning,
print_error,
safe_input
)
from .list_hosts import load_config_file, check_ssh_port
async def get_all_host_blocks(conf_dir):
pattern = os.path.join(conf_dir, "*", "config")
conf_files = sorted(glob.glob(pattern))
all_blocks = []
for conf_file in conf_files:
blocks = load_config_file(conf_file)
all_blocks.extend(blocks)
return all_blocks
async def remove_host(conf_dir):
"""
Remove an SSH host by:
1) Listing all hosts
2) Letting user pick row number or label
3) Attempting to remove the old pub key from remote authorized_keys
4) Deleting the subdirectory in ~/.ssh/conf/<host_label>
"""
print_info("Remove Host - Step 1: Show current hosts...\n")
all_blocks = await get_all_host_blocks(conf_dir)
if not all_blocks:
print_warning("No hosts found. Cannot remove anything.")
return
# We'll display them in a table
import socket
from tabulate import tabulate
table_rows = []
for idx, block in enumerate(all_blocks, start=1):
host_label = block.get("Host", "N/A")
hostname = block.get("HostName", "N/A")
user = block.get("User", "N/A")
port = int(block.get("Port", "22"))
identity_file = block.get("IdentityFile", "N/A")
try:
ip_address = socket.gethostbyname(hostname)
port_open = await asyncio.wait_for(check_ssh_port(ip_address, port), timeout=1)
except:
ip_address = None
port_open = False
ip_disp = f"\033[0;32m{ip_address}\033[0m" if ip_address else "\033[0;31mN/A\033[0m"
port_disp = f"\033[0;32m{port}\033[0m" if port_open else f"\033[0;31m{port}\033[0m"
row = [
idx,
host_label,
user,
port_disp,
hostname,
ip_disp,
identity_file
]
table_rows.append(row)
headers = ["No.", "Host", "User", "Port", "HostName", "IP", "IdentityFile"]
print("\nSSH Conf Subdirectory Host List")
print(tabulate(table_rows, headers=headers, tablefmt="grid"))
# Prompt which host to remove
choice = safe_input("Enter the row number or Host label to remove: ")
if choice is None:
return
choice = choice.strip()
if not choice:
print_error("Invalid empty choice.")
return
target_block = None
if choice.isdigit():
idx = int(choice)
if idx < 1 or idx > len(all_blocks):
print_warning(f"Row number {idx} is invalid.")
return
target_block = all_blocks[idx - 1]
else:
# They typed a label
for b in all_blocks:
if b.get("Host") == choice:
target_block = b
break
if not target_block:
print_warning(f"No matching host label '{choice}' found.")
return
host_label = target_block.get("Host", "")
hostname = target_block.get("HostName", "")
user = target_block.get("User", "root")
port = int(target_block.get("Port", "22"))
identity_file = target_block.get("IdentityFile", "")
if not host_label:
print_warning("Target block has no Host label. Cannot remove.")
return
# Check IdentityFile for old pub key
if identity_file and identity_file != "N/A":
expanded_key = os.path.expanduser(identity_file)
pub_path = expanded_key + ".pub"
old_pub_data = ""
if os.path.isfile(pub_path):
try:
with open(pub_path, "r") as f:
# This is the EXACT line that might appear in authorized_keys
old_pub_data = f.read().rstrip("\n")
except Exception as e:
print_warning(f"Could not read old pub key: {e}")
if old_pub_data:
print_info("Attempting to remove old key from remote authorized_keys...")
await remove_old_key_remote(old_pub_data, user, hostname, port)
# Finally, remove the local subdirectory
host_dir = os.path.join(conf_dir, host_label)
if os.path.isdir(host_dir):
confirm = safe_input(f"Are you sure you want to delete local folder {host_dir}? (y/n): ")
if confirm and confirm.lower().startswith('y'):
try:
import shutil
shutil.rmtree(host_dir)
print_info(f"Removed local folder: {host_dir}")
except Exception as e:
print_warning(f"Could not remove folder {host_dir}: {e}")
else:
print_warning("Local folder not removed.")
else:
print_warning(f"Local host folder {host_dir} not found. Nothing to remove.")
async def remove_old_key_remote(old_pub_data, user, hostname, port):
"""
Remove the old key from remote authorized_keys if found, matching EXACT lines.
We'll do:
grep -Fxq for existence
grep -vFx to remove it
"""
# 1) Check if old_pub_data is present in authorized_keys (exact line)
check_cmd = [
"ssh", "-o", "StrictHostKeyChecking=no",
"-p", str(port),
f"{user}@{hostname}",
f"grep -Fxq \"{old_pub_data}\" ~/.ssh/authorized_keys"
]
found_key = False
try:
subprocess.check_call(check_cmd)
found_key = True
except subprocess.CalledProcessError:
pass
if not found_key:
print_warning("No old key found on remote authorized_keys.")
return
# 2) Actually remove it by ignoring EXACT matches to that line
remove_cmd = [
"ssh", "-o", "StrictHostKeyChecking=no",
"-p", str(port),
f"{user}@{hostname}",
f"grep -vFx \"{old_pub_data}\" ~/.ssh/authorized_keys > ~/.ssh/tmp && mv ~/.ssh/tmp ~/.ssh/authorized_keys"
]
try:
subprocess.check_call(remove_cmd)
print_info("Old public key removed from remote authorized_keys.")
except subprocess.CalledProcessError:
print_warning("Failed to remove old key from remote authorized_keys (permissions or other error).")

View file

@ -1,5 +1,7 @@
# ssh_manager/utils.py # ssh_manager/utils.py
import sys
class Colors: class Colors:
GREEN = "\033[0;32m" GREEN = "\033[0;32m"
RED = "\033[0;31m" RED = "\033[0;31m"
@ -16,3 +18,13 @@ def print_warning(message):
def print_info(message): def print_info(message):
print(f"{Colors.GREEN}{Colors.BOLD}[✔] {Colors.RESET}{message}") print(f"{Colors.GREEN}{Colors.BOLD}[✔] {Colors.RESET}{message}")
def safe_input(prompt=""):
"""
A wrapper around input() that exits the entire script on Ctrl+C.
"""
try:
return input(prompt)
except KeyboardInterrupt:
print_info("\nExiting on Ctrl+C.")
sys.exit(130) # Conventionally 130 indicates "terminated by Ctrl+C"