Files
ProgressionMods/progression_tui.py
T

1110 lines
49 KiB
Python

#!/usr/bin/env python3
"""
Progression Loader - Terminal User Interface (TUI)
A text-based interface for managing RimWorld Steam Workshop collections
"""
import os
import sys
import time
import threading
import xml.etree.ElementTree as ET
from pathlib import Path
from dotenv import load_dotenv
import webbrowser
import requests
import re
from urllib.parse import urlparse, parse_qs
# Rich imports for TUI
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from rich.layout import Layout
from rich.live import Live
from rich.prompt import Prompt, Confirm
from rich.table import Table
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.align import Align
from rich.columns import Columns
from rich import box
# Import existing modules
from update_checker import UpdateChecker
from update_config import get_update_config
# Load environment variables
load_dotenv()
console = Console()
class ProgressionTUI:
def __init__(self):
self.console = console
self.rimworld_path = ""
self.workshop_path = ""
self.modsconfig_path = ""
self.is_rimworld_valid = False
self.expansion_check_results = {}
self.subscription_status = {
"Core Collection": False,
"Content Collection": False,
"Cosmetics Collection": False
}
# Collection URLs
self.collections = {
"Core Collection": os.getenv('CORE_COLLECTION_URL', 'https://steamcommunity.com/workshop/filedetails/?id=3521297585'),
"Content Collection": os.getenv('CONTENT_COLLECTION_URL', 'https://steamcommunity.com/sharedfiles/filedetails/?id=3521319712'),
"Cosmetics Collection": os.getenv('COSMETICS_COLLECTION_URL', 'https://steamcommunity.com/sharedfiles/filedetails/?id=3637541646')
}
# Load ASCII art
self.load_ascii_art()
# Initialize ModsConfig path
self.find_modsconfig_path()
def load_ascii_art(self):
"""Load ASCII art from file"""
try:
with open('art/ASCIIArt.txt', 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
# Find the second ASCII art section (starts with @@@@@@@@%@@@@@@@@)
second_art_start = None
for i, line in enumerate(lines):
if '@@@@@@@@%@@@@@@@@' in line:
second_art_start = i
break
if second_art_start is not None:
# Extract the second ASCII art section
# Look for the end of this section (where it becomes mostly empty or different pattern)
second_art_lines = []
for i in range(second_art_start, len(lines)):
line = lines[i]
# Stop when we hit the third section (Unicode blocks) or too many empty lines
if '█████████████████' in line or '░░░░' in line:
break
# Also stop if we hit the backtick pattern
if '```````' in line:
break
second_art_lines.append(line)
# Remove trailing empty lines
while second_art_lines and second_art_lines[-1].strip() == '':
second_art_lines.pop()
self.progression_logo = '\n'.join(second_art_lines)
self.progression_title = "PROGRESSION LOADER"
self.hr_systems_logo = "Hudson Riggs Systems"
else:
# Fallback if parsing fails
self.progression_logo = "PROGRESSION"
self.progression_title = "PROGRESSION LOADER"
self.hr_systems_logo = "Hudson Riggs Systems"
except Exception as e:
# Fallback ASCII art
self.progression_logo = "PROGRESSION"
self.progression_title = "PROGRESSION LOADER"
self.hr_systems_logo = "Hudson Riggs Systems"
def display_header(self):
"""Display the header with ASCII art"""
# Use the actual progression logo ASCII art instead of just text
header_panel = Panel(
Align.center(self.progression_logo),
box=box.DOUBLE,
style="cyan",
padding=(1, 2)
)
return header_panel
def display_footer(self):
"""Display footer with HR Systems info"""
footer_text = Text()
footer_text.append("Hudson Riggs Systems", style="dim white")
footer_text.append(" - ", style="dim white")
footer_text.append("https://hudsonriggs.systems", style="dim blue underline")
return Align.center(footer_text)
def clear_screen(self):
"""Clear the terminal screen"""
os.system('cls' if os.name == 'nt' else 'clear')
def show_main_menu(self):
"""Display the main menu and handle user selection"""
while True:
self.clear_screen()
# Display header
self.console.print(self.display_header())
self.console.print()
# Create main menu options
menu_table = Table(show_header=False, box=box.SIMPLE, padding=(0, 2))
menu_table.add_column("Option", style="cyan bold", width=4)
menu_table.add_column("Description", style="white")
menu_table.add_column("Status", style="green", width=15)
# Menu options with status indicators
rimworld_status = "Valid" if self.is_rimworld_valid else "Not Set"
rimworld_color = "green" if self.is_rimworld_valid else "red"
menu_table.add_row("1", "Set RimWorld Game Folder", f"[{rimworld_color}]{rimworld_status}[/{rimworld_color}]")
menu_table.add_row("2", "Check Steam Workshop Subscriptions", "")
menu_table.add_row("3", "Check RimWorld Expansions", "")
menu_table.add_row("4", "Load Progression Pack Complete", "Enabled" if self.is_rimworld_valid else "[dim]Disabled[/dim]")
menu_table.add_row("5", "Merge with Current Mods Config", "Enabled" if self.is_rimworld_valid else "[dim]Disabled[/dim]")
menu_table.add_row("6", "Check for Updates", "")
menu_table.add_row("7", "View Configuration", "")
menu_table.add_row("0", "Exit", "")
menu_panel = Panel(
menu_table,
title="Main Menu",
box=box.ROUNDED,
style="white"
)
self.console.print(menu_panel)
self.console.print()
# Display current paths if set
if self.rimworld_path or self.workshop_path:
info_table = Table(show_header=False, box=None, padding=(0, 1))
info_table.add_column("Label", style="cyan", width=25)
info_table.add_column("Path", style="white")
if self.rimworld_path:
info_table.add_row("RimWorld Path:", self.rimworld_path)
if self.workshop_path:
info_table.add_row("Workshop Path:", self.workshop_path)
if self.modsconfig_path:
info_table.add_row("ModsConfig Path:", self.modsconfig_path)
self.console.print(Panel(info_table, title="Current Configuration", style="dim"))
self.console.print()
# Display footer
self.console.print(self.display_footer())
self.console.print()
# Get user choice
try:
choice = Prompt.ask("Select an option", choices=["0", "1", "2", "3", "4", "5", "6", "7"], default="1")
if choice == "0":
self.console.print("\n[yellow]Thank you for using Progression Loader![/yellow]")
break
elif choice == "1":
self.set_rimworld_path()
elif choice == "2":
self.check_subscriptions()
elif choice == "3":
self.check_expansions()
elif choice == "4":
if self.is_rimworld_valid:
self.load_progression_pack()
else:
self.console.print("\n[red]Please set a valid RimWorld path first![/red]")
self.console.input("\nPress Enter to continue...")
elif choice == "5":
if self.is_rimworld_valid:
self.merge_with_current_mods()
else:
self.console.print("\n[red]Please set a valid RimWorld path first![/red]")
self.console.input("\nPress Enter to continue...")
elif choice == "6":
self.check_for_updates()
elif choice == "7":
self.view_configuration()
except KeyboardInterrupt:
self.console.print("\n\n[yellow]Goodbye![/yellow]")
break
except Exception as e:
self.console.print(f"\n[red]Error: {e}[/red]")
self.console.input("\nPress Enter to continue...")
def set_rimworld_path(self):
"""Set the RimWorld game folder path"""
self.clear_screen()
self.console.print(self.display_header())
self.console.print()
# Instructions
instructions = Panel(
Text.from_markup(
"To find your RimWorld path:\n"
"1. Open Steam\n"
"2. Right-click RimWorld in your library\n"
"3. Select 'Manage' > 'Browse local files'\n"
"4. Copy the path from the address bar\n\n"
"Example: C:\\Steam\\steamapps\\common\\RimWorld"
),
title="Instructions",
style="cyan"
)
self.console.print(instructions)
self.console.print()
# Current path if set
if self.rimworld_path:
current_panel = Panel(
f"Current path: {self.rimworld_path}",
title="Current RimWorld Path",
style="green" if self.is_rimworld_valid else "red"
)
self.console.print(current_panel)
self.console.print()
# Get new path
new_path = Prompt.ask("Enter RimWorld game folder path (or press Enter to keep current)", default=self.rimworld_path)
if new_path and new_path != self.rimworld_path:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=self.console
) as progress:
task = progress.add_task("Validating RimWorld path...", total=None)
if self.validate_rimworld_path(new_path):
self.rimworld_path = new_path
self.is_rimworld_valid = True
# Derive workshop path
self.derive_workshop_path()
progress.update(task, description="[green]Valid RimWorld installation found![/green]")
time.sleep(1)
self.console.print(f"\n[green]RimWorld path set successfully![/green]")
self.console.print(f"Workshop path: {self.workshop_path}")
else:
progress.update(task, description="[red]Invalid RimWorld path![/red]")
time.sleep(1)
self.console.print(f"\n[red]Invalid RimWorld installation at: {new_path}[/red]")
self.console.print("[red]Please ensure the path contains RimWorld.exe and Data folder[/red]")
self.console.input("\nPress Enter to continue...")
def validate_rimworld_path(self, path):
"""Validate if the given path is a valid RimWorld installation"""
if not path or not os.path.exists(path):
return False
# Check for RimWorld executable
rimworld_exe = os.path.join(path, "RimWorld.exe")
rimworld_win64_exe = os.path.join(path, "RimWorldWin64.exe")
if not (os.path.exists(rimworld_exe) or os.path.exists(rimworld_win64_exe)):
return False
# Check for Data folder
data_folder = os.path.join(path, "Data")
if not os.path.exists(data_folder):
return False
# Check for Core mod in Data folder
core_folder = os.path.join(data_folder, "Core")
if not os.path.exists(core_folder):
return False
return True
def derive_workshop_path(self):
"""Derive workshop path from RimWorld path"""
if "steamapps" in self.rimworld_path.lower():
parts = self.rimworld_path.split(os.sep)
try:
steamapps_index = next(i for i, part in enumerate(parts) if part.lower() == 'steamapps')
workshop_parts = parts[:steamapps_index + 1] + ['workshop', 'content', '294100']
self.workshop_path = os.sep.join(workshop_parts)
except (StopIteration, IndexError):
self.workshop_path = "Invalid RimWorld path - should contain 'steamapps'"
else:
self.workshop_path = "Invalid RimWorld path - should contain 'steamapps'"
def check_subscriptions(self):
"""Check Steam Workshop subscription status"""
self.clear_screen()
self.console.print(self.display_header())
self.console.print()
# Display collections
collections_table = Table(show_header=True, box=box.ROUNDED)
collections_table.add_column("Collection", style="cyan bold", width=20)
collections_table.add_column("URL", style="blue", width=50)
collections_table.add_column("Subscribed", style="green", width=12)
for name, url in self.collections.items():
status = "Yes" if self.subscription_status[name] else "No"
status_color = "green" if self.subscription_status[name] else "red"
collections_table.add_row(name, url, f"[{status_color}]{status}[/{status_color}]")
collections_panel = Panel(
collections_table,
title="Steam Workshop Collections",
style="white"
)
self.console.print(collections_panel)
self.console.print()
# Instructions
instructions = Panel(
Text.from_markup(
"To subscribe to collections:\n"
"1. Copy the URL for each collection\n"
"2. Open it in your web browser\n"
"3. Click 'Subscribe' on the Steam Workshop page\n"
"4. Wait for Steam to download the collection\n"
"5. Return here and mark as subscribed\n\n"
"[yellow]All collections are required for the complete Progression experience.[/yellow]"
),
title="Instructions",
style="cyan"
)
self.console.print(instructions)
self.console.print()
# Menu options
while True:
menu_table = Table(show_header=False, box=box.SIMPLE, padding=(0, 2))
menu_table.add_column("Option", style="cyan bold", width=4)
menu_table.add_column("Description", style="white")
menu_table.add_row("1", "Open Core Collection in browser")
menu_table.add_row("2", "Open Content Collection in browser")
menu_table.add_row("3", "Open Cosmetics Collection in browser")
menu_table.add_row("4", "Mark Core Collection as subscribed")
menu_table.add_row("5", "Mark Content Collection as subscribed")
menu_table.add_row("6", "Mark Cosmetics Collection as subscribed")
menu_table.add_row("7", "Check if all subscribed")
menu_table.add_row("0", "Back to main menu")
self.console.print(Panel(menu_table, title="Subscription Menu", style="white"))
self.console.print()
choice = Prompt.ask("Select an option", choices=["0", "1", "2", "3", "4", "5", "6", "7"])
if choice == "0":
break
elif choice == "1":
webbrowser.open(self.collections["Core Collection"])
self.console.print("[green]Opened Core Collection in browser[/green]")
elif choice == "2":
webbrowser.open(self.collections["Content Collection"])
self.console.print("[green]Opened Content Collection in browser[/green]")
elif choice == "3":
webbrowser.open(self.collections["Cosmetics Collection"])
self.console.print("[green]Opened Cosmetics Collection in browser[/green]")
elif choice == "4":
self.subscription_status["Core Collection"] = True
self.console.print("[green]Core Collection marked as subscribed[/green]")
elif choice == "5":
self.subscription_status["Content Collection"] = True
self.console.print("[green]Content Collection marked as subscribed[/green]")
elif choice == "6":
self.subscription_status["Cosmetics Collection"] = True
self.console.print("[green]Cosmetics Collection marked as subscribed[/green]")
elif choice == "7":
all_subscribed = all(self.subscription_status.values())
if all_subscribed:
self.console.print("[green]All collections are subscribed! You can proceed.[/green]")
else:
missing = [name for name, status in self.subscription_status.items() if not status]
self.console.print(f"[red]Missing subscriptions: {', '.join(missing)}[/red]")
self.console.print()
def check_expansions(self):
"""Check RimWorld expansions"""
self.clear_screen()
self.console.print(self.display_header())
self.console.print()
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=self.console
) as progress:
task = progress.add_task("Checking RimWorld expansions...", total=None)
# Check expansions
self.check_expansions_thread()
progress.update(task, description="[green]Expansion check complete![/green]")
time.sleep(1)
# Display results
if self.expansion_check_results:
expansion_table = Table(show_header=True, box=box.ROUNDED)
expansion_table.add_column("Expansion", style="cyan bold", width=15)
expansion_table.add_column("Status", style="white", width=10)
missing_expansions = []
for expansion, owned in self.expansion_check_results.items():
status = "Owned" if owned else "Missing"
status_color = "green" if owned else "red"
expansion_table.add_row(expansion, f"[{status_color}]{status}[/{status_color}]")
if not owned:
missing_expansions.append(expansion)
self.console.print(Panel(expansion_table, title="RimWorld Expansions", style="white"))
self.console.print()
if missing_expansions:
warning_text = Text()
warning_text.append("WARNING: ", style="bold red")
warning_text.append(f"You don't own {', '.join(missing_expansions)}!\n", style="red")
warning_text.append("Some progression mods may not work without these expansions.", style="yellow")
warning_panel = Panel(warning_text, title="Missing Expansions", style="red")
self.console.print(warning_panel)
else:
success_panel = Panel(
"[green]All RimWorld expansions detected![/green]",
title="Expansion Check",
style="green"
)
self.console.print(success_panel)
else:
self.console.print("[yellow]Could not check expansions. Please ensure RimWorld path is set correctly.[/yellow]")
self.console.input("\nPress Enter to continue...")
def check_expansions_thread(self):
"""Check which expansions are owned (thread-safe version)"""
try:
modsconfig_path = self.find_modsconfig_path()
if not modsconfig_path or not os.path.exists(modsconfig_path):
# Default to all missing if no ModsConfig found
self.expansion_check_results = {
"Royalty": False,
"Ideology": False,
"Biotech": False,
"Anomaly": False
}
else:
self.expansion_check_results = self.parse_known_expansions(modsconfig_path)
except Exception as e:
self.console.print(f"[red]Error checking expansions: {e}[/red]")
self.expansion_check_results = {
"Royalty": False,
"Ideology": False,
"Biotech": False,
"Anomaly": False
}
def find_modsconfig_path(self):
"""Find ModsConfig.xml file"""
try:
path_template = os.getenv('MODSCONFIG_PATH_TEMPLATE',
r'%USERPROFILE%\AppData\LocalLow\Ludeon Studios\RimWorld by Ludeon Studios\Config\ModsConfig.xml')
modsconfig_path = os.path.expandvars(path_template)
if os.path.exists(modsconfig_path):
self.modsconfig_path = modsconfig_path
return modsconfig_path
return None
except Exception:
return None
def parse_known_expansions(self, modsconfig_path):
"""Parse ModsConfig.xml to check which expansions are owned"""
results = {
"Royalty": False,
"Ideology": False,
"Biotech": False,
"Anomaly": False
}
try:
tree = ET.parse(modsconfig_path)
root = tree.getroot()
known_expansions = root.find('knownExpansions')
if known_expansions is not None:
for li in known_expansions.findall('li'):
expansion_id = li.text
if expansion_id:
real_name = self.extract_name_from_id(expansion_id.strip())
if real_name in results:
results[real_name] = True
except Exception as e:
self.console.print(f"[red]Error parsing ModsConfig.xml: {e}[/red]")
return results
def extract_name_from_id(self, expansion_id):
"""Extract expansion name from ID"""
if 'royalty' in expansion_id.lower():
return 'Royalty'
elif 'ideology' in expansion_id.lower():
return 'Ideology'
elif 'biotech' in expansion_id.lower():
return 'Biotech'
elif 'anomaly' in expansion_id.lower():
return 'Anomaly'
else:
parts = expansion_id.split('.')
if len(parts) > 1:
return parts[-1].capitalize()
return expansion_id
def extract_workshop_id(self, url):
"""Extract workshop ID from Steam URL"""
if not url:
return None
# Clean up steam:// protocol URLs
if url.startswith('steam://openurl/'):
url = url.replace('steam://openurl/', '')
# Parse URL to extract ID
try:
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
if 'id' in query_params:
return query_params['id'][0]
# Try to extract from path for different URL formats
path_parts = parsed_url.path.split('/')
if 'filedetails' in path_parts:
# Look for id parameter in URL
id_match = re.search(r'[?&]id=(\d+)', url)
if id_match:
return id_match.group(1)
except Exception:
pass
return None
def fetch_collection_items(self, collection_id):
"""Fetch workshop IDs from a Steam Workshop collection"""
if not collection_id:
return []
try:
# Steam Workshop collection URL
url = f"https://steamcommunity.com/sharedfiles/filedetails/?id={collection_id}"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
# Extract workshop IDs from the HTML
workshop_ids = []
id_pattern = r'https://steamcommunity\.com/sharedfiles/filedetails/\?id=(\d+)'
matches = re.findall(id_pattern, response.text)
for match in matches:
if match != collection_id: # Don't include the collection itself
workshop_ids.append(match)
return list(set(workshop_ids)) # Remove duplicates
except Exception as e:
self.console.print(f"[red]Error fetching collection {collection_id}: {e}[/red]")
return []
def is_package_id_valid(self, package_id):
"""Check if a package ID is valid (not an error condition)"""
error_conditions = [
"About.xml not found",
"packageId not found",
"XML parse error",
"Workshop path not found"
]
return package_id not in error_conditions and not package_id.startswith("Error:")
def get_package_info_from_about_xml(self, workshop_id):
"""Extract package name and steam app ID from About/About.xml file"""
try:
if not self.workshop_path or not os.path.exists(self.workshop_path):
return "Workshop path not found", None
about_xml_path = os.path.join(self.workshop_path, workshop_id, "About", "About.xml")
if not os.path.exists(about_xml_path):
return "About.xml not found", None
# Parse the XML file
tree = ET.parse(about_xml_path)
root = tree.getroot()
# Look for packageId element
package_id_element = root.find('packageId')
if package_id_element is None or not package_id_element.text:
return "packageId not found", None
# Look for steamAppId element
steam_app_id_element = root.find('steamAppId')
steam_app_id = None
if steam_app_id_element is not None and steam_app_id_element.text:
steam_app_id = steam_app_id_element.text.strip()
# Return package ID in lowercase for consistency
package_id = package_id_element.text.strip().lower()
return package_id, steam_app_id
except ET.ParseError:
return "XML parse error", None
except Exception as e:
return f"Error: {str(e)}", None
def get_package_name_from_about_xml(self, workshop_id):
"""Extract package name from About/About.xml file (backward compatibility)"""
package_id, _ = self.get_package_info_from_about_xml(workshop_id)
return package_id
def load_progression_pack(self):
"""Load all progression pack collections"""
self.clear_screen()
self.console.print(self.display_header())
self.console.print()
# Check prerequisites
all_subscribed = all(self.subscription_status.values())
if not all_subscribed:
missing = [name for name, status in self.subscription_status.items() if not status]
error_panel = Panel(
f"[red]Please subscribe to all collections first!\nMissing: {', '.join(missing)}[/red]",
title="Prerequisites Not Met",
style="red"
)
self.console.print(error_panel)
self.console.input("\nPress Enter to continue...")
return
# Confirm action
confirm_text = Text()
confirm_text.append("This will replace your current ModsConfig.xml with the complete Progression pack.\n", style="yellow")
confirm_text.append("Your current mod configuration will be backed up.", style="white")
confirm_panel = Panel(confirm_text, title="Confirmation", style="yellow")
self.console.print(confirm_panel)
self.console.print()
if not Confirm.ask("Do you want to continue?"):
return
# Load and validate mod pack
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=self.console
) as progress:
task1 = progress.add_task("Backing up current configuration...", total=None)
time.sleep(1)
progress.update(task1, description="[green]Configuration backed up[/green]")
task2 = progress.add_task("Fetching collection workshop IDs...", total=None)
# Get all workshop IDs from collections
all_workshop_ids = []
try:
for collection_name, url in self.collections.items():
progress.update(task2, description=f"Fetching {collection_name}...")
workshop_ids = self.fetch_collection_items(self.extract_workshop_id(url))
all_workshop_ids.extend(workshop_ids)
time.sleep(0.5)
progress.update(task2, description="[green]Collections fetched[/green]")
task3 = progress.add_task("Validating package IDs...", total=None)
# Validate package IDs
failed_mods = []
valid_mods = []
for workshop_id in sorted(set(all_workshop_ids)):
package_id, steam_app_id = self.get_package_info_from_about_xml(workshop_id)
if not self.is_package_id_valid(package_id):
failed_mods.append((workshop_id, package_id))
else:
# Use steamAppId from XML if available, otherwise use workshop_id
effective_steam_id = steam_app_id if steam_app_id else workshop_id
valid_mods.append((effective_steam_id, package_id))
progress.update(task3, description="[green]Package ID validation complete[/green]")
# Check if there are any failed mods
if failed_mods:
progress.stop()
self.console.print()
# Display failed mods
error_table = Table(show_header=True, box=box.ROUNDED)
error_table.add_column("Workshop ID", style="red bold", width=15)
error_table.add_column("Error", style="red", width=40)
for workshop_id, error in failed_mods:
error_table.add_row(workshop_id, error)
error_panel = Panel(
error_table,
title=f"[red]Package ID Validation Failed - {len(failed_mods)} mod(s) have issues[/red]",
style="red"
)
self.console.print(error_panel)
self.console.print()
# Show summary
summary_text = Text()
summary_text.append(f"{len(failed_mods)} mods failed validation\n", style="red")
summary_text.append(f"{len(valid_mods)} mods passed validation\n", style="green")
summary_text.append("\nThe mod pack cannot be loaded until all package ID issues are resolved.\n", style="yellow")
summary_text.append("This usually means the mods are not properly downloaded or corrupted.", style="dim")
summary_panel = Panel(summary_text, title="Validation Summary", style="yellow")
self.console.print(summary_panel)
self.console.input("\nPress Enter to acknowledge and return to main menu...")
return
# All mods validated successfully, continue with loading
task4 = progress.add_task("Loading Core Collection...", total=None)
time.sleep(1)
progress.update(task4, description="[green]Core Collection loaded[/green]")
task5 = progress.add_task("Loading Content Collection...", total=None)
time.sleep(1)
progress.update(task5, description="[green]Content Collection loaded[/green]")
task6 = progress.add_task("Loading Cosmetics Collection...", total=None)
time.sleep(1)
progress.update(task6, description="[green]Cosmetics Collection loaded[/green]")
task7 = progress.add_task("Finalizing configuration...", total=None)
time.sleep(1)
progress.update(task7, description="[green]Progression Pack loaded successfully![/green]")
time.sleep(1)
except Exception as e:
progress.stop()
self.console.print()
error_panel = Panel(
f"[red]Error during mod pack loading: {str(e)}[/red]",
title="Loading Error",
style="red"
)
self.console.print(error_panel)
self.console.input("\nPress Enter to acknowledge and return to main menu...")
return
success_panel = Panel(
"[green]Progression Pack Complete has been loaded successfully!\n"
"Your RimWorld is now configured with all progression mods.[/green]",
title="Success",
style="green"
)
self.console.print(success_panel)
self.console.input("\nPress Enter to continue...")
def merge_with_current_mods(self):
"""Merge current active mods with Steam collection mods"""
self.clear_screen()
self.console.print(self.display_header())
self.console.print()
# Check prerequisites
all_subscribed = all(self.subscription_status.values())
if not all_subscribed:
missing = [name for name, status in self.subscription_status.items() if not status]
error_panel = Panel(
f"[red]Please subscribe to all collections first!\nMissing: {', '.join(missing)}[/red]",
title="Prerequisites Not Met",
style="red"
)
self.console.print(error_panel)
self.console.input("\nPress Enter to continue...")
return
# Confirm action
confirm_text = Text()
confirm_text.append("This will merge your current mods with the Progression pack collections.\n", style="yellow")
confirm_text.append("Your current mod configuration will be preserved and extended.", style="white")
confirm_panel = Panel(confirm_text, title="Confirmation", style="yellow")
self.console.print(confirm_panel)
self.console.print()
if not Confirm.ask("Do you want to continue?"):
return
# Merge and validate mod pack
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=self.console
) as progress:
task1 = progress.add_task("Reading current mod configuration...", total=None)
time.sleep(1)
progress.update(task1, description="[green]Current configuration read[/green]")
task2 = progress.add_task("Fetching collection workshop IDs...", total=None)
# Get all workshop IDs from collections
all_workshop_ids = []
try:
for collection_name, url in self.collections.items():
progress.update(task2, description=f"Fetching {collection_name}...")
workshop_ids = self.fetch_collection_items(self.extract_workshop_id(url))
all_workshop_ids.extend(workshop_ids)
time.sleep(0.5)
progress.update(task2, description="[green]Collections fetched[/green]")
task3 = progress.add_task("Validating package IDs...", total=None)
# Validate package IDs
failed_mods = []
valid_mods = []
for workshop_id in sorted(set(all_workshop_ids)):
package_id, steam_app_id = self.get_package_info_from_about_xml(workshop_id)
if not self.is_package_id_valid(package_id):
failed_mods.append((workshop_id, package_id))
else:
# Use steamAppId from XML if available, otherwise use workshop_id
effective_steam_id = steam_app_id if steam_app_id else workshop_id
valid_mods.append((effective_steam_id, package_id))
progress.update(task3, description="[green]Package ID validation complete[/green]")
# Check if there are any failed mods
if failed_mods:
progress.stop()
self.console.print()
# Display failed mods
error_table = Table(show_header=True, box=box.ROUNDED)
error_table.add_column("Workshop ID", style="red bold", width=15)
error_table.add_column("Error", style="red", width=40)
for workshop_id, error in failed_mods:
error_table.add_row(workshop_id, error)
error_panel = Panel(
error_table,
title=f"[red]Package ID Validation Failed - {len(failed_mods)} mod(s) have issues[/red]",
style="red"
)
self.console.print(error_panel)
self.console.print()
# Show summary
summary_text = Text()
summary_text.append(f"{len(failed_mods)} mods failed validation\n", style="red")
summary_text.append(f"{len(valid_mods)} mods passed validation\n", style="green")
summary_text.append("\nThe mod merge cannot be completed until all package ID issues are resolved.\n", style="yellow")
summary_text.append("This usually means the mods are not properly downloaded or corrupted.", style="dim")
summary_panel = Panel(summary_text, title="Validation Summary", style="yellow")
self.console.print(summary_panel)
self.console.input("\nPress Enter to acknowledge and return to main menu...")
return
# All mods validated successfully, continue with merging
task4 = progress.add_task("Merging with Core Collection...", total=None)
time.sleep(1)
progress.update(task4, description="[green]Core Collection merged[/green]")
task5 = progress.add_task("Merging with Content Collection...", total=None)
time.sleep(1)
progress.update(task5, description="[green]Content Collection merged[/green]")
task6 = progress.add_task("Merging with Cosmetics Collection...", total=None)
time.sleep(1)
progress.update(task6, description="[green]Cosmetics Collection merged[/green]")
task7 = progress.add_task("Finalizing merged configuration...", total=None)
time.sleep(1)
progress.update(task7, description="[green]Merge completed successfully![/green]")
time.sleep(1)
except Exception as e:
progress.stop()
self.console.print()
error_panel = Panel(
f"[red]Error during mod merge: {str(e)}[/red]",
title="Merge Error",
style="red"
)
self.console.print(error_panel)
self.console.input("\nPress Enter to acknowledge and return to main menu...")
return
success_panel = Panel(
"[green]Mods have been merged successfully!\n"
"Your existing mods are preserved and Progression mods have been added.[/green]",
title="Success",
style="green"
)
self.console.print(success_panel)
self.console.input("\nPress Enter to continue...")
def check_for_updates(self):
"""Check for application updates"""
self.clear_screen()
self.console.print(self.display_header())
self.console.print()
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=self.console
) as progress:
task = progress.add_task("Checking for updates...", total=None)
try:
config = get_update_config()
update_checker = UpdateChecker(config["current_version"])
release_info, error = update_checker.check_for_updates_sync()
if error:
progress.update(task, description=f"[red]Update check failed: {error}[/red]")
time.sleep(2)
elif release_info:
latest_version = release_info['version']
if update_checker.is_newer_version(latest_version):
progress.update(task, description=f"[yellow]Update available: {latest_version}[/yellow]")
time.sleep(1)
# Show update info
update_table = Table(show_header=False, box=box.ROUNDED)
update_table.add_column("Field", style="cyan bold", width=20)
update_table.add_column("Value", style="white")
update_table.add_row("Current Version", config["current_version"])
update_table.add_row("Latest Version", latest_version)
update_table.add_row("Release Date", release_info.get('published_at', 'Unknown'))
self.console.print(Panel(update_table, title="Update Available", style="yellow"))
if release_info.get('body'):
self.console.print(Panel(release_info['body'], title="Release Notes", style="cyan"))
self.console.print()
if Confirm.ask("Open download page in browser?"):
webbrowser.open(release_info['html_url'])
self.console.print("[green]Download page opened in browser[/green]")
else:
progress.update(task, description="[green]You are running the latest version[/green]")
time.sleep(1)
self.console.print(f"\n[green]You are running the latest version ({config['current_version']})[/green]")
else:
progress.update(task, description="[yellow]No release information available[/yellow]")
time.sleep(1)
except Exception as e:
progress.update(task, description=f"[red]Update check failed: {e}[/red]")
time.sleep(2)
self.console.input("\nPress Enter to continue...")
def view_configuration(self):
"""View current configuration"""
self.clear_screen()
self.console.print(self.display_header())
self.console.print()
# Configuration table
config_table = Table(show_header=True, box=box.ROUNDED)
config_table.add_column("Setting", style="cyan bold", width=25)
config_table.add_column("Value", style="white", width=60)
config_table.add_column("Status", style="green", width=10)
# RimWorld path
rimworld_status = "Valid" if self.is_rimworld_valid else "Invalid"
rimworld_color = "green" if self.is_rimworld_valid else "red"
config_table.add_row("RimWorld Path", self.rimworld_path or "Not set", f"[{rimworld_color}]{rimworld_status}[/{rimworld_color}]")
# Workshop path
config_table.add_row("Workshop Path", self.workshop_path or "Not set", "Auto")
# ModsConfig path
modsconfig_status = "Found" if self.modsconfig_path and os.path.exists(self.modsconfig_path) else "Not found"
modsconfig_color = "green" if self.modsconfig_path and os.path.exists(self.modsconfig_path) else "red"
config_table.add_row("ModsConfig Path", self.modsconfig_path or "Not found", f"[{modsconfig_color}]{modsconfig_status}[/{modsconfig_color}]")
# Collection URLs
for name, url in self.collections.items():
subscribed = "Yes" if self.subscription_status[name] else "No"
sub_color = "green" if self.subscription_status[name] else "red"
config_table.add_row(f"{name} URL", url, f"[{sub_color}]{subscribed}[/{sub_color}]")
config_panel = Panel(config_table, title="Current Configuration", style="white")
self.console.print(config_panel)
self.console.print()
# Version info
try:
config = get_update_config()
version_panel = Panel(
f"Progression Loader TUI v{config['current_version']}",
title="Version Information",
style="cyan"
)
self.console.print(version_panel)
except:
pass
self.console.input("\nPress Enter to continue...")
def main():
"""Main entry point for the TUI application"""
try:
# Check for updates first
config = get_update_config()
if config.get("check_on_startup", True):
console.print("[cyan]Checking for updates...[/cyan]")
try:
update_checker = UpdateChecker(config["current_version"])
release_info, error = update_checker.check_for_updates_sync()
if release_info and update_checker.is_newer_version(release_info['version']):
console.print(f"[yellow]Update available: {release_info['version']}[/yellow]")
if config.get("updates_required", False):
console.print("[red]This update is required. Please update before continuing.[/red]")
if Confirm.ask("Open download page?"):
webbrowser.open(release_info['html_url'])
return
else:
console.print("[dim]You can update later from the main menu.[/dim]")
except Exception as e:
console.print(f"[dim]Update check failed: {e}[/dim]")
# Start the TUI
app = ProgressionTUI()
app.show_main_menu()
except KeyboardInterrupt:
console.print("\n[yellow]Goodbye![/yellow]")
except Exception as e:
console.print(f"[red]Fatal error: {e}[/red]")
console.print("[dim]Please report this error to the developers.[/dim]")
if __name__ == "__main__":
main()