KemoFureApi/modules/Archive/database.py

153 lines
6.5 KiB
Python

import json
from flask import Flask
from .databaseController import DatabaseController
from .endpoints.query import Query, QueryOld
from .endpoints.command import Command, CommandOld
from .endpoints.commands import Commands, CommandsOld
from .endpoints.post import GetPost, GetPostOld
from .endpoints.posts import GetPosts, GetPostsOld
from .endpoints.new_query import NewQuery, NewQueryOld
from .endpoints.set_action import SetAction, SetActionOld
from .endpoints.posts_count import GetPostsCount, GetPostsCountOld
from .endpoints.account_stats import AccountStats, AccountStatsOld
class Database:
db : DatabaseController = None
app : Flask = None
def __init__(self, api, database_name, database_path) -> None:
self.app = api.app
if database_name in self.app.databases:
del self.app.databases[database_name]
self.reload_data(database_path)
self.app.databases[database_name] = self
if database_name == "Archive":
api.add_resource(Query, "/Archive/Query")
api.add_resource(NewQuery, "/Archive/NewQuery")
api.add_resource(Command, "/Archive/Command")
api.add_resource(Commands, "/Archive/Commands")
api.add_resource(GetPost, "/Archive/GetPost/<id>")
api.add_resource(GetPosts, "/Archive/GetPosts")
api.add_resource(GetPostsCount, "/Archive/GetPosts/Count")
api.add_resource(AccountStats, "/Archive/AccountStats")
api.add_resource(SetAction, "/Archive/SetAction")
elif database_name == "ArchiveOld":
api.add_resource(QueryOld, "/ArchiveOld/Query")
api.add_resource(NewQueryOld, "/ArchiveOld/NewQuery")
api.add_resource(CommandOld, "/Archive/Command")
api.add_resource(CommandsOld, "/Archive/Commands")
api.add_resource(GetPostOld, "/ArchiveOld/GetPost/<id>")
api.add_resource(GetPostsOld, "/ArchiveOld/GetPosts")
api.add_resource(GetPostsCountOld, "/ArchiveOld/GetPosts/Count")
api.add_resource(AccountStatsOld, "/ArchiveOld/AccountStats")
api.add_resource(SetActionOld, "/ArchiveOld/SetAction")
def get_accounts(self):
query = f'''
SELECT x_handle, id FROM accounts
ORDER BY x_handle ASC'''
return self.db.run_query(query)
def get_account_stats(self):
query = '''SELECT x_handle,
SUM(IIF(x_posts.action_taken = 0, 1, 0)) as undecided,
SUM(IIF(x_posts.action_taken = 1, 1, 0)) as approved,
SUM(IIF(x_posts.action_taken = 2, 1, 0)) as deleted,
SUM(IIF(x_posts.action_taken = 3, 1, 0)) as hidden,
SUM(IIF(x_post_details.id is NULL, 1, 0)) as invalid
FROM accounts
LEFT JOIN x_posts on accounts.id = x_posts.account_id
LEFT JOIN x_post_details on x_post_details.id = x_posts.id
WHERE accounts.download_mode != 4
GROUP BY account_id
ORDER BY x_handle'''
return self.db.run_query(query)
def get_post(self, id):
query = f'''SELECT x_posts.id, cast(x_posts.id as TEXT) as id_str, x_posts.error_id, x_posts.action_taken, x_post_details.text, x_post_details.files, x_post_details.date, accounts.x_handle, accounts.rating from x_posts
LEFT JOIN x_post_details
ON x_posts.id = x_post_details.id
LEFT JOIN accounts
ON x_posts.account_id = accounts.id WHERE x_posts.id = {id}
LIMIT 1'''
result = self.db.run_query(query)
if len(result) == 0:
return None
else:
#return most recent post
return result[-1]
def build_where_query(self, artist, actions_taken = [0, 1, 2, 3], last_id = -1, include_ratings = ["SFW", "NSFW"]):
where_query = "WHERE x_post_details.id NOT NULL AND x_posts.error_id = 0"
if last_id != -1:
where_query += f" AND x_posts.id < {last_id}"
if actions_taken != [0, 1, 2, 3]:
where_query += " AND (" + " OR ".join([f"x_posts.action_taken = {action}" for action in actions_taken]) + ")"
if include_ratings != ["SFW", "NSFW", "NSFL"]:
where_query += " AND (" + " OR ".join([f'accounts.rating = "{rating}"' for rating in include_ratings]) + ")"
if artist is not None and artist != "":
where_query += f' AND accounts.x_handle = "{artist}"'
return where_query
def get_posts_count(self, artist, actions_taken = [0, 1, 2, 3], last_id = -1, include_ratings = ["SFW", "NSFW"]):
where_query = self.build_where_query(artist, actions_taken, last_id, include_ratings)
query = f'''
SELECT count(*) as count FROM x_posts
LEFT JOIN x_post_details
ON x_posts.id = x_post_details.id
LEFT JOIN accounts
ON x_posts.account_id = accounts.id
{where_query}'''
result = self.db.run_query(query)
return result[0]["count"]
def get_posts(self, num_posts, artist, actions_taken = [0, 1, 2, 3], last_id = -1, offset = 0, include_ratings = ["SFW", "NSFW"], order = "DESC"):
num_posts = max(1, min(num_posts, 100))
order_by = "RANDOM()" if order == "RAND" else "x_posts.id ASC" if order == "ASC" else "x_posts.id DESC"
where_query = self.build_where_query(artist, actions_taken, last_id, include_ratings)
query = f'''
SELECT x_posts.id, cast(x_posts.id as TEXT) as id_str, x_posts.action_taken, x_post_details.text, x_post_details.files, x_post_details.date, accounts.x_handle, accounts.rating FROM x_posts
LEFT JOIN x_post_details
ON x_posts.id = x_post_details.id
LEFT JOIN accounts
ON x_posts.account_id = accounts.id
{where_query}
ORDER BY {order_by}
LIMIT {num_posts} OFFSET {offset}'''
return self.db.run_query(query)
def wrap_query_response(self, result, mode = "json"):
if result is None:
response = self.app.response_class(status=400)
else:
if mode == "json":
response = self.app.response_class(
response=json.dumps(result, ensure_ascii=False, indent=1),
status=200,
mimetype='application/json'
)
elif mode == "text":
response = self.app.response_class(
response=str(result),
status=200,
mimetype='text/plain'
)
response.headers.add("Access-Control-Allow-Origin", "*")
return response
def reload_data(self, database_path):
self.db = DatabaseController(database_path)