🚀 Try Zilliz Cloud, the fully managed Milvus, for free—experience 10x faster performance! Try Now>>

milvus-logo
LFAI
Home
  • Tutorials

Tutorial: Use AsyncMilvusClient with asyncio

AsyncMilvusClient is an asynchronous MilvusClient that offers a coroutine-based API for non-blocking access to Milvus via asyncio. In this article, you will learn about the process for calling the APIs that AsyncMilvusClient provides and the aspects to which you need to pay attention to.

Overview

Asyncio is a library for writing concurrent code using async/await syntax and serves as the foundation for Milvus’ high-performance asynchronous client, which will fit into your code library running on top of asyncio.

The methods that AsyncMilvusClient provides have identical parameter sets and behaviors as those of MilvusClient. The only difference lies in the way you call them. The following table lists the methods available in AsyncMilvusClient.

Client

close()

Collection & Partition

create_collection()

drop_collection()

create_partition()

drop_partition()

Index

create_index()

drop_index()

load_collection()

release_collection()

load_partitions()

release_partitions()

Vector

insert()

upsert()

delete()

search()

query()

hybrid_search()

get()

If you still need the asynchronous version of any other MilvusClient method, you can submit a feature request in the pymilvus repo. Code contribution is also welcome.

Create an event loop

Applications using asyncio typically use the event loop as the orchestrator for managing asynchronous tasks and I/O operations. In this tutorial, we will get an event loop from asyncio and use it as the orchestrator.

import asyncio
import numpy as np
from scipy.sparse import csr_matrix
from pymilvus import MilvusClient, AsyncMilvusClient, DataType, RRFRanker, AnnSearchRequest

loop = asyncio.get_event_loop()

Connect with AsyncMilvusClient

The following example demonstrates how to connect Milvus in an asynchronous manner.

# Connect to Milvus server using AsyncMilvusClient
async_client = AsyncMilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

Create schema

Currently, create_schema() is not available in AsyncMilvusClient. Instead, we will use MilvusClient to create the schema for the collection.

schema = async_client.create_schema(
    auto_id=False,
    description="This is a sample schema",
)

schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("dense_vector", DataType.FLOAT_VECTOR, dim=5)
schema.add_field("sparse_vector", DataType.SPARSE_FLOAT_VECTOR)
schema.add_field("text", DataType.VARCHAR, max_length=512)

AsyncMilvusClient calls the create_schema() method synchronously; therefore, you do not need to orchestrate the call using the event loop.

Create collection

Now we will use the schema to create a collection. Note that you need to prefix the await keyword to any call to the AsyncMilvusClient methods and place the call inside an async function as follows:

async def create_my_collection(collection_name, schema):
    if (client.has_collection(collection_name)):
        await async_client.drop_collection(collection_name)

    await async_client.create_collection(
        collection_name=collection_name,
        schema=schema
    )

    if (client.has_collection(collection_name)):
        print("Collection created successfully")
    else:
        print("Failed to create collection")
        
# Call the above function asynchronously 
loop.run_until_complete(create_my_collection("my_collection", schema))

# Output
#
# Collection created successfully

Create index

You also need to create indexes for all vector fields and optional scalar fields. According to the schema defined above, there are two vector fields in the collection, and you will create indexes for them as follows:

async def create_indexes(collection_name):
    index_params = client.prepare_index_params()

    index_params.add_index(field_name="dense_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="sparse_vector", index_type="AUTOINDEX", metric_type="IP")
    index_params.add_index(field_name="text", index_type="AUTOINDEX")

    await async_client.create_index(collection_name, index_params)

# Call the above function asynchronously 
loop.run_until_complete(create_indexes("my_collection"))

Load collection

A collection can be loaded after the necessary fields are indexed. The following code demonstrates how to load the collection asynchronously.

async def load_my_collection(collection_name):
    await async_client.load_collection(collection_name)
    print(client.get_load_state(collection_name))
    
# Call the above function asynchronously 
loop.run_until_complete(load_my_collection("my_collection"))

# Output
#
# {'state': <LoadState: Loaded>}

Insert data

You can use the embedding models available in pymilvus to generate vector embeddings for your texts. For details, refer to Embedding Overview. In this section, we will insert randomly generated data into the collection.

async def insert_sample_data(collection_name):
    # Randomly generated data will be used here
    rng = np.random.default_rng(42)

    def generate_random_text(length):
        seed = "this is a seed paragraph to generate random text, which is used for testing purposes. Specifically, a random text is generated by randomly selecting words from this sentence."
        words = seed.split()
        return " ".join(rng.choice(words, length))
    
    data = [{
        'id': i, 
        'dense_vector': rng.random(5).tolist(), 
        'sparse_vector': csr_matrix(rng.random(5)), 
        'text': generate_random_text(10)
    } for i in range(10000)]

    res = await async_client.insert(collection_name, data)

    print(res)

# Call the above function asynchronously 
loop.run_until_complete(insert_sample_data("my_collection"))

# Output
#
# {'insert_count': 10000, 'ids': [0, 1, 2, 3, ..., 9999]}

Query

After the collection is loaded and filled with data, you can conduct searches and queries in it. In this section, you are going to find the number of entities in the text field starting with the word random in the collection named my_collection.

async def query_my_collection(collection_name):
    # Find the number of entities with the `text` fields starting with the word "random" in the `my_collection` collection.

    res = await async_client.query(
        collection_name="my_collection",
        filter='text like "%random%"',
        output_fields=["count(*)"]
    )

    print(res) 
    
# Call the above function asynchronously   
loop.run_until_complete(query_my_collection("my_collection"))

# Output
#
# data: ["{'count(*)': 6802}"] 

In this section, you will conduct vector searches on the target collection’s dense and sparse vector fields.

async def conduct_vector_search(collection_name, type, field):
    # Generate a set of three random query vectors
    query_vectors = []
    if type == "dense":
        query_vectors = [ rng.random(5) for _ in range(3) ]
    
    if type == "sparse":
        query_vectors = [ csr_matrix(rng.random(5)) for _ in range(3) ]

    print(query_vectors)

    res = await async_client.search(
        collection_name="my_collection",
        data=query_vectors,
        anns_field=field,
        output_fields=["text", field]
    )

    print(res)
    
# To search against the dense vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "dense", "dense_vector"))

# To search against the sparse vector field asynchronously 
loop.run_until_complete(conduct_vector_search("my_collection", "sparse", "sparse_vector"))

The search output should list three sets of results corresponding to the specified query vectors.

A hybrid search combines the results of multiple searches and reranks them to get a better recall. In this section, you are going to conduct a hybrid search using the dense and sparse vector fields.

async def conduct_hybrid_search(collection_name):
    req_dense = AnnSearchRequest(
        data=[ rng.random(5) for _ in range(3) ],
        anns_field="dense_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    req_sparse = AnnSearchRequest(
        data=[ csr_matrix(rng.random(5)) for _ in range(3) ],
        anns_field="sparse_vector",
        param={"metric_type": "IP"},
        limit=10
    )

    reqs = [req_dense, req_sparse]

    ranker = RRFRanker()

    res = await async_client.hybrid_search(
        collection_name="my_collection",
        reqs=reqs,
        ranker=ranker,
        output_fields=["text", "dense_vector", "sparse_vector"]
    )

    print(res)
    
# Call the above function asynchronously  
loop.run_until_complete(conduct_hybrid_search("my_collection"))

Try Managed Milvus for Free

Zilliz Cloud is hassle-free, powered by Milvus and 10x faster.

Get Started
Feedback

Was this page helpful?