Chat Stream

Run a chat query and return the answer as stream. Chat pipelines are based on the chat template that uses a search
session to include search history in the chat. You can then specify how many search history items (query and answer)
from a given search session you want to display in the chat. You'll need a search session ID to run the query.
Use the search session endpoints to list or create search sessions.

The full result can be accessed as the last stream message if include_result=True.

Event data format where delta, result and error are mutually exclusive:

{
    "query_id": UUID,
    "type": Literal["delta", "result", "error"],
    "delta": Optional[StreamDelta],
    "result": Optional[DeepsetCloudQueryResponse],
    "error": Optional[str],
}

StreamDelta format:

{
    "text": str,
}
Example code to consume the stream in Python:
import httpx
import json
from httpx_sse import EventSource
import asyncio

TOKEN = "MY_TOKEN"
PIPELINE_URL = "https://api.cloud.deepset.ai/api/v1/workspaces/MY_WORKSPACE/pipelines/MY_PIPELINE"
SEARCH_SESSION_ID = "MY_SEARCH_SESSION_ID"


async def main():
    query = {
        "query": "How does streaming work with deepset Cloud?",
        "include_result": True,
        "search_session_id": SEARCH_SESSION_ID
    }
    headers = {
        "Authorization": f"Bearer {TOKEN}"
    }
    async with httpx.AsyncClient(base_url=PIPELINE_URL, headers=headers) as client:
        async with client.stream("POST", "/chat-stream", json=query) as response:
            # Check if the response is successful
            if response.status_code != 200:
                await response.aread()
                print(f"An error occured with status code: {response.status_code}")
                print(response.json()["errors"][0])
                return

            event_source = EventSource(response)
            # Stream the response
            async for event in event_source.aiter_sse():
                event_data = json.loads(event.data)
                chunk_type = event_data["type"]
                # Check the type of the chunk and print the data accordingly
                match chunk_type:
                    # Delta chunk contains the next text chunk of the answer
                    case "delta":
                        token: str = event_data["delta"]["text"]
                        print(token, flush=True, end="")
                    # Result chunk contains the final pipeline result
                    case "result":
                        print("\n\nPipeline result:")
                        print(json.dumps(event_data["result"]))
                    # Error chunk contains the error message
                    case "error":
                        print("\n\nAn error occurred while streaming:")
                        print(event_data["error"])


asyncio.run(main())
Language
Authorization
Bearer
Click Try It! to start a request and see the response here!