An asynchronous client for the deepset API.
Module files
This module contains async functions for uploading files and folders to deepset.
list_files
async def list_files(
api_key: Optional[str] = None,
api_url: Optional[str] = None,
workspace_name: str = DEFAULT_WORKSPACE_NAME,
name: Optional[str] = None,
odata_filter: Optional[str] = None,
batch_size: int = 100,
timeout_s: Optional[int] = None) -> AsyncGenerator[List[File], None]
List all files in a workspace.
Arguments:
api_key
: deepset API key to use for authentication.api_url
: API URL to use for authentication.workspace_name
: Name of the workspace to list the files from. It uses the workspace from the .ENV file by default.name
: Name of the file to odata_filter for.odata_filter
: The odata_filter to apply to the file list.
For example,odata_filter="category eq 'news'"
lists files with metadata{"meta": {"category": "news"}}
.timeout_s
: The timeout in seconds for this API call.batch_size
: Batch size for the listing.
Returns:
List of files.
list_upload_sessions
async def list_upload_sessions(
api_key: Optional[str] = None,
api_url: Optional[str] = None,
workspace_name: str = DEFAULT_WORKSPACE_NAME,
is_expired: Optional[bool] = None,
batch_size: int = 100,
timeout_s: Optional[int] = None
) -> AsyncGenerator[List[UploadSessionDetail], None]
List the details of all upload sessions for a given workspace, including the closed sessions.
Arguments:
api_key
: deepset API key to use for authentication.api_url
: API URL to use for authentication.workspace_name
: Name of the workspace to list the files from. It uses the workspace from the .ENV file by default.is_expired
: Whether to list expired upload sessions.batch_size
: Batch size for the listing.timeout_s
: Timeout in seconds for the API requests.
Returns:
List of files.
get_upload_session
async def get_upload_session(
session_id: UUID,
api_key: Optional[str] = None,
api_url: Optional[str] = None,
workspace_name: str = DEFAULT_WORKSPACE_NAME) -> UploadSessionStatus
Get the status of an upload session.
Arguments:
session_id
: ID of the upload session to get the status for.api_key
: deepset API key to use for authentication.api_url
: API URL to use for authentication.workspace_name
: Name of the workspace to list the files from.
Returns:
List of files.
upload
async def upload(paths: List[Path],
api_key: Optional[str] = None,
api_url: Optional[str] = None,
workspace_name: str = DEFAULT_WORKSPACE_NAME,
write_mode: WriteMode = WriteMode.KEEP,
blocking: bool = True,
timeout_s: Optional[int] = None,
show_progress: bool = True,
recursive: bool = False,
desired_file_types: Optional[List[str]] = None,
enable_parallel_processing: bool = False,
safe_mode: bool = False) -> S3UploadSummary
Upload a folder to deepset.
Arguments:
paths
: Path to the folder to upload. If the folder contains unsupported files, they're skipped.
during the upload. Supported file formats are txt, pdf, docx, pptx, xlsx, xml, csv, html, md, json.api_key
: API key to use for authentication.api_url
: API URL to use for authentication.workspace_name
: Name of the workspace to upload the files to. It uses the workspace from the .ENV file by default.write_mode
: Specifies what to do when a file with the same name already exists in the workspace.
Possible options are:
KEEP - uploads the file with the same name and keeps both files in the workspace.
OVERWRITE - overwrites the file that is in the workspace.
FAIL - fails to upload the file with the same name.blocking
: Whether to wait for the upload to finish.timeout_s
: Timeout in seconds for the upload.show_progress
: Shows the upload progress.recursive
: Uploads files from subdirectories as well.desired_file_types
: A list of allowed file types to upload. If not provided, all
files are uploaded.enable_parallel_processing
: IfTrue
, the deepset will ingest the files in parallel.
Use this to speed up the upload process and if you are not running concurrent uploads for the same files.safe_mode
: IfTrue
, the deepset will not ingest the files in parallel.
download
async def download(workspace_name: str = DEFAULT_WORKSPACE_NAME,
file_dir: Optional[Union[Path, str]] = None,
name: Optional[str] = None,
odata_filter: Optional[str] = None,
include_meta: bool = True,
batch_size: int = 50,
api_key: Optional[str] = None,
api_url: Optional[str] = None,
show_progress: bool = True,
timeout_s: Optional[int] = None,
safe_mode: bool = False) -> None
Download a folder to deepset.
Downloads all files from a workspace to a local folder.
Arguments:
workspace_name
: Name of the workspace to upload the files to. It uses the workspace from the .ENV file by default.file_dir
: Path to the folder to download.name
: Name of the file to odata_filter by.odata_filter
: odata_filter by file meta data.include_meta
: Whether to include the file meta in the folder.batch_size
: Batch size for the listing.api_key
: API key to use for authentication.api_url
: API URL to use for authentication.show_progress
: Shows the upload progress.timeout_s
: Timeout in seconds for the download.safe_mode
: IfTrue
, disabled ingesting files in parallel.
upload_texts
async def upload_texts(
files: List[DeepsetCloudFile],
api_key: Optional[str] = None,
api_url: Optional[str] = None,
workspace_name: str = DEFAULT_WORKSPACE_NAME,
write_mode: WriteMode = WriteMode.KEEP,
blocking: bool = True,
timeout_s: Optional[int] = None,
show_progress: bool = True,
enable_parallel_processing: bool = False) -> S3UploadSummary
Upload raw texts to deepset.
Arguments:
files
: List of DeepsetCloudFile objects to upload.api_key
: deepset API key to use for authentication.api_url
: API URL to use for authentication.workspace_name
: Name of the workspace to upload the files to. It uses the workspace from the .ENV file by default.write_mode
: Specifies what to do when a file with the same name already exists in the workspace.
Possible options are:
KEEP - uploads the file with the same name and keeps both files in the workspace.
OVERWRITE - overwrites the file that is in the workspace.
FAIL - fails to upload the file with the same name.blocking
: Whether to wait for the files to be listed and displayed in deepset.
This may take a couple of minutes.timeout_s
: Timeout in seconds for theblocking
parameter.show_progress
: Shows the upload progress.enable_parallel_processing
: IfTrue
, deepset ingests files in parallel.
Use this to speed up the upload process. Make sure you are not running concurrent uploads for the same files.
Example:
import asyncio
from deepset_cloud_sdk.workflows.async_client.files import upload_texts, DeepsetCloudFile
async def my_async_context() -> None:
await upload_texts(
api_key="<deepsetCloud_API_key>",
workspace_name="<default_workspace>", # optional, by default the environment variable "DEFAULT_WORKSPACE_NAME" is used
files=[
DeepsetCloudFile(
name="example.txt",
text="this is text",
meta={"key": "value"}, # optional
)
],
blocking=True, # optional, by default True
timeout_s=300, # optional, by default 300
)
# Run the async function
if __name__ == "__main__":
asyncio.run(my_async_context())
upload_bytes
async def upload_bytes(
files: List[DeepsetCloudFileBytes],
api_key: Optional[str] = None,
api_url: Optional[str] = None,
workspace_name: str = DEFAULT_WORKSPACE_NAME,
write_mode: WriteMode = WriteMode.KEEP,
blocking: bool = True,
timeout_s: Optional[int] = None,
show_progress: bool = True,
enable_parallel_processing: bool = False) -> S3UploadSummary
Upload files in byte format.
Arguments:
files
: List of DeepsetCloudFileBytes objects to upload.api_key
: deepset API key to use for authentication.api_url
: API URL to use for authentication.workspace_name
: Name of the workspace to upload the files to. It uses the workspace from the .ENV file by default.write_mode
: Specifies what to do when a file with the same name already exists in the workspace.
Possible options are:
KEEP - uploads the file with the same name and keeps both files in the workspace.
OVERWRITE - overwrites the file in the workspace with the file you're uploading.
FAIL - fails to upload the file if a file with the same name already exists in the workspace.blocking
: Whether to wait for the files to be listed and displayed in deepset.
This may take a couple of minutes.timeout_s
: Timeout in seconds for theblocking
parameter.show_progress
: Shows the upload progress.enable_parallel_processing
: IfTrue
, deepset ingests files in parallel.
Use this to speed up the upload process. Make sure you are not running concurrent uploads for the same files.
Module async_pipeline_client
Async pipeline client for importing pipelines and indexes to deepset AI Platform.
AsyncPipelineClient
class AsyncPipelineClient()
Async client for importing Haystack pipelines and indexes to deepset AI platform.
Notes:
When using this client, you need to manage your own event loop.
Example for importing a Haystack pipeline or index to deepset AI platform:
```python
from deepset_cloud_sdk import (
AsyncPipelineClient,
PipelineConfig,
PipelineInputs,
PipelineOutputs,
IndexConfig,
IndexInputs,
)
from haystack import Pipeline
# Initialize the client with configuration from environment variables (after running `deepset-cloud login`)
client = AsyncPipelineClient()
# or initialize the client with explicit configuration
client = AsyncPipelineClient(
api_key="your-api-key",
workspace_name="your-workspace",
api_url="https://api.cloud.deepset.ai/api/v1"
)
# Configure your pipeline
pipeline = Pipeline()
# Configure import
# if importing a pipeline, use PipelineConfig
config = PipelineConfig(
name="my-pipeline",
inputs=PipelineInputs(
query=["prompt_builder.query"],
filters=["bm25_retriever.filters", "embedding_retriever.filters"],
),
outputs=PipelineOutputs(
answers="answers_builder.answers",
documents="ranker.documents",
),
strict_validation=False, # Fail on validation errors (default: False, warnings only)
overwrite=False, # Overwrite existing pipelines with the same name. If True, creates if it doesn't exist (default: False)
)
# if importing an index, use IndexConfig
config = IndexConfig(
name="my-index",
inputs=IndexInputs(files=["file_type_router.sources"]),
strict_validation=False, # Fail on validation errors (default: False, warnings only)
overwrite=False, # Overwrite existing indexes with the same name. If True, creates if it doesn't exist (default: False)
)
# async execution
await client.import_into_deepset(pipeline, config)
<a id="async_pipeline_client.AsyncPipelineClient.__init__"></a>
#### AsyncPipelineClient.\_\_init\_\_
python
def init(api_key: str | None = None,
workspace_name: str | None = None,
api_url: str | None = None) -> None
Initialize the Async Pipeline Client.
The client can be configured in two ways:
1. Using environment variables (recommended):
- Run `deepset-cloud login` to set up the following environment variables:
- `API_KEY`: Your deepset AI platform API key
- `API_URL`: The URL of the deepset AI platform API
- `DEFAULT_WORKSPACE_NAME`: The workspace name to use.
2. Using explicit parameters:
- Provide the values directly to this constructor
- Any missing parameters will fall back to environment variables
**Arguments**:
- `api_key`: Your deepset AI platform API key. Falls back to `API_KEY` environment variable.
- `workspace_name`: The workspace to use. Falls back to `DEFAULT_WORKSPACE_NAME` environment variable.
- `api_url`: The URL of the deepset AI platform API. Falls back to `API_URL` environment variable.
**Raises**:
- `ValueError`: If no api key or workspace name is provided and `API_KEY` or `DEFAULT_WORKSPACE_NAME` is not set in the environment.
<a id="async_pipeline_client.AsyncPipelineClient.import_into_deepset"></a>
#### AsyncPipelineClient.import\_into\_deepset
```python
async def import_into_deepset(pipeline: PipelineProtocol,
config: IndexConfig | PipelineConfig) -> None
Import a Haystack Pipeline
or AsyncPipeline
into deepset AI Platform asynchronously.
The pipeline must be imported as either an index or a pipeline:
- An index: Processes files and stores them in a document store, making them available for
pipelines to search. - A pipeline: For other use cases, for example, searching through documents stored by index pipelines.
Arguments:
pipeline
: The HaystackPipeline
orAsyncPipeline
to import.config
: Configuration for importing, use eitherIndexConfig
orPipelineConfig
.
If importing an index, the config argument is expected to be of typeIndexConfig
,
if importing a pipeline, the config argument is expected to be of typePipelineConfig
.