import json from flask import Flask from .databaseController import DatabaseController from .endpoints.query import Query, QueryOld from .endpoints.post import GetPost, GetPostOld from .endpoints.posts import GetPosts, GetPostsOld from .endpoints.set_action import SetAction, SetActionOld from .endpoints.posts_count import GetPostsCount, GetPostsCountOld from .endpoints.account_stats import AccountStats, AccountStatsOld def get_post_dictionary(id, error_id, action_taken, text, files, date, x_handle, x_rating): if files == "" or files is None: files = [] else: files = files.split(',') dict = {"id": id, "id_str": str(id), "error_id": error_id, "action_taken": action_taken, "text": text, "files": files, "date": date, "handle": x_handle, "rating": x_rating} if error_id == -1: del(dict["error_id"]) if action_taken == -1: del(dict["action_taken"]) return dict class Database: db : DatabaseController = None app : Flask = None def __init__(self, api, database_name, database_path) -> None: self.app = api.app if database_name in self.app.databases: del self.app.databases[database_name] self.reload_data(database_path) self.app.databases[database_name] = self if database_name == "Archive": api.add_resource(Query, "/Archive/Query") api.add_resource(GetPost, "/Archive/GetPost/") api.add_resource(GetPosts, "/Archive/GetPosts") api.add_resource(GetPostsCount, "/Archive/GetPosts/Count") api.add_resource(AccountStats, "/Archive/AccountStats") api.add_resource(SetAction, "/Archive/SetAction") elif database_name == "ArchiveOld": api.add_resource(QueryOld, "/ArchiveOld/Query") api.add_resource(GetPostOld, "/ArchiveOld/GetPost/") api.add_resource(GetPostsOld, "/ArchiveOld/GetPosts") api.add_resource(GetPostsCountOld, "/ArchiveOld/GetPosts/Count") api.add_resource(AccountStatsOld, "/ArchiveOld/AccountStats") api.add_resource(SetActionOld, "/ArchiveOld/SetAction") def get_accounts(self): query = f''' SELECT x_handle, id FROM accounts ORDER BY x_handle ASC''' results = self.db.run_query(query) accounts = [] if results is not None: accounts = [{"x_handle": result[0], "id": result[1]} for result in results] return accounts def get_account_stats(self): query = '''SELECT x_handle, SUM(IIF(x_posts.action_taken = 0, 1, 0)) as undecided, SUM(IIF(x_posts.action_taken = 1, 1, 0)) as approved, SUM(IIF(x_posts.action_taken = 2, 1, 0)) as deleted, SUM(IIF(x_posts.action_taken = 3, 1, 0)) as hidden, SUM(IIF(x_post_details.id is NULL, 1, 0)) as invalid FROM x_posts LEFT JOIN accounts on accounts.id = x_posts.account_id LEFT JOIN x_post_details on x_post_details.id = x_posts.id GROUP BY account_id ORDER BY x_handle''' results = self.db.run_query(query) return [{"x_handle": result[0], "undecided": result[1], "approved": result[2], "deleted": result[3], "hidden": result[4], "invalid": result[5]} for result in results] def get_post(self, id): query = f'''SELECT x_posts.id, x_posts.error_id, x_posts.action_taken, x_post_details.text, x_post_details.files, x_post_details.date, accounts.x_handle, accounts.rating from x_posts LEFT JOIN x_post_details ON x_posts.id = x_post_details.id LEFT JOIN accounts ON x_posts.account_id = accounts.id WHERE x_posts.id = {id}''' result = self.db.run_query(query) if len(result) == 0: return None else: #return most recent post result = result[-1] return get_post_dictionary(result[0], result[1], result[2], result[3], result[4], result[5], result[6], result[7]) def build_where_query(self, artist, actions_taken = [0, 1, 2, 3], last_id = -1, include_ratings = ["SFW", "NSFW"]): where_query = "WHERE x_post_details.id NOT NULL AND x_posts.error_id = 0" if last_id != -1: where_query += f" AND x_posts.id < {last_id}" if actions_taken != [0, 1, 2, 3]: where_query += " AND (" + " OR ".join([f"x_posts.action_taken = {action}" for action in actions_taken]) + ")" if include_ratings != ["SFW", "NSFW", "NSFL"]: where_query += " AND (" + " OR ".join([f'accounts.rating = "{rating}"' for rating in include_ratings]) + ")" if artist is not None and artist != "": where_query += f' AND accounts.x_handle = "{artist}"' return where_query def get_posts_count(self, artist, actions_taken = [0, 1, 2, 3], last_id = -1, include_ratings = ["SFW", "NSFW"]): where_query = self.build_where_query(artist, actions_taken, last_id, include_ratings) query = f''' SELECT count(*) FROM x_posts LEFT JOIN x_post_details ON x_posts.id = x_post_details.id LEFT JOIN accounts ON x_posts.account_id = accounts.id {where_query}''' result = self.db.run_query(query) return result[0][0] def get_posts(self, num_posts, artist, actions_taken = [0, 1, 2, 3], last_id = -1, offset = 0, include_ratings = ["SFW", "NSFW"], order = "DESC"): num_posts = max(1, min(num_posts, 100)) order_by = "RANDOM()" if order == "RAND" else "x_posts.id ASC" if order == "ASC" else "x_posts.id DESC" where_query = self.build_where_query(artist, actions_taken, last_id, include_ratings) query = f''' SELECT x_posts.id, x_posts.action_taken, x_post_details.text, x_post_details.files, x_post_details.date, accounts.x_handle, accounts.rating FROM x_posts LEFT JOIN x_post_details ON x_posts.id = x_post_details.id LEFT JOIN accounts ON x_posts.account_id = accounts.id {where_query} ORDER BY {order_by} LIMIT {num_posts} OFFSET {offset}''' results = self.db.run_query(query) posts = [] if results is not None: for result in results: posts.append(get_post_dictionary(result[0], -1, result[1], result[2], result[3], result[4], result[5], result[6])) return posts def wrap_query_response(self, result, mode = "json"): if result is None: response = self.app.response_class(status=400) else: if mode == "json": response = self.app.response_class( response=json.dumps(result, ensure_ascii=False, indent=1), status=200, mimetype='application/json' ) elif mode == "text": response = self.app.response_class( response=str(result), status=200, mimetype='text/plain' ) response.headers.add("Access-Control-Allow-Origin", "*") return response def reload_data(self, database_path): self.db = DatabaseController(database_path)