DeepsetParallelExecutor
Use DeepsetParallelExecutor to run a component in parallel across multiple inputs simultaneously instead of one by one. This reduces processing time when a component needs to handle several inputs, such as running an LLM over a list of documents.
Key Features
- Wraps any Haystack component and runs it in parallel across a list of inputs
- Configurable number of parallel workers for throughput tuning
- Supports automatic retries when the wrapped component fails
- Optional output flattening to normalize nested list outputs
- Returns outputs in the same order as inputs
Configuration
- Drag the
DeepsetParallelExecutorcomponent onto the canvas from the Component Library. - Click the component to open the configuration panel.
- On the General tab:
- Set
componentto the component you want to run in parallel. Specify it by type and init parameters, the same way you would define any component in a pipeline YAML.
- Set
- Go to the Advanced tab to configure
max_workers,max_retries,progress_bar,raise_on_failure, andflatten_output.
Connections
DeepsetParallelExecutor connects to components with input and output types compatible with the wrapped component. Each input to the executor must be a list of the wrapped component's individual inputs. The executor passes one element from each list to the wrapped component per parallel invocation and returns the results as lists in the same order.
For example, if you wrap a PromptBuilder that takes a single document, the executor takes a List[Document] and returns a List[str] of prompts.
Usage Example
Using the Component in an Index
This is an example of an index that adds LLM-generated metadata to processed documents. It uses DeepsetParallelExecutor to run PromptBuilder and generate one prompt per document. It then uses another DeepsetParallelExecutor to invoke AmazonBedrockGenerator for each prompt.
components:
file_classifier:
type: haystack.components.routers.file_type_router.FileTypeRouter
init_parameters:
mime_types:
- text/plain
- application/pdf
- text/markdown
- text/html
- application/vnd.openxmlformats-officedocument.wordprocessingml.document
- application/vnd.openxmlformats-officedocument.presentationml.presentation
- application/vnd.openxmlformats-officedocument.spreadsheetml.sheet
- text/csv
text_converter:
type: haystack.components.converters.txt.TextFileToDocument
init_parameters:
encoding: utf-8
pdf_converter:
type: haystack.components.converters.pdfminer.PDFMinerToDocument
init_parameters:
line_overlap: 0.5
char_margin: 2
line_margin: 0.5
word_margin: 0.1
boxes_flow: 0.5
detect_vertical: true
all_texts: false
store_full_path: false
markdown_converter:
type: haystack.components.converters.txt.TextFileToDocument
init_parameters:
encoding: utf-8
html_converter:
type: haystack.components.converters.html.HTMLToDocument
init_parameters:
# A dictionary of keyword arguments to customize how you want to extract content from your HTML files.
# For the full list of available arguments, see
# the [Trafilatura documentation](https://trafilatura.readthedocs.io/en/latest/corefunctions.html#extract).
extraction_kwargs:
output_format: markdown # Extract text from HTML. You can also also choose "txt"
target_language: # You can define a language (using the ISO 639-1 format) to discard documents that don't match that language.
include_tables: true # If true, includes tables in the output
include_links: true # If true, keeps links along with their targets
docx_converter:
type: haystack.components.converters.docx.DOCXToDocument
init_parameters: {}
pptx_converter:
type: haystack.components.converters.pptx.PPTXToDocument
init_parameters: {}
xlsx_converter:
type: deepset_cloud_custom_nodes.converters.xlsx.XLSXToDocument
init_parameters: {}
csv_converter:
type: haystack.components.converters.csv.CSVToDocument
init_parameters:
encoding: utf-8
joiner:
type: haystack.components.joiners.document_joiner.DocumentJoiner
init_parameters:
join_mode: concatenate
sort_by_score: false
joiner_xlsx: # merge split documents with non-split xlsx documents
type: haystack.components.joiners.document_joiner.DocumentJoiner
init_parameters:
join_mode: concatenate
sort_by_score: false
splitter:
type: haystack.components.preprocessors.document_splitter.DocumentSplitter
init_parameters:
split_by: word
split_length: 250
split_overlap: 30
respect_sentence_boundary: true
language: en
document_file_adapter:
# An Adapter to create a list of corresponding files for split documents.
# The result will be of the same size and order as the input documents.
#
# inputs:
# - files: List[Document] before splitting
# - documents: List[Document] after splitting
#
# Outputs:
# - output: List[Document] same size as input documents
type: haystack.components.converters.output_adapter.OutputAdapter
init_parameters:
output_type: List[haystack.Document]
unsafe: true
template: |
{%- set file_dict = {} -%}
{%- for file in files -%}
{%- set _ = file_dict.update({file.meta['file_id']: file}) -%}
{%- endfor -%}
{%- set files = [] -%}
{%- for doc in documents -%}
{%- set _ = files.append(file_dict[doc.meta['file_id']]) -%}
{%- endfor -%}
{{ files }}
prompt_builder:
# The prompt to create the context
#
# This template uses DeepsetParallelExecutor to create one prompt per document.
# DeepsetParallelExecutor requires each input to be a list of the wrapped component's inputs.
# It iterates through the lists and passes each individual inputs to the wrapped component.
# Here it receives a list of documents and a list of corresponding files and invokes PromptBuilder for each document-file-pair.
#
# Component run: PromptBuilder
# Component inputs:
# - document: Document
# - file: Document
#
# DeepsetParallelExecutor takes a list of each component input:
# - document: List[Document]
# - file: List[Document]
#
# Outputs:
# - output: List[str]
type: deepset_cloud_custom_nodes.executors.parallel_executor.DeepsetParallelExecutor
init_parameters:
max_workers: 1 # disable parallelism
component:
# here goes any usual component definition
type: haystack.components.builders.prompt_builder.PromptBuilder
init_parameters:
template: |-
Give a short concise context of max 100 words to situate this passage within the overall document for the purposes of improving search retrieval of the passage.
Answer only with the concise context and nothing else.
document:
{{ file.content }}
passage:
{{ document.content }}
context:
required_variables: "*"
llm:
# Runs LLM-based extraction in parallel
#
# We invoke AmazonBedrockGenerator for each prompt.
# For lower latency we run up to 20 requests in parallel.
#
# Component run in parallel: AmazonBedrockGenerator
# Component inputs:
# - prompt: str
#
# DeepsetParallelExecutor takes a list of each component input:
# - prompt: List[str]
#
# Outputs:
# - replies: List[str] (note: flatten_output)
type: deepset_cloud_custom_nodes.executors.parallel_executor.DeepsetParallelExecutor
init_parameters:
max_workers: 20 # 20 llm requests in parallel
# max_retries: 3 # in case the underlying component does not offer retries itself
raise_on_failure: false # don't fail whole batch if one request fails after retry. Failed context will be None.
flatten_output: true # ensure we get List[str] instead of List[List[str]]
component:
type: haystack_integrations.components.generators.amazon_bedrock.generator.AmazonBedrockGenerator
init_parameters:
model: "us.anthropic.claude-sonnet-4-20250514-v1:0"
max_length: 650
model_max_length: 200000
temperature: 0
boto3_config:
region_name: us-west-2
read_timeout: 120
retries:
total_max_attempts: 3
mode: standard
document_meta_updater:
# Updates the documents meta data by adding the context.
#
# We have two list-type inputs (of the same length) and want to process each of their entries together.
# Writing a jinja2 template for OutputAdapter that has to deal with (multiple) lists can be challenging.
# DeepsetParallelExecutor iterates through the lists using OutputAdapter to add the context to each document's metadata.
# As such, the resulting jinja2 code for OutputAdapter becomes trivial.
#
# Component run in parallel: OutputAdapter
# Component inputs:
# - context: str
# - document: Document
#
# DeepsetParallelExecutor takes a list of each component input:
# - context: List[str]
# - document: List[Document]
#
# Outputs:
# - output: List[Document]
type: deepset_cloud_custom_nodes.executors.parallel_executor.DeepsetParallelExecutor
init_parameters:
max_workers: 1 # disable parallelism as this is a cheap operation
component:
type: haystack.components.converters.output_adapter.OutputAdapter
init_parameters:
unsafe: true
output_type: haystack.Document
# sets context to document's meta and returns the document
template: |
{%- set _ = document.meta.update({'context': context}) -%}
{{ document }}
document_embedder:
type: deepset_cloud_custom_nodes.embedders.nvidia.document_embedder.DeepsetNvidiaDocumentEmbedder
init_parameters:
model: intfloat/e5-base-v2
normalize_embeddings: true
meta_fields_to_embed:
- context
writer:
type: haystack.components.writers.document_writer.DocumentWriter
init_parameters:
document_store:
type: haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore
init_parameters:
hosts:
index: ''
max_chunk_bytes: 104857600
embedding_dim: 768
return_embedding: false
method:
mappings:
settings:
create_index: true
http_auth:
use_ssl:
verify_certs:
timeout:
search_fields:
- content
- context
policy: OVERWRITE
connections: # Defines how the components are connected
- sender: file_classifier.text/plain
receiver: text_converter.sources
- sender: file_classifier.application/pdf
receiver: pdf_converter.sources
- sender: file_classifier.text/markdown
receiver: markdown_converter.sources
- sender: file_classifier.text/html
receiver: html_converter.sources
- sender: file_classifier.application/vnd.openxmlformats-officedocument.wordprocessingml.document
receiver: docx_converter.sources
- sender: file_classifier.application/vnd.openxmlformats-officedocument.presentationml.presentation
receiver: pptx_converter.sources
- sender: file_classifier.application/vnd.openxmlformats-officedocument.spreadsheetml.sheet
receiver: xlsx_converter.sources
- sender: file_classifier.text/csv
receiver: csv_converter.sources
- sender: text_converter.documents
receiver: joiner.documents
- sender: pdf_converter.documents
receiver: joiner.documents
- sender: markdown_converter.documents
receiver: joiner.documents
- sender: html_converter.documents
receiver: joiner.documents
- sender: docx_converter.documents
receiver: joiner.documents
- sender: pptx_converter.documents
receiver: joiner.documents
- sender: joiner.documents
receiver: splitter.documents
# pass documents to PromptBuilder
- sender: joiner.documents
receiver: document_file_adapter.files
- sender: splitter.documents
receiver: document_file_adapter.documents
- sender: document_file_adapter.output
receiver: prompt_builder.file
- sender: splitter.documents
receiver: prompt_builder.document
# pass prompts to llm
- sender: prompt_builder.prompt
receiver: llm.prompt
# pass generated context to meta updater
- sender: llm.replies
receiver: document_meta_updater.context
# pass documents to meta updater
- sender: splitter.documents
receiver: document_meta_updater.document
- sender: document_meta_updater.output
receiver: joiner_xlsx.documents
- sender: xlsx_converter.documents
receiver: joiner_xlsx.documents
- sender: csv_converter.documents
receiver: joiner_xlsx.documents
- sender: joiner_xlsx.documents
receiver: document_embedder.documents
- sender: document_embedder.documents
receiver: writer.documents
inputs: # Define the inputs for your pipeline
files: # This component will receive the files to index as input
- file_classifier.sources
max_runs_per_component: 100
metadata: {}
Parameters
Inputs
| Parameter | Type | Default | Description |
|---|---|---|---|
kwargs | Any | The inputs to the component. Each input must be a list of the wrapped component's individual inputs. |
Outputs
| Parameter | Type | Default | Description |
|---|---|---|---|
| The names of the component's outputs | Dict[str, List[str | Dict[str, Any]]] | The outputs of the component. Each output is a list. DeepsetParallelExecutor returns outputs in the same order as inputs. |
Init Parameters
These are the parameters you can configure in Pipeline Builder:
| Parameter | Type | Default | Description |
|---|---|---|---|
component | Component | The component to run in parallel. Specify it by type and its init parameters. | |
max_workers | int | 4 | The maximum number of workers to use in the thread pool executor. |
max_retries | int | 3 | The maximum number of retries to attempt if the component fails. |
progress_bar | bool | False | Whether to show a progress bar while running the component in parallel. |
raise_on_failure | bool | True | Whether to raise an exception if the component fails. |
flatten_output | bool | False | Whether to flatten the output of the component. |
Run Method Parameters
These are the parameters you can configure for the component's run() method. This means you can pass these parameters at query time through the API, in Playground, or when running a job. For details, see Modify Pipeline Parameters at Query Time.
| Parameter | Type | Default | Description |
|---|---|---|---|
kwargs | Any | The inputs to the component. Each input must be a list of the wrapped component's individual inputs. |
Was this page helpful?