前提条件
如果要在公众号中集成ChatGPT功能,拥有一个openai的api key是必不可少的,这可能需要会魔法,去官网注册一个账号,买也行,不过容易被封(貌似最近风控比较严)。其次,需要一个能够直接访问openai接口的服务器作为跳板(没办法,国区无法直连)。
搭建跳板服务
这里给大家介绍个最简单的方法,利用nginx的反向代理功能,0代码实现openai接口的代理服务。
那这里的问题就是怎么拥有海外节点的服务器,可以购买海外区的vps(比如godaddy),或者在AMZ(亚马逊云)、AZURE(微软云)注册领取一个一年试用的云服务器,节点选择美国。
然后就是配置nginx了,简单一点直接安装一个宝塔面板,安装过程可自行百度,这里就不讲解了,基本就是从官网获取安装脚本一键安装,没啥可讲的。
新建站点,输入一个域名(IP应该也可以)
配置反代
按截图配置即可,然后再编辑一下配置文件
将第一个红框中的代码注释掉,不要转发原始IP,加上第二个红框中的内容,开启https的代理,否则会出现502报错。
然后,访问 http://服务器IP地址:端口/v1/,如果出现下图中的信息即表示代理成功了。
有了这个代理接口,后续就是大家的发挥空间了。
接入微信公众号
这里给大家介绍如何接入微信公众号
我是使用的python,用的FastAPI,总共200来行代码(很多冗余的,大家可自行删减)。
因为公众号回复消息需要在5秒内完成,请求接口生成答案可能会超过这个阈值,但是微信会重试3次,这样总体思路是这样的。
给每次请求计个数, 重试的message_id是一样的,如果能在5秒内返回,那很好,如果重试第3次还是没结果,返回一个提示,这里是必须的,否则微信会提示服务异常。
同时需要将答案缓存起来,如果超时,再次发送相同的问题时,直接从缓存中获取,不再请求OpenAI接口。
附上代码,大家可参考一下:
主要用到的库如下,请自行pip安装
fastapi[all]
wechatpy
aioredis
pycryptodome
aiohttp
缓存是使用的Redis,可直接在宝塔后台安装Redis服务。
redis.py文件内容
from typing import Optional
from contextlib import asynccontextmanager
from aioredis import Redis, ConnectionPool
from fastapi.logger import logger
import json
class MissingFactoryError(Exception):
pass
class MissingDatabaseSettingError(Exception):
pass
class RedisMixin:
redis_client: Redis = None
@asynccontextmanager
async def session(self) -> Redis:
try:
_session = self._make_redis_session()
yield _session
except Exception as e:
logger.error(e)
finally:
if _session: await _session.close()
@property
def redis_conn(self) -> Redis:
return self._make_redis_session()
def _make_redis_session(self) -> Redis:
if not self.redis_client:
raise MissingFactoryError()
redis = self.redis_client
if not redis:
raise MissingDatabaseSettingError()
return redis.session
class AioRedis:
def __init__(
self, pool_options=None
):
self.configure(
pool_options=pool_options,
)
def configure(
self, pool_options=None
):
self.redis_pool = ConnectionPool(**(pool_options or {}))
@property
def session(self):
return self.get_session(pool=self.redis_pool)
def get_session(self, pool=None):
if not pool:
raise MissingDatabaseSettingError()
return Redis(connection_pool=pool)
class BaseRedisModel(RedisMixin):
def __init__(self, redis_sess:AioRedis=None):
if redis_sess is not None: self.redis_client = redis_sess
"""
取值(字符串)
"""
async def get(self, name):
async with self.redis_conn as redis:
try:
value = await redis.get(name=name)
value = value.decode('utf8') if isinstance(value, bytes) else value
if value: value = json.loads(value)
except Exception as e:
logger.error(e)
value = None
finally:
await redis.close()
return value
"""
设值(字符串)
"""
async def set(self, name, value, ex=None, px=None, nx=False, xx=False):
async with self.redis_conn as redis:
try:
value and (await redis.set(name=name, value=json.dumps(value, ensure_ascii=False), ex=ex, px=px, nx=nx, xx=xx))
except Exception as e:
logger.error(e)
finally:
await redis.close()
"""
删值
"""
async def delete(self, names):
async with self.redis_conn as redis:
try:
await redis.delete(names)
except Exception as e:
logger.error(e)
finally:
await redis.close()
"""
取值(键值对)
"""
async def hgetall(self, name):
async with self.redis_conn as redis:
try:
data = dict()
values = await redis.hgetall(name)
for key, value in values.items():
value = value.decode('utf8') if isinstance(value, bytes) else value
if value: data[key] = json.loads(value)
except Exception as e:
logger.error(e)
data = dict()
finally:
await redis.close()
return data
"""
设值(键值对)
"""
async def hset(self, name, key, value, ex=None):
async with self.redis_conn as redis:
try:
value = json.dumps(value, ensure_ascii=False)
value and (await redis.hset(name, key, value))
ex and (await redis.expire(name, ex))
except Exception as e:
logger.error(e)
finally:
await redis.close()
"""
批量设值(键值对)
"""
async def hmset(self, name, data, ex=None):
async with self.redis_conn as redis:
try:
for key, value in data.items():
data[key] = json.dumps(value, ensure_ascii=False)
data and (await redis.hmset(name, data))
ex and (await redis.expire(name, ex))
except Exception as e:
logger.error(e)
finally:
await redis.close()
"""
取一个值(键值对)
"""
async def hget(self, name, key):
async with self.redis_conn as redis:
try:
data = await redis.hget(name, key)
data = data.decode('utf8') if isinstance(data, bytes) else data
if data: data = json.loads(data)
except Exception as e:
data = None
logger.error(e)
finally:
await redis.close()
return data
"""
删一个值(键值对)
"""
async def hdel(self, name, key):
async with self.redis_conn as redis:
try:
await redis.hdel(name, key)
except Exception as e:
logger.error(e)
finally:
await redis.close()
"""
设值(列表)
"""
async def lpush(self, name, value, ex=None):
async with self.redis_conn as redis:
try:
if not isinstance(value, list): value = [value]
if value:
for v in value:
await redis.lpush(name, json.dumps(v, ensure_ascii=False))
ex and (await redis.expire(name, ex))
except Exception as e:
logger.error(e)
finally:
await redis.close()
"""
取值(列表)
"""
async def lgetall(self, name):
async with self.redis_conn as redis:
try:
end = (await redis.llen(name))
data = (await redis.lrange(name, 0, end))
for v in data:
if v: v = json.loads(v)
except Exception as e:
logger.error(e)
data = []
finally:
await redis.close()
return data
"""
清空并设值(列表)
"""
async def lnpush(self, name, value, ex=None):
async with self.redis_conn as redis:
try:
await redis.delete(name)
await self.lpush(name=name, value=value, ex=ex)
except Exception as e:
logger.error(e)
finally:
await redis.close()
service.py 文件内容
from fastapi import FastAPI, applications
from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html
from fastapi.exceptions import RequestValidationError, HTTPException, WebSocketRequestValidationError
from fastapi.encoders import jsonable_encoder
from fastapi.logger import logger
from fastapi.responses import PlainTextResponse, JSONResponse, HTMLResponse, Response
from pydantic import BaseModel
from starlette.requests import Request
from starlette.status import HTTP_422_UNPROCESSABLE_ENTITY, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND, HTTP_401_UNAUTHORIZED
from uuid import uuid4
from typing import Any, List, Union
from wechatpy import parse_message, create_reply
from wechatpy.utils import check_signature
from wechatpy.crypto import WeChatCrypto
from wechatpy.exceptions import (
InvalidSignatureException,
InvalidAppIdException,
)
from redis import AioRedis, BaseRedisModel
import asyncio
import json
import time
import uvicorn
import aiohttp
import json
import time
class XMLResponse(Response):
media_type = "text/xml"
async def http_exception_handler(request: Request, exc: HTTPException) -> JSONResponse:
return JSONResponse(status_code=exc.status_code, content=dict(success=False, message=jsonable_encoder(exc.detail)), headers=exc.headers)
async def request_validation_exception_handler(request: Request, exc: RequestValidationError) -> JSONResponse:
return JSONResponse(status_code=HTTP_422_UNPROCESSABLE_ENTITY, content=dict(success=False, message='请求参数错误!', data=jsonable_encoder(exc.errors())))
async def on_exception_404(request: Request, exc: HTTPException) -> JSONResponse:
return JSONResponse(status_code=HTTP_404_NOT_FOUND, content=dict(success=False, message='访问的路径不存在!', data=jsonable_encoder(exc.detail)), headers=exc.headers)
async def on_exception_403(request: Request, exc: HTTPException) -> JSONResponse:
return JSONResponse(status_code=HTTP_403_FORBIDDEN, content=dict(success=False, message='没有访问权限!', data=jsonable_encoder(exc.detail)), headers=exc.headers)
async def on_exception_401(request: Request, exc: HTTPException) -> JSONResponse:
return JSONResponse(status_code=HTTP_401_UNAUTHORIZED, content=dict(success=False, message='登录超时, 请重新登录!', data=jsonable_encoder(exc.detail)), headers=exc.headers)
exception_handlers = {
404: on_exception_404,
403: on_exception_403,
401: on_exception_401
}
def swagger_monkey_patch(*args, **kwargs) -> HTMLResponse:
return get_swagger_ui_html(*args, **kwargs)
def redoc_monkey_patch(*args, **kwargs) -> HTMLResponse:
return get_redoc_html(*args, **kwargs)
applications.get_swagger_ui_html = swagger_monkey_patch
applications.get_redoc_html = redoc_monkey_patch
api_prefix = '/api'
app = FastAPI(title='ChatGPT', description='ChatGPT API', version='1.0.0', debug=False,
swagger_ui_oauth2_redirect_url=f'{api_prefix}/docs/oauth2-redirect', openapi_url=f'{api_prefix}/openapi.json',
docs_url=f'{api_prefix}/docs', redoc_url=f'{api_prefix}/redoc', exception_handlers=exception_handlers)
app.add_exception_handler(HTTPException, http_exception_handler)
app.add_exception_handler(RequestValidationError, request_validation_exception_handler)
app.add_exception_handler(WebSocketRequestValidationError, request_validation_exception_handler)
# 微信公众号和redis配置
settings = dict(wechat_mp_token='', wechat_mp_aes_key='', wechat_mp_appid='', rds_host='127.0.0.1', rds_port=6379, rds_db=1, rds_user='default', rds_password='', rds_pool_size=20)
curd_redis = BaseRedisModel(AioRedis(pool_options=dict(host=settings.get('rds_host'), port=settings.get('rds_port'), db=settings.get('rds_db'), username=settings.get('rds_user'), password=settings.get('rds_password'), max_connections=settings.get('rds_pool_size'), decode_responses=True)))
async def openai_chat(text: str, from_user: str, message_id: str):
try:
if text == '新对话':
await curd_redis.delete(f'openai_chat:{from_user}')
return '已启用新的对话进程, 现在可以跟我聊天啦!'
if len(text) > 500:
return '单次提问文本长度不能超过500个字符, 请重新输入。'
# 这里改成代理地址和配置api key
url = 'https://api.openai.com/v1/chat/completions'
api_key = ''
# 在这里设定系统角色
messages = [{
"role": "system",
"content": "你是一个友好的,活泼开朗的,有着正向价值观的,拥有丰富的软件测试知识的,不回答任何涉及政治、色情等不良信息的问题,是公众号【贝克街的捉虫师】的AI智能小助手冰冰"
}]
count = await curd_redis.get(f"openai_chat:{from_user}:{message_id}")
count = int(count) if count else 0
count += 1
history_messages = await curd_redis.hgetall(name=f'openai_chat:{from_user}')
message = None
for key in sorted(history_messages, reverse=True)[:5]:
msg = history_messages.get(key, {})
if not isinstance(msg, dict):
msg = json.loads(msg)
if msg.get('qusetion', {}).get('content') == text:
message = msg.get('anwser', {}).get('content')
break
if message: return message
if count != 1 and message_id not in history_messages.keys():
await asyncio.sleep(4)
history_messages = await curd_redis.hgetall(name=f'openai_chat:{from_user}')
await curd_redis.set(f"openai_chat:{from_user}:{message_id}", count, ex=30)
if message_id in history_messages.keys():
content = history_messages.get(message_id, {})
if not isinstance(content, dict):
content = json.loads(content)
return content.get('anwser').get('content')
elif count == 2:
await asyncio.sleep(3)
return ''
elif count == 3:
return '答案生成中...请稍后(3~15秒)重发原问题获取答案吧。'
else:
for key in sorted(history_messages):
content = history_messages.get(key, {})
if not isinstance(content, dict):
content = json.loads(content)
if content.get('qusetion'): messages.append(content.get('qusetion'))
if content.get('anwser'): messages.append(content.get('anwser'))
qusetion = {
"role": "user",
"content": text,
}
messages.append(qusetion)
data = dict(model='gpt-3.5-turbo', messages=messages)
headers = {'Authorization': f'Bearer {api_key}'}
async with aiohttp.ClientSession() as session:
async with session.request('POST', url, json=data, headers=headers) as resp:
resp = await resp.json()
if resp.get('choices'):
message = resp.get("choices")[0].get('message', {}) if resp.get("choices") else {}
await curd_redis.hset(name=f'openai_chat:{from_user}', key=message_id, value=dict(qusetion=qusetion, anwser=message), ex=60*60*24)
return message.get('content')
else:
logger.error(resp.get('error'))
if resp.get('error', {}).get('message', '').find('maximum context length') < 0:
return 'AI智能助手已暂停服务, 敬请谅解!'
return 'Sorry, 暂不支持长文本或长时间持续性对话, 请回复「新对话」, 发起新的对话进程, 谢谢。'
except Exception as e:
logger.error(e)
return ''
async def reply_messages(message: Any):
'''
解析回复微信消息
'''
msg = parse_message(message)
if msg.type == "text":
content = await openai_chat(msg.content, msg.source, msg.id)
reply = create_reply(content, msg)
elif msg.type == "event" and msg.event == "subscribe":
# 配置关注后自动回复的内容
reply = create_reply("终于等到你, 聚合软件测试类精华, 尽在「贝克街的捉虫师」。我是「贝克街的捉虫师」家的AI智能小助手冰冰, 现在可以跟我聊天啦, 祝你开心每一天!", msg)
else:
reply = create_reply("Sorry, 暂不支持此消息类型, 仅支持文本消息。", msg)
return reply
# 微信公众号回调接口
@app.get(f'{api_prefix}/open/mp/callback', summary='微信公众号回调接口', operation_id='verify_token', response_class=PlainTextResponse)
async def verify_token(request: Request):
"""
使用微信sdk回复文本消息
"""
signature = request.query_params.get("signature", "")
timestamp = request.query_params.get("timestamp", "")
nonce = request.query_params.get("nonce", "")
encrypt_type = request.query_params.get("encrypt_type", "raw")
msg_signature = request.query_params.get("msg_signature", "")
echostr = request.query_params.get("echostr", "")
try:
check_signature(settings.get('wechat_mp_token'), signature, timestamp, nonce)
except InvalidSignatureException:
return 'Invalid Signature'
else:
return echostr
# 微信公众号回调接口
@app.post(f'{api_prefix}/open/mp/callback', summary='微信公众号回调接口', operation_id='mp_callback')
async def mp_callback(request: Request):
"""
使用微信sdk回复文本消息
"""
signature = request.query_params.get("signature", "")
timestamp = request.query_params.get("timestamp", "")
nonce = request.query_params.get("nonce", "")
encrypt_type = request.query_params.get("encrypt_type", "raw")
msg_signature = request.query_params.get("msg_signature", "")
try:
check_signature(settings.get('wechat_mp_token'), signature, timestamp, nonce)
except InvalidSignatureException:
return PlainTextResponse(content='Invalid Signature')
xml_data = await request.body()
# POST request
if encrypt_type == "raw":
# plaintext mode
reply = await reply_messages(message=xml_data)
return XMLResponse(content=reply.render())
else:
# encryption mode
crypto = WeChatCrypto(settings.get('wechat_mp_token'), settings.get('wechat_mp_aes_key'), settings.get('wechat_mp_appid'))
try:
msg = crypto.decrypt_message(xml_data, msg_signature, timestamp, nonce)
except (InvalidSignatureException, InvalidAppIdException):
return PlainTextResponse(content='Decrypt Error')
else:
reply = await reply_messages(message=msg)
return XMLResponse(content=crypto.encrypt_message(reply.render(), nonce, timestamp))
if __name__ == '__main__':
uvicorn.run('service:app', host='0.0.0.0', port=8888, reload=False, proxy_headers=True, workers=2, log_level='info')
将上面两个文件保存在一个目录中,执行下面命令启动服务
python3 service.py
微信公众号的配置如下图,将配置复制到代码的配置中,配置要一致,不然无法保存,服务器地址填:https://你的域名/api/open/mp/callback
这个脚本可以部署在国内的服务器上, 只能使用域名且需要备案的域名,需要先部署好服务才能保存,微信会验证token是否正确。
主要提供一个思路,大家有什么问题可以留言。