JapariBypass/dmmBypass.py

254 lines
11 KiB
Python

import os
import sys
import json
import hashlib
import pathlib
import argparse
import subprocess
import urllib.parse
from uuid import getnode
from datetime import datetime, timedelta
# Set to False to allow packet sniffing/capture.
# Set separately for dmmUpdater
SSL_VERIFY = True
parser = argparse.ArgumentParser(description='DMM bypass script')
parser.add_argument('-g', '--game', type=str, help="DMM code name of the game", default="kfp2g")
parser.add_argument('-t', '--type', help="DMM game type (ACL/GCL)", default="GCL")
parser.add_argument('-u', '--update', help="Check for game update before launching", action='store_true')
args = parser.parse_args()
def load_config(game_id):
config_name = game_id + ".cfg"
if not os.path.exists(config_name):
return None
else:
with open(config_name, "rt", encoding="utf-8") as f:
return json.load(f)
def save_config(game_id, config):
with open(game_id + '.cfg', 'wt', encoding="utf-8") as f:
json.dump(config, f, indent=1, ensure_ascii=False)
config = load_config(args.game)
if config is None:
subprocess.check_call([sys.executable, "-m", "pip", "install", "requests", "beautifulsoup4"])
config = {
"game_id" : args.game,
"file_path" : input('Enter full file path to KF3 .exe: ').strip('\"'),
"dmm_login" : input('DMM Login (email): '),
"dmm_password" : input('DMM Password: '),
"use_proxy" : input('Your login data will be sent through my VPN.\nIs that okay? (yes/no): ').lower() == "yes"
}
save_config(args.game, config)
else:
print(f'Loaded settings from {args.game}.cfg')
config["update_game"] = args.update
config["game_type"] = args.type
import requests
import dmmUpdater
from bs4 import BeautifulSoup
def get_hash(data):
sha_obj = hashlib.sha256()
sha_obj.update(str(data).encode())
hash_hex = sha_obj.hexdigest()
return hash_hex
def get_mac():
mac = getnode()
return ':'.join(("%012X" % mac)[i:i+2] for i in range(0, 12, 2)).lower()
def get_game_root_path(user_path, game_dir, exe_name):
"""Return valid game install directory"""
path = pathlib.Path(user_path)
parts = list(path.parts)
if path.name == exe_name:
return str(path.parent.resolve())
elif game_dir in parts:
#get index of last occurence of game_dir in user path (+1)
dir_idx = len(parts) - parts[::-1].index(game_dir)
parts = parts[:dir_idx]
return str(pathlib.Path(*parts).resolve())
else:
raise Exception(f"Path to game files is incorrect! Ensure it points to {exe_name} or contains {game_dir}!")
def retrieve_oauth_code(login, password, session : requests.Session):
try:
print("Retrieving login url")
url = "https://apidgp-gameplayer.games.dmm.com/v5/auth/login/url"
data = {"prompt":"choose"}
result = session.post(url, json=data, verify=SSL_VERIFY)
result.raise_for_status()
url = result.json()["data"]["url"]
encoded_url = urllib.parse.quote_plus(url.replace("https://accounts.dmm.com/service/oauth/select/=/path=", "").replace("oauth/select/", "oauth/").replace("accounts?prompt=choose", "accounts"))
print("Retrieving login form token")
result = session.get(url, verify=SSL_VERIFY)
result.raise_for_status()
page = BeautifulSoup(result.content, 'html.parser')
token = page.find('input', attrs={"name":"token"}).get("value")
print("Retrieving oauth link")
url = "https://accounts.dmm.com/service/oauth/authenticate"
headers = {"Content-Type": "application/x-www-form-urlencoded",
"check_done_login":"true"}
data = f"token={token}&login_id={login}&password={password}&use_auto_login=1&path={encoded_url}&recaptchaToken="
result = session.post(url, data, headers=headers, verify=SSL_VERIFY)
result.raise_for_status()
page = BeautifulSoup(result.content, 'html.parser')
final_url = page.find('input', attrs={"id":"ga-param-service-url"}).get("value")
print("Retrieving oauth token")
result = session.get(final_url, allow_redirects=False, verify=SSL_VERIFY)
redirect_url = result.next.url
oauth_token = redirect_url.split("=")[-1]
return oauth_token
except Exception as e:
print("Failed to retrieve login form:", e)
def retrieve_access_token(oauth_token, session: requests.Session):
url = "https://apidgp-gameplayer.games.dmm.com/v5/auth/accesstoken/issue"
result = session.post(url, json={"code":oauth_token}, verify=SSL_VERIFY)
result.raise_for_status()
data = result.json()["data"]
return data["access_token"], data["expires_in_seconds"]
def agree_to_game_terms(game_id, use_proxy):
try:
print("Accepting updated game terms of use")
headers = {"User-Agent": "DMMGamePlayer5-Win/5.3.12 Electron/32.1.0", "Client-App": "DMMGamePlayer5", "Client-version": "5.3.12", "Content-Type": "application/json"}
url = "https://katworks.sytes.net/proxy/agreement" if use_proxy else "https://apidgp-gameplayer.games.dmm.com/v5/agreement/confirm/client"
data = {"product_id":game_id,"is_notification":False,"is_myapp":False}
result = requests.post(url, headers=headers, json=data)
result.raise_for_status()
result_json = result.json()
if result_json["result_code"] != 100:
raise Exception(f'{result_json["result_code"]}: {result_json["error"]}')
return True
except Exception as e:
print("Failed to accept terms of use:", e)
return False
def retrieve_launch_params(game_id, game_type, mac_addr, hdd_serial, motherboard, access_token, session: requests.Session, use_proxy = False, update_game = False):
try:
print("Retrieving launch arguments")
data = {"product_id":game_id,"game_type":game_type,"game_os":"win","launch_type":"LIB","mac_address":mac_addr,"hdd_serial":hdd_serial,"motherboard":motherboard,"user_os":"win"}
headers = {"User-Agent": "DMMGamePlayer5-Win/5.3.12 Electron/32.1.0",
"Client-App": "DMMGamePlayer5",
"Client-version": "5.3.12",
"Content-Type": "application/json",
"actauth":access_token}
if update_game:
url = "https://katworks.sytes.net/proxy/launchAndUpdate" if use_proxy else "https://apidgp-gameplayer.games.dmm.com/v5/r2/launch/cl"
else:
url = "https://katworks.sytes.net/proxy/launch" if use_proxy else "https://apidgp-gameplayer.games.dmm.com/v5/launch/cl"
result = session.post(url, headers=headers, json=data, verify=SSL_VERIFY)
result.raise_for_status()
result_json = result.json()
if result_json["result_code"] == 308:
if agree_to_game_terms(game_id, use_proxy):
result = session.post(url, headers=headers, json=data, verify=SSL_VERIFY)
result.raise_for_status()
result_json = result.json()
if result_json["result_code"] != 100:
raise Exception(f'{result_json["result_code"]}: {result_json["error"]}')
else:
raise Exception("Failed to agree to updated game terms of use. Use the DMM app to confirm.")
elif result_json["result_code"] != 100:
raise Exception(f'{result_json["result_code"]}: {result_json["error"]}')
data = result_json["data"]
return data
except Exception as e:
if str(e).startswith("203:"):
config = load_config(game_id)
config["saved_login"] = None
save_config(game_id, config)
print("DMM session has expired. Saved login data cleared. Please run the program again.")
else:
print("Failed to retrieve launch arguments:", e)
def main(config):
#required arguments
game_id = config["game_id"]
game_type = config["game_type"]
exe_location = config["file_path"]
login = urllib.parse.quote_plus(config["dmm_login"])
password = urllib.parse.quote_plus(config["dmm_password"])
#optional arguments
update_game = config.get("update_game", False)
use_proxy = config.get("use_proxy", False)
saved_login = config.get("saved_login", None)
#dmm requires these values
mac_addr = get_mac()
hdd_serial = get_hash('') #DMM sends an empty hash as well
motherboard = get_hash(getnode()) #actual moterboard serial is unknown, but this works
current_time = datetime.now()
with requests.Session() as session:
if saved_login is not None:
login_expiration = datetime.fromtimestamp(saved_login["expiration"])
if login_expiration < current_time:
print("Login data has expired")
saved_login = None
if saved_login is not None:
access_token = saved_login["access_token"]
print("Loaded token from previous session")
else:
oauth_code = retrieve_oauth_code(login, password, session)
if oauth_code == None:
return
access_token, expiration_seconds = retrieve_access_token(oauth_code, session)
token_expiration = int((current_time + timedelta(seconds=expiration_seconds)).timestamp())
config["saved_login"] = saved_login = {"access_token" : access_token, "expiration": token_expiration}
del(config["update_game"])
save_config(game_id, config)
if not use_proxy: input("Enable VPN now and press Enter")
#execute_args, file_list_url, file_access_params
launch_data : dict = retrieve_launch_params(game_id, game_type, mac_addr, hdd_serial, motherboard, access_token, session, use_proxy, update_game)
if launch_data == None:
return
execute_args : str = launch_data.get("execute_args", None)
if execute_args == None:
return
if update_game:
file_list_url = launch_data.get("file_list_url", None)
file_list_params : str = launch_data.get("sign", None)
if file_list_url == None or file_list_params == None:
return
print("Latest version:", launch_data["latest_version"])
file_list_params = "?" + file_list_params.replace(";", "&").replace("CloudFront-", "")
exec_name = launch_data["exec_file_name"]
install_dir = launch_data["install_dir"]
game_root_path = get_game_root_path(exe_location, install_dir, exec_name)
if not use_proxy: input("Disable VPN now and press Enter")
if update_game:
dmmUpdater.update_game(game_root_path, file_list_url, file_list_params)
print("Starting game")
args = [os.path.join(game_root_path, exec_name)] + execute_args.split()
subprocess.Popen(args, start_new_session=True)
input("Done. Press enter to exit")
main(config)