Skip to main content

Chat Stream

POST 

/api/v1/workspaces/:workspace_name/pipelines/:pipeline_name/chat-stream

Run a chat query and return the answer as stream. Chat pipelines are based on the chat template that uses a search session to include search history in the chat. You can then specify how many search history items (query and answer) from a given search session you want to display in the chat. You'll need a search session ID to run the query. Use the search session endpoints to list or create search sessions.

Options:

  • The full result can be accessed as the last stream message if include_result=True.
  • Tool calls are streamed if include_tool_calls=True. Defaults to rendered where tool calls are converted to markdown text deltas.
  • Tool call results are streamed if include_tool_call_result=True.

Event data format where delta, result, tool_call_delta, tool_call_result, and error are mutually exclusive:

{
"query_id": UUID,
"type": Literal["delta", "result", "error", "tool_call_delta", "tool_call_result"],
"delta": Optional[StreamDelta],
"result": Optional[DeepsetCloudQueryResponse],
"error": Optional[str],
"tool_call_delta": Optional[ToolCallDelta],
"tool_call_result": Optional[ToolCallResult],
"index": Optional[int],
"start": Optional[bool],
"finish_reason": Optional[str],
}

StreamDelta format:

{
"text": str,
"meta": Optional[dict[str, Any]],
}

ToolCallDelta format:

{
"index": int,
"tool_name": Optional[str],
"id": Optional[str],
"arguments": Optional[str],
}

ToolCallResult format:

{
"result": str,
"origin": {
"tool_name": str,
"arguments": dict[str, Any],
"id": Optional[str],
},
"error": bool,
}

Example code to consume the stream in Python:

import httpx
import json
from httpx_sse import EventSource
import asyncio

TOKEN = "MY_TOKEN"
PIPELINE_URL = "https://api.cloud.deepset.ai/api/v1/workspaces/MY_WORKSPACE/pipelines/MY_PIPELINE"
SEARCH_SESSION_ID = "MY_SEARCH_SESSION_ID"


async def main():
query = {
"query": "How does streaming work with deepset?",
"include_result": True,
"include_tool_calls": True,
"include_tool_call_results": True,
"search_session_id": SEARCH_SESSION_ID
}
headers = {
"Authorization": f"Bearer {TOKEN}"
}
async with httpx.AsyncClient(base_url=PIPELINE_URL, headers=headers, timeout=httpx.Timeout(300.0)) as client:
async with client.stream("POST", "/chat-stream", json=query) as response:
# Check if the response is successful
if response.status_code != 200:
await response.aread()
print(f"An error occured with status code: {response.status_code}")
print(response.json()["errors"][0])
return

event_source = EventSource(response)
# Stream the response
async for event in event_source.aiter_sse():
event_data = json.loads(event.data)
chunk_type = event_data["type"]
# Check the type of the chunk and print the data accordingly
match chunk_type:
# Delta chunk contains the next text chunk of the answer
case "delta":
token: str = event_data["delta"]["text"]
print(token, flush=True, end="\n" if event_data.get("finish_reason") else "")
# Result chunk contains the final pipeline result
case "result":
print("\n\nPipeline result:")
print(json.dumps(event_data["result"]))
# Error chunk contains the error message
case "error":
print("\n\nAn error occurred while streaming:")
print(event_data["error"])
case "tool_call_delta":
tool_call_delta = event_data["tool_call_delta"]
if tool_call_delta["tool_name"]:
tool_id = tool_call_delta["id"]
tool_name = tool_call_delta["tool_name"]
print(f"\n\nTool call {tool_id} started {tool_name} with arguments: ")
elif tool_call_delta["arguments"]:
print(tool_call_delta["arguments"], flush=True, end="")
case "tool_call_result":
tool_call_result = event_data["tool_call_result"]
tool_id = tool_call_result["origin"]["id"]
tool_name = tool_call_result["origin"]["tool_name"]
if tool_call_result["error"]:
print(f"\n\nTool call {tool_name} with id {tool_id} failed.")
else:
print(f"\n\nTool call {tool_name} with id {tool_id} result:")
print(tool_call_result["result"])


asyncio.run(main())

Request

Responses

Returns the search results.