Pipelines

Pipelines are powerful and highly flexible systems that form the engine for your app. They consist of components that process your data and perform various tasks on it. Each component in a pipeline passes its output to the next component as input.

How Do Pipelines Work?

Pipelines are composed of connected components. Each component processes a piece of your data, such as documents, and performs specific tasks before passing the output to the next component. For example, a basic RAG pipeline may include:

  • A text embedder that takes the user query and turns it into a vector.
  • A retriever that uses the vector from the text embedder and uses it to fetch relevant documents from the document store.
  • A prompt builder that injects the user query and the documents from the retriever into the prompt.
  • A generator that uses the prompt from the prompt builder, including the documents, to generate the response.

Components function like modular building blocks that you can mix, match, and replace to form various pipeline configurations, such as loops, branches, or simultaneous flows. When connecting the components, it's crucial to ensure the output type of one component matches the input type of the next. Each component receives only the data it needs, which speeds up the pipeline and makes it easier to debug.

For component compatibility, check individual pages under Pipeline Components.

Connection Types

Pipelines in deepset Cloud V2 are flexible and multifunctional. While you can still create simple pipelines performing one function, like answering queries, you can also use them to build complex workflows.

When you connect components in a pipeline, it validates that their outputs and inputs match and are explicitly indicated and, if needed, produces detailed errors.

Branches

Pipelines can branch to process data simultaneously. For example, each pipeline branch can have a different converter, each dedicated to a specific file type, allowing for efficient parallel processing.

Loops

In loops, components operate iteratively, with a set limit on repetitions. This is useful in scenarios such as self-correcting loops, where a validator component checks the generator's output and potentially cycles it back for correction until it meets the quality standard and can be sent further down the pipeline.

How Do Pipelines Use Your Files?

Pipelines run on the files you uploaded to deepset Cloud. These files must be preprocessed - cleaned, and chunked into smaller passages called Documents. Documents get stored in a database called Document Store, from which the pipeline retrieves them at query time. One file may produce multiple documents. Documents inherit metadata from files. The exact steps for preprocessing the files are defined in your indexing pipeline.

Files uploaded to deepset Cloud are preprocessed using pipeline components. As a result of this preprocessing they're converted into documents and stored in the DocumentStore. That's called indexing. Then, at query time, a query pipeline looks into the document store to search for relevant documents and come up with an answer based on them.

Your files are indexed once when you deploy your pipeline; they aren't indexed every time a pipeline runs. New files uploaded after you deploy your pipeline are indexed individually and included in the files the pipeline runs on without you having to redeploy the pipeline. The same is true for conversion. If you're using a converter component that converts your files into documents, it does so only once; it doesn't convert them every time you run your search.

Indexing and Query Pipelines

Pipelines in deepset Cloud v2 are very flexible. A single pipeline configuration can contain both indexing and query steps, and many components can be used in all types of pipelines. However, keeping the indexing steps separate from the query steps is often convenient. When you create a pipeline, it opens in two tabs: indexing and query.

  • The indexing pipeline defines how you want to preprocess your files before running a search on them.
    An indexing pipeline takes file paths as input. Its goal is to convert the files into smaller passages of text (Documents) and store them in the DocumentStore, where the query pipeline can access them at query time. It does this using configurable components that clean and chunk the files.
  • The query pipeline contains a recipe for how to execute a query. It runs on the Documents stored in the DocumentStore and created from the files you uploaded to deepset Cloud. It takes the user query, retrieves the relevant documents, and processes them through the components to arrive at an answer.
Example of an indexing pipeline

components:
    file_classifier:
      type: haystack.components.routers.file_type_router.FileTypeRouter
      init_parameters:
        mime_types:
        - text/plain
        - application/pdf
        - text/markdown
        - text/html

    text_converter:
      type: haystack.components.converters.txt.TextFileToDocument
      init_parameters:
        encoding: utf-8

    pdf_converter:
      type: haystack.components.converters.pypdf.PyPDFToDocument
      init_parameters:
        converter_name: default

    markdown_converter:
      type: haystack.components.converters.markdown.MarkdownToDocument
      init_parameters:
        table_to_single_line: false

    html_converter:
      type: haystack.components.converters.html.HTMLToDocument
      init_parameters:
        # A dictionary of keyword arguments to customize how you want to extract content from your HTML files.
        # For the full list of available arguments, see
        # the [Trafilatura documentation](https://trafilatura.readthedocs.io/en/latest/corefunctions.html#extract).
        extraction_kwargs:
          output_format: txt  # Extract text from HTML. You can also also choose "markdown"
          target_language: null  # You can define a language (using the ISO 639-1 format) to discard documents that don't match that language.
          include_tables: true  # If true, includes tables in the output
          include_links: false  # If true, keeps links along with their targets

    joiner:
      type: haystack.components.joiners.document_joiner.DocumentJoiner
      init_parameters:
        join_mode: concatenate
        sort_by_score: false

    splitter:
      type: haystack.components.preprocessors.document_splitter.DocumentSplitter
      init_parameters:
        split_by: word
        split_length: 250
        split_overlap: 30

    document_embedder:
      type: haystack.components.embedders.sentence_transformers_document_embedder.SentenceTransformersDocumentEmbedder
      init_parameters:
        model: "intfloat/e5-base-v2"
        device: null

    writer:
      type: haystack.components.writers.document_writer.DocumentWriter
      init_parameters:
        document_store:
          type: haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore
          init_parameters:
            embedding_dim: 768
            similarity: cosine
        policy: OVERWRITE

  connections:  # Defines how the components are connected
  - sender: file_classifier.text/plain
    receiver: text_converter.sources
  - sender: file_classifier.application/pdf
    receiver: pdf_converter.sources
  - sender: file_classifier.text/markdown
    receiver: markdown_converter.sources
  - sender: file_classifier.text/html
    receiver: html_converter.sources
  - sender: text_converter.documents
    receiver: joiner.documents
  - sender: pdf_converter.documents
    receiver: joiner.documents
  - sender: markdown_converter.documents
    receiver: joiner.documents
  - sender: html_converter.documents
    receiver: joiner.documents
  - sender: joiner.documents
    receiver: splitter.documents
  - sender: splitter.documents
    receiver: document_embedder.documents
  - sender: document_embedder.documents
    receiver: writer.documents

  max_loops_allowed: 100

  inputs:  # Define the inputs for your pipeline
    files: "file_classifier.sources"  # This component will receive the files to index as input
An example of a query pipeline
components:
    bm25_retriever: # Selects the most similar documents from the document store
      type: haystack_integrations.components.retrievers.opensearch.bm25_retriever.OpenSearchBM25Retriever
      init_parameters:
        document_store:
          init_parameters:
            use_ssl: True
            verify_certs: False
            hosts:
              - ${OPENSEARCH_HOST}
            http_auth:
              - "${OPENSEARCH_USER}"
              - "${OPENSEARCH_PASSWORD}"
          type: haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore
        top_k: 20 # The number of results to return

    query_embedder:
      type: haystack.components.embedders.sentence_transformers_text_embedder.SentenceTransformersTextEmbedder
      init_parameters:
        model: "intfloat/e5-base-v2"
        device: null

    embedding_retriever: # Selects the most similar documents from the document store
      type: haystack_integrations.components.retrievers.opensearch.embedding_retriever.OpenSearchEmbeddingRetriever
      init_parameters:
        document_store:
          init_parameters:
            use_ssl: True
            verify_certs: False
            http_auth:
              - "${OPENSEARCH_USER}"
              - "${OPENSEARCH_PASSWORD}"
          type: haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore
        top_k: 20 # The number of results to return

    document_joiner:
      type: haystack.components.joiners.document_joiner.DocumentJoiner
      init_parameters:
        join_mode: concatenate

    ranker:
      type: haystack.components.rankers.transformers_similarity.TransformersSimilarityRanker
      init_parameters:
        model: "intfloat/simlm-msmarco-reranker"
        top_k: 8
        device: null
        model_kwargs:
          torch_dtype: "torch.float16"

    prompt_builder:
      type: haystack.components.builders.prompt_builder.PromptBuilder
      init_parameters:
        template: |-
          You are a technical expert.
          You answer questions truthfully based on provided documents.
          Ignore typing errors in the question.
          For each document check whether it is related to the question.
          Only use documents that are related to the question to answer it.
          Ignore documents that are not related to the question.
          If the answer exists in several documents, summarize them.
          Only answer based on the documents provided. Don't make things up.
          Don't reference documents.
          Just output the structured, informative and precise answer and nothing else.
          If the documents can't answer the question, say so.
          These are the documents:
          {% for document in documents %}
          Document[{{ loop.index }}]:
          {{ document.content }}
          {% endfor %}
          Question: {{question}}
          Answer:

    llm:
      type: haystack.components.generators.openai.OpenAIGenerator
      init_parameters:
        api_key: {"type": "env_var", "env_vars": ["OPENAI_API_KEY"], "strict": False}
        model: "gpt-4o"
        generation_kwargs:
          max_tokens: 400
          temperature: 0.0
          seed: 0

    answer_builder:
      init_parameters: {}
      type: haystack.components.builders.answer_builder.AnswerBuilder

  connections:  # Defines how the components are connected
  - sender: bm25_retriever.documents
    receiver: document_joiner.documents
  - sender: query_embedder.embedding
    receiver: embedding_retriever.query_embedding
  - sender: embedding_retriever.documents
    receiver: document_joiner.documents
  - sender: document_joiner.documents
    receiver: ranker.documents
  - sender: ranker.documents
    receiver: prompt_builder.documents
  - sender: ranker.documents
    receiver: answer_builder.documents
  - sender: prompt_builder.prompt
    receiver: llm.prompt
  - sender: llm.replies
    receiver: answer_builder.replies

  max_loops_allowed: 100

  inputs:  # Define the inputs for your pipeline
    query:  # These components will receive the query as input
    - "bm25_retriever.query"
    - "query_embedder.text"
    - "ranker.query"
    - "prompt_builder.question"
    - "answer_builder.query"

    filters:  # These components will receive a potential query filter as input
    - "bm25_retriever.filters"
    - "embedding_retriever.filters"

  outputs:  # Defines the output of your pipeline
    documents: "ranker.documents"  # The output of the pipeline is the retrieved documents
    answers: "answer_builder.answers"  # The output of the pipeline is the generated answer

Pipeline Service Levels

To save costs and meet your infrastructure and service requirements, your pipelines are assigned service levels. There are three service levels available:

  • Draft: This is a service level automatically assigned to new and undeployed pipelines, so that you can easily distinguish them from the deployed ones.
  • Development: Pipelines at this level are designed for testing and running experiments. They have no replicas by default, and their time to standby is short, so they can save resources whenever these pipelines aren’t used. When you deploy a draft pipeline, it becomes a development pipeline.
  • Production: This level is recommended for business-critical scenarios where you need the pipeline to be scalable and reliable. Pipelines at this level include one replica by default and a longer time-to-standby period than other service levels. With heavy traffic, the number of replicas grows up to 10.

This table gives an overview of the settings that come with each service level:

Service levelDescriptionTime to standbyScaling (replicas)How to enable
ProductionDesigned for critical business scenarios that require reliable and scalable pipelines.30 days1 at all times, scales up to 10 if traffic is heavy- In deepset Cloud, on the Pipelines page

- Through the Update Pipeline REST API endpoint
DevelopmentDesigned for testing and experimenting purposes.12 hours0- By switching off the Production service level for a deployed production pipeline in deepset Cloud

- Through the Update Pipeline REST API endpoint
- By deploying a draft pipeline
DraftIndicates an undeployed pipeline.n/a0- By undeploying a production or development pipeline

- All new pipelines are automatically classified as drafts

Time to standby is the time after which an unused pipeline enters a standby mode to save resources. Inactive pipelines don’t use up the pipeline hours included in your plan.
To use a pipeline that is on standby, activate it either on the Pipelines page or by initiating a search using that pipeline.

Replicas are the number of duplicate versions of a pipeline that are available. In case there is a spike in demand, deepset Cloud seamlessly switches to a functioning replica to maintain uninterrupted service. Pricing plans with production pipelines automatically include one replica. The pipeline hours (the hours your pipeline is deployed) of this replica are not billed separately.

For heavy pipelines, we can increase the maximum number of replicas on request. Contact your deepset Cloud representative to change this setting.

You can change the service level of your pipeline at any time. For details, see Change the Pipeline's Service Level.

The Pipelines Page

All the pipelines created by your organization are listed on the Pipelines page. The pipelines listed under Deployed are the ones that you can run your search with. The pipelines under In Development are drafts you must deploy before you can use them for your search.

Clicking a pipeline opens Pipeline Details, where you can check all the information about your pipeline, including pipeline logs.

Pipeline Status

When you deploy a pipeline, it changes its status as follows:

  • Not indexed: The pipeline is being deployed, but the files have not yet been indexed
  • Indexing: Your files are being indexed. You can see how many files have already been indexed if you hover your mouse over the Indexing label.
  • Indexed: Your pipeline is deployed, all the files are indexed, and you can use your pipeline for search.
  • Partially indexed: At least one of the files wasn't indexed. This may be an NLP-related problem, a problem with your file, or a component in the pipeline. You can still run a search if at least some files were indexed.
  • Failed to deploy: It's a fatal state. Your pipeline was not deployed, and your files are not indexed. For ideas on how to fix it, see Troubleshoot Pipeline Deployment.