Metadata Querying Lab Solution
Familiarize yourself with Ray Datasets, `map_batches``, and the process of generating metadata and providing it to a vector store.
This lab is an opportunity to familiarize yourself with
map_batches
If you’re new to LLMs and Ray applications, focus on the core activities. If you’ve worked with LLMs and/or Ray before, you may have time to try the advanced activity.
Throughout this lab, we’re going to work with additional metadata for each of our documents. The metadata we’ll add here is simple: it’s just the length of the document. But we’ll see that having (and using) even trivial metadata like this allows us to improve our search results.
.map_batches
to generate embeddings. Add to the ouput of the existing processing operation a column that contains the length of each document. Hint: it will be another key-value pair in the dictionary representing the batch-processing outputmap_batches
API further)collection.query
to handle a where
condition that filters against the metadata. Hint: since we have length metadata, we can query for shorter or longer documentsIn this activity, we’re not trying to product any new output functionality.
But instead of modifying the existing actor class that generates the embeddings to also generate metadata, we’ll leave that code as-is.
add_metadata(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]
map_batches
step (with the new schema including doc, id, and embedding)map_batches
but the call will be simpler than the previous one, since we don’t have actors and Ray can handle scaling tasks on its own. We also don’t need to worry about GPUs for this operation or specifying batch size.to_numpy_refs
and then ray.get
one of those chunks of data, inspect it, and verify it has the same strucure as the actor-based implementationimport uuid
import chromadb
import numpy as np
import ray
from InstructorEmbedding import INSTRUCTOR
paras_ds = ray.data.read_text("/mnt/cluster_storage/around.txt", parallelism=4)
class DocEmbedderWithMetadata:
def __init__(self):
self._model = INSTRUCTOR('hkunlp/instructor-large')
def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
inputs = batch['text']
embeddings = self._model.encode(inputs, device='cuda:0')
ids = np.array([uuid.uuid1().hex for i in inputs])
lengths = np.array([len(i) for i in inputs])
return { 'doc' : inputs, 'vec' : embeddings, 'id' : ids, 'length' : lengths }
vecs = paras_ds.map_batches(DocEmbedderWithMetadata, compute=ray.data.ActorPoolStrategy(size=4), num_gpus=0.25, batch_size=64)
numpy_refs = vecs.to_numpy_refs()
dicts = ray.get(numpy_refs)
vecs = np.vstack([d['vec'] for d in dicts])
ids = np.hstack([d['id'] for d in dicts])
docs = np.hstack([d['doc'] for d in dicts])
metadatas = sum( [ [{'length' : int(length) } for length in d['length']] for d in dicts ], [])
chroma_client = chromadb.Client()
collection = chroma_client.get_or_create_collection(name="metadata_lab")
collection.upsert(
embeddings=vecs.tolist(),
documents=docs.tolist(),
ids=ids.tolist(),
metadatas=metadatas
)
model = INSTRUCTOR('hkunlp/instructor-large')
utah_query_vec = model.encode("Describe the body of water in Utah").tolist()
def results_with_and_without_length(query_vec, length):
where_filter = { "length": { "$gt" : length } }
results_without_length = collection.query(
query_embeddings=[query_vec],
n_results=3
)
results_with_length = collection.query(
query_embeddings=[query_vec],
n_results=3,
where=where_filter
)
return (results_without_length, results_with_length)
Adding this this specific metadata filter to this query does not make a difference…
results_with_and_without_length(utah_query_vec, 200)
Adding a metadata filter with this query does make a difference and improves the results:
bank_query = model.encode('bank robbery details').tolist()
results_with_and_without_length(bank_query, 200)
class DocEmbedder:
def __init__(self):
self._model = INSTRUCTOR('hkunlp/instructor-large')
def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
inputs = batch['text']
embeddings = self._model.encode(inputs, device='cuda:0')
ids = np.array([uuid.uuid1().hex for i in range(len(inputs))])
return { 'doc' : inputs, 'vec' : embeddings, 'id' : ids }
def add_metadata(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
lengths = np.array([len(i) for i in batch['doc']])
batch['length'] = lengths
return batch
vecs = paras_ds \
.map_batches(DocEmbedder, compute=ray.data.ActorPoolStrategy(size=4), num_gpus=0.25, batch_size=64) \
.map_batches(add_metadata)
numpy_refs = vecs.to_numpy_refs()
ray.get(numpy_refs[0])