Tutorial: Use AsyncMilvusClient with asyncio
AsyncMilvusClient is an asynchronous MilvusClient that offers a coroutine-based API for non-blocking access to Milvus via asyncio. In this article, you will learn about the process for calling the APIs that AsyncMilvusClient provides and the aspects to which you need to pay attention to.
Overview
Asyncio is a library for writing concurrent code using async/await syntax and serves as the foundation for Milvus’ high-performance asynchronous client, which will fit into your code library running on top of asyncio.
The methods that AsyncMilvusClient provides have identical parameter sets and behaviors as those of MilvusClient. The only difference lies in the way you call them. The following table lists the methods available in AsyncMilvusClient.
Client | ||
---|---|---|
| ||
Collection & Partition | ||
|
|
|
| ||
Index | ||
|
|
|
|
|
|
Vector | ||
|
|
|
|
|
|
|
If you still need the asynchronous version of any other MilvusClient method, you can submit a feature request in the pymilvus repo. Code contribution is also welcome.
Create an event loop
Applications using asyncio typically use the event loop as the orchestrator for managing asynchronous tasks and I/O operations. In this tutorial, we will get an event loop from asyncio and use it as the orchestrator.
import asyncio
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest
loop = asyncio.get_event_loop()
Connect with AsyncMilvusClient
The following example demonstrates how to connect Milvus in an asynchronous manner.
# Connect to Milvus server using AsyncMilvusClient
async_client = AsyncMilvusClient(
uri="http://localhost:19530",
token="root:Milvus"
)
Create schema
Currently, create_schema()
is not available in AsyncMilvusClient. Instead, we will use MilvusClient to create the schema for the collection.
schema = async_client.create_schema(
auto_id=False,
description="This is a sample schema",
)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=5)
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("text", DataType.VARCHAR, max_length=512)
AsyncMilvusClient calls the create_schema()
method synchronously; therefore, you do not need to orchestrate the call using the event loop.
Create collection
Now we will use the schema to create a collection. Note that you need to prefix the await
keyword to any call to the AsyncMilvusClient
methods and place the call inside an async
function as follows:
async def create_my_collection(collection_name, schema):
if (client.has_collection(collection_name)):
await async_client.drop_collection(collection_name)
await async_client.create_collection(
collection_name=collection_name,
schema=schema
)
if (client.has_collection(collection_name)):
print("Collection created successfully")
else:
print("Failed to create collection")
# Call the above function asynchronously
loop.run_until_complete(create_my_collection("my_collection", schema))
# Output
#
# Collection created successfully
Create index
You also need to create indexes for all vector fields and optional scalar fields. According to the schema defined above, there are two vector fields in the collection, and you will create indexes for them as follows:
async def create_indexes(collection_name):
index_params = client.prepare_index_params()
index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="IP")
index_params.add_index(field_name="sparse_vector", index_type="AUTOINDEX", metric_type="IP")
index_params.add_index(field_name="text", index_type="AUTOINDEX")
await async_client.create_index(collection_name, index_params)
# Call the above function asynchronously
loop.run_until_complete(create_indexes("my_collection"))
Load collection
A collection can be loaded after the necessary fields are indexed. The following code demonstrates how to load the collection asynchronously.
async def load_my_collection(collection_name):
await async_client.load_collection(collection_name)
print(client.get_load_state(collection_name))
# Call the above function asynchronously
loop.run_until_complete(load_my_collection("my_collection"))
# Output
#
# {'state': <LoadState: Loaded>}
Insert data
You can use the embedding models available in pymilvus to generate vector embeddings for your texts. For details, refer to Embedding Overview. In this section, we will insert randomly generated data into the collection.
async def insert_sample_data(collection_name):
# Randomly generated data will be used here
rng = np.random.default_rng(42)
def generate_random_text(length):
seed = "this is a seed paragraph to generate random text, which is used for testing purposes. Specifically, a random text is generated by randomly selecting words from this sentence."
words = seed.split()
return " ".join(rng.choice(words, length))
data = [{
'id': i,
'dense_vector': rng.random(5).tolist(),
'sparse_vector': csr_matrix(rng.random(5)),
'text': generate_random_text(10)
} for i in range(10000)]
res = await async_client.insert(collection_name, data)
print(res)
# Call the above function asynchronously
loop.run_until_complete(insert_sample_data("my_collection"))
# Output
#
# {'insert_count': 10000, 'ids': [0, 1, 2, 3, ..., 9999]}
Query
After the collection is loaded and filled with data, you can conduct searches and queries in it. In this section, you are going to find the number of entities in the text
field starting with the word random
in the collection named my_collection
.
async def query_my_collection(collection_name):
# Find the number of entities with the `text` fields starting with the word "random" in the `my_collection` collection.
res = await async_client.query(
collection_name="my_collection",
filter='text like "%random%"',
output_fields=["count(*)"]
)
print(res)
# Call the above function asynchronously
loop.run_until_complete(query_my_collection("my_collection"))
# Output
#
# data: ["{'count(*)': 6802}"]
Search
In this section, you will conduct vector searches on the target collection’s dense and sparse vector fields.
async def conduct_vector_search(collection_name, type, field):
# Generate a set of three random query vectors
query_vectors = []
if type == "dense":
query_vectors = [ rng.random(5) for _ in range(3) ]
if type == "sparse":
query_vectors = [ csr_matrix(rng.random(5)) for _ in range(3) ]
print(query_vectors)
res = await async_client.search(
collection_name="my_collection",
data=query_vectors,
anns_field=field,
output_fields=["text", field]
)
print(res)
# To search against the dense vector field asynchronously
loop.run_until_complete(conduct_vector_search("my_collection", "dense", "dense_vector"))
# To search against the sparse vector field asynchronously
loop.run_until_complete(conduct_vector_search("my_collection", "sparse", "sparse_vector"))
The search output should list three sets of results corresponding to the specified query vectors.
Hybrid Search
A hybrid search combines the results of multiple searches and reranks them to get a better recall. In this section, you are going to conduct a hybrid search using the dense and sparse vector fields.
async def conduct_hybrid_search(collection_name):
req_dense = AnnSearchRequest(
data=[ rng.random(5) for _ in range(3) ],
anns_field="dense_vector",
param={"metric_type": "IP"},
limit=10
)
req_sparse = AnnSearchRequest(
data=[ csr_matrix(rng.random(5)) for _ in range(3) ],
anns_field="sparse_vector",
param={"metric_type": "IP"},
limit=10
)
reqs = [req_dense, req_sparse]
ranker = RRFRanker()
res = await async_client.hybrid_search(
collection_name="my_collection",
reqs=reqs,
ranker=ranker,
output_fields=["text", "dense_vector", "sparse_vector"]
)
print(res)
# Call the above function asynchronously
loop.run_until_complete(conduct_hybrid_search("my_collection"))