Skip to main content

BranchJoiner

A component that merges multiple input branches of a pipeline into a single output stream.

Basic Information

  • Type: haystack_integrations.joiners.branch.BranchJoiner

Inputs

ParameterTypeDefaultDescription
**kwargsAnyThe input data. Must be of the type declared by type_ during initialization.

Outputs

ParameterTypeDefaultDescription

Overview

Work in Progress

Bear with us while we're working on adding pipeline examples and most common components connections.

A component that merges multiple input branches of a pipeline into a single output stream.

BranchJoiner receives multiple inputs of the same data type and forwards the first received value to its output. This is useful for scenarios where multiple branches need to converge before proceeding.

Common Use Cases:

  • Loop Handling: BranchJoiner helps close loops in pipelines. For example, if a pipeline component validates or modifies incoming data and produces an error-handling branch, BranchJoiner can merge both branches and send (or resend in the case of a loop) the data to the component that evaluates errors. See "Usage example" below.

  • Decision-Based Merging: BranchJoiner reconciles branches coming from Router components (such as ConditionalRouter, TextLanguageRouter). Suppose a TextLanguageRouter directs user queries to different Retrievers based on the detected language. Each Retriever processes its assigned query and passes the results to BranchJoiner, which consolidates them into a single output before passing them to the next component, such as a PromptBuilder.

Example Usage:

import json
from typing import List

from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import BranchJoiner
from haystack.components.validators import JsonSchemaValidator
from haystack.dataclasses import ChatMessage

# Define a schema for validation
person_schema = {
"type": "object",
"properties": {
"first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]},
},
"required": ["first_name", "last_name", "nationality"]
}

# Initialize a pipeline
pipe = Pipeline()

# Add components to the pipeline
pipe.add_component('joiner', BranchJoiner(List[ChatMessage]))
pipe.add_component('generator', OpenAIChatGenerator(model="gpt-4o-mini"))
pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema))
pipe.add_component('adapter', OutputAdapter("{{chat_message}}", List[ChatMessage], unsafe=True))

# And connect them
pipe.connect("adapter", "joiner")
pipe.connect("joiner", "generator")
pipe.connect("generator.replies", "validator.messages")
pipe.connect("validator.validation_error", "joiner")

result = pipe.run(
data={
"generator": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
"adapter": {"chat_message": [ChatMessage.from_user("Create json from Peter Parker")]}}
)

print(json.loads(result["validator"]["validated"][0].text))

>> {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation':
>> 'Superhero', 'age': 23, 'location': 'New York City'}

Note that BranchJoiner can manage only one data type at a time. In this case, BranchJoiner is created for passing List[ChatMessage]. This determines the type of data that BranchJoiner will receive from the upstream connected components and also the type of data that BranchJoiner will send through its output.

In the code example, BranchJoiner receives a looped back List[ChatMessage] from the JsonSchemaValidator and sends it down to the OpenAIChatGenerator for re-generation. We can have multiple loopback connections in the pipeline. In this instance, the downstream component is only one (the OpenAIChatGenerator), but the pipeline could have more than one downstream component.

Usage Example

components:
BranchJoiner:
type: components.joiners.branch.BranchJoiner
init_parameters:

Parameters

Init Parameters

These are the parameters you can configure in Pipeline Builder:

ParameterTypeDefaultDescription
type_TypeThe expected data type of inputs and outputs.

Run Method Parameters

These are the parameters you can configure for the component's run() method. This means you can pass these parameters at query time through the API, in Playground, or when running a job. For details, see Modify Pipeline Parameters at Query Time.

ParameterTypeDefaultDescription
**kwargsAnyThe input data. Must be of the type declared by type_ during initialization.