post https://api.cloud.deepset.ai/api/v1/workspaces//pipelines//chat-stream
Run a chat query and return the answer as stream. Chat pipelines are based on the chat
template that uses a search
session to include search history in the chat. You can then specify how many search history items (query and answer)
from a given search session you want to display in the chat. You'll need a search session ID to run the query.
Use the search session endpoints to list or create search sessions.
The full result can be accessed as the last stream message if include_result=True
.
Event data format where delta
, result
and error
are mutually exclusive:
{
"query_id": UUID,
"type": Literal["delta", "result", "error"],
"delta": Optional[StreamDelta],
"result": Optional[DeepsetCloudQueryResponse],
"error": Optional[str],
}
StreamDelta format:
{
"text": str,
}
Example code to consume the stream in Python:
import httpx
import json
from httpx_sse import EventSource
import asyncio
TOKEN = "MY_TOKEN"
PIPELINE_URL = "https://api.cloud.deepset.ai/api/v1/workspaces/MY_WORKSPACE/pipelines/MY_PIPELINE"
SEARCH_SESSION_ID = "MY_SEARCH_SESSION_ID"
async def main():
query = {
"query": "How does streaming work with deepset Cloud?",
"include_result": True,
"search_session_id": SEARCH_SESSION_ID
}
headers = {
"Authorization": f"Bearer {TOKEN}"
}
async with httpx.AsyncClient(base_url=PIPELINE_URL, headers=headers) as client:
async with client.stream("POST", "/chat-stream", json=query) as response:
# Check if the response is successful
if response.status_code != 200:
await response.aread()
print(f"An error occured with status code: {response.status_code}")
print(response.json()["errors"][0])
return
event_source = EventSource(response)
# Stream the response
async for event in event_source.aiter_sse():
event_data = json.loads(event.data)
chunk_type = event_data["type"]
# Check the type of the chunk and print the data accordingly
match chunk_type:
# Delta chunk contains the next text chunk of the answer
case "delta":
token: str = event_data["delta"]["text"]
print(token, flush=True, end="")
# Result chunk contains the final pipeline result
case "result":
print("\n\nPipeline result:")
print(json.dumps(event_data["result"]))
# Error chunk contains the error message
case "error":
print("\n\nAn error occurred while streaming:")
print(event_data["error"])
asyncio.run(main())