import os import subprocess import asyncio from pathlib import Path from typing import Optional, List, Dict, Tuple, Any from .utils import ( print_info, print_warning, print_error, safe_input ) from .list_hosts import ( build_host_list_table, load_config_file, gather_host_info, sort_by_ip ) def validate_key_path(key_path: str) -> bool: """Validate that the key path and its directory are valid.""" try: key_path = Path(key_path) key_dir = key_path.parent if not key_dir.exists(): key_dir.mkdir(mode=0o700, parents=True) elif not os.access(str(key_dir), os.W_OK): print_error(f"No write permission for directory: {key_dir}") return False return True except Exception as e: print_error(f"Error validating key path: {e}") return False def generate_new_key(key_path: str, user: str, hostname: str, port: int) -> bool: """Generate a new ED25519 SSH key and optionally copy it to the remote server.""" if not validate_key_path(key_path): return False print_info("Generating new ed25519 SSH key...") try: subprocess.check_call([ "ssh-keygen", "-q", "-t", "ed25519", "-N", "", "-f", key_path ]) except subprocess.CalledProcessError as e: print_error(f"Error generating new SSH key: {e}") return False print_info(f"Generated new SSH key at {key_path}") # Ask to copy the key copy_choice = safe_input("Copy new key to remote now? (y/n): ") if not copy_choice or not copy_choice.lower().startswith('y'): return True try: ssh_copy_cmd = ["ssh-copy-id", "-i", key_path] if port != 22: ssh_copy_cmd += ["-p", str(port)] ssh_copy_cmd.append(f"{user}@{hostname}") subprocess.check_call(ssh_copy_cmd) print_info("New key successfully copied to remote server.") return True except subprocess.CalledProcessError as e: print_error(f"Error copying new key: {e}") return False def update_config_with_key(config_path: Path, new_key_path: str) -> bool: """Update the SSH config file with the new identity file.""" try: with open(config_path, 'r') as f: config_lines = [ line.rstrip('\n') for line in f if not line.strip().lower().startswith('identityfile') ] config_lines.append(f" IdentityFile {new_key_path}") with open(config_path, 'w') as f: f.write('\n'.join(config_lines) + '\n') print_info(f"Updated config file with new IdentityFile: {new_key_path}") return True except Exception as e: print_error(f"Failed to update config file: {e}") return False def find_target_host(sorted_rows: List[Tuple], choice: str) -> Optional[Tuple]: """Find the target host based on user choice.""" if not choice: print_error("No choice given.") return None if choice.isdigit(): row_idx = int(choice) if row_idx < 1 or row_idx > len(sorted_rows): print_warning(f"Invalid row number {row_idx}.") return None return sorted_rows[row_idx - 1] # User typed a label for row in sorted_rows: if row[0] == choice: # row[0] is host_label return row print_warning(f"No matching host label '{choice}' found.") return None def remove_key_files(key_paths: List[str]) -> None: """Remove SSH key files.""" for path in key_paths: if os.path.isfile(path): try: os.remove(path) print_info(f"Removed {path}") except Exception as e: print_warning(f"Could not remove {path}: {e}") async def regenerate_key(conf_dir: str) -> bool: """ Regenerate the SSH key for a chosen host by: 1) Displaying the unified table of hosts 2) Letting you pick a row number or host label 3) Reading/deleting any existing local keys 4) Generating a new key 5) Optionally copying it to the remote 6) Removing the old pub key from the remote authorized_keys if present Returns True if key was successfully regenerated, False otherwise. """ print_info("Regenerate Key - Step 1: Show current hosts...\n") # Get host list headers, final_data = await build_host_list_table(conf_dir) if not final_data: print_warning("No hosts found. Cannot regenerate a key.") return False # Display host table from tabulate import tabulate print("\nSSH Conf Subdirectory Host List (Sorted by IP Ascending)") print(tabulate(final_data, headers=headers, tablefmt="grid")) # Get host blocks and sort them all_blocks = [] pattern = os.path.join(conf_dir, "*", "config") for cfile in glob.glob(pattern): all_blocks.extend(load_config_file(cfile)) results = await gather_host_info(all_blocks) sorted_rows = sort_by_ip(results) # Get user choice choice = safe_input("Enter the row number or the Host label to regenerate: ") if choice is None: return False target_tuple = find_target_host(sorted_rows, choice.strip()) if not target_tuple: return False # Get host information host_label, user, _, hostname, *_ = target_tuple # Find config block found_block = next( (b for b in all_blocks if b.get("Host") == host_label), None ) if not found_block: print_warning(f"No config block found for '{host_label}'.") return False port = int(found_block.get("Port", "22")) identity_file = found_block.get("IdentityFile", "") # Handle missing identity file if not identity_file or identity_file == "N/A": print_warning("No existing SSH key found in the configuration.") gen_choice = safe_input("Would you like to generate a new key? (y/n): ") if not gen_choice or not gen_choice.lower().startswith('y'): return False # Set up new key path and generate key host_dir = Path(conf_dir) / host_label host_dir.mkdir(mode=0o700, exist_ok=True) new_key_path = str(host_dir / "id_ed25519") if not generate_new_key(new_key_path, user, hostname, port): return False # Update config with new key config_path = host_dir / "config" return update_config_with_key(config_path, new_key_path) # Handle existing key regeneration expanded_key = os.path.expanduser(identity_file) pub_path = expanded_key + ".pub" old_pub_data = "" # Try to read old public key if os.path.isfile(pub_path): try: old_pub_data = Path(pub_path).read_text().rstrip("\n") except Exception as e: print_warning(f"Could not read old pub key: {e}") # Remove old key files print_info("Removing old key files locally...") remove_key_files([expanded_key, pub_path]) # Generate new key if not generate_new_key(expanded_key, user, hostname, port): return False # Remove old key from remote if we had it if old_pub_data: print_info("Attempting to remove old key from remote authorized_keys...") await remove_old_key_remote(old_pub_data, user, hostname, port) else: print_warning("No old pub key data found locally, skipping remote removal.") return True async def remove_old_key_remote(old_pub_data: str, user: str, hostname: str, port: int) -> bool: """ Remove the old public key from remote authorized_keys file. Returns True if successful, False otherwise. """ # Escape the public key data for shell safety escaped_key = old_pub_data.replace('"', '\\"') # First check if authorized_keys exists check_file_cmd = [ "ssh", "-o", "StrictHostKeyChecking=no", "-p", str(port), f"{user}@{hostname}", "test -f ~/.ssh/authorized_keys && echo 'exists'" ] try: result = subprocess.run(check_file_cmd, capture_output=True, text=True) if result.returncode != 0 or 'exists' not in result.stdout: print_warning("No authorized_keys file found on remote host.") return False except subprocess.CalledProcessError as e: print_error(f"Error checking authorized_keys: {e}") return False # Create a temporary file for the sed script temp_script = """#!/bin/bash set -e KEYS_FILE="$HOME/.ssh/authorized_keys" TEMP_FILE="$HOME/.ssh/authorized_keys.tmp" grep -Fxv "$1" "$KEYS_FILE" > "$TEMP_FILE" mv "$TEMP_FILE" "$KEYS_FILE" chmod 600 "$KEYS_FILE" """ # Create a temporary script on the remote host setup_cmd = [ "ssh", "-o", "StrictHostKeyChecking=no", "-p", str(port), f"{user}@{hostname}", f'cat > ~/.ssh/remove_key.sh << \'EOF\'\n{temp_script}\nEOF\n' f'chmod +x ~/.ssh/remove_key.sh' ] try: subprocess.run(setup_cmd, check=True, capture_output=True) except subprocess.CalledProcessError as e: print_error(f"Failed to create temporary script: {e}") return False # Execute the script with the key remove_cmd = [ "ssh", "-o", "StrictHostKeyChecking=no", "-p", str(port), f"{user}@{hostname}", f'~/.ssh/remove_key.sh "{escaped_key}"' ] try: subprocess.run(remove_cmd, check=True, capture_output=True) print_info("Old public key removed from remote authorized_keys.") # Clean up the temporary script cleanup_cmd = [ "ssh", "-o", "StrictHostKeyChecking=no", "-p", str(port), f"{user}@{hostname}", "rm -f ~/.ssh/remove_key.sh" ] subprocess.run(cleanup_cmd, check=True, capture_output=True) return True except subprocess.CalledProcessError as e: print_error(f"Failed to remove old key: {e}") # Try to clean up even if removal failed try: subprocess.run(cleanup_cmd, check=True, capture_output=True) except: pass return False except Exception as e: print_error(f"Unexpected error removing old key: {e}") return False