import json from zhipuai import ZhipuAI import requests import flask def test(): url = "https://agentapi.baidu.com/assistant/getAnswer?appId=LiXmltQoTrb8pryis0cm7RZpUTeUYmcM&secretKey=BShZAmOJXdSLo1yizqFtpQ2DDBiLUcPM" payload = json.dumps({ "message": { "content": { "type": "text", "value": { "showText": "你好,理工一怎么走到西门" } } }, "source": "LiXmltQoTrb8pryis0cm7RZpUTeUYmcM", "from": "openapi", "openId": "001" }) headers = { 'Content-Type': 'application/json' } response = requests.post(url, headers=headers, data=payload) print(response) if response.status_code == 200: api_response = response.json() if api_response.get("status") == 0: # 获取内容 content_list = api_response["data"].get("content", []) for content in content_list: if content["dataType"] == "txt": print("返回的文本内容:") print(content["data"]) else: print(f"API 返回失败: {api_response.get('message')}") else: print(f"API 调用失败,状态码: {response.status_code}, 响应: {response.text}") # client = ZhipuAI(api_key="73bdeed728677bc80efc6956478a2315.VerNWJMCwN9L5gTi") # 请填写您自己的APIKey # response = client.chat.completions.create( # model="glm-4", # 请填写您要调用的模型名称 # messages=[ # {"role": "user", "content": "你好"}, # ], # ) # print(response.choices[0].message) app = flask.Flask(__name__) @app.route("/chat", methods=['POST']) def app_chat(): data = json.loads(flask.globals.request.get_data()) uid = data["user_id"] # 检查输入是否以指定符号结尾,否则补充句号 if not data["text"][-1] in ['?', '?', '.', '。', ',', ',', '!', '!']: data["text"] += "。" # 构造 API 请求参数 url = "https://agentapi.baidu.com/assistant/getAnswer?appId=LiXmltQoTrb8pryis0cm7RZpUTeUYmcM&secretKey=BShZAmOJXdSLo1yizqFtpQ2DDBiLUcPM" payload = json.dumps({ "message": { "content": { "type": "text", "value": { "showText": data["text"] # 使用用户输入的内容替换固定文本 } } }, "source": "LiXmltQoTrb8pryis0cm7RZpUTeUYmcM", "from": "openapi", "openId": uid # 使用用户ID }) headers = { 'Content-Type': 'application/json' } # 发起 POST 请求 response = requests.post(url, headers=headers, data=payload) if response.status_code == 200: api_response = response.json() if api_response.get("status") == 0: # 提取 API 返回的内容 content_list = api_response["data"].get("content", []) for content in content_list: if content["dataType"] == "txt": return json.dumps({ "user_id": uid, "text": content["data"], "error": False, "error_msg": "" }) return json.dumps({ "user_id": uid, "text": "", "error": True, "error_msg": "未找到文本内容" }) else: return json.dumps({ "user_id": uid, "text": "", "error": True, "error_msg": f"API 返回错误: {api_response.get('message')}" }) else: return json.dumps({ "user_id": uid, "text": "", "error": True, "error_msg": f"API 调用失败,状态码: {response.status_code}, 响应: {response.text}" }) if __name__ == '__main__': app.run(host="0.0.0.0", port=11111) # test()