Search Stream

Run a search using a pipeline and return the answer as stream.

The full result can be accessed as the last stream message if include_result=True.

Event data format where delta, result and error are mutually exclusive:

{
    "query_id": UUID,
    "type": Literal["delta", "result", "error"],
    "delta": Optional[StreamDelta],
    "result": Optional[DeepsetCloudQueryResponse],
    "error": Optional[str],
}

StreamDelta format:

{
    "text": str,
}

Example code to consume the stream in Python:

import httpx
import json
from httpx_sse import EventSource
import asyncio

TOKEN = "MY_TOKEN"
PIPELINE_URL = "https://api.cloud.deepset.ai/api/v1/workspaces/MY_WORKSPACE/pipelines/MY_PIPELINE"


async def main():
    query = {
        "query": "How does streaming work with deepset Cloud?",
        "include_result": True
    }
    headers = {
        "Authorization": f"Bearer {TOKEN}"
    }
    async with httpx.AsyncClient(base_url=PIPELINE_URL, headers=headers) as client:
        async with client.stream("POST", "/search-stream", json=query) as response:
            # Check if the response is successful
            if response.status_code != 200:
                await response.aread()
                print(f"An error occured with status code: {response.status_code}")
                print(response.json()["errors"][0])
                return

            event_source = EventSource(response)
            # Stream the response
            async for event in event_source.aiter_sse():
                event_data = json.loads(event.data)
                chunk_type = event_data["type"]
                # Check the type of the chunk and print the data accordingly
                match chunk_type:
                    # Delta chunk contains the next text chunk of the answer
                    case "delta":
                        token: str = event_data["delta"]["text"]
                        print(token, flush=True, end="")
                    # Result chunk contains the final pipeline result
                    case "result":
                        print("\n\nPipeline result:")
                        print(json.dumps(event_data["result"]))
                    # Error chunk contains the error message
                    case "error":
                        print("\n\nAn error occurred while streaming:")
                        print(event_data["error"])


asyncio.run(main())
Language
Authorization
Bearer
Click Try It! to start a request and see the response here!