Pipelines
Pipelines are powerful and highly flexible systems that form the engine for your app. They consist of components that process your data and perform various tasks on it. Each component in a pipeline passes its output to the next component as input.
How Do Pipelines Work?
Pipelines are composed of connected components. Each component processes a piece of your data, such as documents, and performs specific tasks before passing the output to the next component. For example, a basic RAG pipeline may include:
- A
TextEmbedder
that takes the user query and turns it into a vector. - A
Retriever
that receives the vector from theTextEmbedder
and uses it to fetch relevant documents from the document store. - A
PromptBuilder
that injects the user query and the documents from theRetriever
into the prompt. - A
Generator
that uses the prompt from thePromptBuilder
, including the documents, to generate the response.
Components function like modular building blocks that you can mix, match, and replace to form various pipeline configurations, such as loops, branches, or simultaneous flows. When connecting the components, it's crucial to ensure the output type of one component matches the input type of the next. Each component receives only the data it needs, which speeds up the pipeline and makes it easier to debug.
For component compatibility, check individual pages under Pipeline Components. When building in Pipeline Builder, you can check compatible connections if you hover your mouse over the connection point of a component.
deepset Cloud uses Haystack pipelines, which means pipelines accept Haystack objects as input and output. For more information, see Haystack documentation.
Connection Types
Pipelines in deepset Cloud V2 are flexible and multifunctional. While you can still create simple pipelines performing one function, like answering queries, you can also use them to build complex workflows.
Connection Validation
When you connect components in a pipeline, it validates that their outputs and inputs match and are explicitly indicated and, if needed, produces detailed errors.
Branches
Pipelines can branch to process data simultaneously. For example, each pipeline branch can have a different converter, each dedicated to a specific file type, allowing for efficient parallel processing.
Loops
In loops, components operate iteratively, with a set limit on repetitions. This is useful in scenarios such as self-correcting loops, where a validator component checks the generator's output and potentially cycles it back for correction until it meets the quality standard and can be sent further down the pipeline.
How Do Pipelines Use Your Files?
Pipelines run on the files you uploaded to deepset Cloud. These files must be preprocessed - cleaned and chunked into smaller passages called Documents
. Documents are stored in a database called Document Store
, from which the pipeline retrieves them at query time. Document store is an interface to a database, such as OpenSerach, Pinecone, Weaviate, and more. For details, see Document Stores.
One file may produce multiple documents. Documents inherit metadata from files. Your indexing pipeline defines the exact steps for preprocessing the files.
Your files are indexed once when you deploy your pipeline; they aren't indexed every time a pipeline runs. New files uploaded after you deploy your pipeline are indexed individually and included in the files the pipeline runs on without you having to redeploy the pipeline. The same is true for conversion. If you're using a converter component that converts your files into documents, it does so only once; it doesn't convert them every time you run your search.
For some databases, you must manage the index yourself, as deepset Cloud doesn't have access to it. The same may be true for the indexing status of your files. To check the document stores this applies to, see Document Stores.
Indexing and Query Pipelines
Pipelines in deepset Cloud v2 are very flexible. A single pipeline configuration can contain both indexing and query steps, and many components can be used in all types of pipelines. However, keeping the indexing steps separate from the query steps is often convenient. When you create a pipeline, it opens in two tabs: indexing and query.
- The indexing pipeline defines how you want to preprocess your files before running a search on them.
An indexing pipeline takes file paths as input. Its goal is to convert the files into smaller passages of text (Documents) and store them in the DocumentStore, where the query pipeline can access them at query time. It does this using configurable components that clean and chunk the files.
Indexing pipelines are optional. In most cases, you'll need them when writing files to a document store. However, if you're handling tasks like summarization and passing all the content directly in the query, you can skip the indexing pipeline. - The query pipeline contains a recipe for how to execute a query. It runs on the Documents stored in the DocumentStore and created from the files you uploaded to deepset Cloud. It takes the user query, retrieves the relevant documents, and processes them through the components to arrive at an answer.
Example of an indexing pipeline
components:
file_classifier:
type: haystack.components.routers.file_type_router.FileTypeRouter
init_parameters:
mime_types:
- text/plain
- application/pdf
- text/markdown
- text/html
text_converter:
type: haystack.components.converters.txt.TextFileToDocument
init_parameters:
encoding: utf-8
pdf_converter:
type: haystack.components.converters.pypdf.PyPDFToDocument
init_parameters:
converter_name: default
markdown_converter:
type: haystack.components.converters.markdown.MarkdownToDocument
init_parameters:
table_to_single_line: false
html_converter:
type: haystack.components.converters.html.HTMLToDocument
init_parameters:
# A dictionary of keyword arguments to customize how you want to extract content from your HTML files.
# For the full list of available arguments, see
# the [Trafilatura documentation](https://trafilatura.readthedocs.io/en/latest/corefunctions.html#extract).
extraction_kwargs:
output_format: txt # Extract text from HTML. You can also also choose "markdown"
target_language: null # You can define a language (using the ISO 639-1 format) to discard documents that don't match that language.
include_tables: true # If true, includes tables in the output
include_links: false # If true, keeps links along with their targets
joiner:
type: haystack.components.joiners.document_joiner.DocumentJoiner
init_parameters:
join_mode: concatenate
sort_by_score: false
splitter:
type: haystack.components.preprocessors.document_splitter.DocumentSplitter
init_parameters:
split_by: word
split_length: 250
split_overlap: 30
document_embedder:
type: haystack.components.embedders.sentence_transformers_document_embedder.SentenceTransformersDocumentEmbedder
init_parameters:
model: "intfloat/e5-base-v2"
device: null
writer:
type: haystack.components.writers.document_writer.DocumentWriter
init_parameters:
document_store:
type: haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore
init_parameters:
embedding_dim: 768
similarity: cosine
policy: OVERWRITE
connections: # Defines how the components are connected
- sender: file_classifier.text/plain
receiver: text_converter.sources
- sender: file_classifier.application/pdf
receiver: pdf_converter.sources
- sender: file_classifier.text/markdown
receiver: markdown_converter.sources
- sender: file_classifier.text/html
receiver: html_converter.sources
- sender: text_converter.documents
receiver: joiner.documents
- sender: pdf_converter.documents
receiver: joiner.documents
- sender: markdown_converter.documents
receiver: joiner.documents
- sender: html_converter.documents
receiver: joiner.documents
- sender: joiner.documents
receiver: splitter.documents
- sender: splitter.documents
receiver: document_embedder.documents
- sender: document_embedder.documents
receiver: writer.documents
max_loops_allowed: 100
inputs: # Define the inputs for your pipeline
files: "file_classifier.sources" # This component will receive the files to index as input
Example of a query pipeline
components:
bm25_retriever: # Selects the most similar documents from the document store
type: haystack_integrations.components.retrievers.opensearch.bm25_retriever.OpenSearchBM25Retriever
init_parameters:
document_store:
init_parameters:
use_ssl: True
verify_certs: False
hosts:
- ${OPENSEARCH_HOST}
http_auth:
- "${OPENSEARCH_USER}"
- "${OPENSEARCH_PASSWORD}"
type: haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore
top_k: 20 # The number of results to return
query_embedder:
type: haystack.components.embedders.sentence_transformers_text_embedder.SentenceTransformersTextEmbedder
init_parameters:
model: "intfloat/e5-base-v2"
device: null
embedding_retriever: # Selects the most similar documents from the document store
type: haystack_integrations.components.retrievers.opensearch.embedding_retriever.OpenSearchEmbeddingRetriever
init_parameters:
document_store:
init_parameters:
use_ssl: True
verify_certs: False
http_auth:
- "${OPENSEARCH_USER}"
- "${OPENSEARCH_PASSWORD}"
type: haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore
top_k: 20 # The number of results to return
document_joiner:
type: haystack.components.joiners.document_joiner.DocumentJoiner
init_parameters:
join_mode: concatenate
ranker:
type: haystack.components.rankers.transformers_similarity.TransformersSimilarityRanker
init_parameters:
model: "intfloat/simlm-msmarco-reranker"
top_k: 8
device: null
model_kwargs:
torch_dtype: "torch.float16"
prompt_builder:
type: haystack.components.builders.prompt_builder.PromptBuilder
init_parameters:
template: |-
You are a technical expert.
You answer questions truthfully based on provided documents.
Ignore typing errors in the question.
For each document check whether it is related to the question.
Only use documents that are related to the question to answer it.
Ignore documents that are not related to the question.
If the answer exists in several documents, summarize them.
Only answer based on the documents provided. Don't make things up.
Don't reference documents.
Just output the structured, informative and precise answer and nothing else.
If the documents can't answer the question, say so.
These are the documents:
{% for document in documents %}
Document[{{ loop.index }}]:
{{ document.content }}
{% endfor %}
Question: {{question}}
Answer:
llm:
type: haystack.components.generators.openai.OpenAIGenerator
init_parameters:
api_key: {"type": "env_var", "env_vars": ["OPENAI_API_KEY"], "strict": False}
model: "gpt-4o"
generation_kwargs:
max_tokens: 400
temperature: 0.0
seed: 0
answer_builder:
init_parameters: {}
type: haystack.components.builders.answer_builder.AnswerBuilder
connections: # Defines how the components are connected
- sender: bm25_retriever.documents
receiver: document_joiner.documents
- sender: query_embedder.embedding
receiver: embedding_retriever.query_embedding
- sender: embedding_retriever.documents
receiver: document_joiner.documents
- sender: document_joiner.documents
receiver: ranker.documents
- sender: ranker.documents
receiver: prompt_builder.documents
- sender: ranker.documents
receiver: answer_builder.documents
- sender: prompt_builder.prompt
receiver: llm.prompt
- sender: llm.replies
receiver: answer_builder.replies
max_loops_allowed: 100
inputs: # Define the inputs for your pipeline
query: # These components will receive the query as input
- "bm25_retriever.query"
- "query_embedder.text"
- "ranker.query"
- "prompt_builder.question"
- "answer_builder.query"
filters: # These components will receive a potential query filter as input
- "bm25_retriever.filters"
- "embedding_retriever.filters"
outputs: # Defines the output of your pipeline
documents: "ranker.documents" # The output of the pipeline is the retrieved documents
answers: "answer_builder.answers" # The output of the pipeline is the generated answer
Inputs and Outputs
Indexing Pipelines
Inputs
Indexing pipelines always take in files as input. When in Pipeline Builder, it means adding the FilesInput
component at the beginning of an indexing pipeline:
When working in YAML editor, you must explicitly specify the inputs and the components that receive them:
inputs: # Define the inputs for your pipeline
files: "file_classifier.sources" # This component will receive the files to index as input
Outputs
Indexing pipelines always return a list of Document objects as output, usually written into the document store by DocumentWriter
, which is often the last component of an indexing pipeline.
Query Pipelines
Inputs
Query pipelines always take the query as the required input. Optionally, they can also take in filters. When working in Pipeline Builder, drag Query
and Filters
from the Inputs group onto the canvas and then connect them to the components that should receive them.
In YAML editor, you must explicitly list the inputs and the components that receive them:
inputs: # Define the inputs for your pipeline
query: # These components will receive the query as input
- "bm25_retriever.query"
- "query_embedder.text"
- "ranker.query"
- "prompt_builder.question"
- "answer_builder.query"
filters: # These components will receive a potential query filter as input
- "bm25_retriever.filters"
- "embedding_retriever.filters"
Filters are documents' metadata keys by default. This means that if your documents have the following metadata: {"category": "news"}
when searching in Playground, users can narrow down the search to the documents matching this category. You can also pass filters at search time with the Search endpoint. For more information about filters, see Working with Metadata.
Outputs
The output of query pipelines matches the output of the last component. However, it must be one of the following data classes:
- List of
Document
objects (usually document search pipelines) - List of
Answer
objects, including the following subclasses:- List of
ExtracedAnswer
objects (usually extractive question answering pipelines) - List of
ExtractedTableAnswer
objects (table question answering pipelines) - List of
GeneratedAnswer
objects (usually RAG pipelines or pipelines that use large language models to generate answers)
- List of
The output can be a list of documents, a list of answers, or both. Ensure the last component in your pipeline produces one or both of these outputs. For example, you may need to add an AnswerBuilder
after a Generator
to produce a list of GeneratedAnswer
objects. For more information, see Pipeline Components. To learn more about data classes, see Haystack data classes.
In Pipeline Builder, to finalize the pipeline with the correct output, drag the Output
component onto the canvas and connect it to the components that produce the documents or answers you want to include:
When working in the YAML editor, you must explicitly specify the outputs and the components that provide them:
outputs: # Defines the output of your pipeline
documents: "ranker.documents" # The output of the pipeline is the retrieved documents
answers: "answer_builder.answers" # The output of the pipeline is the generated answers
Pipeline Service Levels
To save costs and meet your infrastructure and service requirements, your pipelines are assigned service levels. There are three service levels available:
- Draft: This is a service level automatically assigned to new and undeployed pipelines, so that you can easily distinguish them from the deployed ones.
- Development: Pipelines at this level are designed for testing and running experiments. They have no replicas by default, and their time to standby is short, so they can save resources whenever these pipelines aren’t used. When you deploy a draft pipeline, it becomes a development pipeline.
- Production: This level is recommended for business-critical scenarios where you need the pipeline to be scalable and reliable. Pipelines at this level include one replica by default and a longer time-to-standby period than other service levels. With heavy traffic, the number of replicas grows up to 10.
This table gives an overview of the settings that come with each service level:
Service level | Description | Time to standby | Scaling (replicas) | How to enable |
---|---|---|---|---|
Production | Designed for critical business scenarios that require reliable and scalable pipelines. | 30 days | 1 at all times, scales up to 10 if traffic is heavy | - In deepset Cloud, on the Pipelines page - Through the Update Pipeline REST API endpoint |
Development | Designed for testing and experimenting purposes. | 12 hours | 0 | - By switching off the Production service level for a deployed production pipeline in deepset Cloud - Through the Update Pipeline REST API endpoint - By deploying a draft pipeline |
Draft | Indicates an undeployed pipeline. | n/a | 0 | - By undeploying a production or development pipeline - All new pipelines are automatically classified as drafts |
Time to standby is the time after which an unused pipeline enters a standby mode to save resources. Inactive pipelines don’t use up the pipeline hours included in your plan.
To use a pipeline that is on standby, activate it either on the Pipelines page or by initiating a search using that pipeline.
Replicas are the number of duplicate versions of a pipeline that are available. In case there is a spike in demand, deepset Cloud seamlessly switches to a functioning replica to maintain uninterrupted service. Pricing plans with production pipelines automatically include one replica. The pipeline hours (the hours your pipeline is deployed) of this replica are not billed separately.
For heavy pipelines, we can increase the maximum number of replicas on request. Contact your deepset Cloud representative to change this setting.
You can change the service level of your pipeline at any time. For details, see Change the Pipeline's Service Level.
The Pipelines Page
All the pipelines created by your organization are listed on the Pipelines page. The pipelines listed under Deployed are the ones that you can run your search with. The pipelines under In Development are drafts you must deploy before you can use them for your search.
Clicking a pipeline opens Pipeline Details, where you can check all the information about your pipeline, including pipeline logs.
Pipeline Status
When you deploy a pipeline, it changes its status as follows:
- Not indexed: The pipeline is being deployed, but the files have not yet been indexed
- Indexing: Your files are being indexed. You can see how many files have already been indexed if you hover your mouse over the Indexing label.
- Indexed: Your pipeline is deployed, all the files are indexed, and you can use your pipeline for search.
- Partially indexed: At least one of the files wasn't indexed. This may be an NLP-related problem, a problem with your file, or a component in the pipeline. You can still run a search if at least some files were indexed.
- Failed to deploy: It's a fatal state. Your pipeline was not deployed, and your files are not indexed. For ideas on how to fix it, see Troubleshoot Pipeline Deployment.
Updated 2 months ago