from __future__ import annotations from typing import TYPE_CHECKING from config import Global_Config if TYPE_CHECKING: from runtimeBotData import RuntimeBotData import os import asyncio from io import BytesIO from zipfile import ZipFile from pixivpy3 import AppPixivAPI from Twitter.tweetyapi import TweetyApi from getPixivpyToken.gppt import GetPixivToken from PIL import Image class PixivApi: def init(self): self.api = AppPixivAPI(timeout=60) self.api.auth(refresh_token=self.get_refresh_token()) return self def get_refresh_token(self) -> str: if Global_Config["pixiv_token"] is not None: return Global_Config["pixiv_token"] else: if Global_Config["pixiv_username"] is None or Global_Config["pixiv_password"] is None: raise Exception("Pixiv username and password are required") g = GetPixivToken(headless=True, username=Global_Config["pixiv_username"], password=Global_Config["pixiv_password"]) refresh_token = g.login()["refresh_token"] Global_Config["pixiv_token"] = refresh_token return refresh_token async def search_illust(self, retries = 3, **next_qs): last_error = "" sleep_default = 0.125 for i in range(retries): try: json_result = self.api.search_illust(**next_qs) if json_result.error != None: raise Exception(json_result.error) return json_result except Exception as ex: print(ex) last_error = ex await TweetyApi.sleep_wait(sleep_default, i) self.api = PixivApi().init().api continue raise Exception(last_error) async def illust_detail(self, id, retries = 3): last_error = "" sleep_default = 0.125 for i in range(retries): try: json_result = self.api.illust_detail(id) if json_result.error != None: if json_result.error.user_message == 'Page not found': return None if json_result.error.user_message == 'Artist has made their work private.': return None raise Exception(json_result.error) return json_result except Exception as ex: print(ex) last_error = ex await TweetyApi.sleep_wait(sleep_default, i) self.api = PixivApi().init().api continue raise Exception(last_error) async def ugoira_metadata(self, id, retries = 3): last_error = "" sleep_default = 0.125 for i in range(retries): try: json_result = self.api.ugoira_metadata(id) if json_result.error != None: if json_result.error.user_message == 'Page not found': return None if json_result.error.user_message == 'Artist has made their work private.': return None raise Exception(json_result.error) return json_result except Exception as ex: print(ex) last_error = ex await TweetyApi.sleep_wait(sleep_default, i) self.api = PixivApi().init().api continue raise Exception(last_error) async def download(self, url, retries = 3): for i in range(retries): try: with BytesIO() as io_bytes: def foo(): self.api.download(url, fname=io_bytes) loop = asyncio.get_running_loop() await loop.run_in_executor(None, foo) return io_bytes.getbuffer().tobytes() except Exception as ex: print(ex) if i == 3: raise ex await asyncio.sleep(i * 60) async def get_new_posts(self, botData: RuntimeBotData): posts = [] # get all page: next_qs = {'word': 'けものフレンズ', 'search_target': 'partial_match_for_tags', 'sort': 'date_desc', 'filter': 'for_ios', 'end_date': ''} page = 0 while next_qs: print("Getting pixiv page") has_new_posts = False json_result = await self.search_illust(retries=3, **next_qs) for illust in json_result.illusts: if botData.db.pixiv_get_post(illust.id) == None: has_new_posts = True print(illust.id, illust.title, illust.create_date) posts.append(illust) if has_new_posts == False: break next_qs = self.api.parse_qs(json_result.next_url) if len(json_result.illusts) > 0: oldest_date = json_result.illusts[-1].create_date[0:10] if "end_date" in next_qs and oldest_date != next_qs["end_date"]: next_qs["end_date"] = oldest_date next_qs["offset"] = sum(1 for x in json_result.illusts if x.create_date[0:10] == oldest_date) page += 1 await asyncio.sleep(1) return posts async def download_illust(self, illust, download_path = "Temp/"): print(illust.id) downloaded = [] filepath = os.path.abspath(os.path.join(download_path, str(illust.user.id))) os.makedirs(filepath, exist_ok=True) if illust.type == 'ugoira': filename = str(illust.id) + "_p0.gif" filepath = os.path.join(filepath, filename) bytes = await self.download_ugoira_to_bytes(illust.id) if bytes is None: return [] with open(filepath, "wb") as f: f.write(bytes) downloaded.append({"file":bytes, "name":filename}) else: if len(illust.meta_pages) == 0: _, filename = os.path.split(illust.meta_single_page.original_image_url) filepath = os.path.join(filepath, filename) bytes = await self.download(illust.meta_single_page.original_image_url) with open(filepath, "wb") as f: f.write(bytes) if len(downloaded) < 4: downloaded.append({"file":bytes, "name":filename}) else: for page in illust.meta_pages: filepath = os.path.abspath(os.path.join(download_path, str(illust.user.id))) _, filename = os.path.split(page.image_urls.original) filepath = os.path.join(filepath, filename) bytes = await self.download(page.image_urls.original) with open(filepath, "wb") as f: f.write(bytes) if len(downloaded) < 4: downloaded.append({"file":bytes, "name":filename}) return downloaded async def download_ugoira_to_bytes(self, id): closables = [] metadata = await self.ugoira_metadata(id) if metadata is None: return None if len(metadata.ugoira_metadata.zip_urls) > 1: raise Exception bytes = await self.download(metadata.ugoira_metadata.zip_urls["medium"]) with BytesIO(bytes) as bytes0, ZipFile(bytes0) as input_zip: frames = metadata.ugoira_metadata.frames zip_frames = {} for name in input_zip.namelist(): im = input_zip.read(name) im_bytes = BytesIO(im) im_im = Image.open(im_bytes) closables += [im_bytes, im_im] zip_frames[name] = im_im with BytesIO() as buffer: zip_frames[frames[0].file].save( buffer, format="GIF", save_all=True, append_images=[zip_frames[frame.file] for frame in frames[1:]], duration=[frame.delay for frame in frames], loop=0, optimize=False ) [closable.close() for closable in closables] return buffer.getbuffer().tobytes() if __name__ == "__main__": async def get_new_posts(): api = PixivApi().init() while True: await api.get_new_posts() await asyncio.sleep(30 * 60) asyncio.run(get_new_posts())