Enable Streaming

Streaming refers to a large language model generating text as it's produced rather than waiting for the entire response to be ready before showing it. It's similar to watching someone type real-time. Enable streaming for the Generators in your pipelines.

About This Task

Streaming is a technique often used in chat interfaces. It makes the responses seem faster as users can immediately see the output and can start reading while the rest of the text generates. It also makes it possible to interrupt the LLM if needed. This is particularly useful for longer responses where waiting for the generation to complete may take a couple of seconds.

Enabling Streaming

Enabling Streaming for Generators and ChatGenerators

To enable streaming, set the Generator's streaming_callback parameter to deepset_cloud_custom_nodes.callbacks.streaming.streaming_callback.

If your pipeline has multiple Generators, you can enable streaming for each one. The same applies to ChatGenerators.

If no streaming_callback is set, the last Generator in the pipeline streams.

Example of a Generator with Streaming

Here is an example of a Generator with streaming enabled:

CohereGenerator with the streaming_callback parameter filled in to enable streaming

YAML configuration:

CohereGenerator:
    type: haystack_integrations.components.generators.cohere.generator.CohereGenerator
    init_parameters:
      api_key:
        type: env_var
        env_vars:
        - COHERE_API_KEY
        - CO_API_KEY
        strict: false
      model: command-r
      streaming_callback: deepset_cloud_custom_nodes.callbacks.streaming.streaming_callback

Enabling Streaming for Agents

To enable streaming for the Agent, set the Agent's streaming_callback parameter to deepset_cloud_custom_nodes.callbacks.streaming.streaming_callback. Note that you enable the streaming for the Agent, not the ChatGenerator it uses.

Example of an Agent with Streaming

In Pipeline Builder, expand optional parameters on the Agent component card and configure streaming_callback:

The streaming callback parameter on the agent component cared The streaming callback parameter filled in

YAML configuration:

  agent:
    type: haystack.components.agents.agent.Agent
    init_parameters:
      # Chat Generator: Use the OpenAI one from Pipeline A
      chat_generator:
        type: haystack.components.generators.chat.openai.OpenAIChatGenerator
        init_parameters:
          api_key:
            type: env_var
            env_vars:
            - OPENAI_API_KEY
            strict: false
          model: gpt-4o              
          streaming_callback: # this stays empty
          tools:
          api_base_url:
          organization:
          timeout:
          max_retries:
          tools_strict: false
      system_prompt: You are a deep research assistant.
      streaming_callback: deepset_cloud_custom_nodes.callbacks.streaming.streaming_callback # this setting enables streaming for the Agent
      tools:
      ...

Streaming with API

You can use streaming with the stream API endpoints: Chat Stream and Search Stream. This is an example request to the Search Stream endpoint.

curl --request POST \
     --url https://api.cloud.deepset.ai/api/v1/workspaces/WORKSPACE_NAME/pipelines/PIPELINE_NAME/search-stream \
     --header 'accept: application/json' \
     --header 'authorization: Bearer DEEPSET_API_KEY' \
     --header 'content-type: application/json' \
     --data '
{
  "debug": false,
  "include_result": true,
  "view_prompts": false,
  "query": "who started all-girl bands?"
}
'
import requests

url = "https://api.cloud.deepset.ai/api/v1/workspaces/WORKSPACE_NAME/pipelines/PIPELINE_NAME/search-stream"

payload = {
    "debug": False,
    "include_result": True,
    "view_prompts": False,
    "query": "who started all-girl bands?"
}
headers = {
    "accept": "application/json",
    "content-type": "application/json",
    "authorization": "Bearer DEEPSET_API_KEY"
}

response = requests.post(url, json=payload, headers=headers)

print(response.text)

Replace:

  • WORKSPACE_NAME: With the name of the workspace containing your pipeline.
  • PIPELINE_NAME: With the name of the pipeline to use for search.
  • DEEPSET_API_KEY: With your deepset API key.

Determining Which Generator Streamed

If your pipeline includes multiple Generators with streaming enabled, you can determine which Generator streamed a specific chunk of data by checking its name in the API response. This information is available in the delta field.

Below is a partial example of a response from the Search Stream endpoint, showing two streaming-enabled Generators: chat_summary_llm and qa_llm.

{
   "query_id":"290a1f96-57d6-4843-8ed7-2a224142398b",
   "delta":{
      "text":"girl bands?",
      "meta":{
         "index":0,
         "deepset_cloud":{
            "component":"chat_summary_llm" // this is the name of the Generator that streamed
         }
      }
   },
   "type":"delta"
}


{
   "query_id":"290a1f96-57d6-4843-8ed7-2a224142398b",
   "delta":{
      "text":"Base",
      "meta":{
         "index":0,
         "deepset_cloud":{
            "component":"qa_llm" // this is the name of the Generator that streamed
         }
      }
   },
   "type":"delta"
}