Syntax Highlighter

Friday, May 27, 2011

Creating VMs with JavaScript and Node

Seriously, JavaScript?

Node is a framework built around the speedy V8 JavaScript interpreter. It enables you to use JavaScript to create back-end services for your web application (or any other application). I/O is based around the same event-driven model that is familiar to front-end JavaScript programmers. Putting aside any claims about the inherent performance benefits of that model, it's powerful to use the same language in all parts of a web application and I have no doubt that Node will grow to stand next to the big players (Rails, Django, Struts) in the world of web frameworks.

Copper is focused on enabling dynamic and flexible services on the back-end. We empower programmers and administrators with a programmatic model for scaling applications, embedded within the control flow of the applications. Given that Node is an upcoming web framework, a few weekends ago I embarked on the fun exercise of creating some GridCentric V8 bindings.

When I co-founded a systems company a couple years ago I would never have guessed that I'd write a line of JavaScript, but technology brings us to unexpected places.

This post has two simple goals.

  1. To enable GridCentric API calls in Node applications.
  2. To use those bindings in a nifty demo, creating VMs on-demand to scale an application.

For a nifty demo, I've chosen to take the standard Node chat demo and make it scale automatically as users join chat rooms, using only about a hundred lines of JavaScript.

Building the extension

To skip the details of building the extension, click here to jump straight to the demo.

Starting points for V8

If you want to write a Node extension, without a doubt the best place to start is the useful blog post from cloudkick. Figuring out where to go next once you've exhausted their example however, is a bit tricky. I found that the V8 embedder's guide was mostly useless, but maybe you'll have a different experience. I think the best way to learn more is by looking at the Node source and the source for other native extensions (a relatively complete list of extensions can be found here).

Synchronous bindings

Let's start with the easy stuff.

Simple, synchronous function bindings for Node are quite straight-forward. Many of our API functions can be considered non-blocking (reading a value from the kernel, equivalent to a system call) so they are quite simple to wrap. For example, the C function below will return the current vmid.

#include <gridcentric/gc-guest.h>

int func() {
  return gc_vmid();
}
In the JavaScript world, we really want to this look like:
var gridcentric = require("gridcentric");
var vmid = gridcentric.vmid();
To get those semantics, we can take the C example above and wrap it in some V8 voodoo.
#include <node/v8.h>
#include <node/node.h>

#include <gridcentric/gc-guest.h>

using namespace node;
using namespace v8;

static Handle<Value> VmId(const Arguments& args)
{
    HandleScope scope;
    Local<Integer> result = Integer::New(gc_vmid());
    return scope.Close(result);
}
The V8 function is complete.

Before we have a usable extension however, we must bind this function appropriately within the native module. When Node loads a native extension, it executes the init symbol, passing in a variable representing the scope created for the module. Using a FunctionTemplate wrapper, we define an init† symbol that does the appropriate binding within the module.

extern "C" {
  void init (Handle<Object> target)
  {
    Local<FunctionTemplate> vmid = FunctionTemplate::New(VmId);
    target->Set(String::NewSymbol("vmid"), vmid->GetFunction());
  }
}
The init function must be wrapped in an extern "C" declaration to prevent g++ from name mangling.

Almost done -- we only need the build script. To build the module with our vmid function, we first create a wscript file (assuming that our source file is src/gridcentric.cc) used by the Node build tool node-waf.

def set_options(opt):
  opt.tool_options("compiler_cxx")

def configure(conf):
  conf.check_tool("compiler_cxx")
  conf.check_tool("node_addon")

def build(bld):
  obj = bld.new_task_gen("cxx", "shlib", "node_addon")
  obj.cxxflags = ["-Wall"]
  obj.ldflags = ["-lgridcentric"]
  obj.target = "gridcentric"
  obj.source = "src/gridcentric.cc"
Finally, we run node-waf configure && node-waf build to build our extension.

We now have a basic gridcentric module, and the following code works.

var gridcentric = require("./build/default/gridcentric");
var vmid = gridcentric.vmid();
console.log("My vmid is " + vmid);

Now we can move on to adding some meat to the module.

Understanding non-blocking operations

Node event-driven semantics require that any function doing significant work (i.e. I/O) be structured using callbacks. Much of the heavy-lifting of a binding is caused by the need to restructure calls to your library using the callback mechanism provided by Node.

This can be a bit of a pain, but fortunately many of the functions in our guest bindings were well-suited to the asynchronous callback style required by Node (and I think that it's generally not too difficult to find a nice mapping). For example, the request ticket operation may take a few hundred milliseconds to make the round-trip to the scheduler, allocate the requested resources and return the result. Similarly, the clone operation may take seconds, but in a complex control flow you'll likely need to be doing other things during that time.

With our C bindings, the request ticket function call looks like:

#include<gridcentric/gc-guest.h>
...
gc_uuid_t ticket;
if( gc_request_ticket(1, 1, 1, 1000, &ticket) < 0 ) {
   perror("Couldn't request ticket");
}
If we were to translate directly into JavaScript using a synchronous style this might look like:
var gridcentric = require('gridcentric');
...
ticket = gridcentric.request_ticket(1, 1, 1, 1000);
if( ticket ) {
  console.log("Successfully allocated ticket " + ticket + ".");
} else {
  console.log("Unable to allocate ticket in 1000 milliseconds.");
}
But because the request_ticket operation will block up to 1000 milliseconds in this case, this function doesn't conform to the non-blocking semantics required by Node.

Instead, we must structure this function to use an asynchronous callback when the ticket request is completed, as follows:

var gridcentric = require('gridcentric');
...
gridcentric.request_ticket(1, 1, 1, 1000, function(ticket) {
  if( ticket ) {
    console.log("Successfully allocated ticket " + ticket + ".");
  } else {
    console.log("Unable to allocate ticket in 1000 milliseconds.");
  }
});
Notice that we pass in a function as the last parameter. This function will be called asynchronously with the return value of the request_ticket function after it has completed. We don't have any guarantees about when that function will be executed.

Once this style is adopted for all functions, we can easily see how to chain operations using closures. For example, in order to fork() the VM we can extend the above:

var gridcentric = require('gridcentric');
...
gridcentric.request_ticket(1, 1, 1, 1000, function(ticket) {
  if( ticket ) {
    console.log("Successfully allocated ticket " + ticket + ".");
    gridcentric.clone(ticket, function(vmid) {
       if( vmid > 0 ) {
           console.log("On a clone VM.");
       } else if( vmid == 0 ) {
           console.log("Still on the master VM.");
       } else {
           console.log("Error during clone operation.");
       }
    });
  } else {
    console.log("Unable to allocate ticket in 1000 milliseconds.");
  }
});

Implementing callbacks

Before implementing these functions, you'll notice above that my simple example did not require any arguments. The first thing that I will do is define a number of processor macros to sanity check passed in arguments.

#define REQUIRE(I, ISTYPE, CASTTYPE, NAME)                     \
  if( args.Length() <= (I) || !args[I]->Is##ISTYPE() )         \
    return ThrowException(Exception::TypeError(                \
      String::New("Argument " #I " must be a " #ISTYPE "."))); \
  Local<CASTTYPE> NAME = Local<CASTTYPE>::Cast(args[I]);

#define REQUIRE_STRING(I, NAME) \
        REQUIRE(I, String, String, NAME)
#define REQUIRE_INTEGER(I, NAME) \
        REQUIRE(I, Number, Integer, NAME)
#define REQUIRE_FUNCTION(I, NAME) \
        REQUIRE(I, Function, Function, NAME)

We could now implement a no-op request ticket function that takes the appropriate arguments.

static Handle<Value> RequestTicket(const Arguments& args)
{
    REQUIRE_INTEGER(0, maxcpus);
    REQUIRE_INTEGER(1, minvms);
    REQUIRE_INTEGER(2, mincpuspervm);
    REQUIRE_INTEGER(3, timeout);
    REQUIRE_FUNCTION(4, cb);
    return Undefined();
}
It sanity-checks it's input. Now it needs to do something.

Node uses libeio as the basis for its thread pool (which, assuming you are not working with raw file descriptors and sockets, you will likely be using). To use libeio, you schedule two functions for future execution: one that does the work and one which will be called when the work is completed. The function called when the work is completed will be executed in the main thread, so it needs to be quick. You are also permitted to pass an opaque pointer, which will be (indirectly) passed to each of the two functions.

For our example below, we will first define a new class that we can use as an opaque pointer. This class will hold all data related to the ticket request, the callback function passed in, and the return value to be given. Since we've going to have three functions involved: the one called by V8, the one scheduled by libeio and the one executed after the work is complete, this class will be used to pass around shared information to each of them.

We will also declare two functions ahead of time that will use for our libeio work, EIO_RequestTicket and EIO_Post.

class CallbackData
{
public:
    Handle<Value> This;      // The this scope we were called in.
    Persistent<Function> cb; // The callback function passed.
    Handle<Value> rval;      // The return value to be given.

    // The parameters required for request_ticket.
    int maxcpus;
    int minvms;
    int mincpuspervm;
    int timeout;
}

static int EIO_RequestTicket(eio_req* req);
static int EIO_Post(eio_req *req);
We use the This variable to track the scope, cb to record the callback the user passes in and rval to store the return value once the work is done. The rest of the parameters are required for the actual ticket request.

Given these declarations, the actual RequestTicket function is straight-forward.

static Handle<Value> RequestTicket(const Arguments& args)
{
    REQUIRE_INTEGER(0, maxcpus);
    REQUIRE_INTEGER(1, minvms);
    REQUIRE_INTEGER(2, mincpuspervm);
    REQUIRE_INTEGER(3, timeout);
    REQUIRE_FUNCTION(4, cb);

    // Create the opaque pointer.
    CallbackData *data = new CallbackData();

    // Set the scope variable (in case its needed).
    data->This = args.This();

    // Set the parameters associated with the ticket request.
    data->maxcpus = maxcpus->Value();
    data->minvms = minvms->Value();
    data->mincpuspervm = mincpuspervm->Value();
    data->timeout = timeout->Value();

    // Save the passed callback.
    data->cb = Persistent<Function>::New(cb);

    // Schedule the EIO functions to be run.
    eio_custom(EIO_RequestTicket, EIO_PRI_DEFAULT, EIO_Post, data);
    ev_ref(EV_DEFAULT_UC);

    return Undefined();
}
As required, it doesn't do any real work. It allocates the opaque data pointer (the CallbackData class we defined), schedules the work in the thread pool (lines 15 and 16), and returns Undefined() immediately.

All that remains is for us to actually implement the missing functions.

The first EIO_RequestTicket does the work required (called gc_request_ticket) and sets the return value (rval) in the opaque pointer.

static int EIO_RequestTicket(eio_req* req)
{
    CallbackData *data = static_cast<CallbackData*>(req->data);
    gc_uuid_t uuid;

    if( gc_request_ticket(
            data->maxcpus, data->minvms, data->mincpuspervm,
            data->timeout, &uuid) < 0 ) {
        // Set the result to undefined.
        data->rval = Undefined();
    } else {
        // Save the resulting ticket as a string.
        data->rval = String::New(uuid.value);
    }

    return 0;
}
The second function, takes the given opaque pointer, creates a V8 array using the return value and calls the callback function that was passed in as a argument. This will also not block.
static int EIO_Post(eio_req *req)
{
    CallbackData *data = static_cast<CallbackData*>(req->data);
    ev_unref(EV_DEFAULT_UC);
    Local<Value> argv[1] = { *(data->rval) };
    TryCatch try_catch;
    data->cb->Call(Context::GetCurrent()->Global(), 1, argv);
    if (try_catch.HasCaught()) {
        FatalException(try_catch);
    }
    data->cb.Dispose();
    delete data;
    return 0;
}
That's it! All that's left to do is to bind the RequestTicket function appropriately within the extension (see vmid example above), then our asynchronous request ticket function will be working like a charm.

Wrapping objects

Some of the GridCentric API functions return more complex structures. Although I would recommend mapping values to V8 primitives wherever possible, the need may arise to return more complex JavaScript objects.

After negative experiences with wrapped objects in V8, I think that unless you require complex interactions with the JavaScript world -- you can return complex objects as simple JavaScript Objects (i.e., no prototype). Below is my example for creating a TicketInfo object.

#define SET_VALUE(VAR, NAME, TYPE, VAL) \
    VAR->Set(String::New(NAME), TYPE::New(VAL))
#define SET_INTEGER(VAR, NAME, VAL) \
        SET_VALUE(VAR, NAME, Integer, VAL)
#define SET_STRING(VAR, NAME, VAL) \
        SET_VALUE(VAR, NAME, String, VAL)

class TicketInfo {
public:
    static Handle<Object> Create(gc_ticket_info_t info)
    {
        HandleScope scope;
        Local<Object> obj = Object::New();
        SET_STRING(obj, "id", info.id.value);
        SET_STRING(obj, "status",
           gc_ticket_status_string(info.status));
        SET_INTEGER(obj, "cpus", info.cpus);
        SET_INTEGER(obj, "vms", info.vms);
        SET_INTEGER(obj, "mincpuspervm", info.mincpuspervm);
        return scope.Close(obj);
    }
};

Pre-processor tricks and gotchas

If you look at the source for my extension on bitbucket, you'll see that I didn't explicitly define separate classes and functions for each of the callbacks. Due to the repetitive nature of the wrapping, I wrapped most of the callback code into hacky pre-processor macros.

I also encountered one annoying gotcha while building the Node extension. During the clone operation, libgridcentric executes the scripts at /etc/gridcentric/pre-clone and /etc/gridcentric/post-clone. This execution is simple. Here's some pseudo-C.

pid_t child = fork();
if( !child ) {
  exec(script);
} else {
  int rc = waitpid(child,...);
}
When executed from within Node, the waitpid fails with return value -1 and causes the clone operation to be aborted if there is an /etc/gridcentric/pre-clone script. Why? Ostensibly, the waitpid fails because a different part of Node gobbles up all child processes and their associated return values. Presumably this is prevent Zombie processes, but it's not a great solution. The workaround for the gridcentric extension is to remove these scripts, but then you lose this functionality.

The application

Enabling the GridCentric API in an application running on our platform allows it to dynamically scale horizontally by requesting resources and cloning itself, much in the same way fork() works in UNIX. The cloning operation is handled transparently from under the VM in seconds, with state magically propagated. With a Node application, our stack will look something like this.

With the bindings I've just built, this operation in JavaScript looks like this.

var gridcentric = require("gridcentric");
gridcentric.request_ticket(1, 1, 1, 1000, function(ticket) {
  if( ticket ) {
    gridcentric.clone(ticket, function(vmid) {
      if( vmid < 0 ) {
        console.log("There was an error.");
      } else if( vmid == 0 ) {
        console.log("I'm on the original VM.");
      } else {
        console.log("I'm on a clone with id " + vmid + ".");
      }
    });
  } else {
    console.log("Unable to allocate resources.");
  }
});

Service structure

More logic is required to scale a service than simply cloning a VM. To scale any service horizontally, you'll need to implement same kind of proxy or load-balancing mechanism.

Using the completed bindings, I created an autoscale.js module which turns the master VM into a proxy (based on this) and routes requests to clones which are created automatically. In other words, it turns a regular Node application into a auto-scaling service.

In this case, we create a new VM for every two active users we have and don't synchronize state across different VMs (think of it as a Node chat roulette -- only with cloning VMs).

More specifically, the autoscale.js implements the following simple algorithm.

  • Every second, we fetch the list of clone domains and store their IPs in a global array.
  • This information is used by the proxy to route incoming connections.
  • If there is less than one clone for every two active connections, we create the appropriate number of clones.
  • Obviously, this is kind of a bold (ridiculous) metric for measuring load and scaling the system.
  • When a new connection arrives, it is mapped to the latest clone VM.
  • We could use a number of more reasonable strategies here, such as round robin, least-loaded, random. The last clone heuristic is actually quite silly, but allows for a deterministic demo.

Integration

To leverage this service, I modified the chat demo to use the auto-scaling module. This required adding the following lines at the bottom of the server.js file:

setTimeout(function() {
  as = require("./autoscale");
  as.setup(80, PORT);
}, 3000);
I add the 3 second delay so that the service can get started before the first clone operation.

The following video is a quick demo of the service. I log on to the auto-scaling chat service with five different users. Because the service has been configured to create new VMs for every two users, the five users in the demo are routed to three different VMs that are created on-demand, in the span of seconds. Enable annotations for notes during the video.

Caveats

Much like Node chat, the demo described here is not intended to be a serious service. Were you to add auto-scaling to a real application, you'd definitely need to do a better job of tracking active hosts, synchronizing necessary state across slaves and handling errors in general.

It's worth pointing out however, that this is miles easier with the gridcentric extension than in the case where you have to provision new VMs from scratch. Provisioning from scratch, you'll likely need to involve lots of languages and tools (init scripts, chef or puppet, configuration files, proxies, synchronization servers) before you even touch the application. The semantics of clone() give the programmer a very powerful primitive on top of which they can build reliable distributed services. Plus, it's pretty awesome.

Do-it-yourself

If you have a Copper installation, feel free to install the bindings from NPM and try them out for yourself. There are likely a few bugs, but I'd love to hear feedback or complaints. You will need to have the gc-guest-base package installed in the VM to provide libgridcentric, then the module can installed simply:

$ npm install gridcentric
gridcentric@0.0.1 ./node_modules/gridcentric

If you want to dig more into the bindings (or steal macros -- please go ahead), the full source is available here. This source also includes the simple autoscale.js module used above.

Enjoy!

Thursday, March 17, 2011

Infrastructure performance

There's a lot of FUD surrounding private cloud. Without getting in to definitions, I figured I would add a bit of noise and have some fun with infrastructure performance in the process.

Cloud vs Claude

I'm a virtualization guy -- I know that cloud can be interpreted a lot of ways, but for now I'm using it to describe Infrastructure-as-a-Service (IaaS). Public cloud means public infrastructure providers such as Amazon's EC2 or Rackspace's cloud.

From my perspective, public IaaS offerings fundamentally provide two forms of value:

  1. Outsourcing of infrastructure costs (hardware, management, etc.)
  2. Cloud technology as a platform (APIs, billing, image libraries, etc.)

It's clear that the value of the first item is fully realized by traditional (non-cloud) hosting and co-location services. Yet, based on how these companies are scrambling to release cloud offerings, the cloud providers must be killing these shops in the market. It follows that there must be something more than just vapor to that second point. Hourly billing alone is not driving people to choose cloud providers over traditional hosts.

Private Claude

That brings us to the definition of private cloud. It's what you get when you have cloud technology running on your own infrastructure -- no outsourcing of hardware. But WTF is cloud technology? While virtualizing your infrastructure buys you a lot of agility, there's definitely more to it than just virtualization. Here's my take.

The key difference between vanilla virtualization and cloud is automation.

Automation isn't just one thing -- there are lots of ways to automate. Automated provisioning means that you can deploy virtual machines based on standard templates with the click of a button (an or API call). Automated billing means that resources are accounted for and reports are automatically generated or fed-back into a billing system. Automated scaling means that services are architected to automatically grow and shrink their footprints over time, using infrastructure APIs.

To me, automation is a fundamental tenet of cloud technology.

So where does private cloud make sense? In environments where automation makes sense.

Automation sees return on investment when things change often: workloads, requirements, environments, usage. These environments are very common, but not in every business. Technical computing, content creation, service providers and hosting all may have demanding and unpredictable workloads but may also have strict data requirements (legal or bandwidth) that prevent them from using public infrastructure. Private cloud makes immediate sense here.

If you're not using hosted infrastructure today but are faced with dynamic workload and automation needs, then it's likely that cloud is a revolution that will change how you compute in your own datacenter.

Where do we come in?

Our technology transforms how applications interact with infrastructure, simplifying automated provisioning and scaling.

Instead of requiring virtual machine templates to be stored and maintained as complete disk images, Copper enables on-demand cloning of live, running virtual machines in real-time via a powerful API. One second you have a single server, the next you have dozens -- and each server is aware of its unique ID allowing complex, distributed services to be created easily.

You provision virtual machines in seconds. So what? I can deploy a new server on <favorite platform> in just a few minutes. That's good enough, I've never needed anything better.

The above remark is a classic (and ludicrous) response I occasionally get when I explain what our technology does. It's a total straw man. That's good enough for what you do today because you've never been able to do anything more. It's like saying that you'll never need a hover car, or that "640Kb [of RAM] ought to be enough for anybody".

Taking minutes to provision an additional virtual machine means that you need to be able to predict fluctuations in demand and provision in anticipation of load. It also means that if you want to run a large-scale parallel computation that can take as little as a few minutes given a few hundred machines then you'll probably wait many, many times the required time or most of the time and cost in your analysis will consist of deploying virtual machines instead of actually running code.

Being able to scale in seconds changes the name of the game. Software can predict and react in response to demand, in real-time. That's the basis for our upcoming hosted cloud offering, RiotCloud, and our powerful grid queueing solution GCQ.

Taking it on the road

If automation is a fundamental tenet of cloud technology, then measuring the speed at which infrastructure software reacts to programmatic automation is a logical next step. I'm a massive fan of Top Gear, despite having a complete lack of interest in cars. After realizing that this was quite normal, I thought it would be fun to do a Top Gear tribute by boasting about our raw infrastructure performance.

The below tests were performed on our test track cluster by our tame race driver infrastructure programmer, the Stig Steve, seen here. Some say that Steve's brain, if hooked up to an IPv6 backbone, is capable of routing over 50GiB per second.

Acceleration

A fun test for infrastructure software is how quickly it allows an application to scale, which is tantamount to how fast new machines can be provisioned and booted. A faster time-to-scale means smoother upgrades, fewer resources wasted transferring state and an overall more dynamic infrastructure. For example, consider scaling an analysis application (like Hadoop) to perform a time-critical computation. Our software requires zero time to create and bundle virtual machine templates (it's not necessary), so for us this is simply a matter of how quickly we can scale a running virtual machine. In fact, everything here is done within the virtual machine being scaled, showing the power of our API.

Before Steve did his thing, we were lucky enough to have Jeremy Clarkson of Top Gear take our software for a spin and give his first impressions.


Don't worry: he's fine now, just a little shaken up.
Also, that's obviously not really Jeremy Clarkson.

In the tests performed by the our tame infrastructure programmer, we found a blazing average time of just a little over 9 seconds to scale from a single virtual machine to sixty-one, making repurposing infrastructure a staggeringly fast operation. Remember, that when Copper clones virtual machines, they are already running with all the necessary application state -- 9 seconds is everything, there is no boot time.

Cornering

Although acceleration is important, a huge factor in how fast you can make it around the track is how your machine can handle corners. Application cornering is a lot like cornering in a race car: you need to slow down in one direction, then speed up in another.

The below video shows Steve apexing a nearly 180 degree corner. Our infrastructure starts off running one application with over one hundred virtual machines, destroys them all, then scales a second application out to over one hundred virtual machines. This process takes less than 20 seconds.




The fine print

These demos show the power and speed of dynamic infrastructure, but I'm glossing over a few details. When you're scaling applications (in or out), they'd better be ready for it. Although a lot of applications can be adapted, there are plenty of frameworks that fit the bill already, especially in spaces accustomed to dynamic workloads.

To give some concrete examples of what they might be, suppose that application one is for distributed data crunching (like SETI@Home or a batch processing system) and application two does distributed web load testing (like Selenium Grid). In that case, cornering might consist of putting aside the opportunistic analysis that we have our infrastructure doing in order to load test the latest website release.

When provisioning is so simple and takes seconds, we can easily imagine building new infrastructures that react and scale like never before. Applications can scale in response to load or needs in real-time. The example above, where infrastructure is taken over at the flick of a switch, could easily happen every night, every day, every hour or even every time some stupid developer (or umm... CTO) pushes code and breaks the build.

That's it folks!

In other news, we've recently released Copper 1.4.2 and updated all the guest images available include Debian 6.0, Fedora 12, Fedora 13, and CentOS 5.5. Happy updating.

I also want to add that although we're excited to work every day at changing the world with innovative virtualization software, our team's thoughts and hopes are with those affected by the tragedy in Japan.

Tuesday, January 4, 2011

How fast can you add more worker nodes?

There is a very important, and often overlooked, question when it comes to the scalability of the system: How fast does the system scale? The main question is still how will the system scale in the first place. But in the age of Internet powered applications that revolves around sudden, unexpected peaks of traffic, just answering this question is not good enough. The system doesn't just need to be able to scale, but to also scale instantaneously.

The producer-consumer programming model is a good example of a software design that has a very simple conceptual path to scaling. The model basically consists of three parts: A Producer that creates work, a Queue that stores a backlog of pending work the producer has created, and finally a Worker that takes the pending work from the queue and executes it.



Take for example uploading a video to YouTube. The producer would be the web server that creates the work of "process this video file". A worker then takes the next video file to process, and performs all the codec conversions, formatting, etc. on the video file. Once it is done, it picks the next waiting video file from the queue. Suppose suddenly a couple hundred people decide to all upload their video at once. The work queue would get huge, and the system will start to get bogged down. However, more workers can be added to the system in order to scale it up to handle the increased load. Essentially the system's throughput is proportional to the number of workers.


Here is a quick video that demonstrates the scalable nature of the producer-consumer model. There is a producer node that creates jobs that will take a worker ~ 5 seconds to complete. The producer creates, or submits, 2 jobs every second. Obviously this is more than the single worker can handle, so more workers are added to the system to keep up to pace with the producer node, and to tackle the backlog of jobs.


There are two main virtual machines in the video: A producer machine and the worker machine. The producer machine is running a Django application, which is used to visualize the system, RabbitMQ, which is an AMQP system used to support the producer-consumer model, and finally a simple shell loop that hits the Django application to submit jobs to the work queue. Here is the code snippet from the the Django view.py that handles the job submission:

from django.shortcuts import render_to_response
from prdconapp.models import Job, Server

# pika is a simple AMQP library that is used to interact with the
# RabbitMQ instance.
import pika

def give(request, vm_id, num_jobs):
    """
    This gives jobs to the system. It will create num_jobs more
    jobs and submit them to the queue, and update the 
    visualization state with these jobs. The vm_id is just a 
    string to indicate what machine is giving these jobs.
    """
    
    # Get a connection of the queue instance called "task_queue" 
    # that is running on the local host.
    connection = pika.AsyncoreConnection(
            pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='task_queue')
    
    # Submit the desired number of jobs to RabbitMQ.
    for i in range(int(num_jobs)):
        j = Job(owner=vm_id, status='pending')
        j.save()
        message = "Job id: " + str(j.id)
        channel.basic_publish(exchange='', 
                routing_key='task_queue',
                body=message, 
                properties=pika.BasicProperties(
                delivery_mode = 2, # make message persistent
            ))


    return render_to_response("index.html", job_dict())


The worker virtual machine simply runs a single python script that takes the next job from the RabbitMQ queue, updates the Django application visualization, runs the job, and then loops to take the next job. When the Copper platform does the live-cloning of the virtual machine, this same python script is already automatically running in each of the cloned machines. This is how the new workers are able to connect to the RabbitMQ system, and start pulling off the next pending job. Here is the worker code:
import time
import urllib2
import sys

# pika is a simple AMQP library that is used to interact with the
# RabbitMQ instance.
import pika

# The gridcentric library binding used to get some information
# about the virtual machine on which this script is running.
from gridcentric import guest as gc

# The host that has the RabbitMQ instance. It is also the host
# running the Django app we need to update.
host = '192.168.1.80'

# We use the gridcentric library to get the unique vmid of this
# machine. This is used to determine if we are a cloned virtual 
# machine, or the original master machine.
VMID = gc.vmid()

# Start a connection to the RabbitMQ server and listen to the 
# task_queue queue.
connection = pika.AsyncoreConnection(
        pika.ConnectionParameters(host=host),
        True,
        pika.connection.SimpleReconnectionStrategy())
channel = connection.channel()
channel.queue_declare(queue='task_queue')

print ' [*] Waiting for messages. To exit press CTRL+C'

# This is the call back function that gets called when this 
# worker receives a message from the server. It essentially 
# updates the visualizer, and runs the task.
def callback(ch, method, header, body):
    # Use the gridcentric library to get the IP address of 
    # this machine in the cluster's private virtual network. 
    # We will use this to uniquely identify this machine in
    # the visualization.
    ip = gc.my_ip()
    
    print " [%s] Received %r" % (ip,body,)
    job_id = body.split(":")[1].strip()
    try:
        # Update the visualizer to tell it that we have taken
        # the job and are going to be running it.
        urllib2.urlopen("http://%s:8000/prdcon/take/%s/%s" % 
                (host, ip, job_id) )

        # All jobs themselves are just sleep jobs. We could 
        # decode the message, and run the job, or better yet 
        # use something like Celery that does all of this for 
        # us. For the purposes of the demo, sleeping 5 seconds 
        # should be fine.
        time.sleep(5)

        # Update the visualizer to tell it that we have completed
        # the job.
        urllib2.urlopen("http://%s:8000/prdcon/done/%s/%s" % 
                (host, ip, job_id) )
        print " [x] Done"
    except Exception:
        pass

    # Tell the RabbitMQ server that we have processed the message, 
    # and we are ready for another one.
    ch.basic_ack(delivery_tag = method.delivery_tag)

    # If we detect that our VMID has changed, we basically want 
    # to reconnect back to the RabbitMQ server because we are a 
    # new machine. This is a bit of a hack to get a demo worker, 
    # but basically the worker will exit. This is why we run the
    # worker in a while loop in the shell.
    if gc.vmid() != VMID:
        sys.exit(0)

# Tell the RabbitMQ server that we only want to receive a single 
# message at a time.
channel.basic_qos(prefetch_count=1)

# Register the callback function.
channel.basic_consume(callback,
                      queue='task_queue')

# Just keep looping and waiting for new messages to process.
pika.asyncore_loop()


The producer-consumer model is a straightforward design that has many simple semantics that make it easy to reason about how it will scale. But, as I mentioned in the beginning of this post, the mere fact that this model can scale is not good enough. The next, and almost equally important question, is how fast does it scale?

Let's take a look at how long it would take to scale the producer-consumer programming model throughout time:
  • Days: In the 1980s adding new workers to the system would be in the magnitude of days because it was a very manual process, both physically setting up the computer and manually configuring it. Taking days to scale an application is simply not good enough.
  • Hours: In the 1990s ghosting computer images enabled computers to be preconfigured, however they still needed to be physically setup and it was still a big manual process. Scaling in hours is better, but it is still not able to capture a lot of potential.
  • Minutes: In the 2000s virtualization and cloud computing started to become more main stream removing a lot of the manual process in setting up new workers. This has helped enabled the current explosion in Internet based applications, but there is still a window of lost potential as the system scales up.
  • Seconds: GridCentric's Copper platform can add functioning workers to the system an order of magnitude better than everyone else. With delays of only seconds, the system has become a lot more responsive, and reactive, to peaks in demand. Now the window of lost potential has gone from minutes, down to seconds. The added bonus is it is often simpler, and easier, to add workers using the Copper platform, than using other platforms that can only accomplish the task in minutes.
Here is the video again, and as you watch it remember to ask yourself how fast can you add workers to your scalable system, and what are you losing as you scramble to react to a temporary peak in demand? 


Monday, December 20, 2010

Tis the season...

We've been meaning for a while to put up some videos demoing how Copper can be used to implement load-testing environments with minimal setup. Well, here they are:



In retrospect, it's a simple use case that's easy enough to set up and demonstrate, and makes for some cool visuals... so it's about time we got around to it.

There are two videos, one by me and one by Dave. Each of them follows the same basic format: we start off with a single master VM running a simple hitcounter webapp, and then spawn clone VMs that run an HTTP client and ping the hitcounter. The hitcounter is observed via the webapp's main page, which updates itself dynamically. My hitcounter's main page just counts the hits, while Dave's gets all fancy about it ;)

The way our scripts gets clones to do their work is also different. In my case, a script running on the master VM initiates the cloning, then branches into the code that runs the http client. In Dave's case, he backgrounds a shell command which runs in an infinite loop pinging the hitcounter, and then just creates clones directly using the 'gc clone' command. His background command continues executing on each clone.

Also.. I know.. this demo is not actually a DDOS or a load test. However, throwing a couple of while loops around the http client would get the behaviour pretty close to it.. ;) It's not particularly hard to topple a Django standalone server running in test mode. Actually, in Dave's video you can see the server start struggling to fulfill requests towards the end..

Anyway, if what you see intrigues you, feel free to download and play with Copper yourself.

Cheers and Happy Holidays

Monday, November 8, 2010

Join The Revolution!

Scaling your services can be a pain -- a big one. Using conventional cloud hosting is easier than building and managing your own infrastructure, but service scaling is still painful. At GridCentric, we think service and infrastructure scaling should be easy and goes hand-in-hand. 

For example, current automated cloud scaling typically includes learning new and complex APIs along with specifically prepared server images. There are third party services you can work with – some really good ones – but the increased cost and dependence doesn’t really make things any easier to swallow. For everybody that finds this as frustrating as we do – we’re proud to announce RiotCloud: an easy-to-use, managed hosting service that makes automated scaling trivial.

RiotCloud is designed for simple deployment – users won’t need to learn new APIs, languages or have to manage/configure images; they just rent a server, change a few settings, and work away. The servers will scale automatically with the same application, run time and configuration states without you having to update or manage the system. In fact, the only thing missing is the administrative complexity. RiotCloud employs the revolutionary live-cloning technology of Copper to ensure consistent server state as you scale, exactly as you expect. 

The entire team is excited and prepping for launch! How about you? Comment and let us know your thoughts/feedback – we’d love to hear them! 

Join the Revolution: http://riotcloud.com/ - Register now for a free $50 credit!

Tuesday, October 5, 2010

How do I clone thee? Let me count the ways...

Dynamically provisioning stateful VMs with Copper is fast and easy. It occurred to me that we don't often show how fast and easy it is in ways other than using the shell (because it's simple and accessible). Not everyone's workflow is bash-based, so I thought I would show a few examples of our API used with different language bindings.

Our API is powerful and fully featured, allowing for synchronization, resource reservations, and the equivalent of process control for VMs. For the purpose of these examples, I've created four different extremely simple "VM fork" programs below. Each one duplicates the running VM machine (which takes seconds with Copper) and prints the VM's id ala fork().

C/C++


/* Must be linked with -lgridcentric */
#include <gridcentric/gc-guest.h>

int main(int argc, char** argv);
    gc_uuid_t ticket;
    int vmid;
    gc_request_ticket(1, 1, 1, 1000, &ticket);
    vmid = gc_clone(ticket);
    printf("%d\n", vmid);
}

bash


#!/bin/bash
ticket=`gc rt 1 1 1 1000 | awk '{print $2;}'`
gc clone $ticket # Could have pulled the vmid from here.
vmid=`gc vmid`   # But this is much simpler.
echo $vmid

python


#!/usr/bin/env python
import gridcentric.guest

ticket = gridcentric.guest.request_ticket(1,1,1,1000)
vmid = gridcentric.guest.clone(ticket)
print vmid

java


// Be sure to include '/usr/share/gridcentric/gridcentric.jar'
// in the classpath when running this program.
import ca.gridcentric.guest.API;

class VMFork {
    public static void main(String[] args) {
        API api = new API(); 
        API.Uuid ticket = api.requestTicket(1,1,1,1000);
        int vmid = api.clone(ticket);
        System.out.println("" + vmid);
    }
}

Got it? These programs all implement fork() for VMs.

The result of running any of these programs is the same: two nearly-identical VMs, printing our two different vmid values. The operation takes seconds, and can easily be incorporated into complex workflows. It's pretty nuts. Of course, you need not limit yourself to fork with Copper. Say hello to endless possibilities for distributed and scalable applications!

Tuesday, September 21, 2010

GridCentric: Testing Mincemeat.py MapReduce Scalability

Introduction
The MapReduce programming paradigm has been getting a lot of buzz the past couple of years because in general developers are facing larger and larger datasets. Google has shown that MapReduce is very effective at processing extremely large datasets by indexing the entire web using it. As a result of all this buzz there have also been numerous different implementations popping up. The front runner is probably Hadoop, and we have a post about how to set up a Hadoop cluster in 5 minutes. But there are many more, and in this post I am using a lightweight python implementation called mincemeat.py.

Essentially the promise of MapReduce is that we can easily throw machines at the dataset and process it faster. What I am interested in is the affect to the processing power of the system when adding additional machines. In other words, what does the performance function look like in relation to the number of machines in the system. To do this I am basically going to have a fixed dataset, and then test how fast it takes to process the dataset using a different number of worker machines.
Test Harness
Being able to dynamically vary the number of workers that I have is an interesting problem. I am a believer in the DRY principle so I don't want to have to configure 10 machines with mincemeat, or synchronize them so that the correct number of them connect when running my tests. Basically I have a very simple experiment I want to run and I want a very simple solution that allows me to easily vary the number of machines in my tests.

Fortunately the GridCentric Copper virtualization platform allows me to create an arbitrary sized cluster very easily. Copper provides a very simple, but powerful,  API call that allows a virtual machine to live-clone itself. Within seconds a running virtual machine can create exact replicas of itself that have the same memory loaded, the same disk state and even the same instruction pointer in the CPU. In other words, I can use the Copper API to create clusters with a single node, 5 nodes, 10 nodes, and so on within a matter of seconds without any need for additional configuration, synchronization, or any maintenance on my part. In addition, since this is an API call I can include it directly within my test harness.
My test harness is composed of two scripts: one that will create my test cluster and execute the MapReduce program, and the other that will vary the size of the cluster and collect the time information. The first script will basically take as a parameter the number of additional workers to add. It will use the Copper API to live-clone the machine to build the cluster with the additional workers, start up the MapReduce server and then connect all the workers together. It will then clean up the cluster to free up the resources for other tests, or other applications we want to run on our cluster.

This is what my test script looks like that uses Copper to dynamically scale out a test cluster and executes my tests:

############################################################
# 
# Simple test script that will create a cluster of arbitrary
# size and then use mincemeat.py to run a MapReduce job
# on the cluster.
#
############################################################


# This will synchronize the process on the ticket and act
# as a barricade so that the process won't resume until
# everyone has synchronized on the ticket.
#
function sync_ticket {

 TICKET=$1
 IS_MASTER=$2

 if [ $IS_MASTER -eq 0 ]
 then
   gc sync $TICKET 60000
 else
   gc sync
 fi
}



if [ $# -eq 0 ]
then
  echo "Usage: $0 num_workers"
else

  # The number of additional workers to create
  WORKERS=$1
  # The IP address of the server
  SERVER_IP=`gc my-ip`

  if [ $WORKERS -gt 0 ]
  then
    # If we need additional workers then we will request a 
    # ticket for them. This reserves the resources in Copper
    # for our cluster.
    TICKET=`gc rt $WORKERS $WORKERS 1 60000 | awk '{ print $2 }'`
    if [ ${#TICKET} -eq 0 ]
    then 
      # We failed to get a ticket for that number of workers.
      # Basically the resources in our system is in use, so
      # we just exit with an EBUSY error.
      echo "Failed to acquire the resources"
      exit 16
    else
      # We have the resources so we clone out. This will create
      # all the worker machines, and they will be created as
      # exact replicas of this machine, and each one will
      # be executing this script at this point!
      gc clone $TICKET

      # The gc clone API will create the replica virtual machines.
      # Essentially we will now have new $WORKERS machine all 
      # running this script because they are clones of a virtual
      # machine that is running this script. Moreover, they will
      # all be running this script from this point onwards because
      # gc clone was the last command executed by the virtual 
      # machine prior to cloning.
    fi
  else
    # We just get the ticket that the master was created.
    # There is no need to create additional workers.
    TICKET=`gc lt | grep ticket | awk '{ print $2 }'`
  fi

  gc ismaster
  IS_MASTER=$?

  if [ $IS_MASTER -eq 0 ]
  then
    # We want to start up the server on the master
    python example.py > results.out &
    # Give the serve some time to startup
    sleep 3
  fi
  
  # This is a barricade and ensures all workers have checked
  # in before we start the process.
  sync_ticket $TICKET $IS_MASTER

  # We start up the mincemeat worker and connect to the server
  python mincemeat.py -p changeme $SERVER_IP
  
  # We have finished processing so we want to destroy the 
  # additional workers that were created.
  if [ $IS_MASTER -ne 0 ]
  then
    # We are a worker so we should join back
    gc join noreport
  fi
fi
############################################################


Results
I execute the above script using 1, 2, 3, 4, 5, 6, 7, 8, 9 and 10 hosts. For each host configuration I ran the script five times to calculate the average time it takes for this configuration. Here are my results running MapReduce using mincemeat.py on a 28 MB dataset.

The overall MapReduce time decreases as we add more workers, while at the same time the overhead and cloning of virtual machines remains constant. The time spent synchronizing the virtual machines, and requesting a ticket, is essentially zero.
We can clearly see a remarkable improvement when moving from 1 worker to 3 workers, and in fact it looks like 3 nodes is a local minimum performing fast than having either 4 or 5 workers. However, things do continue to improve and once we are in the 8 to 10 workers territory we start to see the performance improvements leveling off. Obviously this is specific to the 28 MB dataset, and I expect us to be able to get significant gains with larger number of nodes with larger data.

Conclusion
The results presented above highlight that there is a point of diminishing returns whereby adding new hosts to the MapReduce system does not provide any significant gains. I feel like this is a intuitive property of the MapReduce paradigm, but it is nice to have the data to support it. However, what is interesting is how easy it was to test this prediction. In fact, any hypothesis on distributed applications can similarly be easily tested. This is because GridCentric's Copper virtualization platform makes it extremely simple to build a cluster in which to test the distributed application, as well as provides the API so that the test harness itself can dynamically create this cluster.

The real result of this post is that Copper can open up a doorway into testing and experimenting with distributed applications that was otherwise closed due to the high cost of configuring, maintaining and synchronzing the distributed appication.