Import a Haystack Pipeline

Import Haystack pipelines and indexes into deepset AI Platform using the SDK.

About This Task

You can bring your Haystack pipeline or index into deepset AI Platform using an SDK method. You can specify the workspace where you want the pipeline or index to be imported. After being imported, you must deploy the pipeline or enable the index to be able to use it for search or file processing.

You can import Pipeline and AsyncPipeline. For details, see Haystack documentation.

The SDK uses the Pipeline.dumps() method under the hood to serialize the pipeline to YAML.

📘

deepset and Haystack Pipelines

There are some differences between deepset and Haystack pipelines:

  • deepset distinguishes between pipelines and indexes.
  • Pipelines and indexes take inputs and produce outputs.

Make sure you understand how each of them works before importing. For more information, see Pipelines and Indexes.

Secrets

You can import pipelines with environment variable-based secrets. Make sure you add the secrets to deepset. You can do this either on the Connections page, if it's an existing integration, or on the Secrets page. For details, see Using Hosted Models and External Services.

Token-based secrets do not work in imported pipelines.

Custom Components

If the pipeline or index you're importing uses custom components, import them to deepset first. For details, see Working with Custom Components.

Document Stores

For full indexing capabilities, such as detailed indexing status, use the core OpenSearchDocumentStore. If you're using another document store, check the instructions on how to connect deepset to it: Connect to an External Document Store.

To understand how deepset interacts with different document stores, check Document Stores.

Streaming

The streaming_callbackfunction is not serializable. Once you import your pipeline, you can enable streaming directly in deepset AI Platform. For details, see Enable Streaming.

Validation

You can choose to validate the YAML of the pipeline or index you're importing by setting the strict_validation parameter of PipelineConfig() or IndexConfig() to True. The import fails if there are validation errors.

By default, validation is set to False, which means validation errors are logged but the import continues.

Output Type

For pipelines, you can set the output type to properly display the answers it returns in Playground. For details, see Set Pipeline Output Type and the Examples section below.

Prerequisites

Import a Pipeline

When importing a pipeline, use the PipelineConfig()class to define the pipeline inputs and outputs.
Here's the code you can use to import a Haystack pipeline:

import asyncio
from haystack import Pipeline
from deepset_cloud_sdk import PipelineClient, PipelineConfig, PipelineInputs, PipelineOutputs
from haystack import Pipeline

# these components are used in the pipeline
# make sure you import the components for your pipeline
from haystack.utils import Secret
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator


# Define the pipeline
pipeline = Pipeline()

# Add pipeline components
prompt_builder = PromptBuilder(template="Answer this question: {{query}}", required_variables=["*"])
llm = OpenAIGenerator(api_key=Secret.from_env_var("OPENAI_API_KEY", strict=False), model="gpt-4")
answer_builder = AnswerBuilder()

pipeline.add_component("prompt_builder", prompt_builder)
pipeline.add_component("llm", llm)
pipeline.add_component("answer_builder", answer_builder)
pipeline.connect("prompt_builder.prompt", "llm.prompt")
pipeline.connect("llm.replies", "answer_builder.replies")

# Initialize the deepset SDK PipelineClient
# Option 1 - Using environment variables (recommended):
# This assumes you've run `deepset-cloud login` to set up environment variables in your .env file
client = PipelineClient()

# Option 2 - Using explicit parameters:
# Alternatively, you can pass api_key, workspace_name, and api_url explicitly
# client = PipelineClient(
#     api_key="your-api-key",
#     workspace_name="your-workspace",
#     api_url="https://api.cloud.deepset.ai/api/v1"
# )

# Configure the pipeline import
config = PipelineConfig(
    name="my-pipeline-async",  # Name for your pipeline in deepset AI Platform
    inputs=PipelineInputs(
        query=["prompt_builder.query"]  # List the components and their inputs that should receive the query
    ),
    outputs=PipelineOutputs(
        answers="answer_builder.answers"  # List the components and their output names that return answers
    )
    strict_validation=True # Fails the import if the pipeline YAML fails validation, by default set to `False`
    overwrite=True # Overwrites a pipeline with the same name that already exists in deepset
)

# Import the pipeline
# Option 1: Sync import
client.import_into_deepset(pipeline, config)

# Option 2: Async import
await client.import_into_deepset_async(pipeline, config)

Import an Index

When importing an index, use the IndexConfig() class to define the index inputs. Here's the code you can use to import an index:

from deepset_cloud_sdk.workflows import DeepsetSDK,IndexConfig, IndexInputs, IndexOutputs
from haystack import Pipeline
# import all the components your index is using, for example:
from haystack.components.routers.file_type_router import FileTypeRouter
from haystack.components.converters.txt import TextFileToDocument


# Configure your index
index = Pipeline()

# Initialize index components, for example:
file_classifier = FileTypeRouter(mime_types=[
    "text/plain"
])

text_converter = TextFileToDocument(encoding="utf-8")

# Add components to the index, for example:
index.add_component("file_classifier", file_classifier)
index.add_component("text_converter", text_converter)

# Connect the components
index.connect("file_classifier.text/plain", "text_converter.sources")

# Initialize the deepset SDK PipelineClient()
# Option 1 - Using environment variables (recommended):
# This assumes you've run `deepset-cloud login` to set up environment variables in your .env file
client = PipelineClient()

# Option 2 - Using explicit parameters:
# Alternatively, you can pass api_key, workspace_name, and api_url explicitly
# client = PipelineClient(
#     api_key="your-api-key",
#     workspace_name="your-workspace",
#     api_url="https://api.cloud.deepset.ai/api/v1"
# )

# Configure the import
config = IndexConfig(
    name="my-index", # specify the name under which the index will appear in deepset
    inputs=IndexInputs(files=["file_type_router.sources"]), # define the components and their input names that should receive files
    strict_validation=False #this logs YAML validation errors but doesn't fail the import
    overwrite=True # overwrites an index with the same name that already exists in deepset
)


# Import the index using sync method
client.import_into_deepset(index, config)

# Import the index using async method
await client.import_into_deepset_async(index, config)

Examples

This is an example of how to import an index into deepset AI Platform:

from haystack import Pipeline
from haystack.components.routers.file_type_router import FileTypeRouter
from haystack.components.converters.txt import TextFileToDocument
from haystack.components.converters.pdfminer import PDFMinerToDocument
from haystack.components.converters.html import HTMLToDocument
from haystack.components.converters.docx import DOCXToDocument
from haystack.components.converters.pptx import PPTXToDocument
from haystack.components.converters.xlsx import XLSXToDocument
from haystack.components.converters.csv import CSVToDocument
from haystack.components.joiners.document_joiner import DocumentJoiner
from haystack.components.preprocessors.document_splitter import DocumentSplitter
from haystack.components.embedders.sentence_transformers_document_embedder import SentenceTransformersDocumentEmbedder
from haystack_integrations.document_stores.opensearch.document_store import OpenSearchDocumentStore
from haystack.components.writers.document_writer import DocumentWriter
from deepset_cloud_sdk import PipelineClient, IndexConfig, IndexInputs
from haystack.document_stores.types import DuplicatePolicy


# Explicitly call the function to add the publish method
pipeline_client = PipelineClient(
    workspace_name="new",
    api_key="api_key",
    api_url="https://api.cloud.deepset.ai/api/v1"
)

# Initialize components
file_classifier = FileTypeRouter(mime_types=[
    "text/plain", 
    "application/pdf", 
    "text/markdown", 
    "text/html", 
    "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
    "application/vnd.openxmlformats-officedocument.presentationml.presentation",
    "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
    "text/csv"
])

text_converter = TextFileToDocument(encoding="utf-8")
pdf_converter = PDFMinerToDocument(
    line_overlap=0.5,
    char_margin=2,
    line_margin=0.5,
    word_margin=0.1,
    boxes_flow=0.5,
    detect_vertical=True,
    all_texts=False,
    store_full_path=False
)
markdown_converter = TextFileToDocument(encoding="utf-8")
html_converter = HTMLToDocument(
    extraction_kwargs={
        "output_format": "markdown",
        "target_language": None,
        "include_tables": True,
        "include_links": True
    }
)
docx_converter = DOCXToDocument(link_format="markdown")
pptx_converter = PPTXToDocument()
xlsx_converter = XLSXToDocument()
csv_converter = CSVToDocument(encoding="utf-8")

joiner = DocumentJoiner(join_mode="concatenate", sort_by_score=False)
joiner_xlsx = DocumentJoiner(join_mode="concatenate", sort_by_score=False)

splitter = DocumentSplitter(
    split_by="word",
    split_length=250,
    split_overlap=30,
    respect_sentence_boundary=True,
    language="en"
)

document_embedder = SentenceTransformersDocumentEmbedder(
    normalize_embeddings=True,
    model="intfloat/e5-base-v2"
)

opensearchdocumentstore = OpenSearchDocumentStore(
    index="default",
    max_chunk_bytes=104857600,
    embedding_dim=768,
    return_embedding=False,
    create_index=True
)

writer = DocumentWriter(document_store=opensearchdocumentstore, policy=DuplicatePolicy.OVERWRITE)

# Create and configure pipeline
pipeline_index = Pipeline()

# Add components
pipeline_index.add_component("file_classifier", file_classifier)
pipeline_index.add_component("text_converter", text_converter)
pipeline_index.add_component("pdf_converter", pdf_converter)
pipeline_index.add_component("markdown_converter", markdown_converter)
pipeline_index.add_component("html_converter", html_converter)
pipeline_index.add_component("docx_converter", docx_converter)
pipeline_index.add_component("pptx_converter", pptx_converter)
pipeline_index.add_component("xlsx_converter", xlsx_converter)
pipeline_index.add_component("csv_converter", csv_converter)
pipeline_index.add_component("joiner", joiner)
pipeline_index.add_component("joiner_xlsx", joiner_xlsx)
pipeline_index.add_component("splitter", splitter)
pipeline_index.add_component("document_embedder", document_embedder)
pipeline_index.add_component("writer", writer)

# Connect components
pipeline_index.connect("file_classifier.text/plain", "text_converter.sources")
pipeline_index.connect("file_classifier.application/pdf", "pdf_converter.sources")
pipeline_index.connect("file_classifier.text/markdown", "markdown_converter.sources")
pipeline_index.connect("file_classifier.text/html", "html_converter.sources")
pipeline_index.connect("file_classifier.application/vnd.openxmlformats-officedocument.wordprocessingml.document", "docx_converter.sources")
pipeline_index.connect("file_classifier.application/vnd.openxmlformats-officedocument.presentationml.presentation", "pptx_converter.sources")
pipeline_index.connect("file_classifier.application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "xlsx_converter.sources")
pipeline_index.connect("file_classifier.text/csv", "csv_converter.sources")

pipeline_index.connect("text_converter.documents", "joiner.documents")
pipeline_index.connect("pdf_converter.documents", "joiner.documents")
pipeline_index.connect("markdown_converter.documents", "joiner.documents")
pipeline_index.connect("html_converter.documents", "joiner.documents")
pipeline_index.connect("docx_converter.documents", "joiner.documents")
pipeline_index.connect("pptx_converter.documents", "joiner.documents")

pipeline_index.connect("joiner.documents", "splitter.documents")
pipeline_index.connect("splitter.documents", "joiner_xlsx.documents")
pipeline_index.connect("xlsx_converter.documents", "joiner_xlsx.documents")
pipeline_index.connect("csv_converter.documents", "joiner_xlsx.documents")

pipeline_index.connect("joiner_xlsx.documents", "document_embedder.documents")
pipeline_index.connect("document_embedder.documents", "writer.documents")

index_config = IndexConfig(
    name="demo0",
    inputs=IndexInputs(
        files=["file_classifier.sources"],
    ),
)

if __name__ == "__main__":
    pipeline_client.import_into_deepset(pipeline=pipeline_index, config=index_config)
    # asyncio.run(pipeline_client.import_into_deepset_async(pipeline=pipeline_index, config=index_config))
    print(f"index {index_config.name} published") 

With Validation Enabled

Setting strict_validation=True fails the import if there are issues with the pipeline YAML. Here's how to enable validation:

from deepset_cloud_sdk import DeepsetValidationError, PipelineClient, PipelineConfig, PipelineInputs, PipelineOutputs
...
# Here you'd define the pipeline components and connect them

# Enable validation when configuring the import
config = PipelineConfig(
    name="my-simple-pipeline-validated",  # Name for your pipeline in deepset AI Platform
    inputs=PipelineInputs(
        query=["prompt_builder.query"]  # Which component parameter receives the query
    ),
    outputs=PipelineOutputs(
        answers="answer_builder.answers"  # Which component output provides the answers
    ),
    strict_validation=True  # prevents import on validation errors
)

# When importing the pipeline, you may catch validation errors using DeepsetValidationError
try:
    client.import_into_deepset(pipeline, config)
except DeepsetValidationError as err:
    # Manage validation errors
    print(err)
...

Overwriting Existing Pipelines with the Same Name

You can choose to overwrite pipelines that already exist in deepset and have the same name as the pipeline you're importing. Use the overwrite setting to do that:

config = PipelineConfig(
    name="my-pipeline",  # Name for your pipeline in deepset AI Platform
    inputs=PipelineInputs(
        query=["prompt_builder.query"]  # Which component parameter receives the query
    ),
    outputs=PipelineOutputs(
        answers="answer_builder.answers"  # Which component output provides the answers
    ),
    overwrite=True # set to True to overwrite the pipeline if it already exists
)

With Output Type Set

To set the pipeline output type, import PipelineOutputType and set it to one of the options: generative, chat, extractive, or document. This example sets the pipeline type to chat:

from deepset_cloud_sdk import PipelineClient, PipelineConfig, PipelineInputs, PipelineOutputs, PipelineOutputType

config = PipelineConfig(
    name="my-simple-pipeline-output-type",  # Name for your pipeline in deepset AI Platform
    inputs=PipelineInputs(
        query=["prompt_builder.query"]  # Which component parameter receives the query
    ),
    outputs=PipelineOutputs(
        answers="llm.replies"  # Which component output provides the answers
    ),
    pipeline_output_type=PipelineOutputType.CHAT # set pipeline_output_type here
)

What To Do Next

  • Check if you want to swap any of your pipeline components for deepset-specific components. For example. DeepsetAnswerBuilder can format and output references in a way AnswerBuilder can't.
  • Make sure the pipeline or index works. Fix any errors that you may see in the Builder.
  • If you imported a pipeline, you must deploy it to use it for search. For details, see Deploy a Pipeline.
  • If you imported an index, you must enable it to process the files. For details, see Enable an Index.