Ray Serve Architecture
Take a peek under the hood at the architecture of Ray Serve to get a sense of how the internals of this distributed framework works.
from ray import serve
import requests, json
from starlette.requests import Request
from concurrent.futures import ThreadPoolExecutor
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import matplotlib.pyplot as plt
In Ray, user code is executed by worker processes. These workers can run tasks (stateless functions) or actors (stateful class instances).
Ray Serve is built on actors, allowing deployments to collect expensive state once (such as loading a ML model) and to reuse it across many service requests.
Although you may never need to code any Ray tasks or actors yourself, your Ray Serve application has full access to those cluster capabilities and you may wish to use them to implement other functionality (e.g., service or operations that don’t need to accept HTTP traffic). More information is at https://docs.ray.io/en/releases-2.6.1/ray-core/walkthrough.html
Under the hood, a few other actors are used to make up a serve instance.
Controller: A global actor unique to each Serve instance is responsible for managing other actors. Serve API calls like creating or getting a deployment make remote calls to the Controller.
HTTP Proxy: By default there is one HTTP proxy actor on the head node that accepts incoming requests, forwards them to replicas, and responds once they are completed. For scalability and high availability, you can also run a proxy on each node in the cluster via the location field of http_options.
Deployment Replicas: Actors that execute the code in response to a request. Each replica processes requests from the HTTP proxy.
Incoming requests, once resolved to a particular deployment, are queued. The requests from the queue are assigned round-robin to available replicas as long as capacity is available. This design provides load balancing and elasticity.
Capacity can be managed with the max_concurrent_queries
parameter to the deployment decorator. This value defaults to 100 and represents the maximum number of queries that will be sent to a replica of this deployment without receiving a response. Each replica has its own queue to collect and smooth incoming request traffic.
Roadmap: Production features
Each deployment can have its own resource management and autoscaling configuration, with several options for scaling.
By default – if nothing specified, as in our examples above – the default is a single. We can specify a larger, constant number of replicas in the decorator:
@serve.deployment(num_replicas=3)
For autoscaling, instead of num_replicas
, we provide an autoscaling_config
dictionary. With autoscaling, we can specify a minimum and maximum range for the number of replicas, the initial replica count, a load target, and more.
Here is example of extended configuration – see https://docs.ray.io/en/releases-2.6.1/serve/scaling-and-resource-allocation.html#scaling-and-resource-allocation for more details:
@serve.deployment(
autoscaling_config={
'min_replicas': 1,
'initial_replicas': 2,
'max_replicas': 5,
'target_num_ongoing_requests_per_replica': 10,
}
)
min_replicas
can also be set to zero to create a “serverless” style design: in exchange for potentially slower startup, no actors (or their CPU/GPU resources) need to be permanently reserved.
The LLM-based chat service is a good example for seeing autoscaling in action, because LLM inference is relative expensive so we can easily build up a queue of requests to the service. The autoscaler responds to the dynamics of queue sizes and will launch additional replicas.
@serve.deployment(ray_actor_options={'num_gpus': 0.5}, autoscaling_config={ 'min_replicas': 1, 'max_replicas': 4 })
class Chat:
def __init__(self, model: str):
self._tokenizer = AutoTokenizer.from_pretrained(model)
self._model = AutoModelForSeq2SeqLM.from_pretrained(model).to(0)
async def __call__(self, request: Request) -> dict:
data = await request.json()
data = json.loads(data)
return {'response': self.get_response(data['user_input'], data['history']) }
def get_response(self, user_input: str, history: list[str]) -> str:
history.append(user_input)
inputs = self._tokenizer('</s><s>'.join(history), return_tensors='pt').to(0)
reply_ids = self._model.generate(**inputs, max_new_tokens=500)
response = self._tokenizer.batch_decode(reply_ids.cpu(), skip_special_tokens=True)[0]
return response
chat = Chat.bind(model='facebook/blenderbot-400M-distill')
handle = serve.run(chat, name='autoscale_chat')
We can generate a little load and look at the Ray Dashboard
What do we expect to see?
def make_request(s):
return requests.post("http://localhost:8000/", json = s).json()
sample = '{ "user_input" : "Hello there, chatbot!", "history":[] }'
make_request(sample)
executor = ThreadPoolExecutor(max_workers=32)
results = executor.map(make_request, ['{ "user_input" : "Hello there, chatbot!", "history":[] }'] * 128)
serve.delete('autoscale_chat')
Many services – especially services that rely on neural net models – can produce higher throughput on batches of data.
At the same time, most service interfaces or contracts are based on a single request-response.
Ray Serve enables us to meet both of those goals by automatically applying batching based on a specified batch size and batch timeout.
@serve.deployment()
class Chat:
def __init__(self):
self._message = "Chatbot counts the batch size at "
@serve.batch(max_batch_size=10, batch_wait_timeout_s=0.01)
async def handle_batch(self, request_batch):
num_requests = len(request_batch)
return [ {'response': self._message + str(num_requests) } ] * num_requests
async def __call__(self, request: Request) -> dict:
data = await request.json()
data = json.loads(data)
return await self.handle_batch(data)
chat = Chat.bind()
handle = serve.run(chat, name='batch_chat')
results = executor.map(make_request, ['{ "user_input" : "Hello there, chatbot!", "history":[] }'] * 100)
batches = [int(resp['response'].split(' ')[-1]) for resp in results]
plt.hist(batches)
serve.delete('batch_chat')
Serve provides some fault tolerance features out of the box
End-to-end fault tolerance by running Serve on top of KubeRay or Anyscale
While Ray can start/restart/scale worker processes, KubeRay and Anyscale provide the ability to recover nodes, provision additional nodes from a resource pool, cloud provider, etc.
For use in production, Serve includes a CLI with commands to deploy applications, check them, update them, and more
! serve status
handle = serve.run(chat, name='batch_chat')
! ray status
! serve status
serve.shutdown()
While deployments can be reconfigured in-place and hot-redeployed, those updates will trigger an update of all deployments within the application.
In large, complex applications, you may want to share a single Ray cluster and make updates to individual components, but not redeploy the entire set of services. For those use cases, Ray Serve allows you do define multiple applications.
This collection of applications