How to Use the DSU Cluster?

Before we learn how to use the DSU cluster, let’s overview some of the basics in parallel computing.

Sequential and Parallel Programming

In sequential programming, processes are executed consecutively on a single processor. This implies that a process is not started until the preceding process has finished its execution.

In parallel programming, multiple processes can be executed simultaneously using multiple computing resources to solve a computational problem.

The computing resources may include:

  • A single computer with multiple processors
  • Multiple computers that are connected by a network (computer cluster)

Why Use Parallel Computing?

  • Save time and/or money
  • Allocating more resources to a task will minimize its execution time. Parallel clusters can be built from cheap, commodity hardware.

  • Solve larger problems
  • It is impractical to solve very large and/or complex problems using a single machine due to its limited computational power, memory and storage.

  • Provide concurrency
  • Many tasks can be executed simultaneously using multiple computing resources.

  • Taking advantage of non-local resources
  • It is possible to use resources over the internet (i.e., computer cloud) when local compute resources are scarce.

Shared Memory and Distributed Memory Architecture

In a shared memory system, all processors have access to a pool of shared memory.

In a distributed memory system, each processor has its own local memory. Message passing is used to exchange data between processors through the interconnection network. Message passing is an effective communication mechanism between processors in distributed memory architecture.

Computer Cluster

Some applications need more computational resources than a single computer can provide. There are two ways of overcoming this limitation.

  • Vertical Scaling
  • Installing more memory or processing power to a single node.

  • Horizontal Scaling
  • Networking multiple nodes together so that they work as a single logical unit.

A computer cluster is a horizontally scaled architecture. However, each node in the cluster can be scaled up vertically as well. The networked computers act as a single powerful machine. A computer cluster provides:

  • Higher processing power
  • Scalability
  • High availability of resources
  • Reduced cost

A computer cluster follows master-worker architecture. One node in the cluster is the master node and all other nodes in the cluster are called worker nodes or compute nodes. The master node distributes data among the worker nodes and also assigns tasks to worker nodes. It also monitors for worker failure and if a worker fails, it reassigns a task to some other worker.

What is SLURM and why do we need it?

An important aspect of the computer cluster is that multiple users should be able to use the resources concurrently. To do that, we need a resource manager and a job scheduler.

The Simple Linux Utility for Resource Management (SLURM) is an open source, resource management and job scheduling system for submitting, executing, monitoring, and managing jobs on Linux clusters.

SLURM provides three key functions:

  • Allocating excusive and/or non-exclusive access to resources (compute nodes) to users for some duration of time so they can perform work.
  • Providing a framework for starting, executing, and monitoring work on a set of allocated nodes.
  • Arbitrating contention for resources by managing a queue of pending jobs

The DSU Cluster uses SLURM as the resource manager and job scheduler.

More information about SLURM and its commands can be found from the official Slurm website.

Creating a Job Script

A preferred option for running a job on the DSU cluster is to set up a job script. This script will request cluster resources and run your program when resources are available. If resources are not available, the job will be queued until it meets the required resources available.

To properly configure a job script, you will need to know the general script format, the commands you wish to use, how to request the resources required for the job to run, and, possibly some of the Slurm environmental variables.

Common Slurm Commands

The following is a list of Slurm commands that are used frequently

Command Definition
sbatch Submits job scripts into system for execution (queued)
scancel Cancels a job
scontrol Used to view and modify Slurm configuration and state
sinfo Display state of partitions and nodes
squeue Display state of jobs

Specifying Resources

The following is a list of commonly requested resources and the Slurm syntax to get it.

Syntax Meaning
--ntasks = Controls the number of tasks (processes) to be created for the job
--mem = The real memory required per node
--mem-per-cpu = The real memory required per processor
--output Path for standard output
--error Path for standard error

Slurm Environmental Variables

Below is a list of some environmental variables that are defined by Slurm when a job is launched into execution.

Environmental Variable Definition
$SLURM_JOB_ID ID of job allocation
$SLURM_SUBMIT_DIR Directory where job was submitted
$SLURM_JOB_NODELIST File containing allocated hostnames
$SLURM_NTASKS Total number of cores for job

How to run programs on the DSU Cluster

In this section, we demonstrate how to use DSU cluster to run Serial and Parallel programs. The following example is used to explain how parallel processing differs from sequential processing.

Calculating PI using Numerical Integration

Numerical integration is used to approximate the area under the curve of a function in situations where the integral cannot be solved. The value of the PI can be defined by the above integral. We approximate the value of pi using numerical integration as illustrated in the diagram (The area under the curve between 0 and 1 gives an approximate value for PI). Here, we divide the area into 4 rectangles and sum them up to get an approximate value for PI. When the number of rectangles is increased, we get more accurate value for PI.

Running Serial Jobs

Let’s solve this problem sequentially. In this paradigm, area of a single rectangle is calculated at a time. Once it is calculated, the area of the next rectangle is calculated and so on. At the end, all the areas will be summed up to get the approximate value for PI. The C program for this problem is as follows.

#include <stdio.h>
#include <math.h>

double func(double);

double func(double a)
{
	return (4.0 / (1.0 + a*a));
}

int main(int args, char * argv[])
{
	int n = 10000;	// number of rectangles
	double base = 1.0/n;
	int i;
	double sum_area = 0.0;
	double x,h;
//printf("base = %f\n", base);
		
for (i=0; i<n; i++)
{
	x = base*(i+0.5);
	h = func(x);
	sum_area += base*h;
	printf("i = %d\n", i);
	printf("x = %f\n", x);
	printf("h = %f\n", h);
	printf("sum_area = %f\n", sum_area);

}

printf("pi = %.16f\n", sum_area) ;
}


Compiling this C Program:

dsu@master ~$gcc –o pi_serial pi_serial.c


The usual way to run this program:

dsu@master ~$ ./pi_serial


Let’s run this program using batch mode. To do that, we have to prepare a batch script.

The batch script (test_slurm.sh) is as follows.

#!/bin/bash
#SBATCH --job-name=pi_serial
#SBATCH --output=slurm.out
#SBATCH --error=slurm.err
#SBATCH --ntasks=1

./pi_serial

Let’s submit the batch script. As it is a serial program, it will use only one processor.

dsu@master ~$ sbatch test_slurm.sh


Once a job is submitted, slurm will generate a job id and display it on the terminal.

The statuses of the currently running jobs can be viewed using “SVIEW” command.

dsu@master ~$sview


We also can view the full details of a currently running job using “scontrol show job [job_id]”.

Once the job is completed, we can view the output using “cat slurm.out”.

If the program fails, the error log can be viewed using “cat slurm.error”.

Going Parallel with MPI

Let’s code the above problem using parallel programming. Earlier, single processor was calculating the areas of each rectangle one at a time. In parallel processing, multiple processors are used to solve this problem. Each processor calculates an area of a rectangle concurrently. Execution time will be minimized when more number of processors is used.

What is Message Passing Interface (MPI)?

The Message Passing Interface (MPI) is a library specification for creating parallel applications.

In parallel computing, multiple nodes work on different parts of a computing problem simultaneously. The challenge is to synchronize the actions of each node, exchange data between nodes and provide command and control over the nodes. The message passing interface provides a standard suite of functions for these tasks. The primary purpose of MPI is distributed memory parallelism.

There are several implementations of the MPI standard.

  • Intel MPI
  • OpenMPI
  • MVAPICH2
  • MPICH

Currently, DSU supports OpenMPI implementation of the MPI. OpenMPI supports C, C++, F77, and F90.

Let’s run the parallel version of the “calculate pi” example using OpenMPI with C.

The source code is as follows (cpi.c).

#include "mpi.h"
#include <stdio.h>
#include <math.h>

double f( double );

double f( double a)
{
    return (4.0 / (1.0 + a*a));
}

int main( int argc, char *argv[] )
{
    int done = 0, n, myid, numprocs, i;
    double PI25DT = 3.141592653589793238462643;
    double mypi, pi, h, sum, x;
    double startwtime=0.0, endwtime;
    int  namelen;
    char processor_name[MPI_MAX_PROCESSOR_NAME];

    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD,&numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD,&myid);
    MPI_Get_processor_name(processor_name,&namelen);

    fprintf(stderr,"Process %d on %s\n",
	    myid, processor_name);

    n = 1000000;
    
        if (myid == 0)
          startwtime = MPI_Wtime();
        
        MPI_Bcast(&n, 1, MPI_INT, 0, MPI_COMM_WORLD);
        
            h   = 1.0 / (double) n;
            sum = 0.0;
            for (i = myid + 1; i <= n; i += numprocs) {
                x = h * ((double)i - 0.5);
                sum += f(x);
            }
            mypi = h * sum;

            MPI_Reduce(&mypi, &pi, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);

            if (myid == 0)  {
     printf("pi is approximately %.16f, Error is %.16f\n",pi, fabs(pi - PI25DT));
		endwtime = MPI_Wtime();
		printf("wall clock time = %f\n",
		       endwtime-startwtime);	       
	        }

    
    MPI_Finalize();
    return 0;
}

            

Let’s compile this parallel program using mpicc. Mpicc is a shell script that compiles MPI programs written in C.

dsu@master ~$ mpicc –o cpi cpi.c


Let’s modify the slurm script (test_slurm.sh) to run this program on our cluster using 5 processors.

#!/bin/bash
#SBATCH -n 5

mpirun --mca btl_tcp_if_include 10.40.62.0/16 ./cpi

Mpirun is a shell script that executes serial and parallel jobs in OpenMPI.

--mca btl_tcp_if_include 10.40.62.0/16 is a parameter that is used to bypass our university proxy server.

The progress and output of the program can be viewed as we discussed earlier.

Parallel Programming in Python

As we mentioned earlier, OpenMPI supports C, C++, and Fortran. In order to do parallel programming in Python, we use another library called MPI for Python (MPI4Py). It provides bindings of the MPI standard for the Python programming language.

Besides MPI4Py, there are several modules available for parallel programming in Python. DSU supports only MPI4Py.

Let’s parallelize the “calculate pi” problem using MPI4Py (calculate_pi.py).

from mpi4py import MPI
import math

def compute_pi(n, start=0, step=1):
    h = 1.0/n
    s = 0.0
    for i in range(start, n, step):
        x = h * (i+0.5)
        s += 4.0/(1.0 + x**2)
    return s*h

comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
myrank = comm.Get_rank()

if myrank == 0:
    n=10000000
    start_time = MPI.Wtime()
else:
    n=None

n = comm.bcast(n, root=0)

mypi = compute_pi(n, myrank, nprocs)

pi = comm.reduce(mypi, op=MPI.SUM, root=0)

if myrank == 0:
    print "pi is approximately ", pi
    end_time = MPI.Wtime()
    print "Execution Time ", end_time - start_time

We do not need to compile this Python code as Python is an interpreted language.


The slurm script to run this program is as follows.

#!/bin/bash
#SBATCH -n 5

mpirun --mca btl_tcp_if_include 10.40.62.0/16 python calculate_pi.py

Parallel Programming in R

There are several packages available for parallel programming in R. Some of them are listed below.

Shared Memory Distributed Memory
Examples: parallel, Simple Network of Workstations (snow), foreach, gputools Examples: pbdR, Rmpi, RHadoop, RHIPE, snow via Rmpi

We will be discussing snow and Rmpi in the following examples.

Rmpi is a CRAN package that provides an interface between R and the MPI. Snow is used to parallelize R programs using multiple cores within a machine. However, snow alone cannot be used to distribute tasks across multiple nodes. In the more common scenarios, snow is used with Rmpi for distributed computing across the cluster.

Below is the Rmpi version of the “calculate pi” example (calculate_pi.R).


library(Rmpi)

n <- 60000000

ns <- 9 
start <- Sys.time()
mpi.spawn.Rslaves(nslaves=ns)
compute.pi <- function(n, start=0, step=1)
{
    h <- 1.0/n
    s <- 0.0
    x <- 0.0
    for(i in seq(start, n, step))
        { 
           x <- h*(i+0.5)
           s <- s + 4.0/(1.0 + x*x)       
        } 
    return (s*h)
}
mpi.bcast.cmd(id <- mpi.comm.rank())
mpi.bcast.cmd(ns <- mpi.comm.size())

id <- mpi.comm.rank()
ns <- mpi.comm.size()

mpi.bcast.Robj2slave(n)
mpi.bcast.Robj2slave(compute.pi)

tm.comp.st=Sys.time()
mpi.bcast.cmd(pi.approx <- compute.pi(n,id,ns))
pi.approx <- compute.pi(n,id,ns)
mpi.bcast.cmd(mpi.reduce(pi.approx, type=2, op="sum"))
pi.approx <- mpi.reduce(pi.approx, type=2, op="sum")

tm.comp=as.numeric(Sys.time()-tm.comp.st, units="secs")
tm.tot <- as.numeric(Sys.time() - start, units = "secs")

cat(paste('approximate value of pi is: ', pi.approx, '\n'))
cat('computational time =',tm.comp,'\n')
cat('        total time =', tm.tot, '\n')

mpi.close.Rslaves(dellog=FALSE)
mpi.exit()

The above program uses 6 processors (a master and 5 slaves). A batch script to submit this job is as follows

#!/bin/bash
#SBATCH -n 5

mpirun --mca btl_tcp_if_include 10.40.62.0/16 -np 1 Rscript calculate_pi.R

The following code explains how to solve the same problem using snow via Rmpi in a distributed fashion using multiple nodes (cal_pi.R).


library(snow)
library(Rmpi)

n <-  6000000 
ns <- 13 

start <- Sys.time() 

cl <- makeCluster(ns, type="MPI")

# function to calculate pi
# s : start point of a slice
# w : width of a slice
# m : middle point of a slice
# h : height of a slice
# a : area of a slice (rectangle)
compute.pi <- function(s)
{
    m <- s + 0.5*w
    h <- 4.0/(1.0 + m*m)
    a <- w*h
    return (a)    
}

# compute width of a slice
w <- 1.0/n
# export width and compute.pi 
clusterExport(cl, c("compute.pi", "w"))

# create an n x 1 matrix using n 
# start =0, end = 1 width of a slice = w 
m <- matrix(c(seq(0,length=n, by=w)), n, 1)

# generate a list where each element is the area of a single slice
slice.list <- parRapply(cl, m, compute.pi)

pi.approx <- sum(slice.list)

cat(paste('approximate value of pi is: ', pi.approx, '\n'))

end <- Sys.time()

total.time <- as.numeric(end - start, units = "secs")

cat('        total time =', total.time, '\n')

stopCluster(cl)
mpi.exit()

A simple batch script to run the above problem is as follows.


#!/bin/bash
#SBATCH -n 5

mpirun --mca btl_tcp_if_include 10.40.62.0/16 -np 1 Rscript cal_pi.R

Useful Resources

More details about the topics we covered can be found from the following web sites.