DeepsetParallelExecutor

Run a component in parallel with multiple inputs to process them simultaneously instead of one by one. This approach is useful if you have a component that requires handling several inputs.

Basic Information

  • Pipeline type: Query and indexing.
  • Type: deepset_cloud_custom_nodes.executors.parallel_executor.DeepsetParallelExecutor
  • Components it can connect with:
    • It connects to components with compatible input and output types for the component you run with DeepsetParallelExecutor. For example, if you run a Generator, it will connect to components like PromptBuilder and AnswerBuilder.

Inputs

NameTypeDescription
kwargsAny sequence or collection of items that can be looped over (like a list, set, or tuple), where the items can be of any type. The inputs to the component. Each input must be a list of component inputs.
Required.

Outputs

NameTypeDescription
The names of the component's outputs.Dictionary with string keys and list values. Each value is a list that can contain either strings or nested dictionaries.The outputs of the component. Each output is a list.
DeepsetParallelExecutor returns outputs in the same order as inputs.

Overview

Use this component to run multiple outputs of another component in parallel instead of one by one to save time. For example, you can use it to run a Generator with multiple LLM invocations.

Usage Example

This is an example of an indexing pipeline designed for PDF documents with Optical Character Recognition (OCR) capabilities. Here is how it works:

  1. The pipeline receives files as input, and file_classifier (FileTypeRouter) filters for PDF files only.
  2. pdf_converter (PyPDFToDocument) converts PDFs to Document objects and sends them to splitter (DeepsetDocumentSplitter), which breaks them down by page (one page per split).
  3. image_downloader (DeepsetFileDownloader) downloads each page, and then pdf_to_image (DeepsetPDFDocumentToBase64Image) converts each page to an image.
  4. ocr_prompt_builder (PromptBuilder) creates a prompt for the LLM-based OCR system (DeepsetOpenAIVisionGenerator), instructing it to extract text and tables.
  5. ocr_prompt_adapter (OutputAdapter) and ocr_images_adapter (OutputAdapter) prepare the data structure for parallel processing. ocr_prompt_adapter turns the output of ocr_prompt_builder into a list of strings, while ocr_images_adapter turns the output of pdf_to_image into a list of images. This is because DeepsetParallelExecutor accepts only lists of outputs.
  6. ocr (DeepsetParallelExecutor) runs DeepsetOpenAIVisionGenerator and processes up to 20 pages in parallel using the instructions from PromptBuilder's prompt.
  7. document_builder (DeepsetParallelExecutor) creates Document objects from the OCR results and sends them to joiner (DocumentJoiner) to join the processed documents together.
  8. writer (DocumentWriter) writes the processed documents to an OpenSearch document store.
components:
  file_classifier:
    type: haystack.components.routers.file_type_router.FileTypeRouter
    init_parameters:
      mime_types:
      - application/pdf

  pdf_converter:
    type: haystack.components.converters.pypdf.PyPDFToDocument
    init_parameters:
      converter:
        type: haystack.components.converters.pypdf.DefaultConverter

  splitter:
    # Split each document by page: we want one image per page
    type: deepset_cloud_custom_nodes.preprocessors.document_splitter.DeepsetDocumentSplitter
    init_parameters:
      split_by: page
      split_length: 1
      split_overlap: 0
      respect_sentence_boundary: False
      language: en

  image_downloader:
    # Downloads each file for each page
    type: deepset_cloud_custom_nodes.augmenters.deepset_file_downloader.DeepsetFileDownloader
    init_parameters:
      file_extensions:
        - ".pdf"

  pdf_to_image:
    # Converts each page into an image
    type: deepset_cloud_custom_nodes.converters.pdf_to_image.DeepsetPDFDocumentToBase64Image
    init_parameters:
      detail: "high"

  ocr_prompt_builder:
    # The prompt to use for LLM-based OCR
    type: haystack.components.builders.prompt_builder.PromptBuilder
    init_parameters:
      template: |-
        Your task is to extract text and tables from documents. 
        Once the text and tables are extracted, evaluate any tables to see if they have merged cells. If there are merged cells within the tables, you must unmerge these cells so that each relevant row contains the value from the merged cells.
        Make sure to structure the extracted information exactly as it appears in the document and format it using Markdown. 
        It is essential you exclude any irrelevant information, such as the footer.
        It's important not to elaborate on your response or include any additional text. The objective is to maintain limited word count without any additions.
  
  ocr_prompt_adapter:
    # Adapter which converts one prompt into a list of prompts.
    #
    # DeepsetParallelExecutor takes a list of each component input:
    # DeepsetOpenAIVisionGenerator takes input `prompt` of type str
    #
    # So we have to transform one prompt into multiple prompts: str -> List[str]
    # by duplicating prompt into a list of the same length as our images / pages.
    type: haystack.components.converters.output_adapter.OutputAdapter
    init_parameters:
      output_type: typing.List[str]
      template: |
        {% set prompt_list = [] %}
        {% for image in images %}
          {% set _ = prompt_list.append(prompt) %}
        {% endfor %}
        {{ prompt_list }}

  ocr_images_adapter:
    # Adapter which converts each image into a list of images.
    #
    # DeepsetParallelExecutor takes a list of each component input:
    # DeepsetOpenAIVisionGenerator takes input `images` of type List[Base64Image]
    #
    # So we have to transform List[Base64Image] -> List[List[Base64Image]]
    # such that each image will be processed isolated: the inner list having only one entry.
    type: haystack.components.converters.output_adapter.OutputAdapter
    init_parameters:
      output_type: typing.List[typing.List[deepset_cloud_custom_nodes.dataclasses.chat_message_with_images.Base64Image]]
      unsafe: true
      template: "{% set image_list = [] %}{% for image in images %}{% set _ = image_list.append([image]) %}{% endfor %}{{ image_list }}"

  ocr:
    # Runs LLM-based OCR per page in parallel
    #
    # Component run in parallel: DeepsetOpenAIVisionGenerator
    # Component inputs:
    # - prompt: str
    # - images: List[Base64Image]
    #
    # DeepsetParallelExecutor takes a list of each component input:
    # - prompt: List[str]
    # - images: List[List[Base64Image]]
    #
    # Outputs:
    # - replies: List[str] (note: flatten_output)
    type: deepset_cloud_custom_nodes.executors.parallel_executor.DeepsetParallelExecutor
    init_parameters:
      max_workers: 20  # 20 llm requests in parallel
      flatten_output: true  # ensure we get List[str] instead of List[List[str]]
      component:
        type: deepset_cloud_custom_nodes.generators.openai_vision.DeepsetOpenAIVisionGenerator
        init_parameters:
          model: "gpt-4o"
          timeout: 120  # Timeout for AzureOpenAI client
          generation_kwargs:
            max_tokens: 4096  # Maximum number of tokens to generate
            temperature: 0.0  # Controls the randomness and creativity of the text generated by the model.
                              # lower value

  document_builder:
    # Builds a haystack Document from ocr reply and meta.
    # Note that this uses DeepsetParallelExecutor as well to facilitate List-type handling.
    #
    # Component run in parallel: OutputAdapter
    # Component inputs:
    # - content: str
    # - base64_image: Base64Image
    #
    # DeepsetParallelExecutor takes a list of each component input:
    # - content: List[str]
    # - base64_image: List[Base64Image]
    #
    # Outputs:
    # - output: List[Document]
    type: deepset_cloud_custom_nodes.executors.parallel_executor.DeepsetParallelExecutor
    init_parameters:
      max_workers: 1
      component:
        type: haystack.components.converters.output_adapter.OutputAdapter
        init_parameters:
          custom_filters:
            Document: haystack.Document
          unsafe: true
          output_type: haystack.Document
          template: |
            {{ '' | Document(content, None, None, base64_image.meta) }}

  joiner:
    # This is a no-op or deepset Cloud to recognize written documents (requires output slot 'documents')
    type: haystack.components.joiners.document_joiner.DocumentJoiner
    init_parameters: {}
       
  writer:
    type: haystack.components.writers.document_writer.DocumentWriter
    init_parameters:
      document_store:
        type: haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore
        init_parameters:
          embedding_dim: 1024
          similarity: cosine
      policy: OVERWRITE

connections:  # Defines how the components are connected
- sender: file_classifier.application/pdf
  receiver: pdf_converter.sources

- sender: pdf_converter.documents
  receiver: splitter.documents 

- sender: splitter.documents
  receiver: image_downloader.documents

- sender: image_downloader.documents
  receiver: pdf_to_image.documents

- sender: pdf_to_image.base64_images
  receiver: ocr_prompt_adapter.images

- sender: ocr_prompt_builder.prompt
  receiver: ocr_prompt_adapter.prompt

- sender: ocr_images_adapter.output
  receiver: ocr.images

- sender: ocr_prompt_adapter.output
  receiver: ocr.prompt

- sender: ocr.replies
  receiver: document_builder.content

- sender: pdf_to_image.base64_images
  receiver: document_builder.base64_image

- sender: document_builder.output
  receiver: joiner.documents

- sender: joiner.documents
  receiver: writer.documents

max_loops_allowed: 100

inputs:  # Define the inputs for your pipeline
  files: "file_classifier.sources"  # This component will receive the files to index as input

Init Parameters

ParameterTypePossible ValuesDescription
componentComponentThe component to run in parallel. You specify the component by type and its init parameters, for example:
component: type: haystack.components.generators.openai.OpenAIGenerator init_paramteres: {}

Required.
max_workersIntegerDefault: 4The maximum number of workers to use in the thread pool executor.
Required.
max_retriesIntegerDefault: 3The maximum number of retries to attempt if the component fails.
Required.
progress_barBooleanDefault: FalseShows a progress bar while running the component.
Required.
raise_on_failureBooleanDefault: TrueIf set to True, raises an exception and stops the process if the last call fails.
If set to False, shows a warning and returns None for the failed call. The warning contains all the input parameters used in the failed call.
Required.
flatten_outputBooleanDefault: FalseFlattens the output of the component.
Required.