use requests package instead of urllib

This commit is contained in:
otavepto 2024-06-16 01:32:06 +03:00
parent fa59283f1c
commit b1217657db
3 changed files with 35 additions and 25 deletions

View File

@ -1,4 +1,6 @@
import os import os
import sys
import traceback
import json import json
import queue import queue
import threading import threading
@ -20,13 +22,15 @@ def __downloader_thread(q : queue.Queue[tuple[str, str]]):
for download_trial in range(3): for download_trial in range(3):
try: try:
r = requests.get(url) r = requests.get(url)
r.raise_for_status()
if r.status_code == requests.codes.ok: # if download was successfull if r.status_code == requests.codes.ok: # if download was successfull
with open(path, "wb") as f: with open(path, "wb") as f:
f.write(r.content) f.write(r.content)
break break
except Exception: except Exception as e:
pass print(f"Error downloading from '{url}'", file=sys.stderr)
traceback.print_exception(e, file=sys.stderr)
time.sleep(0.1) time.sleep(0.1)

View File

@ -14,8 +14,7 @@ from steam.core.msg import MsgProto
import os import os
import sys import sys
import json import json
import urllib.request import requests
import urllib.error
import threading import threading
import queue import queue
import shutil import shutil
@ -357,16 +356,16 @@ def download_achievement_images(game_id : int, image_names : set[str], output_fo
for u in ["https://cdn.akamai.steamstatic.com/steamcommunity/public/images/apps/", "https://cdn.cloudflare.steamstatic.com/steamcommunity/public/images/apps/"]: for u in ["https://cdn.akamai.steamstatic.com/steamcommunity/public/images/apps/", "https://cdn.cloudflare.steamstatic.com/steamcommunity/public/images/apps/"]:
url = "{}{}/{}".format(u, game_id, name) url = "{}{}/{}".format(u, game_id, name)
try: try:
with urllib.request.urlopen(url) as response: response = requests.get(url, allow_redirects=True)
image_data = response.read() response.raise_for_status()
image_data = response.content
with open(os.path.join(output_folder, name), "wb") as f: with open(os.path.join(output_folder, name), "wb") as f:
f.write(image_data) f.write(image_data)
succeeded = True succeeded = True
break break
except urllib.error.HTTPError as e: except Exception as e:
print("HTTPError downloading", url, e.code) print("HTTPError downloading", url, file=sys.stderr)
except urllib.error.URLError as e: traceback.print_exception(e, file=sys.stderr)
print("URLError downloading", url, e.code)
if not succeeded: if not succeeded:
print("error could not download", name) print("error could not download", name)
@ -422,9 +421,9 @@ def generate_achievement_stats(client, game_id : int, output_directory, backup_d
if not os.path.exists(achievement_images_dir): if not os.path.exists(achievement_images_dir):
os.makedirs(achievement_images_dir) os.makedirs(achievement_images_dir)
if copy_default_unlocked_img: if copy_default_unlocked_img:
shutil.copy("steam_default_icon_unlocked.jpg", achievement_images_dir) shutil.copy(os.path.join(get_exe_dir(), "steam_default_icon_unlocked.jpg"), achievement_images_dir)
if copy_default_locked_img: if copy_default_locked_img:
shutil.copy("steam_default_icon_locked.jpg", achievement_images_dir) shutil.copy(os.path.join(get_exe_dir(), "steam_default_icon_locked.jpg"), achievement_images_dir)
download_achievement_images(game_id, images_to_download, achievement_images_dir) download_achievement_images(game_id, images_to_download, achievement_images_dir)
return achievements return achievements
@ -462,11 +461,16 @@ def download_published_file(client, published_file_id, backup_directory):
f.write(str(ugc_info.body)) f.write(str(ugc_info.body))
if len(file_details.file_url) > 0: if len(file_details.file_url) > 0:
with urllib.request.urlopen(file_details.file_url) as response: try:
data = response.read() response = requests.get(file_details.file_url, allow_redirects=True)
response.raise_for_status()
data = response.content
with open(os.path.join(backup_directory, file_details.filename.replace("/", "_").replace("\\", "_")), "wb") as f: with open(os.path.join(backup_directory, file_details.filename.replace("/", "_").replace("\\", "_")), "wb") as f:
f.write(data) f.write(data)
return data return data
except Exception as e:
print(f"Error downloading from '{file_details.file_url}'", file=sys.stderr)
traceback.print_exception(e, file=sys.stderr)
return None return None
else: else:
print("Could not download file", published_file_id, "no url (you can ignore this if the game doesn't need a controller config)") print("Could not download file", published_file_id, "no url (you can ignore this if the game doesn't need a controller config)")
@ -485,12 +489,12 @@ def generate_inventory(client, game_id):
url = f"https://api.steampowered.com/IGameInventory/GetItemDefArchive/v0001?appid={game_id}&digest={inventory.body.digest}" url = f"https://api.steampowered.com/IGameInventory/GetItemDefArchive/v0001?appid={game_id}&digest={inventory.body.digest}"
try: try:
with urllib.request.urlopen(url) as response: response = requests.get(url, allow_redirects=True)
return response.read() response.raise_for_status()
except urllib.error.HTTPError as e: return response.content
print("HTTPError getting", url, e.code) except Exception as e:
except urllib.error.URLError as e: print(f"Error downloading from '{url}'", file=sys.stderr)
print("URLError getting", url, e.code) traceback.print_exception(e, file=sys.stderr)
return None return None
def get_dlc(raw_infos): def get_dlc(raw_infos):

View File

@ -1,2 +1,4 @@
steam[client] steam[client]
pyinstaller pyinstaller
requests
certifi