Search Stream

Run a search using a pipeline and return the answer as stream.

Options:

  • The full result can be accessed as the last stream message if include_result=True.
  • Tool calls are streamed if include_tool_calls=True. Defaults to rendered where tool calls are converted to markdown text deltas.
  • Tool call results are streamed if include_tool_call_result=True.

Event data format where delta, result, tool_call_delta, tool_call_result, and error are mutually exclusive:

{
    "query_id": UUID,
    "type": Literal["delta", "result", "error", "tool_call_delta", "tool_call_result"],
    "delta": Optional[StreamDelta],
    "result": Optional[DeepsetCloudQueryResponse],
    "error": Optional[str],
    "tool_call_delta": Optional[ToolCallDelta],
    "tool_call_result": Optional[ToolCallResult],
    "index": Optional[int],
    "start": Optional[bool],
    "finish_reason": Optional[str],
}

StreamDelta format:

{
    "text": str,
    "meta": Optional[dict[str, Any]],
}

ToolCallDelta format:

{
    "index": int,
    "tool_name": Optional[str],
    "id": Optional[str],
    "arguments": Optional[str],
}

ToolCallResult format:

{
    "result": str,
    "origin": {
        "tool_name": str,
        "arguments": dict[str, Any],
        "id": Optional[str],
    },
    "error": bool,
}

Example code to consume the stream in Python:

import httpx
import json
from httpx_sse import EventSource
import asyncio

TOKEN = "MY_TOKEN"
PIPELINE_URL = "https://api.cloud.deepset.ai/api/v1/workspaces/MY_WORKSPACE/pipelines/MY_PIPELINE"


async def main():
    query = {
        "query": "How does streaming work with deepset?",
        "include_result": True,
        "include_tool_calls": True,
        "include_tool_call_results": True,
    }
    headers = {
        "Authorization": f"Bearer {TOKEN}"
    }
    async with httpx.AsyncClient(base_url=PIPELINE_URL, headers=headers, timeout=httpx.Timeout(300.0)) as client:
        async with client.stream("POST", "/search-stream", json=query) as response:
            # Check if the response is successful
            if response.status_code != 200:
                await response.aread()
                print(f"An error occured with status code: {response.status_code}")
                print(response.json()["errors"][0])
                return

            event_source = EventSource(response)
            # Stream the response
            async for event in event_source.aiter_sse():
                event_data = json.loads(event.data)
                chunk_type = event_data["type"]
                # Check the type of the chunk and print the data accordingly
                match chunk_type:
                    # Delta chunk contains the next text chunk of the answer
                    case "delta":
                        token: str = event_data["delta"]["text"]
                        print(token, flush=True, end="\n" if event_data.get("finish_reason") else "")
                    # Result chunk contains the final pipeline result
                    case "result":
                        print("\n\nPipeline result:")
                        print(json.dumps(event_data["result"]))
                    # Error chunk contains the error message
                    case "error":
                        print("\n\nAn error occurred while streaming:")
                        print(event_data["error"])
                    case "tool_call_delta":
                        tool_call_delta = event_data["tool_call_delta"]
                        if tool_call_delta["tool_name"]:
                            tool_id = tool_call_delta["id"]
                            tool_name = tool_call_delta["tool_name"]
                            print(f"\n\nTool call {tool_id} started {tool_name} with arguments: ")
                        elif tool_call_delta["arguments"]:
                            print(tool_call_delta["arguments"], flush=True, end="")
                    case "tool_call_result":
                        tool_call_result = event_data["tool_call_result"]
                        tool_id = tool_call_result["origin"]["id"]
                        tool_name = tool_call_result["origin"]["tool_name"]
                        if tool_call_result["error"]:
                            print(f"\n\nTool call {tool_name} with id {tool_id} failed.")
                        else:
                            print(f"\n\nTool call {tool_name} with id {tool_id} result:")
                            print(tool_call_result["result"])


asyncio.run(main())
Language
Credentials
Bearer
URL
Click Try It! to start a request and see the response here!