GridRPC Tutorial

Hidemoto Nakada(ETL), Satoshi Matsuoka(TIT), Mitsuhisa Sato(RWCP), Satoshi Sekiguchi(ETL)

Abstract

GridRPC is a middleware that provides remote library access and task-parallel programming model on the Grid. Representative systems include Ninf, Netsolve, etc. We employ Ninf to exemplify how to program Grid applications using Ninf, in particular, how to “Gridify” a numerical library for remote RPC execution, how to perform parallel parameter sweep survey using multiple servers on the Grid .

1. Introduction

GridRPC is a middleware that provides remote library access and task-parallel programming model on the Grid. Representative systems include Ninf, Netsolve, etc. We employ Ninf to exemplify how to program Grid applications using Ninf.

GridRPC can be effectively used as a programming abstraction in situations including the followings:

Section 2 will cover the basic “Gridifying” of a given numerical library using GridRPC. Section3 will cover how binary executables with command-line interfaces and file I/O can be wrapped as a GridRPC component. Section 4 will exemplify parallel programming for the parameter sweep case. (Section 5, task parallel programming, is in the works.).

2. “Gridifying” a Numerical Library with GridRPC

We first cover the simple case where the library to be Gridifyied is defined as a linkable library function. Below is a sample code of a simple matrix multiply. The first scalar argument specifies the size of the matrix (n by n), parameters a and b are references to matrices to be multiplied, and c is the reference to the result matrix. Notice that, 1) the matrix (defined as arrays) do not itself embody size as type information, and 2) as a result there is a dependency between n and a, b, c. In fact, since array arguments are passed as a reference, one must assume the contents of the array are implicitly shared by the caller and the callee, with arbitrary choices as to using them as input, output, or temporary data structures.
void  mmul(int n, double * a, double * b, double * c){
  double t;
  int i, j, k;
  for (i = 0; i < N; i++) {
    for (j = 0; j < N; j++) {
      t = 0;
      for (k = 0; k < N; k++){
        t += a[i * n + k] * b[k * n + j];
      }
      c[i*N+j] = t;
    }
  }
}
The main routine which calls mmul() might be as follows:
main(){
  double A[N*N], B[N*N], C[N*N];
  initMatA(N, A);  /* initialize */
  initMatB(N, B);  /* initialize */

  mmul(N, A, B, C);
}
In order to “Gridify”, or more precisely, allow mmul to be called remotely via GridRPC, we must describe the interface of the function so that information not embodied in the language type system becomes sufficiently available to the GridRPC system to make the remote call. Although future standardization is definitely conceivable, currently each GridRPC system has its own IDL (Interface Description Language); for example, Ninf has its own NinfIDL definition. Below we give the interface of mmul() defined by the NinfIDL syntax:
1: Module mmul;
2: 
3: Define mmul(IN int    N,      IN  double A[N*N], 
4:             IN double B[N*N], OUT double C[N*N])
5: "matmul"
6: Required "mmul_lib.o"
7: Calls "C" mmul(N, A, B, C);
Line 1 declares the module name to be defined. There is a one-to-one correspondence between a module and an IDL file, and each module can have multiple entries to gridify multiple functions. Lines 3-7 are the definition for a particular entry mmul/mmul. Here, lines 3 and 4 declare the interface of the entry. The difference between a NinfIDL entry definition and the C prototype definition is that there are no return values (the return value of the Ninf call is used to return status info), argument input/output modes are specified, and array sizes are described in terms of the scalar arguments.

We note here that NinfIDL has special features to efficiently support gridifying of a library (similar features are found in Netsolve IDL). In contrast to standard procedure calls within a shared memory space, GridRPC needs to transfer data over the network. Transferring the entire contents of the array will be naturally very costly, especially for huge matrices appearing in real applications. Here, one will quickly observe that surprising number of numerical libraries take for granted the fact that address space of data structures, in particular arrays are shared, and (a) only use subarrays of the passed arrays, (b) write back results in the passed arrays, and (c) pass arrays as scratchpad temporaries. The portion of the arrays to be operated, etc., are determined by the semantics of the operation according to the input parameters passed to the function. For example in mmul, the whole arrays need to be passed, and their sizes are all N by N, where N is the first scalar parameter; A and B only need to be passed as input parameters and their contents do not change, while C is used as a return argument and thus need not be shipped to the server, but the result needs to be shipped back to the client. In general, in order to determine and minimize the size of transfer, NinfIDL allows flexible description of the array shape used by the remote library. One can specify leading dimensions, subarrays, and strides. In fact arbitrary arithmetic expressions involving constants and scalar arguments can be used in the array size expressions.

Line 5 is the comment describing the entry, while line 6 specifies the necessary object file when the executable for the particular file is to be linked. Line 7 gives the actual library function to be called, and the calling sequence; here “C” denotes C-style (row-major) array layout.

The user compiles this IDL file using the Ninf IDL compiler, and generates the stub code and its makefile. By executing this makefile a Ninf executable is generated. The user will subsequently register the executable to the server using the registry tool.

Now the client us ready to make the call of the network. In order to make a GridRPC call, the user modifies his original main program in the following manner. We notice that only the function call is modified---No need to change the program to adjust to the skeleton that the IDL generator generates as is with typical RPC systems such as CORBA. Moreover, we note that the IDL, the stub files and the executables are only resident on the server side, and the client only needs to link his program with a generic Ninf client library.

main(){
  double A[N*N], B[N*N], C[N*N];

  initMatA(N, A);  /* initialize */
  initMatB(N, B);  /* initialize */

  if (Ninf_call("mmul/mmul", N, A, B, C) != NINF_ERROR)
    Ninf_perror("mmul");
}

3. Gridifying Programs that use Files

The above example assumes that the numerical routine is supplied as a library with well-defined function API, or at least its source is available in a way such that it could easily converted into a library. In practice, many numerical routines are only available in a non-library executable and/or binary form, with input/output interfaces using files. In order to gridify such “canned” applications, GridRPC systems typically support remote files and their automatic management/transfer.

We take gnuplot as an example. Gnuplot in non-interactive mode inputs script from a specified file, and outputs the resulting graph to the standard output. Below is an example gnuplot script.

set terminal postscript
set xlabel "x"
set ylabel "y"
plot f(x) = sin(x*a), a = .2, f(x), a = .4, f(x)
If this script is saved under a filename “gplot”:
> gnuplot gplot > graph.ps
will store the postscript representation of the graph to the file graph.ps. In order to execute gnuplot remotely, we must package it appropriately, and moreover must automatically transfer the input (gplot) and output (graph.ps) files between the client and the server.

NinfIDL provides a type filename to specify that the particular argument is a file. Below is an example of using gnuplot via GridRPC.

Module plot;
Define plot(IN filename plotfile, OUT filename psfile )
"invoke gnuplot"
{
    char buffer[1000];
    sprintf(buffer, "gnuplot %s > %s", plotfile, psfile);
    system(buffer);
}
The IDL writes the string command sequence to invoke gnuplot into a variable buffer[], and invokes gnuplot as a system library. The file specified as an input is automatically transferred to the temporary directory of the server, and its temporary file name is passed to the stub function. As for the output file, only the temporary file name is created and passed to the stub function. After the stub program is executed, the files in output mode as specified in the IDL are automatically transferred to the client, and saved there under the name given in the argument.

Below is an example of how this function might be called via GridRPC.

#include <stdio.h>
#include "ninf.h"

main(int argc, char ** argv){
  argc = Ninf_parse_arg(argc, argv);

  if (argc < 3) {
    fprintf(stderr, "USAGE: plot_main INPUT PSFILE\n");
    exit(2);
  }
  if (Ninf_call("plot/plot", argv[1], argv[2]) == NINF_ERROR)
    Ninf_perror("Ninf_call plot:");
}
We also note that, by combining this feature with the technique of using multiple servers simultaneously described in the next secion, we can process large amount of data at once.

4. Using Multiple Servers for Parallel Programming on the Grid --- The Parameter Sweep Survey Example.

GridRPC can serve as a task-parallel programming abstraction, whose programs can scale from local workstations to the Grid. Here, we take an example of simple parameter sweep survey, and investigate how it can be easily programmed using GridRPC.

4.1 Sample Program --- Calculating PI using a simple Monte Carlo Method

As an example, we compute the value of PI using a simple Monte Carlo Method. We generate a large number of random points within the square region that exactly encloses a unit circle (actually, 1/4 of a circle). We calculate the value of PI by inverse computing the area of the circle according to the probability that the points will fall within the circle. The program below shows the original sequential version.
long pi_trial(int seed, long times){
  long l, long counter = 0;
  srandom(seed);
  for (l = 0; l < times; l++){
	double x = (double)random() /	RAND_MAX;
	double y = (double)random() /	RAND_MAX;
	if (x * x + y * y < 1.0)
	  counter++;
  }
  return counter;
}

main(int argc, char ** argv){
  double pi;
  long times = atol(argv[1]);
  count = pi_trial(10, times);
  pi = 4.0 * (count / (double) times);
  printf("PI = %f\n", pi);
}

4.2 Gridifying the PI program.

First, we rewrite the program so that it does the appropriate GridRPC calls. The following steps are needed:
  1. Separate out the pi_trail() function into a separate file (say, trial_pi.c), and create its object file trial_pi.o using a standard C compiler.
  2. Describe the interface of pi_trial in an IDL file.
    Module pi;
    
    Define pi_trial(IN int seed, IN long times, OUT long * count)
    "monte carlo pi computation"
    Required "pi_trial.o"
    {
      long counter;
      counter = pi_trial(seed, times);
      *count = counter;
    }
    
  3. Rewrite the main program so that it makes a GridRPC call.
    main(int argc, char ** argv){
      double pi;
      long times, count;
      argc = Ninf_parse_arg(argc, argv);
      times = atol(argv[1]);
    
      if (Ninf_call("pi/pi_trial", 10, times, &count) == NINF_ERROR){
        Ninf_perror("pi_trial");
        exit(2);
      }
      pi = 4.0 * ( count / (double) times);
      printf("PI = %f\n", pi);
    }
    
We now have made the body of the computation remote. The next phase is to parallelise it.

4.3 Employing Multiple Servers for Task Parallel Computation.

We next rewrite the main program so that parallel tasks are distributed to multiple servers. Although distribution of tasks are possible using metaserver scheduling with Ninf (and Agents with Netsolve), it is sometimes better to specify a host explicitly for performance reasons, for low overhead and explicit load balancing. Ninf allows explicit specification of servers by the use of the URI format.

The standard Ninf_call() RPC is synchronous in that the client waits until the completion of the computation on the server side. For task-parallel execution, Ninf facilitates several asynchronous call APIs. For example, the most basic asynchronous call Ninf_call_async is almost identical to Ninf_call except that it returns immediately after all the arguments have been sent. The return value is the session ID of the asynchronous call; the ID is used for various synchronizations such as waiting for the return value of the call.

There are several calls for synchronization. The most basic is the Ninf_wait(int ID), where we wait for the result of the asynchronous call with the supplied session ID. Ninf_wait_all() waits for all preceding asynchronous invocations made. Here, we employ Ninf_wait_all() to parallelize the above PI client so that it uses multiple simultaneous remote server calls:

 1: #define NUM_HOSTS 16
 2: char * hosts[] =
 3: {"wiz00", "wiz01", "wiz02", "wiz03", "wiz04", "wiz05", "wiz06", "wiz07",
 4:  "wiz08", "wiz09", "wiz10", "wiz11", "wiz12", "wiz13", "wiz14", "wiz15"
 5: };
 6: int port = 4000;
 7: 
 8: main(int argc, char ** argv){
 9:   double pi;
10:   long times, count[NUM_HOSTS], sum;
11:   int i;
12:   times = atol(argv[1]) / NUM_HOSTS;
13: 
14:   for (i = 0; i < NUM_HOSTS; i++){
15:     char entry[100];
16:     sprintf(entry, "ninf://%s:%d/pi/pi_trial", hosts[i], port);
17:     if (Ninf_call_async(entry, i, times, &count[i]) == NINF_ERROR){
18:       Ninf_perror("pi_trial");
19:       exit(2);
20:     }
21:   }
22:   Ninf_wait_all();
23:   for (i = 0, sum = 0; i < NUM_HOSTS; i++)
24:     sum += count[i];
25:   pi = 4.0 * ( sum / ((double) times * NUM_HOSTS));
26:   printf("PI = %f\n", pi);
27: }
We specify the number of server hosts and their names in lines 1 and 2-5, respectively. Line 6 is the port number used, and line 12 divides the number of Monte Carlo trials with the number of servers, determining the number of trials per server. The for loop in lines 14-21 invokes the servers asynchronously: Line 16 generates the URI of the server, line 17 calls the server asynchronously, and line 22 waits for all the servers to finish. Line 23 aggregates the results returned from all the servers.

In this manner, we can easily write a parallel parameter sweep survey program using the task parallel primitives of GridRPC. We next modify the program to perform dynamic load balancing.

4.4 Dynamically Load Balancing Multiple Servers

The previous program assumed that the loads of the servers are more or less balanced. In other words, we partitioned the work evenly assuming that each server has equivalent compute power. Such may not be always the case: in a Grid environment, servers are typically heterogeneous; also, for some programs load cannot be evenly distributed in a predictable manner. For those cases, a dynamic load balancing scheme is required.

Although GridRPC systems (both Ninf and Netsolve) offer system level scheduling infrastructure to select lightly loaded servers for load balancing, for dynamically load balancing parallel programs the mechanism may not be entirely appropriate, as the load information may not propagate fast enough to serve the needs of fast, repetitive GridRPC invocations.

Instead, here we employ the asynchronous GridRPC APIs to explicitly balance load. For the above PI example, we further subdivide the load (# of trials) and invoke each server multiple times. Servers that have rapid task execution turnaround automatically gets assigned more computation, achieving overall balance of the load. Below is the modified code that balance the load:

 1: #define NUM_HOSTS 16
 2: char * hosts[] =
 3: {"wiz00", "wiz01", "wiz02", "wiz03", "wiz04", "wiz05", "wiz06", "wiz07",
 4:  "wiz08", "wiz09", "wiz10", "wiz11", "wiz12", "wiz13", "wiz14", "wiz15"
 5: };
 6: int port = 4000;
 7: #define DIV 5
 8:
 9: main(int argc, char ** argv){
10:    double pi;
11:    long times, whole_times, count[NUM_HOSTS], sum = 0;
12:    int i, done = 0;
13:    char entry[NUM_HOSTS][100];
14:    int ids[NUM_HOSTS];
15:  
16:    whole_times = atol(argv[1]);
17:    times = (whole_times / NUM_HOSTS) / DIV ;
18:    for (i = 0; i < NUM_HOSTS; i++){
19:      sprintf(entry[i], "ninf://%s:%d/pi/pi_trial", hosts[i], port);
20:      if ((ids[i] =
21:           Ninf_call_async(entry[i], rand(), times, &count[i])) == NINF_ERROR){
22:        Ninf_perror("pi_trial");
23:        exit(2);
24:      }
25:    }
26:    while (1) {
27:      int id = Ninf_wait_any();        /* WAIT FOR ANY HOST */
28:      if (id == NINF_OK)
29:        break;
30:      for (i = 0; i < NUM_HOSTS; i++)  /* FIND HOST */
31:        if (ids[i] == id) break;
32:  
33:      sum += count[i];
34:      done += times;
35:      if (done >= whole_times)
36:        continue;
37:      if ((ids[i] =
38:           Ninf_call_async(entry[i], rand(), times, &count[i])) == NINF_ERROR){
39:        Ninf_perror("pi_trial");
40:        exit(2);
41:      }
42:    }
43:    pi = 4.0 * ( sum / (double)done);
44:    printf("PI = %f\n", pi);
45:  }
In line 7, DIV determines how many times on the average the Ninf call should be made. We also define the array ids[] which holds the current GridRPC session ID for each hosts, and array entry which embodies the URI of each server.

In Line 17, we divide the parameter specified with arg[v] with the # of hosts and DIV. Lines 8-24 perform the initial asynchronous Ninf_calls to each server. Lines 26-42 make a call to a host whose call has just finished. Ninf_wait_any in line 27 returns the session ID of the call that has finished. When there are no precedings asynchronous Ninf_calls that has finished, Ninf_wait_any merely returns NINF_OK. The loop in line 30 computes the server that corresponds to the session ID. Lines 33 and 34 aggregate the result from that server, and line 38 makes the next asynchronous GridRPC call to the server.

More complex task parallel interactions between the calls are possible, and will be subject of a future tutorial document.

5. Summary

This document gave a tutorial of how to use GridRPC systems for programming on the Gird using the Ninf system as a representative. Other GridRPC systems facilitate very similar set of features, and the same programming techniques can essentially be used. Simplicity of programming is the most beneficial aspect of Grid RPC systems, and we hope that users will be able to gridify his programs easily after reading this document.

Ninf as well as other GridRPC systems have rich sets of features not covered in this rather short and simple tutorial; we strongly suggest that the users will read the manual and other documents before attempting to utilize GridRPC in an extensive manner. We of course hope to extend this tutorial further to cover more advanced GridRPC features.

For details, readers are referred to http://ninf.etl.go.jp/.