BranchJoiner
A component that merges multiple input branches of a pipeline into a single output stream.
Basic Information
- Type:
haystack_integrations.joiners.branch.BranchJoiner
Inputs
| Parameter | Type | Default | Description |
|---|---|---|---|
| **kwargs | Any | The input data. Must be of the type declared by type_ during initialization. |
Outputs
| Parameter | Type | Default | Description |
|---|
Overview
Bear with us while we're working on adding pipeline examples and most common components connections.
A component that merges multiple input branches of a pipeline into a single output stream.
BranchJoiner receives multiple inputs of the same data type and forwards the first received value
to its output. This is useful for scenarios where multiple branches need to converge before proceeding.
Common Use Cases:
-
Loop Handling:
BranchJoinerhelps close loops in pipelines. For example, if a pipeline component validates or modifies incoming data and produces an error-handling branch,BranchJoinercan merge both branches and send (or resend in the case of a loop) the data to the component that evaluates errors. See "Usage example" below. -
Decision-Based Merging:
BranchJoinerreconciles branches coming from Router components (such asConditionalRouter,TextLanguageRouter). Suppose aTextLanguageRouterdirects user queries to different Retrievers based on the detected language. Each Retriever processes its assigned query and passes the results toBranchJoiner, which consolidates them into a single output before passing them to the next component, such as aPromptBuilder.
Example Usage:
import json
from typing import List
from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import BranchJoiner
from haystack.components.validators import JsonSchemaValidator
from haystack.dataclasses import ChatMessage
# Define a schema for validation
person_schema = {
"type": "object",
"properties": {
"first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]},
},
"required": ["first_name", "last_name", "nationality"]
}
# Initialize a pipeline
pipe = Pipeline()
# Add components to the pipeline
pipe.add_component('joiner', BranchJoiner(List[ChatMessage]))
pipe.add_component('generator', OpenAIChatGenerator(model="gpt-4o-mini"))
pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema))
pipe.add_component('adapter', OutputAdapter("{{chat_message}}", List[ChatMessage], unsafe=True))
# And connect them
pipe.connect("adapter", "joiner")
pipe.connect("joiner", "generator")
pipe.connect("generator.replies", "validator.messages")
pipe.connect("validator.validation_error", "joiner")
result = pipe.run(
data={
"generator": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
"adapter": {"chat_message": [ChatMessage.from_user("Create json from Peter Parker")]}}
)
print(json.loads(result["validator"]["validated"][0].text))
>> {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation':
>> 'Superhero', 'age': 23, 'location': 'New York City'}
Note that BranchJoiner can manage only one data type at a time. In this case, BranchJoiner is created for
passing List[ChatMessage]. This determines the type of data that BranchJoiner will receive from the upstream
connected components and also the type of data that BranchJoiner will send through its output.
In the code example, BranchJoiner receives a looped back List[ChatMessage] from the JsonSchemaValidator and
sends it down to the OpenAIChatGenerator for re-generation. We can have multiple loopback connections in the
pipeline. In this instance, the downstream component is only one (the OpenAIChatGenerator), but the pipeline could
have more than one downstream component.
Usage Example
components:
BranchJoiner:
type: components.joiners.branch.BranchJoiner
init_parameters:
Parameters
Init Parameters
These are the parameters you can configure in Pipeline Builder:
| Parameter | Type | Default | Description |
|---|---|---|---|
| type_ | Type | The expected data type of inputs and outputs. |
Run Method Parameters
These are the parameters you can configure for the component's run() method. This means you can pass these parameters at query time through the API, in Playground, or when running a job. For details, see Modify Pipeline Parameters at Query Time.
| Parameter | Type | Default | Description |
|---|---|---|---|
| **kwargs | Any | The input data. Must be of the type declared by type_ during initialization. |
Was this page helpful?