import json
import requests
# -------------------------- 配置信息 --------------------------
API_KEY = "sk-******************************************"
API_URL = "https://ai.hcrzx.com/v1/chat/completions"
MODEL = "qwen3.6-plus"
# -------------------------- 发起 SSE 流式请求 --------------------------
def chat_stream(prompt: str):
payload = {
"model": MODEL,
"messages": [
{"role": "system", "content": "你是一个专业、简洁的助手。"},
{"role": "user", "content": prompt}
],
"stream": True, # 开启 SSE 流式输出
"enable_thinking": False, # 关闭思考过程(关键)
"max_tokens": 2000 # 限制回答长度
}
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
"Accept": "text/event-stream" # SSE 标准请求头
}
# 发起流式请求
with requests.post(
url=API_URL,
json=payload,
headers=headers,
stream=True, # 必须开启流式
timeout=(10, 60) # 连接超时/读取超时
) as response:
# 检查HTTP状态
response.raise_for_status()
# 逐行解析SSE数据流
for line in response.iter_lines():
# 跳过空行(SSE协议中空行用于分隔事件)
if not line.strip():
continue
# 解码数据
line = line.decode("utf-8").strip()
# SSE标准:只处理 data: 开头的消息
if not line.startswith("data: "):
continue
# 提取消息体
data = line[6:]
# 结束标志
if data == "[DONE]":
print("\n[输出完成]")
break
# 解析JSON
try:
chunk = json.loads(data)
if "choices" in chunk and len(chunk["choices"]) > 0 and "delta" in chunk["choices"][0]:
delta = chunk["choices"][0]["delta"]
content = delta.get("content", "")
# 实时输出(不换行、强制刷新)
if content:
print(content, end="", flush=True)
except json.JSONDecodeError:
continue
except KeyError:
continue
# -------------------------- 调用示例 --------------------------
if __name__ == "__main__":
chat_stream("曹植是曹操的儿子吗?请用简洁的语言回答")