# 请先安装aiohttp
# pip install -U aiohttp
from aiohttp import web
from PIL import Image, ImageDraw
import io
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def generate_test_image(text: str = "Test Image") -> bytes:
"""生成一个简单的测试图片"""
# 创建一个300x200的白色背景图片
width = 300
height = 200
image = Image.new('RGB', (width, height), 'white')
draw = ImageDraw.Draw(image)
# 绘制一些文本和图形
draw.rectangle([10, 10, width - 10, height - 10], outline='blue', width=2)
draw.text((width // 2 - 50, height // 2), text, fill='black')
# 添加时间戳
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
draw.text((10, height - 20), timestamp, fill='gray')
# 转换为JPEG字节流
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='png')
img_byte_arr = img_byte_arr.getvalue()
return img_byte_arr
class ImageServer:
@staticmethod
async def handle_image(request: web.Request) -> web.Response:
"""处理图片请求并返回JPEG图片"""
try:
# 从请求中获取文本参数
data = await request.json()
text = data.get('text', 'Test Image')
# 生成图片
image_data = await generate_test_image(text)
# 设置响应头
headers = {
'Content-Type': 'image/jpeg',
'Content-Disposition': 'attachment; filename="test.jpg"'
}
logger.info(f"Generating image with text: {text}")
return web.Response(body=image_data, headers=headers, status=403)
except Exception as e:
logger.error(f"Error generating image: {e}")
return web.Response(
status=500,
text=f"Error generating image: {str(e)}"
)
async def on_message(request):
# 它将拉黑一个TG用户名为koipybot的用户
# Get the JSON data from request
data = await request.json()
# Return the data as-is
if str(data['message']['from_user']['username']).startswith('koipybot'):
return web.Response(status=403, text='你已被拉黑!')
return web.json_response()
async def on_pre_send(request):
data = await request.json()
return await ImageServer.handle_image(request)
async def on_result(request):
data = await request.json()
result: dict = data['result']
result["NewKey"] = ["回调新增数据1" for _ in range(len(result["节点名称"]))]
data['result'] = result
return web.json_response(data)
async def init_app():
_app = web.Application()
# Setup routes
_app.router.add_post('/onMessage', on_message)
_app.router.add_post('/onPreSend', on_pre_send)
_app.router.add_post('/onResult', on_result)
return _app
if __name__ == '__main__':
app = init_app()
web.run_app(app, host='127.0.0.1', port=8080)