import html
import re
import psycopg
from psycopg.rows import dict_row
from Database.x_classes import x_posts_images, x_accounts, x_posts
from Database.db_schema import x_accounts as x_accounts_schema, x_posts as x_posts_schema, x_posts_images as x_posts_images_schema
from config import Global_Config
class DatabaseController:
def __init__(self):
if Global_Config["postgresql_conninfo"] is None:
raise Exception("Database connection string is required")
self.conn = psycopg.connect(Global_Config["postgresql_conninfo"], row_factory=dict_row)
self.cursor = self.conn.cursor()
def query_get_stream(self, query, params = ([]), count = -1, page_size = 1000, offset = 0, commit = True):
while(True):
result = self.query_get(query + f' LIMIT {page_size} OFFSET {offset}', params, commit=commit)
if len(result) == 0:
return
for idx, record in enumerate(result):
offset += 1
if count != -1 and offset > count:
return
yield record
def query_get(self, query, params = ([]), count = -1, commit = True):
self.cursor.execute(query, params)
if count == -1:
result = self.cursor.fetchall()
elif count == 1:
result = self.cursor.fetchone()
else:
result = self.cursor.fetchmany(count)
if commit:
self.conn.commit()
return result
def query_set(self, query, params = ([]), commit = True):
try:
self.cursor.execute(query, params)
if commit:
self.conn.commit()
return True
except psycopg.IntegrityError as e:
print(e)
self.conn.rollback()
return False
def insert(self, table_name, updates, where=None, returning=None):
# insert data into table
try:
query = f"INSERT INTO {table_name} ({', '.join([update[0] for update in updates])}) VALUES ({', '.join(['%s' for update in updates])})"
if where:
query += f" WHERE {where}"
if returning:
query += f" RETURNING {returning}"
self.cursor.execute(query, ([update[1] for update in updates]))
row = self.cursor.fetchone()
self.conn.commit()
return True, row[returning]
else:
self.cursor.execute(query, ([update[1] for update in updates]))
self.conn.commit()
return True, None
except psycopg.IntegrityError as e:
print(e, query)
self.conn.rollback()
return False, None
#region Pixiv
def pixiv_get_tags(self, tags):
new_tags = []
for tag in tags:
query = "SELECT id FROM pixiv_tags WHERE name=%s"
self.cursor.execute(query, ([tag.name]))
result = self.cursor.fetchone()
if result is None:
query = "INSERT INTO pixiv_tags(name, translated_name) VALUES(%s, %s) RETURNING id"
self.cursor.execute(query, ([tag.name, tag.translated_name]))
result = self.cursor.fetchone()
new_tags.append(result["id"])
self.conn.commit()
return new_tags
def pixiv_get_user(self, id):
query = "SELECT * FROM pixiv_users WHERE id=%s"
return self.query_get(query, ([id]), count=1)
def pixiv_get_post(self, id):
query = "SELECT * FROM pixiv_posts WHERE id=%s"
return self.query_get(query, ([id]), count=1)
def pixiv_insert_user(self, json, commit = True):
query = f'INSERT INTO pixiv_users (id, name, account) VALUES (%s,%s,%s)'
return self.query_set(query, ([json.id, json.name, json.account]), commit)
def pixiv_insert_post(self, json, commit = True):
query = f'INSERT INTO pixiv_posts (id, title, type, caption, restrict, user_id, tags, create_date, page_count, sanity_level, x_restrict, illust_ai_type, is_saved) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'
tags = self.pixiv_get_tags(json.tags)
return self.query_set(query, ([json.id, json.title, json.type, json.caption, json.restrict, json.user.id, tags, json.create_date, json.page_count, json.sanity_level, json.x_restrict, json.illust_ai_type, True]), commit)
def pixiv_update_post(self, id, date, commit = True):
query = f'UPDATE pixiv_posts SET create_date = %s WHERE id = %s'
return self.query_set(query, ([date, id]), commit)
#endregion
#region X
def get_all_posts(self, account_id, commit = True):
query = f"SELECT * FROM {x_posts_schema.table} WHERE {x_posts_schema.account_id} = %s ORDER BY {x_posts_schema.id}"
results = self.query_get(query, ([account_id]), commit=commit)
if results is None:
return []
else:
results = [x_posts(**result) for result in results]
return results
def get_post_images(self, account_id, commit = True):
query = f"SELECT * FROM {x_posts_schema.table} WHERE {x_posts_schema.account_id} = %s ORDER BY {x_posts_schema.id}"
results = self.query_get(query, ([account_id]), commit=commit)
if results is None:
return []
else:
results = [x_posts(**result) for result in results]
return results
def get_all_post_ids(self, account_id, commit = True):
query = f"SELECT {x_posts_schema.id} FROM {x_posts_schema.table} WHERE {x_posts_schema.account_id} = %s"
results = self.query_get(query, ([account_id]), commit=commit)
return [result["id"] for result in results]
def get_max_post_id(self, account_id : str, commit = True):
query = f"SELECT MAX({x_posts_schema.id}) AS max FROM {x_posts_schema.table} WHERE {x_posts_schema.account_id} = %s"
result = self.query_get(query, ([account_id]), count=1, commit=commit)
if result is None:
return 1
else:
return result["max"]
def x_get_post_from_id(self, post_id, commit = True):
query = f"SELECT * FROM {x_posts_schema.table} WHERE {x_posts_schema.id} = %s"
result = self.query_get(query, ([post_id]), count=1, commit=commit)
if result is None:
return None
else:
return x_posts(**result)
def x_insert_post(self, post : x_posts, commit = True):
text = re.sub(r"https://t.co\S+", "", html.unescape(post.text))
query = f'''INSERT INTO {x_posts_schema.table}
({x_posts_schema.id}, {x_posts_schema.account_id}, {x_posts_schema.discord_post_id},
{x_posts_schema.error_id}, {x_posts_schema.action_taken}, {x_posts_schema.rating},
{x_posts_schema.tags}, {x_posts_schema.text}, {x_posts_schema.date})
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)'''
return self.query_set(query,
([
post.id, post.account_id, post.discord_post_id,
post.error_id, post.action_taken, post.rating,
post.tags, text, post.date
]), commit)
def x_search_duplicate(self, user_id: int, max_id: int, phash = None, dhash = None, commit = True):
if(phash != None):
query = f'''SELECT i.* FROM {x_posts_images_schema.table} i JOIN {x_posts_schema.table} p ON i.{x_posts_images_schema.post_id} = p.{x_posts_schema.id} WHERE i.post_id < {max_id} AND p.account_id = {user_id} and bit_count(i.phash # {phash}) < 1 Order by post_id ASC'''
temp = self.query_get(query, count=1, commit = commit)
return None if temp == None else x_posts_images(**temp)
elif(dhash != None):
query = f'''SELECT i.* FROM {x_posts_images_schema.table} i JOIN {x_posts_schema.table} p ON i.{x_posts_images_schema.post_id} = p.{x_posts_schema.id} WHERE i.post_id < {max_id} AND p.account_id = {user_id} and bit_count(i.dhash # {dhash}) < 1 Order by post_id ASC'''
temp = self.query_get(query, count=1, commit = commit)
return None if temp == None else x_posts_images(**temp)
else:
return None
def x_insert_image(self, image : x_posts_images, commit = True):
query = f'''INSERT INTO {x_posts_images_schema.table}
({x_posts_images_schema.post_id}, {x_posts_images_schema.index}, {x_posts_images_schema.phash},
{x_posts_images_schema.dhash}, {x_posts_images_schema.error_id}, {x_posts_images_schema.rating},
{x_posts_images_schema.tags}, {x_posts_images_schema.file}, {x_posts_images_schema.saved_file},
{x_posts_images_schema.vox_label})
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'''
return self.query_set(query,
([
image.post_id, image.index, image.phash,
image.dhash, image.error_id, image.rating,
image.tags, image.file, image.saved_file,
image.vox_label
]), commit)
def x_update_post(self, id, discord_post_id, error_id, action_taken, commit = True):
query = f"UPDATE {x_posts_schema.table} SET {x_posts_schema.discord_post_id} = %s, {x_posts_schema.error_id} = %s, {x_posts_schema.action_taken} = %s WHERE {x_posts_schema.id} = %s"
return self.query_set(query, ([discord_post_id, error_id, action_taken, id]), commit=commit)
def x_get_all_accounts(self):
accounts = self.query_get(f'SELECT * from {x_accounts_schema.table} ORDER BY {x_accounts_schema.id}')
result = [x_accounts(**account) for account in accounts]
return result
def x_get_account_by_name(self, handle : str):
"""returns TwitterContainer if account exists in database or None"""
query = f'SELECT * from {x_accounts_schema.table} where {x_accounts_schema.name} = %s'
result = self.query_get(query, ([handle]), count=1)
if result is None:
return None
container = x_accounts(**result)
return container
def x_get_account_by_id(self, id : int):
"""returns TwitterContainer if account exists in database or None"""
query = f'SELECT * from {x_accounts_schema.table} where {x_accounts_schema.id} = %s'
result = self.query_get(query, ([id]), count=1)
if result is None:
return None
container = x_accounts(**result)
return container
def x_add_account(self, container : x_accounts):
result, id = self.insert(x_accounts_schema.table, [
(x_accounts_schema.id , container.id),
(x_accounts_schema.name , container.name),
(x_accounts_schema.rating , container.rating),
(x_accounts_schema.discord_channel_id , container.discord_channel_id),
(x_accounts_schema.discord_thread_id , container.discord_thread_id),
(x_accounts_schema.download_mode , container.download_mode),
], returning= x_accounts_schema.id)
if result:
return id
else:
return None
def x_update_account(self, container : x_accounts):
updates = [
(x_accounts_schema.name , container.name),
(x_accounts_schema.rating , container.rating),
(x_accounts_schema.discord_channel_id , container.discord_channel_id),
(x_accounts_schema.discord_thread_id , container.discord_thread_id),
(x_accounts_schema.download_mode , container.download_mode)
]
self.x_update_account_properties(container.id, updates)
def x_update_account_properties(self, account_id, updates):
"""Example: updates = [("name", container.name), ("rating", container.rating)]"""
query = f"UPDATE {x_accounts_schema.table} SET {', '.join([f'{update[0]} = %s' for update in updates])} WHERE id = {account_id}"
self.cursor.execute(query, ([update[1] for update in updates]))
self.conn.commit()
#endregion
def close(self):
self.conn.close()