Search Stream
POST/api/v1/workspaces/:workspace_name/pipelines/:pipeline_name/search-stream
Run a search using a pipeline and return the answer as stream.
Options:
- The full result can be accessed as the last stream message if
include_result=True. - Tool calls are streamed if
include_tool_calls=True. Defaults torenderedwhere tool calls are converted to markdown text deltas. - Tool call results are streamed if
include_tool_call_result=True.
Event data format where delta, result, tool_call_delta, tool_call_result, and error are mutually
exclusive:
{
"query_id": UUID,
"type": Literal["delta", "result", "error", "tool_call_delta", "tool_call_result"],
"delta": Optional[StreamDelta],
"result": Optional[DeepsetCloudQueryResponse],
"error": Optional[str],
"tool_call_delta": Optional[ToolCallDelta],
"tool_call_result": Optional[ToolCallResult],
"index": Optional[int],
"start": Optional[bool],
"finish_reason": Optional[str],
}
StreamDelta format:
{
"text": str,
"meta": Optional[dict[str, Any]],
}
ToolCallDelta format:
{
"index": int,
"tool_name": Optional[str],
"id": Optional[str],
"arguments": Optional[str],
}
ToolCallResult format:
{
"result": str,
"origin": {
"tool_name": str,
"arguments": dict[str, Any],
"id": Optional[str],
},
"error": bool,
}
Example code to consume the stream in Python:
import httpx
import json
from httpx_sse import EventSource
import asyncio
TOKEN = "MY_TOKEN"
PIPELINE_URL = "https://api.cloud.deepset.ai/api/v1/workspaces/MY_WORKSPACE/pipelines/MY_PIPELINE"
async def main():
query = {
"query": "How does streaming work with deepset?",
"include_result": True,
"include_tool_calls": True,
"include_tool_call_results": True,
}
headers = {
"Authorization": f"Bearer {TOKEN}"
}
async with httpx.AsyncClient(base_url=PIPELINE_URL, headers=headers, timeout=httpx.Timeout(300.0)) as client:
async with client.stream("POST", "/search-stream", json=query) as response:
# Check if the response is successful
if response.status_code != 200:
await response.aread()
print(f"An error occured with status code: {response.status_code}")
print(response.json()["errors"][0])
return
event_source = EventSource(response)
# Stream the response
async for event in event_source.aiter_sse():
event_data = json.loads(event.data)
chunk_type = event_data["type"]
# Check the type of the chunk and print the data accordingly
match chunk_type:
# Delta chunk contains the next text chunk of the answer
case "delta":
token: str = event_data["delta"]["text"]
print(token, flush=True, end="\n" if event_data.get("finish_reason") else "")
# Result chunk contains the final pipeline result
case "result":
print("\n\nPipeline result:")
print(json.dumps(event_data["result"]))
# Error chunk contains the error message
case "error":
print("\n\nAn error occurred while streaming:")
print(event_data["error"])
case "tool_call_delta":
tool_call_delta = event_data["tool_call_delta"]
if tool_call_delta["tool_name"]:
tool_id = tool_call_delta["id"]
tool_name = tool_call_delta["tool_name"]
print(f"\n\nTool call {tool_id} started {tool_name} with arguments: ")
elif tool_call_delta["arguments"]:
print(tool_call_delta["arguments"], flush=True, end="")
case "tool_call_result":
tool_call_result = event_data["tool_call_result"]
tool_id = tool_call_result["origin"]["id"]
tool_name = tool_call_result["origin"]["tool_name"]
if tool_call_result["error"]:
print(f"\n\nTool call {tool_name} with id {tool_id} failed.")
else:
print(f"\n\nTool call {tool_name} with id {tool_id} result:")
print(tool_call_result["result"])
asyncio.run(main())
Request
Responses
- 200
- 404
- 409
- 422
- 429
- 591
Returns the search results.
We couldn't find the pipeline. Check if the pipeline name is correct, make sure the pipeline exists in the workspace you specified, and try again.
The pipeline was in a conflicted state (for example, it wasn't deployed).
Validation Error
The rate limit for this pipeline has been exceeded. Please try again later.
The pipeline was on standby, and is now being activated. Try again in a while.
Was this page helpful?