diff --git a/tools/generate_emu_config/generate_emu_config.py b/tools/generate_emu_config/generate_emu_config.py index f3286779..2e56350e 100644 --- a/tools/generate_emu_config/generate_emu_config.py +++ b/tools/generate_emu_config/generate_emu_config.py @@ -6,8 +6,7 @@ from external_components import ( ) from controller_config_generator import parse_controller_vdf from steam.client import SteamClient -from steam.client.cdn import CDNClient -from steam.enums import common +from steam.webauth import WebAuth from steam.enums.common import EResult from steam.enums.emsg import EMsg from steam.core.msg import MsgProto @@ -584,6 +583,7 @@ def help(): print(" -aw: generate schemas of all possible languages for Achievement Watcher") print(" -clean: delete any folder/file with the same name as the output before generating any data") print(" -anon: login as an anonymous account, these have very limited access and cannot get all app details") + print(" -token: save refresh_token to disk, the logged-on account will be saved") print(" -de: disable some extra features by generating the corresponding config files in steam_settings folder") print(" -cve: enable some convenient extra features by generating the corresponding config files in steam_settings folder") print(" -reldir: generate temp files/folders, and expect input files, relative to the current working directory") @@ -635,6 +635,7 @@ def main(): GENERATE_ACHIEVEMENT_WATCHER_SCHEMAS = False CLEANUP_BEFORE_GENERATING = False ANON_LOGIN = False + SAVE_REFRESH_TOKEN = False RELATIVE_DIR = False SKIP_ACH = False SKIP_CONTROLLER = False @@ -668,6 +669,8 @@ def main(): CLEANUP_BEFORE_GENERATING = True elif f'{appid}'.lower() == '-anon': ANON_LOGIN = True + elif f'{appid}'.lower() == '-token': + SAVE_REFRESH_TOKEN = True elif f'{appid}'.lower() == '-de': DISABLE_EXTRA = True elif f'{appid}'.lower() == '-cve': @@ -691,32 +694,6 @@ def main(): sys.exit(1) client = SteamClient() - login_tmp_folder = os.path.join(get_exe_dir(RELATIVE_DIR), "login_temp") - if not os.path.exists(login_tmp_folder): - os.makedirs(login_tmp_folder) - client.set_credential_location(login_tmp_folder) - - # first read the 'my_login.txt' file - my_login_file = os.path.join(get_exe_dir(RELATIVE_DIR), "my_login.txt") - if not ANON_LOGIN and os.path.isfile(my_login_file): - filedata = [''] - with open(my_login_file, "r", encoding="utf-8") as f: - filedata = f.readlines() - filedata = list(map(lambda s: s.replace("\r", "").replace("\n", ""), filedata)) - filedata = [l for l in filedata if l] - if len(filedata) == 2: - USERNAME = filedata[0] - PASSWORD = filedata[1] - - # then allow the env vars to override the login details - env_username = os.environ.get('GSE_CFG_USERNAME', None) - env_password = os.environ.get('GSE_CFG_PASSWORD', None) - if env_username: - USERNAME = env_username - if env_password: - PASSWORD = env_password - - if ANON_LOGIN: result = client.anonymous_login() trials = 5 @@ -724,33 +701,65 @@ def main(): time.sleep(1000) result = client.anonymous_login() trials -= 1 - elif (len(USERNAME) == 0 or len(PASSWORD) == 0): - client.cli_login() else: - result = client.login(USERNAME, password=PASSWORD) - auth_code, two_factor_code = None, None + # first read the 'my_login.txt' file + my_login_file = os.path.join(get_exe_dir(RELATIVE_DIR), "my_login.txt") + if not ANON_LOGIN and os.path.isfile(my_login_file): + filedata = [''] + with open(my_login_file, "r", encoding="utf-8") as f: + filedata = f.readlines() + filedata = list(map(lambda s: s.replace("\r", "").replace("\n", ""), filedata)) + filedata = [l for l in filedata if l] + if len(filedata) == 1: + USERNAME = filedata[0] + elif len(filedata) == 2: + USERNAME, PASSWORD = filedata[0], filedata[1] + + # then allow the env vars to override the login details + env_username = os.environ.get('GSE_CFG_USERNAME', None) + env_password = os.environ.get('GSE_CFG_PASSWORD', None) + if env_username: + USERNAME = env_username + if env_password: + PASSWORD = env_password + + # the file to save/load credentials + REFRESH_TOKENS = os.path.join(get_exe_dir(RELATIVE_DIR), "refresh_tokens.json") + refresh_tokens = {} + if os.path.isfile(REFRESH_TOKENS): + with open(REFRESH_TOKENS) as f: + try: + lf = json.load(f) + refresh_tokens = lf if isinstance(lf, dict) else {} + except: + pass + + # select username from credentials if not already persent + if not USERNAME: + users = {i: user for i, user in enumerate(refresh_tokens, 1)} + if len(users) != 0: + for i, user in users.items(): + print(f"{i}: {user}") + while True: + try: + num=int(input("Choose an account to login (0 for add account): ")) + except ValueError: + print('Please type a number'); continue + break + USERNAME = users.get(num) + + # still no username? ask user + if not USERNAME: + USERNAME = input("Steam user: ") + + REFRESH_TOKEN = refresh_tokens.get(USERNAME) + + webauth, result = WebAuth(), None while result in ( - EResult.AccountLogonDenied, EResult.InvalidLoginAuthCode, - EResult.AccountLoginDeniedNeedTwoFactor, EResult.TwoFactorCodeMismatch, EResult.TryAnotherCM, EResult.ServiceUnavailable, - EResult.InvalidPassword, - ): + EResult.InvalidPassword, None): - if result == EResult.InvalidPassword: - print("invalid password, the password you set is wrong.") - exit(1) - - elif result in (EResult.AccountLogonDenied, EResult.InvalidLoginAuthCode): - prompt = ("Enter email code: " if result == EResult.AccountLogonDenied else - "Incorrect code. Enter email code: ") - auth_code, two_factor_code = input(prompt), None - - elif result in (EResult.AccountLoginDeniedNeedTwoFactor, EResult.TwoFactorCodeMismatch): - prompt = ("Enter 2FA code: " if result == EResult.AccountLoginDeniedNeedTwoFactor else - "Incorrect code. Enter 2FA code: ") - auth_code, two_factor_code = None, input(prompt) - - elif result in (EResult.TryAnotherCM, EResult.ServiceUnavailable): + if result in (EResult.TryAnotherCM, EResult.ServiceUnavailable): if prompt_for_unavailable and result == EResult.ServiceUnavailable: while True: answer = input("Steam is down. Keep retrying? [y/n]: ").lower() @@ -760,8 +769,26 @@ def main(): if answer == 'n': break client.reconnect(maxdelay=15) + elif result == EResult.InvalidPassword: + print("invalid password or refresh_token,") + print(f"correct the password or/and delete '{REFRESH_TOKENS}' and try again.") + exit(1) - result = client.login(USERNAME, PASSWORD, None, auth_code, two_factor_code) + if not REFRESH_TOKEN: + try: + webauth.cli_login(USERNAME, PASSWORD) + except Exception as e: + print(f'Unknown exception: {e}, maybe some account info is wrong?') + exit(1) + USERNAME, PASSWORD = webauth.username, webauth.password + REFRESH_TOKEN = webauth.refresh_token + + result = client.login(USERNAME, PASSWORD, REFRESH_TOKEN) + + if SAVE_REFRESH_TOKEN: + with open(REFRESH_TOKENS, 'w') as f: + refresh_tokens.update({USERNAME: REFRESH_TOKEN}) + json.dump(refresh_tokens, f, indent=4) # read and prepend top_owners_ids.txt top_owners_file = os.path.join(get_exe_dir(RELATIVE_DIR), "top_owners_ids.txt") diff --git a/tools/generate_emu_config/requirements.txt b/tools/generate_emu_config/requirements.txt index 8d87fb49..eb7e1ae2 100644 --- a/tools/generate_emu_config/requirements.txt +++ b/tools/generate_emu_config/requirements.txt @@ -1,4 +1,4 @@ -steam[client] +steam[client] @ git+https://github.com/detiam/steam_websocket@b8239912e6a190f490aede529c08b5049096bdc8 pyinstaller requests certifi