from __future__ import annotations import os from Database.x_classes import x_accounts from config import Global_Config import downloadHelper from tweety.types.twDataTypes import Tweet from exceptions import ACCOUNT_DEAD, ACCOUNT_SKIP from tweety.exceptions_ import UserNotFound, UserProtected from typing import TYPE_CHECKING if TYPE_CHECKING: from runtimeBotData import RuntimeBotData class TweetMedia: url : str file_name : str def __init__(self, url, file_name): self.url = url self.file_name = file_name class DownloadedMedia: file_bytes : str file_name : str def __init__(self, bytes, file_name): self.file_bytes = bytes self.file_name = file_name async def GetTweetMedia(tweet : Tweet) -> list[TweetMedia]: mediaList : list[TweetMedia] = [] for idx, media in enumerate(tweet.media): if media.file_format == 'mp4': best_stream = await media.best_stream() fileName = f"{tweet.author.screen_name}_{tweet.id}_{idx}.{media.file_format}" mediaList.append(TweetMedia(best_stream.direct_url, fileName)) else: best_stream = await media.best_stream() extension = best_stream.file_format fileName = f"{tweet.author.screen_name}_{tweet.id}_{idx}.{extension}" mediaList.append(TweetMedia(best_stream.direct_url, fileName)) return mediaList async def GetTweetMediaUrls(tweet : Tweet): mediaList = await GetTweetMedia(tweet) return [media.url for media in mediaList] async def DownloadMedia(post_id, account_id, account_name, url_list : list, session) -> list[DownloadedMedia]: result : list[DownloadedMedia] = [] path = f"{Global_Config["x_download_path"]}{account_id}" os.makedirs(path, exist_ok=True) for idx, file_url in enumerate(url_list): file_name = get_file_name(account_name, post_id, idx, file_url) full_path = f"{path}/{file_name}" photo_bytes = await downloadHelper.save_to_file(file_url, full_path, session) result.append(DownloadedMedia(photo_bytes, file_name)) return result def get_file_name(account_name: str, post_id: int, image_index: int, image_url: str, account_id : int = None, base_path : str = None): ''' `account_id` and `base_path` are optional\n In `base_path`, do not include trailing slash\n Example if none are defined:\n `file_name.ext` Example if `base_path` is defined:\n `c:/base_path/file_name.ext` Example if `account_id` is defined:\n `account_id/file_name.ext` Example if both are defined:\n `c:/base_path/account_id/file_name.ext` ''' ext = image_url.split("?")[0].split(".")[-1] file_name = f"{account_name}_{post_id}_{image_index}.{ext}" if account_id != None and base_path != None: return f"{base_path}/{account_id}/{file_name}" elif base_path != None: return f"{base_path}/{file_name}" elif account_id != None: return f"{account_id}/{file_name}" return file_name async def UpdateMediaPosts(account : x_accounts, botData : RuntimeBotData) -> list[Tweet]: all_posts = botData.db.get_all_post_ids(account.id) newest_post = 1 if len(all_posts) == 0 else max(all_posts) posts = [] try: posts = [tweet async for tweet in botData.twApi.get_tweets(user_name = account.name, bottom_id = newest_post, all_posts = all_posts)] except (UserProtected, UserNotFound) as ex: print("User dead: ", account.name, ex) raise ACCOUNT_DEAD(ex) except Exception as ex: print("Error in ", account.name, ex) raise ACCOUNT_SKIP(ex) return posts async def DownloadAllMediaPosts(account : x_accounts, botData : RuntimeBotData) -> list[Tweet]: all_posts = botData.db.get_all_post_ids(account.id) posts = [] try: async for tweet in botData.twApi.get_tweets(user_name = account.name, bottom_id = 1, all_posts = []): if int(tweet.id) not in all_posts: posts.append(tweet) except (UserProtected, UserNotFound) as ex: print("User dead: ", account.name, ex) raise ACCOUNT_DEAD(ex) except Exception as ex: print("Error in ", account.name, ex) raise ACCOUNT_SKIP(ex) return posts def parse_x_url(url : str): "return account (handle, post id) from full X post url" url = url.replace("https://", "").replace("http://", "") split = url.split("?") if len(split) > 0: url = split[0] split = url.split('/') if split[2] != "status": raise Exception("Invalid Format") return split[1], int(split[3])