from __future__ import annotations from typing import TYPE_CHECKING import aiohttp import requests from Classifier.classifyHelper import classify_all from Database.x_classes import AccountRating, ActionTaken, DownloadMode, PostRating, x_posts, x_posts_images from Database.db_schema import x_posts as schema_x_posts, x_accounts as schema_x_accounts from Discord import discordHelper from Twitter import tweetHelper from exceptions import NO_CHANNEL, OTHER_ERROR from helpers import get_rating_value from helpers_adv import create_x_account if TYPE_CHECKING: from runtimeBotData import RuntimeBotData import importlib import nextcord from nextcord.ext import commands from nextcord import application_command class Commands(commands.Cog): def __init__(self, botData : RuntimeBotData): self.botData = botData self._last_member = None @application_command.slash_command() @commands.has_permissions(administrator=True) async def order(self, interaction: nextcord.Interaction): guild = interaction.guild categories = list(filter(lambda cat: cat.name.startswith("TWITTER"), guild.categories)) channels = list(filter(lambda chan: not isinstance(chan, nextcord.CategoryChannel) and chan.category.name.startswith("TWITTER"), guild.channels)) channelsOrdered = sorted(channels, key=lambda chan: chan.name) for idx,category in enumerate(categories): await category.edit(name=f"temp-{idx}") last_id = -1 target_category = None for idx, channel in enumerate(channelsOrdered): id = idx//50 if id > last_id: last_id = id target_category = await guild.create_category(f"TWITTER-{id}") print("moving", channel.name, "to", target_category.name) await channel.edit(category=target_category) for category in categories: print("deleting", category.name) await category.delete() @application_command.slash_command(description="Downloads single post from (domain)/(account_name)/status/(post_id) format") async def download_post(self, interaction: nextcord.Interaction, full_link : str, new_account_rating : str = nextcord.SlashOption(choices= [AccountRating.NSFW, AccountRating.SFW, AccountRating.NSFL], default = AccountRating.NSFW), download_mode : int = nextcord.SlashOption(choices={"No":DownloadMode.NO_DOWNLOAD, "Yes":DownloadMode.DOWNLOAD, "More Yes":DownloadMode.DOWNLOAD_ALL}, default=DownloadMode.NO_DOWNLOAD)): botData = self.botData try: handle, post_id = tweetHelper.parse_x_url(full_link) except: await interaction.response.send_message("Invalid url. Format should be (domain)/(account_name)/status/(post_id)") return await interaction.response.defer() query_result = botData.db.x_get_post_from_id(post_id) if query_result is not None: if query_result.discord_post_id == 0: await interaction.followup.send(content=f'Post was previously deleted. This is not implemented yet.' ) return else: await interaction.followup.send(content=f'Post was previously downloaded')#: https://discord.com/channels/{client.guilds[0].id}/{query_result.discord_channel_id}/{query_result["discord_post_id"]}' ) return else: try: tweet = await botData.twApi.get_tweet(f'x.com/{handle}/status/{post_id}') x_post = x_posts(id = tweet.id, account_id = tweet.author.id, date = tweet.date, text = tweet.text) artist = botData.db.x_get_account_by_name(handle) if artist is None: artist = await create_x_account(handle, botData, new_account_rating, download_mode = download_mode) await discordHelper.ensure_has_channel_or_thread(artist, interaction.guild, botData.db) image_containers : list[x_posts_images] = [] media = await tweetHelper.GetTweetMediaUrls(tweet) image_containers = [x_posts_images(tweet.id, idx, file = url) for idx, url in enumerate(media)] async with aiohttp.ClientSession() as session: downloaded_media = await tweetHelper.DownloadMedia(tweet.id, tweet.author.id, tweet.author.username, media, session) vox_labels = [] final_filtered_tags = {} duplicates = [] for idx, attachment in enumerate(downloaded_media): container = image_containers[idx] container.saved_file = attachment.file_name container.vox_label, container.rating, container.tags, filtered_tags, container.phash, container.dhash, container.error_id = await classify_all(attachment.file_bytes, botData.classifier, botData.vox) if container.vox_label not in vox_labels: vox_labels.append(container.vox_label) if container.phash != None: duplicate = botData.db.x_search_duplicate(user_id=x_post.account_id, max_id = x_post.id, phash=container.phash) if duplicate != None: container.duplicate_id = duplicate.post_id container.duplicate_index = duplicate.index if duplicate.post_id not in duplicates: duplicates.append(duplicate.post_id) x_post.tags = list(set(x_post.tags + container.tags)) x_post.rating = container.rating if get_rating_value(container.rating) > get_rating_value(x_post.rating) else x_post.rating final_filtered_tags = final_filtered_tags | filtered_tags try: discord_post = await discordHelper.send_x_post(tweet, artist, interaction.guild, [artist.name], downloaded_media, False, rating=x_post.rating, tags=final_filtered_tags, auto_approve=True, vox_labels = vox_labels, duplicate_posts=duplicates, xView=botData.xView, yView=botData.yView) except (NO_CHANNEL, OTHER_ERROR) as e: x_post.error_id = e.code x_post.discord_post_id = 0 else: x_post.discord_post_id = discord_post.id x_post.action_taken = ActionTaken.Accepted try: if not botData.db.x_insert_post(x_post, commit = False): raise Exception("Transaction error") for image in image_containers: botData.db.x_insert_image(image, False) except Exception as ex: botData.db.conn.rollback() raise ex else: botData.db.conn.commit() if discord_post is not None: embeds = discordHelper.build_secondary_embed(discord_post, artist.name, tweet) await discordHelper.edit_existing_embed_color(discord_post, nextcord.Colour.green()) await interaction.followup.send(embeds=embeds) return else: await interaction.followup.send(content=f'Posting failed: {x_post.error_id}') return except Exception as e: await interaction.followup.send(content=e) return @application_command.slash_command() @commands.has_permissions(administrator=True) async def query(self, interaction: nextcord.Interaction, query: str): self.botData.db.cursor.execute(query) self.botData.db.conn.commit() await interaction.response.send_message("ok") @application_command.slash_command() @commands.has_permissions(administrator=True) async def ignore_tag(self, interaction: nextcord.Interaction, tag: str): self.botData.classifier.add_ignored_tag(tag) await interaction.response.send_message("ok", ephemeral=True) @application_command.slash_command() async def add_x_account(self, interaction: nextcord.Interaction, handle: str, rating : str = nextcord.SlashOption(choices= [AccountRating.NSFW, AccountRating.SFW, AccountRating.NSFL], default = AccountRating.NSFW), download_mode : int = nextcord.SlashOption(choices={"No":DownloadMode.NO_DOWNLOAD, "Yes":DownloadMode.DOWNLOAD, "Yes (check all posts)":DownloadMode.DOWNLOAD_ALL}, default=DownloadMode.DOWNLOAD_ALL)): await interaction.response.defer() if "x.com" in handle: handle = handle.split('/') handle = handle[handle.index('x.com')+1] try: print(handle) result = requests.get(f"https://api.vxtwitter.com/{handle}") if result.status_code != 200: raise Exception("Failed to get user id") id = result.json()["id"] existing_account = self.botData.db.x_get_account_by_id(id) if existing_account != None: if existing_account.download_mode == download_mode: raise Exception("Account is already on download list") self.botData.db.x_update_account_properties(id, updates=[(schema_x_accounts.download_mode, download_mode)]) if download_mode == DownloadMode.DOWNLOAD_ALL: await interaction.followup.send("Added " + handle + " to download list (with full check)") elif download_mode == DownloadMode.NO_DOWNLOAD: await interaction.followup.send("Removed " + handle + " from download list") elif download_mode == DownloadMode.DOWNLOAD: await interaction.followup.send("Added " + handle + " to download list") else: await interaction.followup.send("huh??") else: account = await create_x_account(handle, self.botData, rating, download_mode=download_mode) await interaction.followup.send("Added " + handle) except Exception as ex: print(ex) await interaction.followup.send(str(ex)) @application_command.message_command(guild_ids=[1043267878851457096]) async def archive_set_KF(self, interaction: nextcord.Interaction, message: nextcord.Message): if not discordHelper.check_permission(interaction): await interaction.response.send_message("No permission", ephemeral=True) return query =f'SELECT {schema_x_posts.id}, {schema_x_posts.account_id}, {schema_x_posts.action_taken} FROM {schema_x_posts.table} WHERE {schema_x_posts.discord_post_id} = %s ' result = self.botData.db.query_get(query, ([message.id]), count = 1) if result is None: await interaction.response.send_message("post not found in database", ephemeral=True) return if result[schema_x_posts.action_taken] == ActionTaken.Accepted: await discordHelper.edit_existing_embed_color(message, nextcord.Colour.green()) await interaction.response.send_message("post was already KF", ephemeral=True) return await discordHelper.edit_existing_embed_color(message, nextcord.Colour.green()) result = self.botData.db.x_update_post(result[schema_x_posts.id], message.id, 0, ActionTaken.Accepted) await interaction.response.send_message("post approved", ephemeral=True) @application_command.message_command(guild_ids=[1043267878851457096]) async def archive_set_nonKF(self, interaction: nextcord.Interaction, message: nextcord.Message): if not discordHelper.check_permission(interaction): await interaction.response.send_message("No permission", ephemeral=True) return query =f'SELECT {schema_x_posts.id}, {schema_x_posts.account_id}, {schema_x_posts.action_taken} FROM {schema_x_posts.table} WHERE {schema_x_posts.discord_post_id} = %s ' result = self.botData.db.query_get(query, ([message.id]), count = 1) if result is None: await interaction.response.send_message("post not found in database", ephemeral=True) return if result[schema_x_posts.action_taken] == ActionTaken.Hidden: await discordHelper.edit_existing_embed_color(message, nextcord.Colour.yellow()) await interaction.response.send_message("post was already NonKF", ephemeral=True) return await discordHelper.edit_existing_embed_color(message, nextcord.Colour.yellow()) result = self.botData.db.x_update_post(result[schema_x_posts.id], message.id, 0, ActionTaken.Hidden) await interaction.response.send_message("post hidden", ephemeral=True) @application_command.message_command(guild_ids=[1043267878851457096]) async def archive_delete(self, interaction: nextcord.Interaction, message: nextcord.Message): if not discordHelper.check_permission(interaction): await interaction.response.send_message("No permission", ephemeral=True) return query =f'SELECT {schema_x_posts.id} FROM {schema_x_posts.table} WHERE {schema_x_posts.discord_post_id} = %s' result = self.botData.db.query_get(query, ([message.id]), count = 1) if result is None: await interaction.response.send_message("post not found in database", ephemeral=True) return await message.delete() result = self.botData.db.x_update_post(result["id"], 0, 0, ActionTaken.Rejected) await interaction.response.send_message("post deleted", ephemeral=True) def setup(bot: commands.Bot, botData: RuntimeBotData): bot.add_cog(Commands(botData)) print("Loaded Commands")