Added login token persistence. Optimized update process

This commit is contained in:
katboi01 2025-01-30 02:06:13 +01:00
parent 50144307ac
commit ed77fc6b44
2 changed files with 94 additions and 46 deletions

View File

@ -2,17 +2,27 @@ import os
import sys
import json
import hashlib
import pathlib
import argparse
import requests
import subprocess
import urllib.parse
from uuid import getnode
from datetime import datetime, timedelta
parser = argparse.ArgumentParser(description='DMM bypass script')
parser.add_argument('-g', '--game', type=str, help="DMM code name of the game", default="kfp2g")
parser.add_argument('-u', '--update', help="Check for game update before launching", action='store_true')
args = parser.parse_args()
def load_config(config_name):
with open(config_name, "rt", encoding="utf-8") as f:
return json.load(f)
def save_config(config_name, config):
with open(config_name, 'wt', encoding="utf-8") as f:
json.dump(config, f, indent=1, ensure_ascii=False)
config_name = args.game + '.cfg'
if not os.path.exists(config_name):
@ -24,11 +34,9 @@ if not os.path.exists(config_name):
"dmm_password" : input('DMM Password: '),
"use_proxy" : input('Your login tokens will be sent through my VPN machine.\nIs that okay? (yes/no): ').lower() == "yes"
}
with open(config_name, 'wt', encoding="utf-8") as f:
json.dump(config, f, indent=1, ensure_ascii=False)
save_config(config_name, config)
else:
with open(config_name, "rt", encoding="utf-8") as f:
config = json.load(f)
config = load_config(config_name)
print(f'Loaded settings from {config_name}')
config["update_game"] = args.update
@ -47,6 +55,20 @@ def get_mac():
mac = getnode()
return ':'.join(("%012X" % mac)[i:i+2] for i in range(0, 12, 2)).lower()
def get_game_root_path(user_path, game_dir, exe_name):
"""Return valid game install directory"""
path = pathlib.Path(user_path)
parts = list(path.parts)
if path.name == exe_name:
return str(path.parent.resolve())
elif game_dir in parts:
#get index of last occurence of game_dir in user path (+1)
dir_idx = len(parts) - parts[::-1].index(game_dir)
parts = parts[:dir_idx]
return str(pathlib.Path(*parts).resolve())
else:
raise Exception(f"Path to game files is incorrect! Ensure it points to {exe_name} or contains {game_dir}!")
def retrieve_login_token(session : requests.Session):
try:
print("Retrieving login form")
@ -79,7 +101,7 @@ def retrieve_auth_keys(login, password, token, captcha, session : requests.Sessi
except Exception as e:
print("Failed to log in:", e)
return None, None
def agree_to_game_terms(game_id, use_proxy):
try:
print("Accepting updated game terms of use")
@ -121,52 +143,82 @@ def retrieve_launch_params(game_id, mac_addr, hdd_serial, motherboard, login_sec
elif result_json["result_code"] != 100:
raise Exception(f'{result_json["result_code"]}: {result_json["error"]}')
data = result_json["data"]
if update_game:
game_version = data["latest_version"]
print("Latest version:", game_version)
file_list_params = "?" + data["sign"].replace(";", "&").replace("CloudFront-", "")
return data["execute_args"], data["file_list_url"], file_list_params
else:
return data["execute_args"], None, None
return data
except Exception as e:
print("Failed to retrieve launch arguments:", e)
def main(config):
game_id = config["game_id"]
#required arguments
game_id = config["game_id"]
exe_location = config["file_path"]
login = urllib.parse.quote_plus(config["dmm_login"])
password = urllib.parse.quote_plus(config["dmm_password"])
update_game = config["update_game"] if "update_game" in config else False
use_proxy = config["use_proxy"] if "use_proxy" in config else False
login = urllib.parse.quote_plus(config["dmm_login"])
password = urllib.parse.quote_plus(config["dmm_password"])
#optional arguments
update_game = config.get("update_game", False)
use_proxy = config.get("use_proxy", False)
saved_login = config.get("saved_login", None)
#dmm requires these values
mac_addr = get_mac()
hdd_serial = get_hash('')
#actual moterboard serial is unknown, but this works
motherboard = get_hash(getnode())
hdd_serial = get_hash('') #DMM sends an empty hash as well
motherboard = get_hash(getnode()) #actual moterboard serial is unknown, but this works
current_time = datetime.now()
with requests.Session() as session:
token = retrieve_login_token(session)
captcha = retrieve_captcha_token()
if saved_login is not None:
login_expiration = datetime.fromtimestamp(saved_login["expiration"])
if login_expiration < current_time:
print("Login data has expired")
saved_login = None
if token == None or captcha == None:
return
if saved_login is not None:
login_secure, login_session = saved_login["login_secure"], saved_login["login_session"]
print("Loaded login data from previous session")
else:
token = retrieve_login_token(session)
captcha = retrieve_captcha_token()
login_secure, login_session = retrieve_auth_keys(login, password, token, captcha, session)
if login_secure == None or login_session == None:
return
if token == None or captcha == None:
return
login_secure, login_session = retrieve_auth_keys(login, password, token, captcha, session)
if login_secure == None or login_session == None:
return
login_expiration = int((current_time + timedelta(days=364)).timestamp()) #expire in less than a year
config["saved_login"] = saved_login = {"login_secure" : login_secure, "login_session" : login_session, "expiration": login_expiration}
del(config["update_game"])
save_config(config_name, config)
if not use_proxy: input("Enable VPN now and press Enter")
execute_args, file_list_url, file_access_params = retrieve_launch_params(game_id, mac_addr, hdd_serial, motherboard, login_secure, login_session, use_proxy, update_game)
if execute_args == None or (update_game and (file_list_url == None or file_access_params == None)):
#execute_args, file_list_url, file_access_params
launch_data : dict = retrieve_launch_params(game_id, mac_addr, hdd_serial, motherboard, login_secure, login_session, use_proxy, update_game)
execute_args : str = launch_data.get("execute_args", None)
if execute_args == None:
return
if update_game:
file_list_url = launch_data.get("file_list_url", None)
file_list_params : str = launch_data.get("sign", None)
if file_list_url == None or file_list_params == None:
return
print("Latest version:", launch_data["latest_version"])
file_list_params = "?" + file_list_params.replace(";", "&").replace("CloudFront-", "")
exec_name = launch_data["exec_file_name"]
install_dir = launch_data["install_dir"]
game_root_path = get_game_root_path(exe_location, install_dir, exec_name)
if not use_proxy: input("Disable VPN now and press Enter")
if update_game:
dmmUpdater.update_game(os.path.dirname(exe_location), file_list_url, file_access_params)
dmmUpdater.update_game(game_root_path, file_list_url, file_list_params)
print("Starting game")
args = [exe_location] + execute_args.split()
args = [os.path.join(game_root_path, exec_name)] + execute_args.split()
print(args)
subprocess.Popen(args, start_new_session=True)
input("Done. Press enter to exit (this will close the game)")

View File

@ -4,6 +4,7 @@ import hashlib
import requests
import urllib.parse
from urllib.request import urlretrieve
from pathlib import Path
def get_file_list(url):
url = "https://apidgp-gameplayer.games.dmm.com" + url
@ -27,33 +28,28 @@ def get_file_hash(file_path):
def update_game(game_path, files_url, files_param):
print("Updating game")
server_url, server_files = get_file_list(files_url)
server_file_dict = {file["local_path"]: file for file in server_files}
local_files = [os.path.join(dp, f).replace("\\", "/") for dp, dn, filenames in os.walk(game_path) for f in filenames]
local_file_dict = {"/" + os.path.relpath(r, game_path).replace("\\", "/"): {"abs_path":r, "hash":""} for r in local_files}
server_file_dict = {str(Path(game_path, file["local_path"].lstrip('/')).resolve()): file for file in server_files}
local_file_dict = {str(Path(dp, f).resolve()): "" for dp, dn, filenames in os.walk(game_path) for f in filenames}
files_to_download = []
files_to_delete = []
for server_file_key in server_file_dict.keys():
server_file = server_file_dict[server_file_key]
for abs_file_path in server_file_dict.keys():
server_file = server_file_dict[abs_file_path]
if server_file_key in local_file_dict:
local_file = local_file_dict[server_file_key]
if abs_file_path in local_file_dict:
if server_file["force_delete_flg"]:
files_to_delete.append(local_file["abs_path"])
files_to_delete.append(abs_file_path)
else:
local_file["hash"] = get_file_hash(local_file["abs_path"])
local_file_hash = get_file_hash(abs_file_path)
if server_file["check_hash_flg"] and local_file["hash"] == server_file["hash"]:
if server_file["check_hash_flg"] and local_file_hash == server_file["hash"]:
continue
download_url = urllib.parse.urljoin(server_url, server_file["path"]) + files_param
download_path = game_path.replace("\\", "/") + server_file_key
files_to_download.append({"url":download_url, "path":download_path})
files_to_download.append({"url":download_url, "path":abs_file_path})
else:
download_url = urllib.parse.urljoin(server_url, server_file["path"]) + files_param
download_path = game_path.replace("\\", "/") + server_file_key
files_to_download.append({"url":download_url, "path":download_path})
files_to_download.append({"url":download_url, "path":abs_file_path})
print("Files to download:", len(files_to_download))