Thursday, March 12, 2009

MapReduce Framework (Article 2) : Distributed computing

Distributed computing - CPUs having seperate memory. All such CPUs are connected by some means (ethernet, gigabit ethernet, wan etc). 

Basic element of distributed computing is to identify the subproblems that can run simultaneously without any data dependency e.g. in an animation movie, all frames can be rendered simultaneously as to render frame #10 we don't need data from frame #11.

In distributed computing environment, there are multiple processes running on hundreds of CPUs. They work on a copy of original input. Nobody will modify the original input. As the computation proceeds, there will be data transfer happening between nodes. We mainly use TCP/IP protocol to transfer data between nodes.

Challenges in distributed computing environment
  1. Reliable messaging is a MUST.
  2. In distributed computing environment, data will move from one node to another, the intermediate nodes can read your data along with IP headers. So, either you need to trust all the intermediate nodes or build your own protocol, so that data is encrypted but not IP headers.
  3. We need to make sure all data packets originated from host machine.
  4. We need to make sure data sits close to logical computing unit. So Router position is very important.
Hadoop (hadoop.apache.org/core/) is an open source java implementation of distributed computing platform for MapReduce framework that supports the above mentioned features. I will discuss Hadoop distributed File System in article #4.

MapReduce Framework (Article 1) : Parallel computing

Parallel Computing - Muliple CPUs in one box. All CPUs share the same memory. 

Now the question is how to achieve parallelism in a single CPU box ? By using threads. If we call a function foo from main thread, the process stack will look like this


So, main function has to wait till foo ends. If you spawn a thread, the thread will have 
its own process stack and that won't block execution of main function. So by using thread, we can spawn multiple processes and all will execute simultaneously. In a multi CPU system, multiple threads can run simultaneously.





Parallelisation pitfalls
  1. How do we assign work units to worker threads ?
  2. What if we have more work units than worker threads.
  3. How do we aggregare result at the end?
  4. How do we know all worker threads have finished?
  5. What if the work can not be divided into seperate tasks?
Each of these problems represent one point at which multiple threads communicate with one another or access a shared resource. Any memory that can be used by multiple threads must be associated with a synchronization system.

The most important concept in Synchronization is

Race condition
Each thread is racing to complete and depending on who is the winner, outcome will be different. We need to make sure that this condition never arise.

Thread1

void foo() {
    x++;
    y = x;
}

Thread 2

void bar() {
    y++;
    x = y;
}

Here x and y are two shared variables. We don't know how the threads are going to be executed by OS. Based on the execution order, the output will be different. To fix the issue we need to make sure only one thread can work at a particular time. This can be achieved by using semaphore.

Semaphore

To make one Object Thread-Safe, bind one semaphore to that object. Semaphore has two synchronization premitives (special variable or method that gurentees that it can only be accessed by one thread at a particular time).
  1. lock() - Each semaphore will have a queue associated with it. Call to lock() when the semaphore is already blocked causes the thread to wait and the thread will be added to the queue.
  2. unlock() - will wake up all the threads waiting on the semaphore.
By using semaphore, we can modify the above programs so that Race condition will never arise.

Thread1

void foo() {
    sem.lock();
    x++;
    y = x;
    sem.unlock();
}

Thread 2

void bar() {
    sem.lock();
    y++;
    x = y;
    sem.unlock();
}

Semaphore will guarentee that only one thread will execute the block at a particular time, however they are not sufficient to guarentee that nly one flow is allowed (foo should execute before bar).

This can be achieved by using conditional variable. A conditional varaible notifies threads that a particular condition has met. They have two methods
  1. wait() - waiting on a conditional varaible make the thread to sleep.
  2. notify() - notifying on a conditional variable will wake someone up who is waiting.
Now to achieve the flow, we will introduce one boolean fooFinished (boolean is thread safe) and one conditional variable fooFinishedCV.

Thread1

void foo() {
    sem.lock();
    x++;
    y = x;
    fooFinished = true;
    sem.unlock();
    fooFinishedCV.notify();
}

Thread 2

void bar() {
    sem.lock();
    if ( !fooFinished) {
        fooFinishedCV.wait(sem);
    }
    y++;
    x = y;
    sem.unlock();
}

On waiting on a conditinal variable we need to release the lock, otherwise it may happen that foo never starts. This is called Deadlock situation.

So there are so many things to look into while working on threads to achieve parallelism. Plus due to physical limitation we won't be able to attach hundreds of CPUs to a single box. So if you want to process terabytes of data in X hours, parallel computing won't help you.

I will discuss Distributed computing platform in my next article which is the main concept behind google MapReduce framework to process huge dataset.