如何将ChatGPT接入微信公众号

瞎折腾 · 2023-03-14

前提条件

如果要在公众号中集成ChatGPT功能,拥有一个openai的api key是必不可少的,这可能需要会魔法,去官网注册一个账号,买也行,不过容易被封(貌似最近风控比较严)。其次,需要一个能够直接访问openai接口的服务器作为跳板(没办法,国区无法直连)。

搭建跳板服务

这里给大家介绍个最简单的方法,利用nginx的反向代理功能,0代码实现openai接口的代理服务。

那这里的问题就是怎么拥有海外节点的服务器,可以购买海外区的vps(比如godaddy),或者在AMZ(亚马逊云)、AZURE(微软云)注册领取一个一年试用的云服务器,节点选择美国。

然后就是配置nginx了,简单一点直接安装一个宝塔面板,安装过程可自行百度,这里就不讲解了,基本就是从官网获取安装脚本一键安装,没啥可讲的。

新建站点,输入一个域名(IP应该也可以)

20230403174606

配置反代

20230403175217

按截图配置即可,然后再编辑一下配置文件

20230403175606

将第一个红框中的代码注释掉,不要转发原始IP,加上第二个红框中的内容,开启https的代理,否则会出现502报错。

然后,访问 http://服务器IP地址:端口/v1/,如果出现下图中的信息即表示代理成功了。

20230403175926

有了这个代理接口,后续就是大家的发挥空间了。

接入微信公众号

这里给大家介绍如何接入微信公众号

我是使用的python,用的FastAPI,总共200来行代码(很多冗余的,大家可自行删减)。

因为公众号回复消息需要在5秒内完成,请求接口生成答案可能会超过这个阈值,但是微信会重试3次,这样总体思路是这样的。

给每次请求计个数, 重试的message_id是一样的,如果能在5秒内返回,那很好,如果重试第3次还是没结果,返回一个提示,这里是必须的,否则微信会提示服务异常。
同时需要将答案缓存起来,如果超时,再次发送相同的问题时,直接从缓存中获取,不再请求OpenAI接口。

附上代码,大家可参考一下:

主要用到的库如下,请自行pip安装

fastapi[all]
wechatpy
aioredis
pycryptodome
aiohttp

缓存是使用的Redis,可直接在宝塔后台安装Redis服务。

redis.py文件内容

from typing import Optional
from contextlib import asynccontextmanager
from aioredis import Redis, ConnectionPool
from fastapi.logger import logger
import json


class MissingFactoryError(Exception):
    pass


class MissingDatabaseSettingError(Exception):
    pass


class RedisMixin:
    redis_client: Redis = None

    @asynccontextmanager
    async def session(self) -> Redis:
        try:
            _session = self._make_redis_session()
            yield _session
        except Exception as e:
            logger.error(e)
        finally:
            if _session: await _session.close()

    @property
    def redis_conn(self) -> Redis:
        return self._make_redis_session()

    def _make_redis_session(self) -> Redis:
        if not self.redis_client:
            raise MissingFactoryError()
        redis = self.redis_client
        if not redis:
            raise MissingDatabaseSettingError()
        return redis.session

class AioRedis:
    def __init__(
        self, pool_options=None
    ):
        self.configure(
            pool_options=pool_options,
        )

    def configure(
        self, pool_options=None
    ):
        self.redis_pool = ConnectionPool(**(pool_options or {}))

    @property
    def session(self):
        return self.get_session(pool=self.redis_pool)

    def get_session(self, pool=None):
        if not pool:
            raise MissingDatabaseSettingError()
        return Redis(connection_pool=pool)


class BaseRedisModel(RedisMixin):
    def __init__(self, redis_sess:AioRedis=None):
        if redis_sess is not None: self.redis_client = redis_sess

    """
    取值(字符串)
    """
    async def get(self, name):
        async with self.redis_conn as redis:
            try:
                value = await redis.get(name=name)
                value = value.decode('utf8') if isinstance(value, bytes) else value
                if value: value = json.loads(value)
            except Exception as e:
                logger.error(e)
                value = None
            finally:
                await redis.close()
                return value

    """
    设值(字符串)
    """
    async def set(self, name, value, ex=None, px=None, nx=False, xx=False):
        async with self.redis_conn as redis:
            try:
                value and (await redis.set(name=name, value=json.dumps(value, ensure_ascii=False), ex=ex, px=px, nx=nx, xx=xx))
            except Exception as e:
                logger.error(e)
            finally:
                await redis.close()

    """
    删值
    """
    async def delete(self, names):
        async with self.redis_conn as redis:
            try:
                await redis.delete(names)
            except Exception as e:
                logger.error(e)
            finally:
                await redis.close()

    """
    取值(键值对)
    """
    async def hgetall(self, name):
        async with self.redis_conn as redis:
            try:
                data = dict()
                values = await redis.hgetall(name)
                for key, value in values.items():
                    value = value.decode('utf8') if isinstance(value, bytes) else value
                    if value: data[key] = json.loads(value)
            except Exception as e:
                logger.error(e)
                data = dict()
            finally:
                await redis.close()
                return data

    """
    设值(键值对)
    """
    async def hset(self, name, key, value, ex=None):
        async with self.redis_conn as redis:
            try:
                value = json.dumps(value, ensure_ascii=False)
                value and (await redis.hset(name, key, value))
                ex and (await redis.expire(name, ex))
            except Exception as e:
                logger.error(e)
            finally:
               await redis.close()

    """
    批量设值(键值对)
    """
    async def hmset(self, name, data, ex=None):
        async with self.redis_conn as redis:
            try:
                for key, value in data.items():
                    data[key] = json.dumps(value, ensure_ascii=False)
                data and (await redis.hmset(name, data))
                ex and (await redis.expire(name, ex))
            except Exception as e:
                logger.error(e)
            finally:
                await redis.close()

    """
    取一个值(键值对)
    """
    async def hget(self, name, key):
        async with self.redis_conn as redis:
            try:
                data = await redis.hget(name, key)
                data = data.decode('utf8') if isinstance(data, bytes) else data
                if data: data = json.loads(data)
            except Exception as e:
                data = None
                logger.error(e)
            finally:
                await redis.close()
                return data

    """
    删一个值(键值对)
    """
    async def hdel(self, name, key):
        async with self.redis_conn as redis:
            try:
                await redis.hdel(name, key)
            except Exception as e:
                logger.error(e)
            finally:
                await redis.close()

    """
    设值(列表)
    """
    async def lpush(self, name, value, ex=None):
        async with self.redis_conn as redis:
            try:
                if not isinstance(value, list): value = [value]
                if value:
                    for v in value:
                        await redis.lpush(name, json.dumps(v, ensure_ascii=False))
                ex and (await redis.expire(name, ex))
            except Exception as e:
                logger.error(e)
            finally:
                await redis.close()

    """
    取值(列表)
    """
    async def lgetall(self, name):
        async with self.redis_conn as redis:
            try:
                end = (await redis.llen(name))
                data = (await redis.lrange(name, 0, end))
                for v in data:
                    if v: v = json.loads(v)
            except Exception as e:
                logger.error(e)
                data = []
            finally:
                await redis.close()
                return data

    """
    清空并设值(列表)
    """
    async def lnpush(self, name, value, ex=None):
        async with self.redis_conn as redis:
            try:
                await redis.delete(name)
                await self.lpush(name=name, value=value, ex=ex)
            except Exception as e:
                logger.error(e)
            finally:
                await redis.close()

service.py 文件内容

from fastapi import FastAPI, applications
from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html
from fastapi.exceptions import RequestValidationError, HTTPException, WebSocketRequestValidationError
from fastapi.encoders import jsonable_encoder
from fastapi.logger import logger
from fastapi.responses import PlainTextResponse, JSONResponse, HTMLResponse, Response
from pydantic import BaseModel
from starlette.requests import Request
from starlette.status import HTTP_422_UNPROCESSABLE_ENTITY, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND, HTTP_401_UNAUTHORIZED
from uuid import uuid4
from typing import Any, List, Union
from wechatpy import parse_message, create_reply
from wechatpy.utils import check_signature
from wechatpy.crypto import WeChatCrypto
from wechatpy.exceptions import (
    InvalidSignatureException,
    InvalidAppIdException,
)
from redis import AioRedis, BaseRedisModel
import asyncio
import json
import time
import uvicorn
import aiohttp
import json
import time

class XMLResponse(Response):
    media_type = "text/xml"


async def http_exception_handler(request: Request, exc: HTTPException) -> JSONResponse:
    return JSONResponse(status_code=exc.status_code, content=dict(success=False, message=jsonable_encoder(exc.detail)), headers=exc.headers)

async def request_validation_exception_handler(request: Request, exc: RequestValidationError) -> JSONResponse:
    return JSONResponse(status_code=HTTP_422_UNPROCESSABLE_ENTITY, content=dict(success=False, message='请求参数错误!', data=jsonable_encoder(exc.errors())))

async def on_exception_404(request: Request, exc: HTTPException) -> JSONResponse:
    return JSONResponse(status_code=HTTP_404_NOT_FOUND, content=dict(success=False, message='访问的路径不存在!', data=jsonable_encoder(exc.detail)), headers=exc.headers)

async def on_exception_403(request: Request, exc: HTTPException) -> JSONResponse:
    return JSONResponse(status_code=HTTP_403_FORBIDDEN, content=dict(success=False, message='没有访问权限!', data=jsonable_encoder(exc.detail)), headers=exc.headers)

async def on_exception_401(request: Request, exc: HTTPException) -> JSONResponse:
    return JSONResponse(status_code=HTTP_401_UNAUTHORIZED, content=dict(success=False, message='登录超时, 请重新登录!', data=jsonable_encoder(exc.detail)), headers=exc.headers)

exception_handlers = {
    404: on_exception_404,
    403: on_exception_403,
    401: on_exception_401
}

def swagger_monkey_patch(*args, **kwargs) -> HTMLResponse:
    return get_swagger_ui_html(*args, **kwargs)

def redoc_monkey_patch(*args, **kwargs) -> HTMLResponse:
    return get_redoc_html(*args, **kwargs)

applications.get_swagger_ui_html = swagger_monkey_patch
applications.get_redoc_html = redoc_monkey_patch

api_prefix = '/api'

app = FastAPI(title='ChatGPT', description='ChatGPT API', version='1.0.0', debug=False, 
swagger_ui_oauth2_redirect_url=f'{api_prefix}/docs/oauth2-redirect', openapi_url=f'{api_prefix}/openapi.json', 
docs_url=f'{api_prefix}/docs', redoc_url=f'{api_prefix}/redoc', exception_handlers=exception_handlers)

app.add_exception_handler(HTTPException, http_exception_handler)
app.add_exception_handler(RequestValidationError, request_validation_exception_handler)
app.add_exception_handler(WebSocketRequestValidationError, request_validation_exception_handler)


# 微信公众号和redis配置
settings = dict(wechat_mp_token='', wechat_mp_aes_key='', wechat_mp_appid='', rds_host='127.0.0.1', rds_port=6379, rds_db=1, rds_user='default', rds_password='', rds_pool_size=20)

curd_redis = BaseRedisModel(AioRedis(pool_options=dict(host=settings.get('rds_host'), port=settings.get('rds_port'), db=settings.get('rds_db'), username=settings.get('rds_user'), password=settings.get('rds_password'), max_connections=settings.get('rds_pool_size'), decode_responses=True)))


async def openai_chat(text: str, from_user: str, message_id: str):
    try:
        if text == '新对话':
            await curd_redis.delete(f'openai_chat:{from_user}')
            return '已启用新的对话进程, 现在可以跟我聊天啦!'
        if len(text) > 500:
            return '单次提问文本长度不能超过500个字符, 请重新输入。'
      
        # 这里改成代理地址和配置api key
        url = 'https://api.openai.com/v1/chat/completions'
        api_key = ''
        # 在这里设定系统角色
        messages = [{
            "role": "system",
            "content": "你是一个友好的,活泼开朗的,有着正向价值观的,拥有丰富的软件测试知识的,不回答任何涉及政治、色情等不良信息的问题,是公众号【贝克街的捉虫师】的AI智能小助手冰冰"
        }]
        count = await curd_redis.get(f"openai_chat:{from_user}:{message_id}")
        count = int(count) if count else 0
        count += 1

        history_messages = await curd_redis.hgetall(name=f'openai_chat:{from_user}')
        message = None
        for key in sorted(history_messages, reverse=True)[:5]:
            msg = history_messages.get(key, {})
            if not isinstance(msg, dict):
                msg = json.loads(msg)
            if msg.get('qusetion', {}).get('content') == text: 
                message = msg.get('anwser', {}).get('content')
                break
        if message: return message

        if count != 1 and message_id not in history_messages.keys():
            await asyncio.sleep(4)
            history_messages = await curd_redis.hgetall(name=f'openai_chat:{from_user}')
        await curd_redis.set(f"openai_chat:{from_user}:{message_id}", count, ex=30)
      
        if message_id in history_messages.keys():
            content = history_messages.get(message_id, {})
            if not isinstance(content, dict):
                content = json.loads(content)
            return content.get('anwser').get('content')
        elif count == 2:
            await asyncio.sleep(3)
            return ''
        elif count == 3:
            return '答案生成中...请稍后(3~15秒)重发原问题获取答案吧。'
        else:
            for key in sorted(history_messages):
                content = history_messages.get(key, {})
                if not isinstance(content, dict):
                    content = json.loads(content)
                if content.get('qusetion'): messages.append(content.get('qusetion'))
                if content.get('anwser'): messages.append(content.get('anwser'))
        qusetion = {
            "role": "user",
            "content": text,
        }
        messages.append(qusetion)
        data = dict(model='gpt-3.5-turbo', messages=messages)
        headers = {'Authorization': f'Bearer {api_key}'}
        async with aiohttp.ClientSession() as session:
            async with session.request('POST', url, json=data, headers=headers) as resp:
                resp = await resp.json()
        if resp.get('choices'):
            message = resp.get("choices")[0].get('message', {}) if resp.get("choices") else {}
            await curd_redis.hset(name=f'openai_chat:{from_user}', key=message_id, value=dict(qusetion=qusetion, anwser=message), ex=60*60*24)
            return message.get('content')
        else:
            logger.error(resp.get('error'))
            if resp.get('error', {}).get('message', '').find('maximum context length') < 0:
                return 'AI智能助手已暂停服务, 敬请谅解!'
            return 'Sorry, 暂不支持长文本或长时间持续性对话, 请回复「新对话」, 发起新的对话进程, 谢谢。'
    except Exception as e:
        logger.error(e)
        return ''


async def reply_messages(message: Any):
    '''
    解析回复微信消息
    '''
    msg = parse_message(message)
    if msg.type == "text":
        content = await openai_chat(msg.content, msg.source, msg.id)
        reply = create_reply(content, msg)
    elif msg.type == "event" and msg.event == "subscribe":
        # 配置关注后自动回复的内容
        reply = create_reply("终于等到你, 聚合软件测试类精华, 尽在「贝克街的捉虫师」。我是「贝克街的捉虫师」家的AI智能小助手冰冰, 现在可以跟我聊天啦, 祝你开心每一天!", msg)
    else:
        reply = create_reply("Sorry, 暂不支持此消息类型, 仅支持文本消息。", msg)
    return reply


# 微信公众号回调接口
@app.get(f'{api_prefix}/open/mp/callback', summary='微信公众号回调接口', operation_id='verify_token', response_class=PlainTextResponse)
async def verify_token(request: Request):
    """
    使用微信sdk回复文本消息
    """
    signature = request.query_params.get("signature", "")
    timestamp = request.query_params.get("timestamp", "")
    nonce = request.query_params.get("nonce", "")
    encrypt_type = request.query_params.get("encrypt_type", "raw")
    msg_signature = request.query_params.get("msg_signature", "")
    echostr = request.query_params.get("echostr", "")
    try:
        check_signature(settings.get('wechat_mp_token'), signature, timestamp, nonce)
    except InvalidSignatureException:
        return 'Invalid Signature'
    else:
        return echostr
  
# 微信公众号回调接口
@app.post(f'{api_prefix}/open/mp/callback', summary='微信公众号回调接口', operation_id='mp_callback')
async def mp_callback(request: Request):
    """
    使用微信sdk回复文本消息
    """
    signature = request.query_params.get("signature", "")
    timestamp = request.query_params.get("timestamp", "")
    nonce = request.query_params.get("nonce", "")
    encrypt_type = request.query_params.get("encrypt_type", "raw")
    msg_signature = request.query_params.get("msg_signature", "")
    try:
        check_signature(settings.get('wechat_mp_token'), signature, timestamp, nonce)
    except InvalidSignatureException:
        return PlainTextResponse(content='Invalid Signature')
    xml_data = await request.body()
    # POST request
    if encrypt_type == "raw":
        # plaintext mode
        reply = await reply_messages(message=xml_data)
        return XMLResponse(content=reply.render())
    else:
        # encryption mode
        crypto = WeChatCrypto(settings.get('wechat_mp_token'), settings.get('wechat_mp_aes_key'), settings.get('wechat_mp_appid'))
        try:
            msg = crypto.decrypt_message(xml_data, msg_signature, timestamp, nonce)
        except (InvalidSignatureException, InvalidAppIdException):
            return PlainTextResponse(content='Decrypt Error')
        else:
            reply = await reply_messages(message=msg)
            return XMLResponse(content=crypto.encrypt_message(reply.render(), nonce, timestamp))
      

if __name__ == '__main__':
    uvicorn.run('service:app', host='0.0.0.0', port=8888, reload=False, proxy_headers=True, workers=2, log_level='info')

将上面两个文件保存在一个目录中,执行下面命令启动服务

python3 service.py 

微信公众号的配置如下图,将配置复制到代码的配置中,配置要一致,不然无法保存,服务器地址填:https://你的域名/api/open/mp/callback

20230403182827

这个脚本可以部署在国内的服务器上, 只能使用域名且需要备案的域名,需要先部署好服务才能保存,微信会验证token是否正确。

主要提供一个思路,大家有什么问题可以留言。

ChatGPT OpenAI 微信公众号
Theme Jasmine by Kent Liao