Syntax Highlighter

Tuesday, January 4, 2011

How fast can you add more worker nodes?

There is a very important, and often overlooked, question when it comes to the scalability of the system: How fast does the system scale? The main question is still how will the system scale in the first place. But in the age of Internet powered applications that revolves around sudden, unexpected peaks of traffic, just answering this question is not good enough. The system doesn't just need to be able to scale, but to also scale instantaneously.

The producer-consumer programming model is a good example of a software design that has a very simple conceptual path to scaling. The model basically consists of three parts: A Producer that creates work, a Queue that stores a backlog of pending work the producer has created, and finally a Worker that takes the pending work from the queue and executes it.



Take for example uploading a video to YouTube. The producer would be the web server that creates the work of "process this video file". A worker then takes the next video file to process, and performs all the codec conversions, formatting, etc. on the video file. Once it is done, it picks the next waiting video file from the queue. Suppose suddenly a couple hundred people decide to all upload their video at once. The work queue would get huge, and the system will start to get bogged down. However, more workers can be added to the system in order to scale it up to handle the increased load. Essentially the system's throughput is proportional to the number of workers.


Here is a quick video that demonstrates the scalable nature of the producer-consumer model. There is a producer node that creates jobs that will take a worker ~ 5 seconds to complete. The producer creates, or submits, 2 jobs every second. Obviously this is more than the single worker can handle, so more workers are added to the system to keep up to pace with the producer node, and to tackle the backlog of jobs.


There are two main virtual machines in the video: A producer machine and the worker machine. The producer machine is running a Django application, which is used to visualize the system, RabbitMQ, which is an AMQP system used to support the producer-consumer model, and finally a simple shell loop that hits the Django application to submit jobs to the work queue. Here is the code snippet from the the Django view.py that handles the job submission:

from django.shortcuts import render_to_response
from prdconapp.models import Job, Server

# pika is a simple AMQP library that is used to interact with the
# RabbitMQ instance.
import pika

def give(request, vm_id, num_jobs):
    """
    This gives jobs to the system. It will create num_jobs more
    jobs and submit them to the queue, and update the 
    visualization state with these jobs. The vm_id is just a 
    string to indicate what machine is giving these jobs.
    """
    
    # Get a connection of the queue instance called "task_queue" 
    # that is running on the local host.
    connection = pika.AsyncoreConnection(
            pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='task_queue')
    
    # Submit the desired number of jobs to RabbitMQ.
    for i in range(int(num_jobs)):
        j = Job(owner=vm_id, status='pending')
        j.save()
        message = "Job id: " + str(j.id)
        channel.basic_publish(exchange='', 
                routing_key='task_queue',
                body=message, 
                properties=pika.BasicProperties(
                delivery_mode = 2, # make message persistent
            ))


    return render_to_response("index.html", job_dict())


The worker virtual machine simply runs a single python script that takes the next job from the RabbitMQ queue, updates the Django application visualization, runs the job, and then loops to take the next job. When the Copper platform does the live-cloning of the virtual machine, this same python script is already automatically running in each of the cloned machines. This is how the new workers are able to connect to the RabbitMQ system, and start pulling off the next pending job. Here is the worker code:
import time
import urllib2
import sys

# pika is a simple AMQP library that is used to interact with the
# RabbitMQ instance.
import pika

# The gridcentric library binding used to get some information
# about the virtual machine on which this script is running.
from gridcentric import guest as gc

# The host that has the RabbitMQ instance. It is also the host
# running the Django app we need to update.
host = '192.168.1.80'

# We use the gridcentric library to get the unique vmid of this
# machine. This is used to determine if we are a cloned virtual 
# machine, or the original master machine.
VMID = gc.vmid()

# Start a connection to the RabbitMQ server and listen to the 
# task_queue queue.
connection = pika.AsyncoreConnection(
        pika.ConnectionParameters(host=host),
        True,
        pika.connection.SimpleReconnectionStrategy())
channel = connection.channel()
channel.queue_declare(queue='task_queue')

print ' [*] Waiting for messages. To exit press CTRL+C'

# This is the call back function that gets called when this 
# worker receives a message from the server. It essentially 
# updates the visualizer, and runs the task.
def callback(ch, method, header, body):
    # Use the gridcentric library to get the IP address of 
    # this machine in the cluster's private virtual network. 
    # We will use this to uniquely identify this machine in
    # the visualization.
    ip = gc.my_ip()
    
    print " [%s] Received %r" % (ip,body,)
    job_id = body.split(":")[1].strip()
    try:
        # Update the visualizer to tell it that we have taken
        # the job and are going to be running it.
        urllib2.urlopen("http://%s:8000/prdcon/take/%s/%s" % 
                (host, ip, job_id) )

        # All jobs themselves are just sleep jobs. We could 
        # decode the message, and run the job, or better yet 
        # use something like Celery that does all of this for 
        # us. For the purposes of the demo, sleeping 5 seconds 
        # should be fine.
        time.sleep(5)

        # Update the visualizer to tell it that we have completed
        # the job.
        urllib2.urlopen("http://%s:8000/prdcon/done/%s/%s" % 
                (host, ip, job_id) )
        print " [x] Done"
    except Exception:
        pass

    # Tell the RabbitMQ server that we have processed the message, 
    # and we are ready for another one.
    ch.basic_ack(delivery_tag = method.delivery_tag)

    # If we detect that our VMID has changed, we basically want 
    # to reconnect back to the RabbitMQ server because we are a 
    # new machine. This is a bit of a hack to get a demo worker, 
    # but basically the worker will exit. This is why we run the
    # worker in a while loop in the shell.
    if gc.vmid() != VMID:
        sys.exit(0)

# Tell the RabbitMQ server that we only want to receive a single 
# message at a time.
channel.basic_qos(prefetch_count=1)

# Register the callback function.
channel.basic_consume(callback,
                      queue='task_queue')

# Just keep looping and waiting for new messages to process.
pika.asyncore_loop()


The producer-consumer model is a straightforward design that has many simple semantics that make it easy to reason about how it will scale. But, as I mentioned in the beginning of this post, the mere fact that this model can scale is not good enough. The next, and almost equally important question, is how fast does it scale?

Let's take a look at how long it would take to scale the producer-consumer programming model throughout time:
  • Days: In the 1980s adding new workers to the system would be in the magnitude of days because it was a very manual process, both physically setting up the computer and manually configuring it. Taking days to scale an application is simply not good enough.
  • Hours: In the 1990s ghosting computer images enabled computers to be preconfigured, however they still needed to be physically setup and it was still a big manual process. Scaling in hours is better, but it is still not able to capture a lot of potential.
  • Minutes: In the 2000s virtualization and cloud computing started to become more main stream removing a lot of the manual process in setting up new workers. This has helped enabled the current explosion in Internet based applications, but there is still a window of lost potential as the system scales up.
  • Seconds: GridCentric's Copper platform can add functioning workers to the system an order of magnitude better than everyone else. With delays of only seconds, the system has become a lot more responsive, and reactive, to peaks in demand. Now the window of lost potential has gone from minutes, down to seconds. The added bonus is it is often simpler, and easier, to add workers using the Copper platform, than using other platforms that can only accomplish the task in minutes.
Here is the video again, and as you watch it remember to ask yourself how fast can you add workers to your scalable system, and what are you losing as you scramble to react to a temporary peak in demand? 


0 comments:

Post a Comment