The MapReduce programming paradigm has been getting a lot of buzz the past couple of years because in general developers are facing larger and larger datasets. Google has shown that MapReduce is very effective at processing extremely large datasets by indexing the entire web using it. As a result of all this buzz there have also been numerous different implementations popping up. The front runner is probably Hadoop, and we have a post about how to set up a Hadoop cluster in 5 minutes. But there are many more, and in this post I am using a lightweight python implementation called mincemeat.py.
Essentially the promise of MapReduce is that we can easily throw machines at the dataset and process it faster. What I am interested in is the affect to the processing power of the system when adding additional machines. In other words, what does the performance function look like in relation to the number of machines in the system. To do this I am basically going to have a fixed dataset, and then test how fast it takes to process the dataset using a different number of worker machines.
Test Harness
Being able to dynamically vary the number of workers that I have is an interesting problem. I am a believer in the DRY principle so I don't want to have to configure 10 machines with mincemeat, or synchronize them so that the correct number of them connect when running my tests. Basically I have a very simple experiment I want to run and I want a very simple solution that allows me to easily vary the number of machines in my tests.
Fortunately the GridCentric Copper virtualization platform allows me to create an arbitrary sized cluster very easily. Copper provides a very simple, but powerful, API call that allows a virtual machine to live-clone itself. Within seconds a running virtual machine can create exact replicas of itself that have the same memory loaded, the same disk state and even the same instruction pointer in the CPU. In other words, I can use the Copper API to create clusters with a single node, 5 nodes, 10 nodes, and so on within a matter of seconds without any need for additional configuration, synchronization, or any maintenance on my part. In addition, since this is an API call I can include it directly within my test harness.
My test harness is composed of two scripts: one that will create my test cluster and execute the MapReduce program, and the other that will vary the size of the cluster and collect the time information. The first script will basically take as a parameter the number of additional workers to add. It will use the Copper API to live-clone the machine to build the cluster with the additional workers, start up the MapReduce server and then connect all the workers together. It will then clean up the cluster to free up the resources for other tests, or other applications we want to run on our cluster.
This is what my test script looks like that uses Copper to dynamically scale out a test cluster and executes my tests:
############################################################
#
# Simple test script that will create a cluster of arbitrary
# size and then use mincemeat.py to run a MapReduce job
# on the cluster.
#
############################################################
# This will synchronize the process on the ticket and act
# as a barricade so that the process won't resume until
# everyone has synchronized on the ticket.
#
function sync_ticket {
TICKET=$1
IS_MASTER=$2
if [ $IS_MASTER -eq 0 ]
then
gc sync $TICKET 60000
else
gc sync
fi
}
if [ $# -eq 0 ]
then
echo "Usage: $0 num_workers"
else
# The number of additional workers to create
WORKERS=$1
# The IP address of the server
SERVER_IP=`gc my-ip`
if [ $WORKERS -gt 0 ]
then
# If we need additional workers then we will request a
# ticket for them. This reserves the resources in Copper
# for our cluster.
TICKET=`gc rt $WORKERS $WORKERS 1 60000 | awk '{ print $2 }'`
if [ ${#TICKET} -eq 0 ]
then
# We failed to get a ticket for that number of workers.
# Basically the resources in our system is in use, so
# we just exit with an EBUSY error.
echo "Failed to acquire the resources"
exit 16
else
# We have the resources so we clone out. This will create
# all the worker machines, and they will be created as
# exact replicas of this machine, and each one will
# be executing this script at this point!
gc clone $TICKET
# The gc clone API will create the replica virtual machines.
# Essentially we will now have new $WORKERS machine all
# running this script because they are clones of a virtual
# machine that is running this script. Moreover, they will
# all be running this script from this point onwards because
# gc clone was the last command executed by the virtual
# machine prior to cloning.
fi
else
# We just get the ticket that the master was created.
# There is no need to create additional workers.
TICKET=`gc lt | grep ticket | awk '{ print $2 }'`
fi
gc ismaster
IS_MASTER=$?
if [ $IS_MASTER -eq 0 ]
then
# We want to start up the server on the master
python example.py > results.out &
# Give the serve some time to startup
sleep 3
fi
# This is a barricade and ensures all workers have checked
# in before we start the process.
sync_ticket $TICKET $IS_MASTER
# We start up the mincemeat worker and connect to the server
python mincemeat.py -p changeme $SERVER_IP
# We have finished processing so we want to destroy the
# additional workers that were created.
if [ $IS_MASTER -ne 0 ]
then
# We are a worker so we should join back
gc join noreport
fi
fi
############################################################
Results
I execute the above script using 1, 2, 3, 4, 5, 6, 7, 8, 9 and 10 hosts. For each host configuration I ran the script five times to calculate the average time it takes for this configuration. Here are my results running MapReduce using mincemeat.py on a 28 MB dataset.
![]() |
| The overall MapReduce time decreases as we add more workers, while at the same time the overhead and cloning of virtual machines remains constant. The time spent synchronizing the virtual machines, and requesting a ticket, is essentially zero. |
Conclusion
The results presented above highlight that there is a point of diminishing returns whereby adding new hosts to the MapReduce system does not provide any significant gains. I feel like this is a intuitive property of the MapReduce paradigm, but it is nice to have the data to support it. However, what is interesting is how easy it was to test this prediction. In fact, any hypothesis on distributed applications can similarly be easily tested. This is because GridCentric's Copper virtualization platform makes it extremely simple to build a cluster in which to test the distributed application, as well as provides the API so that the test harness itself can dynamically create this cluster.
The real result of this post is that Copper can open up a doorway into testing and experimenting with distributed applications that was otherwise closed due to the high cost of configuring, maintaining and synchronzing the distributed appication.



0 comments:
Post a Comment