Files
JapariArchive/Twitter/tweetHelper.py
2026-01-08 22:42:03 +01:00

131 lines
4.5 KiB
Python

from __future__ import annotations
import os
from Database.x_classes import x_accounts
from config import Global_Config
import downloadHelper
from tweety.types.twDataTypes import Tweet
from exceptions import ACCOUNT_DEAD, ACCOUNT_SKIP
from tweety.exceptions_ import UserNotFound, UserProtected
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from runtimeBotData import RuntimeBotData
class TweetMedia:
url : str
file_name : str
def __init__(self, url, file_name):
self.url = url
self.file_name = file_name
class DownloadedMedia:
file_bytes : str
file_name : str
def __init__(self, bytes, file_name):
self.file_bytes = bytes
self.file_name = file_name
async def GetTweetMedia(tweet : Tweet) -> list[TweetMedia]:
mediaList : list[TweetMedia] = []
for idx, media in enumerate(tweet.media):
if media.file_format == 'mp4':
best_stream = await media.best_stream()
fileName = f"{tweet.author.screen_name}_{tweet.id}_{idx}.{media.file_format}"
mediaList.append(TweetMedia(best_stream.direct_url, fileName))
else:
best_stream = await media.best_stream()
extension = best_stream.file_format
fileName = f"{tweet.author.screen_name}_{tweet.id}_{idx}.{extension}"
mediaList.append(TweetMedia(best_stream.direct_url, fileName))
return mediaList
async def GetTweetMediaUrls(tweet : Tweet):
mediaList = await GetTweetMedia(tweet)
return [media.url for media in mediaList]
async def DownloadMedia(post_id, account_id, account_name, url_list : list, session) -> list[DownloadedMedia]:
result : list[DownloadedMedia] = []
path = f"{Global_Config["x_download_path"]}{account_id}"
os.makedirs(path, exist_ok=True)
for idx, file_url in enumerate(url_list):
file_name = get_file_name(account_name, post_id, idx, file_url)
full_path = f"{path}/{file_name}"
photo_bytes = await downloadHelper.save_to_file(file_url, full_path, session)
result.append(DownloadedMedia(photo_bytes, file_name))
return result
def get_file_name(account_name: str, post_id: int, image_index: int, image_url: str, account_id : int = None, base_path : str = None):
'''
`account_id` and `base_path` are optional\n
In `base_path`, do not include trailing slash\n
Example if none are defined:\n `file_name.ext`
Example if `base_path` is defined:\n `c:/base_path/file_name.ext`
Example if `account_id` is defined:\n `account_id/file_name.ext`
Example if both are defined:\n `c:/base_path/account_id/file_name.ext`
'''
ext = image_url.split("?")[0].split(".")[-1]
file_name = f"{account_name}_{post_id}_{image_index}.{ext}"
if account_id != None and base_path != None:
return f"{base_path}/{account_id}/{file_name}"
elif base_path != None:
return f"{base_path}/{file_name}"
elif account_id != None:
return f"{account_id}/{file_name}"
return file_name
async def UpdateMediaPosts(account : x_accounts, botData : RuntimeBotData) -> list[Tweet]:
all_posts = botData.db.get_all_post_ids(account.id)
newest_post = 1 if len(all_posts) == 0 else max(all_posts)
posts = []
try:
posts = [tweet async for tweet in botData.twApi.get_tweets(user_name = account.name, bottom_id = newest_post, all_posts = all_posts)]
except (UserProtected, UserNotFound) as ex:
print("User dead: ", account.name, ex)
raise ACCOUNT_DEAD(ex)
except Exception as ex:
print("Error in ", account.name, ex)
raise ACCOUNT_SKIP(ex)
return posts
async def DownloadAllMediaPosts(account : x_accounts, botData : RuntimeBotData) -> list[Tweet]:
all_posts = botData.db.get_all_post_ids(account.id)
posts = []
try:
async for tweet in botData.twApi.get_tweets(user_name = account.name, bottom_id = 1, all_posts = []):
if int(tweet.id) not in all_posts:
posts.append(tweet)
except (UserProtected, UserNotFound) as ex:
print("User dead: ", account.name, ex)
raise ACCOUNT_DEAD(ex)
except Exception as ex:
print("Error in ", account.name, ex)
raise ACCOUNT_SKIP(ex)
return posts
def parse_x_url(url : str):
"return account (handle, post id) from full X post url"
url = url.replace("https://", "").replace("http://", "")
split = url.split("?")
if len(split) > 0:
url = split[0]
split = url.split('/')
if split[2] != "status":
raise Exception("Invalid Format")
return split[1], int(split[3])