Skip to main content

DeepsetParallelExecutor

Run a component in parallel with multiple inputs to process them simultaneously instead of one by one. This approach is useful if you have a component that requires handling several inputs.

Basic Information

  • Type: deepset_cloud_custom_nodes.executors.parallel_executor.DeepsetParallelExecutor
  • Components it can connect with:
    • It connects to components with compatible input and output types for the component you run with DeepsetParallelExecutor. For example, if you run a Generator, it will connect to components like PromptBuilder and AnswerBuilder.

Inputs

ParameterTypeDefaultDescription
kwargsAnyThe inputs to the component. Each input must be a list of component inputs.

Outputs

ParameterTypeDefaultDescription
The names of the component's outputsDict[str, List[strDict[str, Any]]]

Overview

Use this component to run multiple outputs of another component in parallel instead of one by one to save time. For example, you can use it to run a Generator with multiple LLM invocations.

Usage Example

Initializing the Component

components:
DeepsetParallelExecutor:
type: executors.parallel_executor.DeepsetParallelExecutor
init_parameters:

Using the Component in an Index

This is an example of an index that adds LLM-generated metadata to processed documents. It uses the DeepsetParallelExecutor to run PromptBuilder and generate one prompt per document. It then uses another DeepsetParallelExecutor to invoke AmazonBedrockGenerator for each prompt.


components:
file_classifier:
type: haystack.components.routers.file_type_router.FileTypeRouter
init_parameters:
mime_types:
- text/plain
- application/pdf
- text/markdown
- text/html
- application/vnd.openxmlformats-officedocument.wordprocessingml.document
- application/vnd.openxmlformats-officedocument.presentationml.presentation
- application/vnd.openxmlformats-officedocument.spreadsheetml.sheet
- text/csv

text_converter:
type: haystack.components.converters.txt.TextFileToDocument
init_parameters:
encoding: utf-8

pdf_converter:
type: haystack.components.converters.pdfminer.PDFMinerToDocument
init_parameters:
line_overlap: 0.5
char_margin: 2
line_margin: 0.5
word_margin: 0.1
boxes_flow: 0.5
detect_vertical: true
all_texts: false
store_full_path: false

markdown_converter:
type: haystack.components.converters.txt.TextFileToDocument
init_parameters:
encoding: utf-8

html_converter:
type: haystack.components.converters.html.HTMLToDocument
init_parameters:
# A dictionary of keyword arguments to customize how you want to extract content from your HTML files.
# For the full list of available arguments, see
# the [Trafilatura documentation](https://trafilatura.readthedocs.io/en/latest/corefunctions.html#extract).
extraction_kwargs:
output_format: markdown # Extract text from HTML. You can also also choose "txt"
target_language: # You can define a language (using the ISO 639-1 format) to discard documents that don't match that language.
include_tables: true # If true, includes tables in the output
include_links: true # If true, keeps links along with their targets

docx_converter:
type: haystack.components.converters.docx.DOCXToDocument
init_parameters: {}

pptx_converter:
type: haystack.components.converters.pptx.PPTXToDocument
init_parameters: {}

xlsx_converter:
type: deepset_cloud_custom_nodes.converters.xlsx.XLSXToDocument
init_parameters: {}

csv_converter:
type: haystack.components.converters.csv.CSVToDocument
init_parameters:
encoding: utf-8

joiner:
type: haystack.components.joiners.document_joiner.DocumentJoiner
init_parameters:
join_mode: concatenate
sort_by_score: false

joiner_xlsx: # merge split documents with non-split xlsx documents
type: haystack.components.joiners.document_joiner.DocumentJoiner
init_parameters:
join_mode: concatenate
sort_by_score: false

splitter:
type: haystack.components.preprocessors.document_splitter.DocumentSplitter
init_parameters:
split_by: word
split_length: 250
split_overlap: 30
respect_sentence_boundary: true
language: en

document_file_adapter:
# An Adapter to create a list of corresponding files for split documents.
# The result will be of the same size and order as the input documents.
#
# inputs:
# - files: List[Document] before splitting
# - documents: List[Document] after splitting
#
# Outputs:
# - output: List[Document] same size as input documents
type: haystack.components.converters.output_adapter.OutputAdapter
init_parameters:
output_type: List[haystack.Document]
unsafe: true
template: |
{%- set file_dict = {} -%}
{%- for file in files -%}
{%- set _ = file_dict.update({file.meta['file_id']: file}) -%}
{%- endfor -%}
{%- set files = [] -%}
{%- for doc in documents -%}
{%- set _ = files.append(file_dict[doc.meta['file_id']]) -%}
{%- endfor -%}
{{ files }}

prompt_builder:
# The prompt to create the context
#
# This template uses DeepsetParallelExecutor to create one prompt per document.
# DeepsetParallelExecutor requires each input to be a list of the wrapped component's inputs.
# It iterates through the lists and passes each individual inputs to the wrapped component.
# Here it receives a list of documents and a list of corresponding files and invokes PromptBuilder for each document-file-pair.
#
# Component run: PromptBuilder
# Component inputs:
# - document: Document
# - file: Document
#
# DeepsetParallelExecutor takes a list of each component input:
# - document: List[Document]
# - file: List[Document]
#
# Outputs:
# - output: List[str]
type: deepset_cloud_custom_nodes.executors.parallel_executor.DeepsetParallelExecutor
init_parameters:
max_workers: 1 # disable parallelism
component:
# here goes any usual component definition
type: haystack.components.builders.prompt_builder.PromptBuilder
init_parameters:
template: |-
Give a short concise context of max 100 words to situate this passage within the overall document for the purposes of improving search retrieval of the passage.
Answer only with the concise context and nothing else.

document:
{{ file.content }}

passage:
{{ document.content }}

context:

required_variables: "*"
llm:
# Runs LLM-based extraction in parallel
#
# We invoke AmazonBedrockGenerator for each prompt.
# For lower latency we run up to 20 requests in parallel.
#
# Component run in parallel: AmazonBedrockGenerator
# Component inputs:
# - prompt: str
#
# DeepsetParallelExecutor takes a list of each component input:
# - prompt: List[str]
#
# Outputs:
# - replies: List[str] (note: flatten_output)
type: deepset_cloud_custom_nodes.executors.parallel_executor.DeepsetParallelExecutor
init_parameters:
max_workers: 20 # 20 llm requests in parallel
# max_retries: 3 # in case the underlying component does not offer retries itself
raise_on_failure: false # don't fail whole batch if one request fails after retry. Failed context will be None.
flatten_output: true # ensure we get List[str] instead of List[List[str]]
component:
type: haystack_integrations.components.generators.amazon_bedrock.generator.AmazonBedrockGenerator
init_parameters:
model: "us.anthropic.claude-sonnet-4-20250514-v1:0"
max_length: 650
model_max_length: 200000
temperature: 0
boto3_config:
region_name: us-west-2
read_timeout: 120
retries:
total_max_attempts: 3
mode: standard

document_meta_updater:
# Updates the documents meta data by adding the context.
#
# We have two list-type inputs (of the same length) and want to process each of their entries together.
# Writing a jinja2 template for OutputAdapter that has to deal with (multiple) lists can be challenging.
# DeepsetParallelExecutor iterates through the lists using OutputAdapter to add the context to each document's metadata.
# As such, the resulting jinja2 code for OutputAdapter becomes trivial.
#
# Component run in parallel: OutputAdapter
# Component inputs:
# - context: str
# - document: Document
#
# DeepsetParallelExecutor takes a list of each component input:
# - context: List[str]
# - document: List[Document]
#
# Outputs:
# - output: List[Document]
type: deepset_cloud_custom_nodes.executors.parallel_executor.DeepsetParallelExecutor
init_parameters:
max_workers: 1 # disable parallelism as this is a cheap operation
component:
type: haystack.components.converters.output_adapter.OutputAdapter
init_parameters:
unsafe: true
output_type: haystack.Document
# sets context to document's meta and returns the document
template: |
{%- set _ = document.meta.update({'context': context}) -%}
{{ document }}

document_embedder:
type: deepset_cloud_custom_nodes.embedders.nvidia.document_embedder.DeepsetNvidiaDocumentEmbedder
init_parameters:
model: intfloat/e5-base-v2
normalize_embeddings: true
meta_fields_to_embed:
- context

writer:
type: haystack.components.writers.document_writer.DocumentWriter
init_parameters:
document_store:
type: haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore
init_parameters:
hosts:
index: ''
max_chunk_bytes: 104857600
embedding_dim: 768
return_embedding: false
method:
mappings:
settings:
create_index: true
http_auth:
use_ssl:
verify_certs:
timeout:
search_fields:
- content
- context
policy: OVERWRITE

connections: # Defines how the components are connected
- sender: file_classifier.text/plain
receiver: text_converter.sources
- sender: file_classifier.application/pdf
receiver: pdf_converter.sources
- sender: file_classifier.text/markdown
receiver: markdown_converter.sources
- sender: file_classifier.text/html
receiver: html_converter.sources
- sender: file_classifier.application/vnd.openxmlformats-officedocument.wordprocessingml.document
receiver: docx_converter.sources
- sender: file_classifier.application/vnd.openxmlformats-officedocument.presentationml.presentation
receiver: pptx_converter.sources
- sender: file_classifier.application/vnd.openxmlformats-officedocument.spreadsheetml.sheet
receiver: xlsx_converter.sources
- sender: file_classifier.text/csv
receiver: csv_converter.sources
- sender: text_converter.documents
receiver: joiner.documents
- sender: pdf_converter.documents
receiver: joiner.documents
- sender: markdown_converter.documents
receiver: joiner.documents
- sender: html_converter.documents
receiver: joiner.documents
- sender: docx_converter.documents
receiver: joiner.documents
- sender: pptx_converter.documents
receiver: joiner.documents
- sender: joiner.documents
receiver: splitter.documents
# pass documents to PromptBuilder
- sender: joiner.documents
receiver: document_file_adapter.files
- sender: splitter.documents
receiver: document_file_adapter.documents
- sender: document_file_adapter.output
receiver: prompt_builder.file
- sender: splitter.documents
receiver: prompt_builder.document
# pass prompts to llm
- sender: prompt_builder.prompt
receiver: llm.prompt
# pass generated context to meta updater
- sender: llm.replies
receiver: document_meta_updater.context
# pass documents to meta updater
- sender: splitter.documents
receiver: document_meta_updater.document
- sender: document_meta_updater.output
receiver: joiner_xlsx.documents
- sender: xlsx_converter.documents
receiver: joiner_xlsx.documents
- sender: csv_converter.documents
receiver: joiner_xlsx.documents
- sender: joiner_xlsx.documents
receiver: document_embedder.documents
- sender: document_embedder.documents
receiver: writer.documents

inputs: # Define the inputs for your pipeline
files: # This component will receive the files to index as input
- file_classifier.sources

max_runs_per_component: 100

metadata: {}


Parameters

Init Parameters

These are the parameters you can configure in Pipeline Builder:

ParameterTypeDefaultDescription
componentComponentThe component to run in parallel. You specify the component by type and its init parameters.
max_workersint4The maximum number of workers to use in the thread pool executor.
max_retriesint3The maximum number of retries to attempt if the component fails.
progress_barboolFalseWhether to show a progress bar while running the component in parallel.
raise_on_failureboolTrueWhether to raise an exception if the component fails.
flatten_outputboolFalseWhether to flatten the output of the component.

Run Method Parameters

These are the parameters you can configure for the component's run() method. This means you can pass these parameters at query time through the API, in Playground, or when running a job. For details, see Modify Pipeline Parameters at Query Time.

ParameterTypeDefaultDescription
kwargsAnyThe inputs to the component. Each input must be a list of component inputs.