Merge pull request #75 from detiam/gec_patch

generate_emu_config: Fix credentials
This commit is contained in:
Detanup01 2024-11-05 17:28:49 +01:00 committed by GitHub
commit ecea684f1b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 80 additions and 53 deletions

View File

@ -6,8 +6,7 @@ from external_components import (
)
from controller_config_generator import parse_controller_vdf
from steam.client import SteamClient
from steam.client.cdn import CDNClient
from steam.enums import common
from steam.webauth import WebAuth
from steam.enums.common import EResult
from steam.enums.emsg import EMsg
from steam.core.msg import MsgProto
@ -584,6 +583,7 @@ def help():
print(" -aw: generate schemas of all possible languages for Achievement Watcher")
print(" -clean: delete any folder/file with the same name as the output before generating any data")
print(" -anon: login as an anonymous account, these have very limited access and cannot get all app details")
print(" -token: save refresh_token to disk, the logged-on account will be saved")
print(" -de: disable some extra features by generating the corresponding config files in steam_settings folder")
print(" -cve: enable some convenient extra features by generating the corresponding config files in steam_settings folder")
print(" -reldir: generate temp files/folders, and expect input files, relative to the current working directory")
@ -635,6 +635,7 @@ def main():
GENERATE_ACHIEVEMENT_WATCHER_SCHEMAS = False
CLEANUP_BEFORE_GENERATING = False
ANON_LOGIN = False
SAVE_REFRESH_TOKEN = False
RELATIVE_DIR = False
SKIP_ACH = False
SKIP_CONTROLLER = False
@ -668,6 +669,8 @@ def main():
CLEANUP_BEFORE_GENERATING = True
elif f'{appid}'.lower() == '-anon':
ANON_LOGIN = True
elif f'{appid}'.lower() == '-token':
SAVE_REFRESH_TOKEN = True
elif f'{appid}'.lower() == '-de':
DISABLE_EXTRA = True
elif f'{appid}'.lower() == '-cve':
@ -691,11 +694,14 @@ def main():
sys.exit(1)
client = SteamClient()
login_tmp_folder = os.path.join(get_exe_dir(RELATIVE_DIR), "login_temp")
if not os.path.exists(login_tmp_folder):
os.makedirs(login_tmp_folder)
client.set_credential_location(login_tmp_folder)
if ANON_LOGIN:
result = client.anonymous_login()
trials = 5
while result != EResult.OK and trials > 0:
time.sleep(1000)
result = client.anonymous_login()
trials -= 1
else:
# first read the 'my_login.txt' file
my_login_file = os.path.join(get_exe_dir(RELATIVE_DIR), "my_login.txt")
if not ANON_LOGIN and os.path.isfile(my_login_file):
@ -704,9 +710,10 @@ def main():
filedata = f.readlines()
filedata = list(map(lambda s: s.replace("\r", "").replace("\n", ""), filedata))
filedata = [l for l in filedata if l]
if len(filedata) == 2:
if len(filedata) == 1:
USERNAME = filedata[0]
PASSWORD = filedata[1]
elif len(filedata) == 2:
USERNAME, PASSWORD = filedata[0], filedata[1]
# then allow the env vars to override the login details
env_username = os.environ.get('GSE_CFG_USERNAME', None)
@ -716,41 +723,43 @@ def main():
if env_password:
PASSWORD = env_password
# the file to save/load credentials
REFRESH_TOKENS = os.path.join(get_exe_dir(RELATIVE_DIR), "refresh_tokens.json")
refresh_tokens = {}
if os.path.isfile(REFRESH_TOKENS):
with open(REFRESH_TOKENS) as f:
try:
lf = json.load(f)
refresh_tokens = lf if isinstance(lf, dict) else {}
except:
pass
if ANON_LOGIN:
result = client.anonymous_login()
trials = 5
while result != EResult.OK and trials > 0:
time.sleep(1000)
result = client.anonymous_login()
trials -= 1
elif (len(USERNAME) == 0 or len(PASSWORD) == 0):
client.cli_login()
else:
result = client.login(USERNAME, password=PASSWORD)
auth_code, two_factor_code = None, None
# select username from credentials if not already persent
if not USERNAME:
users = {i: user for i, user in enumerate(refresh_tokens, 1)}
if len(users) != 0:
for i, user in users.items():
print(f"{i}: {user}")
while True:
try:
num=int(input("Choose an account to login (0 for add account): "))
except ValueError:
print('Please type a number'); continue
break
USERNAME = users.get(num)
# still no username? ask user
if not USERNAME:
USERNAME = input("Steam user: ")
REFRESH_TOKEN = refresh_tokens.get(USERNAME)
webauth, result = WebAuth(), None
while result in (
EResult.AccountLogonDenied, EResult.InvalidLoginAuthCode,
EResult.AccountLoginDeniedNeedTwoFactor, EResult.TwoFactorCodeMismatch,
EResult.TryAnotherCM, EResult.ServiceUnavailable,
EResult.InvalidPassword,
):
EResult.InvalidPassword, None):
if result == EResult.InvalidPassword:
print("invalid password, the password you set is wrong.")
exit(1)
elif result in (EResult.AccountLogonDenied, EResult.InvalidLoginAuthCode):
prompt = ("Enter email code: " if result == EResult.AccountLogonDenied else
"Incorrect code. Enter email code: ")
auth_code, two_factor_code = input(prompt), None
elif result in (EResult.AccountLoginDeniedNeedTwoFactor, EResult.TwoFactorCodeMismatch):
prompt = ("Enter 2FA code: " if result == EResult.AccountLoginDeniedNeedTwoFactor else
"Incorrect code. Enter 2FA code: ")
auth_code, two_factor_code = None, input(prompt)
elif result in (EResult.TryAnotherCM, EResult.ServiceUnavailable):
if result in (EResult.TryAnotherCM, EResult.ServiceUnavailable):
if prompt_for_unavailable and result == EResult.ServiceUnavailable:
while True:
answer = input("Steam is down. Keep retrying? [y/n]: ").lower()
@ -760,8 +769,26 @@ def main():
if answer == 'n': break
client.reconnect(maxdelay=15)
elif result == EResult.InvalidPassword:
print("invalid password or refresh_token,")
print(f"correct the password or/and delete '{REFRESH_TOKENS}' and try again.")
exit(1)
result = client.login(USERNAME, PASSWORD, None, auth_code, two_factor_code)
if not REFRESH_TOKEN:
try:
webauth.cli_login(USERNAME, PASSWORD)
except Exception as e:
print(f'Unknown exception: {e}, maybe some account info is wrong?')
exit(1)
USERNAME, PASSWORD = webauth.username, webauth.password
REFRESH_TOKEN = webauth.refresh_token
result = client.login(USERNAME, PASSWORD, REFRESH_TOKEN)
if SAVE_REFRESH_TOKEN:
with open(REFRESH_TOKENS, 'w') as f:
refresh_tokens.update({USERNAME: REFRESH_TOKEN})
json.dump(refresh_tokens, f, indent=4)
# read and prepend top_owners_ids.txt
top_owners_file = os.path.join(get_exe_dir(RELATIVE_DIR), "top_owners_ids.txt")

View File

@ -1,4 +1,4 @@
steam[client]
steam[client] @ git+https://github.com/detiam/steam_websocket@b8239912e6a190f490aede529c08b5049096bdc8
pyinstaller
requests
certifi