mirror of
https://github.com/Detanup01/gbe_fork.git
synced 2024-12-02 23:35:37 +08:00
231 lines
7.5 KiB
Python
231 lines
7.5 KiB
Python
|
import os
|
||
|
import json
|
||
|
import queue
|
||
|
import threading
|
||
|
import time
|
||
|
import requests
|
||
|
import urllib.parse
|
||
|
from external_components import (
|
||
|
safe_name
|
||
|
)
|
||
|
|
||
|
def __downloader_thread(q : queue.Queue[tuple[str, str]]):
|
||
|
while True:
|
||
|
url, path = q.get()
|
||
|
if not url:
|
||
|
q.task_done()
|
||
|
return
|
||
|
|
||
|
# try 3 times
|
||
|
for download_trial in range(3):
|
||
|
try:
|
||
|
r = requests.get(url)
|
||
|
if r.status_code == requests.codes.ok: # if download was successfull
|
||
|
with open(path, "wb") as f:
|
||
|
f.write(r.content)
|
||
|
|
||
|
break
|
||
|
except Exception:
|
||
|
pass
|
||
|
|
||
|
time.sleep(0.1)
|
||
|
|
||
|
q.task_done()
|
||
|
|
||
|
def __remove_url_query(url : str) -> str:
|
||
|
url_parts = urllib.parse.urlsplit(url)
|
||
|
url_parts_list = list(url_parts)
|
||
|
url_parts_list[3] = '' # remove query
|
||
|
return str(urllib.parse.urlunsplit(url_parts_list))
|
||
|
|
||
|
def __download_screenshots(
|
||
|
base_out_dir : str,
|
||
|
appid : int,
|
||
|
app_details : dict,
|
||
|
download_screenshots : bool,
|
||
|
download_thumbnails : bool):
|
||
|
if not download_screenshots and not download_thumbnails:
|
||
|
return
|
||
|
|
||
|
screenshots : list[dict[str, object]] = app_details.get(f'{appid}', {}).get('data', {}).get('screenshots', [])
|
||
|
if not screenshots:
|
||
|
print(f'[?] no screenshots or thumbnails are available')
|
||
|
return
|
||
|
|
||
|
screenshots_out_dir = os.path.join(base_out_dir, "screenshots")
|
||
|
if download_screenshots:
|
||
|
print(f"downloading screenshots in: {screenshots_out_dir}")
|
||
|
if not os.path.exists(screenshots_out_dir):
|
||
|
os.makedirs(screenshots_out_dir)
|
||
|
time.sleep(0.025)
|
||
|
|
||
|
thumbnails_out_dir = os.path.join(screenshots_out_dir, "thumbnails")
|
||
|
if download_thumbnails:
|
||
|
print(f"downloading screenshots thumbnails in: {thumbnails_out_dir}")
|
||
|
if not os.path.exists(thumbnails_out_dir):
|
||
|
os.makedirs(thumbnails_out_dir)
|
||
|
time.sleep(0.025)
|
||
|
|
||
|
q : queue.Queue[tuple[str, str]] = queue.Queue()
|
||
|
|
||
|
max_threads = 20
|
||
|
for i in range(max_threads):
|
||
|
threading.Thread(target=__downloader_thread, args=(q,), daemon=True).start()
|
||
|
|
||
|
for scrn in screenshots:
|
||
|
if download_screenshots:
|
||
|
full_image_url = scrn.get('path_full', None)
|
||
|
if full_image_url:
|
||
|
full_image_url_sanitized = __remove_url_query(full_image_url)
|
||
|
image_hash_name = f'{full_image_url_sanitized.rsplit("/", 1)[-1]}'.rstrip()
|
||
|
if image_hash_name:
|
||
|
q.put((full_image_url_sanitized, os.path.join(screenshots_out_dir, image_hash_name)))
|
||
|
else:
|
||
|
print(f'[X] cannot download screenshot from url: "{full_image_url}", failed to get image name')
|
||
|
|
||
|
if download_thumbnails:
|
||
|
thumbnail_url = scrn.get('path_thumbnail', None)
|
||
|
if thumbnail_url:
|
||
|
thumbnail_url_sanitized = __remove_url_query(thumbnail_url)
|
||
|
image_hash_name = f'{thumbnail_url_sanitized.rsplit("/", 1)[-1]}'.rstrip()
|
||
|
if image_hash_name:
|
||
|
q.put((thumbnail_url_sanitized, os.path.join(thumbnails_out_dir, image_hash_name)))
|
||
|
else:
|
||
|
print(f'[X] cannot download screenshot thumbnail from url: "{thumbnail_url}", failed to get image name')
|
||
|
|
||
|
q.join()
|
||
|
|
||
|
for i in range(max_threads):
|
||
|
q.put((None, None))
|
||
|
|
||
|
q.join()
|
||
|
|
||
|
print(f"finished downloading app screenshots")
|
||
|
|
||
|
PREFERED_VIDS = [
|
||
|
'trailer', 'gameplay', 'announcement'
|
||
|
]
|
||
|
|
||
|
def __download_videos(base_out_dir : str, appid : int, app_details : dict):
|
||
|
videos : list[dict[str, object]] = app_details.get(f'{appid}', {}).get('data', {}).get('movies', [])
|
||
|
if not videos:
|
||
|
print(f'[?] no videos were found')
|
||
|
return
|
||
|
|
||
|
videos_out_dir = os.path.join(base_out_dir, "videos")
|
||
|
print(f"downloading app videos in: {videos_out_dir}")
|
||
|
|
||
|
first_vid : tuple[str, str] = None
|
||
|
prefered_vid : tuple[str, str] = None
|
||
|
for vid in videos:
|
||
|
vid_name = f"{vid.get('name', '')}"
|
||
|
webm_url = vid.get('webm', {}).get("480", None)
|
||
|
mp4_url = vid.get('mp4', {}).get("480", None)
|
||
|
|
||
|
ext : str = None
|
||
|
prefered_url : str = None
|
||
|
if mp4_url:
|
||
|
prefered_url = mp4_url
|
||
|
ext = 'mp4'
|
||
|
elif webm_url:
|
||
|
prefered_url = webm_url
|
||
|
ext = 'webm'
|
||
|
else: # no url is found
|
||
|
print(f'[X] no url is found for video "{vid_name}"')
|
||
|
continue
|
||
|
|
||
|
vid_url_sanitized = __remove_url_query(prefered_url)
|
||
|
vid_name_in_url = f'{vid_url_sanitized.rsplit("/", 1)[-1]}'.rstrip()
|
||
|
vid_name = safe_name.create_safe_name(vid_name)
|
||
|
if vid_name:
|
||
|
vid_name = f'{vid_name}.{ext}'
|
||
|
else:
|
||
|
vid_name = vid_name_in_url
|
||
|
|
||
|
if vid_name:
|
||
|
if not first_vid:
|
||
|
first_vid = (vid_url_sanitized, vid_name)
|
||
|
|
||
|
if any(vid_name.lower().find(candidate) > -1 for candidate in PREFERED_VIDS):
|
||
|
prefered_vid = (vid_url_sanitized, vid_name)
|
||
|
|
||
|
if prefered_vid:
|
||
|
break
|
||
|
else:
|
||
|
print(f'[X] cannot download video from url: "{prefered_url}", failed to get vido name')
|
||
|
|
||
|
if not first_vid and not prefered_vid:
|
||
|
print(f'[X] no video url could be found')
|
||
|
return
|
||
|
elif not prefered_vid:
|
||
|
prefered_vid = first_vid
|
||
|
|
||
|
if not os.path.exists(videos_out_dir):
|
||
|
os.makedirs(videos_out_dir)
|
||
|
time.sleep(0.05)
|
||
|
|
||
|
q : queue.Queue[tuple[str, str]] = queue.Queue()
|
||
|
|
||
|
max_threads = 1
|
||
|
for i in range(max_threads):
|
||
|
threading.Thread(target=__downloader_thread, args=(q,), daemon=True).start()
|
||
|
|
||
|
# TODO download all videos
|
||
|
print(f'donwloading video: "{prefered_vid[1]}"')
|
||
|
q.put((prefered_vid[0], os.path.join(videos_out_dir, prefered_vid[1])))
|
||
|
q.join()
|
||
|
|
||
|
for i in range(max_threads):
|
||
|
q.put((None, None))
|
||
|
|
||
|
q.join()
|
||
|
|
||
|
print(f"finished downloading app videos")
|
||
|
|
||
|
|
||
|
def download_app_details(
|
||
|
base_out_dir : str,
|
||
|
info_out_dir : str,
|
||
|
appid : int,
|
||
|
download_screenshots : bool,
|
||
|
download_thumbnails : bool,
|
||
|
download_vids : bool):
|
||
|
|
||
|
details_out_file = os.path.join(info_out_dir, "app_details.json")
|
||
|
print(f"downloading app details in: {details_out_file}")
|
||
|
|
||
|
app_details : dict = None
|
||
|
last_exception : Exception | str = None
|
||
|
# try 3 times
|
||
|
for download_trial in range(3):
|
||
|
try:
|
||
|
r = requests.get(f'http://store.steampowered.com/api/appdetails?appids={appid}&format=json')
|
||
|
if r.status_code == requests.codes.ok: # if download was successfull
|
||
|
result : dict = r.json()
|
||
|
json_ok = result.get(f'{appid}', {}).get('success', False)
|
||
|
if json_ok:
|
||
|
app_details = result
|
||
|
break
|
||
|
else:
|
||
|
last_exception = "JSON success was False"
|
||
|
except Exception as e:
|
||
|
last_exception = e
|
||
|
|
||
|
time.sleep(0.1)
|
||
|
|
||
|
if not app_details:
|
||
|
err = "[X] failed to download app details"
|
||
|
if last_exception:
|
||
|
err += f', last error: "{last_exception}"'
|
||
|
|
||
|
print(err)
|
||
|
return
|
||
|
|
||
|
with open(details_out_file, "wt", encoding='utf-8') as f:
|
||
|
json.dump(app_details, f, ensure_ascii=False, indent=2)
|
||
|
|
||
|
__download_screenshots(base_out_dir, appid, app_details, download_screenshots, download_thumbnails)
|
||
|
|
||
|
if download_vids:
|
||
|
__download_videos(base_out_dir, appid, app_details)
|