post https://api.cloud.deepset.ai/api/v1/workspaces//pipelines//search-stream
Run a search using a pipeline and return the answer as stream.
The full result can be accessed as the last stream message if include_result=True
.
Event data format where delta
, result
and error
are mutually exclusive:
{
"query_id": UUID,
"type": Literal["delta", "result", "error"],
"delta": Optional[StreamDelta],
"result": Optional[DeepsetCloudQueryResponse],
"error": Optional[str],
}
StreamDelta format:
{
"text": str,
}
Example code to consume the stream in Python:
import httpx
import json
from httpx_sse import EventSource
import asyncio
TOKEN = "MY_TOKEN"
PIPELINE_URL = "https://api.cloud.deepset.ai/api/v1/workspaces/MY_WORKSPACE/pipelines/MY_PIPELINE"
async def main():
query = {
"query": "How does streaming work with deepset Cloud?",
"include_result": True
}
headers = {
"Authorization": f"Bearer {TOKEN}"
}
async with httpx.AsyncClient(base_url=PIPELINE_URL, headers=headers) as client:
async with client.stream("POST", "/search-stream", json=query) as response:
# Check if the response is successful
if response.status_code != 200:
await response.aread()
print(f"An error occured with status code: {response.status_code}")
print(response.json()["errors"][0])
return
event_source = EventSource(response)
# Stream the response
async for event in event_source.aiter_sse():
event_data = json.loads(event.data)
chunk_type = event_data["type"]
# Check the type of the chunk and print the data accordingly
match chunk_type:
# Delta chunk contains the next text chunk of the answer
case "delta":
token: str = event_data["delta"]["text"]
print(token, flush=True, end="")
# Result chunk contains the final pipeline result
case "result":
print("\n\nPipeline result:")
print(json.dumps(event_data["result"]))
# Error chunk contains the error message
case "error":
print("\n\nAn error occurred while streaming:")
print(event_data["error"])
asyncio.run(main())