DeepsetParallelExecutor
Run a component in parallel with multiple inputs to process them simultaneously instead of one by one. This approach is useful if you have a component that requires handling several inputs.
Basic Information
- Pipeline type: Query and indexing.
- Type:
deepset_cloud_custom_nodes.executors.parallel_executor.DeepsetParallelExecutor
- Components it can connect with:
- It connects to components with compatible input and output types for the component you run with DeepsetParallelExecutor. For example, if you run a Generator, it will connect to components like PromptBuilder and AnswerBuilder.
Inputs
Name | Type | Description |
---|---|---|
kwargs | Any sequence or collection of items that can be looped over (like a list, set, or tuple), where the items can be of any type. | The inputs to the component. Each input must be a list of component inputs. Required. |
Outputs
Name | Type | Description |
---|---|---|
The names of the component's outputs. | Dictionary with string keys and list values. Each value is a list that can contain either strings or nested dictionaries. | The outputs of the component. Each output is a list. DeepsetParallelExecutor returns outputs in the same order as inputs. |
Overview
Use this component to run multiple outputs of another component in parallel instead of one by one to save time. For example, you can use it to run a Generator with multiple LLM invocations.
Usage Example
This is an example of an indexing pipeline designed for PDF documents with Optical Character Recognition (OCR) capabilities. Here is how it works:
- The pipeline receives files as input, and
file_classifier
(FileTypeRouter) filters for PDF files only. pdf_converter
(PyPDFToDocument) converts PDFs to Document objects and sends them tosplitter
(DeepsetDocumentSplitter), which breaks them down by page (one page per split).image_downloader
(DeepsetFileDownloader) downloads each page, and thenpdf_to_image
(DeepsetPDFDocumentToBase64Image) converts each page to an image.ocr_prompt_builder
(PromptBuilder) creates a prompt for the LLM-based OCR system (DeepsetOpenAIVisionGenerator), instructing it to extract text and tables.ocr_prompt_adapter
(OutputAdapter) andocr_images_adapter
(OutputAdapter) prepare the data structure for parallel processing.ocr_prompt_adapter
turns the output ofocr_prompt_builder
into a list of strings, whileocr_images_adapter
turns the output ofpdf_to_image
into a list of images. This is because DeepsetParallelExecutor accepts only lists of outputs.ocr
(DeepsetParallelExecutor) runs DeepsetOpenAIVisionGenerator and processes up to 20 pages in parallel using the instructions from PromptBuilder's prompt.document_builder
(DeepsetParallelExecutor) creates Document objects from the OCR results and sends them tojoiner
(DocumentJoiner) to join the processed documents together.writer
(DocumentWriter) writes the processed documents to an OpenSearch document store.
components:
file_classifier:
type: haystack.components.routers.file_type_router.FileTypeRouter
init_parameters:
mime_types:
- application/pdf
pdf_converter:
type: haystack.components.converters.pypdf.PyPDFToDocument
init_parameters:
converter:
type: haystack.components.converters.pypdf.DefaultConverter
splitter:
# Split each document by page: we want one image per page
type: deepset_cloud_custom_nodes.preprocessors.document_splitter.DeepsetDocumentSplitter
init_parameters:
split_by: page
split_length: 1
split_overlap: 0
respect_sentence_boundary: False
language: en
image_downloader:
# Downloads each file for each page
type: deepset_cloud_custom_nodes.augmenters.deepset_file_downloader.DeepsetFileDownloader
init_parameters:
file_extensions:
- ".pdf"
pdf_to_image:
# Converts each page into an image
type: deepset_cloud_custom_nodes.converters.pdf_to_image.DeepsetPDFDocumentToBase64Image
init_parameters:
detail: "high"
ocr_prompt_builder:
# The prompt to use for LLM-based OCR
type: haystack.components.builders.prompt_builder.PromptBuilder
init_parameters:
template: |-
Your task is to extract text and tables from documents.
Once the text and tables are extracted, evaluate any tables to see if they have merged cells. If there are merged cells within the tables, you must unmerge these cells so that each relevant row contains the value from the merged cells.
Make sure to structure the extracted information exactly as it appears in the document and format it using Markdown.
It is essential you exclude any irrelevant information, such as the footer.
It's important not to elaborate on your response or include any additional text. The objective is to maintain limited word count without any additions.
ocr_prompt_adapter:
# Adapter which converts one prompt into a list of prompts.
#
# DeepsetParallelExecutor takes a list of each component input:
# DeepsetOpenAIVisionGenerator takes input `prompt` of type str
#
# So we have to transform one prompt into multiple prompts: str -> List[str]
# by duplicating prompt into a list of the same length as our images / pages.
type: haystack.components.converters.output_adapter.OutputAdapter
init_parameters:
output_type: typing.List[str]
template: |
{% set prompt_list = [] %}
{% for image in images %}
{% set _ = prompt_list.append(prompt) %}
{% endfor %}
{{ prompt_list }}
ocr_images_adapter:
# Adapter which converts each image into a list of images.
#
# DeepsetParallelExecutor takes a list of each component input:
# DeepsetOpenAIVisionGenerator takes input `images` of type List[Base64Image]
#
# So we have to transform List[Base64Image] -> List[List[Base64Image]]
# such that each image will be processed isolated: the inner list having only one entry.
type: haystack.components.converters.output_adapter.OutputAdapter
init_parameters:
output_type: typing.List[typing.List[deepset_cloud_custom_nodes.dataclasses.chat_message_with_images.Base64Image]]
unsafe: true
template: "{% set image_list = [] %}{% for image in images %}{% set _ = image_list.append([image]) %}{% endfor %}{{ image_list }}"
ocr:
# Runs LLM-based OCR per page in parallel
#
# Component run in parallel: DeepsetOpenAIVisionGenerator
# Component inputs:
# - prompt: str
# - images: List[Base64Image]
#
# DeepsetParallelExecutor takes a list of each component input:
# - prompt: List[str]
# - images: List[List[Base64Image]]
#
# Outputs:
# - replies: List[str] (note: flatten_output)
type: deepset_cloud_custom_nodes.executors.parallel_executor.DeepsetParallelExecutor
init_parameters:
max_workers: 20 # 20 llm requests in parallel
flatten_output: true # ensure we get List[str] instead of List[List[str]]
component:
type: deepset_cloud_custom_nodes.generators.openai_vision.DeepsetOpenAIVisionGenerator
init_parameters:
model: "gpt-4o"
timeout: 120 # Timeout for AzureOpenAI client
generation_kwargs:
max_tokens: 4096 # Maximum number of tokens to generate
temperature: 0.0 # Controls the randomness and creativity of the text generated by the model.
# lower value
document_builder:
# Builds a haystack Document from ocr reply and meta.
# Note that this uses DeepsetParallelExecutor as well to facilitate List-type handling.
#
# Component run in parallel: OutputAdapter
# Component inputs:
# - content: str
# - base64_image: Base64Image
#
# DeepsetParallelExecutor takes a list of each component input:
# - content: List[str]
# - base64_image: List[Base64Image]
#
# Outputs:
# - output: List[Document]
type: deepset_cloud_custom_nodes.executors.parallel_executor.DeepsetParallelExecutor
init_parameters:
max_workers: 1
component:
type: haystack.components.converters.output_adapter.OutputAdapter
init_parameters:
custom_filters:
Document: haystack.Document
unsafe: true
output_type: haystack.Document
template: |
{{ '' | Document(content, None, None, base64_image.meta) }}
joiner:
# This is a no-op or deepset Cloud to recognize written documents (requires output slot 'documents')
type: haystack.components.joiners.document_joiner.DocumentJoiner
init_parameters: {}
writer:
type: haystack.components.writers.document_writer.DocumentWriter
init_parameters:
document_store:
type: haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore
init_parameters:
embedding_dim: 1024
similarity: cosine
policy: OVERWRITE
connections: # Defines how the components are connected
- sender: file_classifier.application/pdf
receiver: pdf_converter.sources
- sender: pdf_converter.documents
receiver: splitter.documents
- sender: splitter.documents
receiver: image_downloader.documents
- sender: image_downloader.documents
receiver: pdf_to_image.documents
- sender: pdf_to_image.base64_images
receiver: ocr_prompt_adapter.images
- sender: ocr_prompt_builder.prompt
receiver: ocr_prompt_adapter.prompt
- sender: ocr_images_adapter.output
receiver: ocr.images
- sender: ocr_prompt_adapter.output
receiver: ocr.prompt
- sender: ocr.replies
receiver: document_builder.content
- sender: pdf_to_image.base64_images
receiver: document_builder.base64_image
- sender: document_builder.output
receiver: joiner.documents
- sender: joiner.documents
receiver: writer.documents
max_loops_allowed: 100
inputs: # Define the inputs for your pipeline
files: "file_classifier.sources" # This component will receive the files to index as input
Init Parameters
Parameter | Type | Possible Values | Description |
---|---|---|---|
component | Component | The component to run in parallel. You specify the component by type and its init parameters, for example:component: type: haystack.components.generators.openai.OpenAIGenerator init_paramteres: {} Required. | |
max_workers | Integer | Default: 4 | The maximum number of workers to use in the thread pool executor. Required. |
max_retries | Integer | Default: 3 | The maximum number of retries to attempt if the component fails. Required. |
progress_bar | Boolean | Default: False | Shows a progress bar while running the component. Required. |
raise_on_failure | Boolean | Default: True | If set to True , raises an exception and stops the process if the last call fails.If set to False , shows a warning and returns None for the failed call. The warning contains all the input parameters used in the failed call.Required. |
flatten_output | Boolean | Default: False | Flattens the output of the component. Required. |
Updated 25 days ago