Gridシステムの使用を容易にするミドルウェアとしてGridRPCがある。 本ドキュメントは、GridRPCの一つであるNinfシステムに関して 演算ライブラリをRPC化する場合、バイナリプログラムをRPC化する場合、 RPC化した計算ルーチンを用いて複数のサーバでパラメータサーベイを 行う場合について、具体的にプログラムサンプルを挙げて説明する。
GridRPCが有効な状況は以下の3通りに分類できる。
このような計算は、MPIなどのメッセージパッシングライブラリを用いて実装することも もちろん可能であるが、プログラミングが煩雑になる。 GridRPCを用いると、非常に容易にこの種の並列計算を記述することができる。 また、遠隔地に置かれた複数のクラスタを使用した計算も可能になる。
mmulは、第1引数に行列のサイズ、
第2,3引数に入力行列をとる。第4引数は出力行列である。
void  mmul(int n, double * a, double * b, double * c){
  double t;
  int i, j, k;
  for (i = 0; i < N; i++) {
    for (j = 0; j < N; j++) {
      t = 0;
      for (k = 0; k < N; k++){
        t += a[i * n + k] * b[k * n + j];
      }
      c[i*N+j] = t;
    }
  }
}
この関数を利用するメインルーチンは以下のようになる。
main(){
  double A[N*N], B[N*N], C[N*N];
  initMatA(N, A);  /* initialize */
  initMatB(N, B);  /* initialize */
  mmul(N, A, B, C);
}
1: Module mmul; 2: 3: Define mmul(IN int N, IN double A[N*N], 4: IN double B[N*N], OUT double C[N*N]) 5: "matmul" 6: Required "mmul_lib.o" 7: Calls "C" mmul(N, A, B, C);1行目の
Module mmul;はモジュール名の宣言である。
IDLファイルとモジュールは1対1に対応する。
通常IDLファイルのファイル名はモジュール名.idlとする。
1つのモジュールには任意個のエントリを定義することができる。
3行目から7行目が、エントリmmul/mmulの定義である。
3行目と4行目で、このエントリのインターフェイスを定義している。
通常のCのプロトタイプ宣言との相違は、返り値がないことと、
モードを指定していること、配列のサイズを指定していることである。
メモリ領域を共有している通常の関数呼び出しと異なり、 GridRPCではデータの転送が必要となる。データの転送量を確定するために、 配列のサイズを指定する必要があるのだ。 配列サイズの指定には、他のint型の引数(ここではN)を用いることが でき、さらに四則演算も許される。
5行目には、このエントリに関する説明がかかれている。 6行目では、このエントリを実現する実行ファイルLinkする際に必要な オブジェクトファイルを指定している。 7行目で、実際に呼び出すライブラリ関数と、呼び出しシーケンスを指定している。
Ninfでは、このIDLファイルをIDLコンパイラでコンパイルすることで、
stubとなるメインルーチンのソースコードと、makeファイルが生成される。
このときファイル名はモジュール名.makとなる。
このmakeファイルを用いてmakeを実行することで実行ファイルが生成される。
> ninf_gen mmul.idl > make -f mmul.mak
_stub_mmulという名前の実行ファイルが出来ているはずである。
この実行ファイルをNinfサーバに組み込むには二つの方法がある。
mmul.confという名前で以下の
エントリを持つファイルを作っておく。
.... stubs _stub_mmul ....そして、このファイルを指定してサーバを起動する。 サーバ設定ファイルではこの他にもさまざまな設定が記述できるが、 ここでは割愛する。
> ninf_serv_tcp mmul.conf後者の方法では、登録用のソフトウェアninf_registerを用いる。 この場合、すでにNinfサーバがたっていて、その使用しているポートが わかっていなければならない。ポートを指定して以下のように登録を行う。
> ninf_register -port 4000 _stub_mmul
main(int argc, char ** argv){
  double A[N*N], B[N*N], C[N*N];
  argc = Ninf_parse_arg(argc, argv);
  initMatA(N, A);  /* initialize */
  initMatB(N, B);  /* initialize */
  if (Ninf_call("mmul/mmul", N, A, B, C) != NINF_ERROR)
    Ninf_perror("mmul");
}
本質的に変更されている場所は、関数呼び出しの1行のみである。
このようにGridRPCを使用することは非常に容易である。
もとのプログラムの1行がここではエラーチェックによって2行になっている。 Ninf呼び出しはグローバル環境で実行されるため、 通常のライブラリ関数と比較すると実行に失敗する可能性のある場所が 多くなっている。このため、返り値をチェックし、エラーの場合には Ninf_perrorを用いて原因を出力するようにプログラムすることが重要である。
このプログラムのコンパイルは以下のようにすればよい。
> ninfcc -o mmul_ninf mmul_ninf.cとすればよい。
ninfccはCコンパイラへのラッパである。
また、上のコードではmainの引数にargc, argvを設定し、
Ninf_parse_argを追加している。
このルーチンは、引数列からNinfに関連する引数を抜き出して、
Ninfルーチンの設定を行う関数である。たとえばninf.apgrid.orgのポート4000で
サービスを行っているNinfサーバを用いる場合下のように実行すればよい。
> ./mmul_ninf -server ninf.apgrid.org -port 4000
Ninf_parse_argは、引数列からNinfで解析した引数を除いた値を返す。
argvも破壊的に書き換え、やはりNinf向けの引数は取り除く。したがって
Ninf_parse_arg以降はNinf関連の引数はない場合と同じになる。
本来のアプリケーションの引数解釈は、Ninf_parse_argの後で
行うことが望ましい。
本節ではファイルを使用するプログラムの例として
 gnuplot を用いる。
 gnuplot はグラフを描画するプログラムで、
インタラクティブに使用することもできるが、
引数で指定したファイルからスクリプトを読み込んで、
標準出力にファイルを出力することもできる。
下に示すのはgnuplotに入力するスクリプトである。
set terminal postscript set xlabel "x" set ylabel "y" plot f(x) = sin(x*a), a = .2, f(x), a = .4, f(x)このスクリプトを
gplotという名前でセーブしていたとすると、
> gnuplot gplot > graph.psとすることで、グラフを得ることができる。 これをサーバ側で実行するためには、 入力ファイル(gplot) と出力ファイル(graph.ps)をクライアント・サーバ間で転送する 必要がある。
Ninf IDLでファイルの転送を指定するには、filename という型を用いる。
gnuplotをRPCで用いるためのIDLを下に示す。
Module plot;
Define plot(IN filename plotfile, OUT filename psfile )
"invoke gnuplot"
{
    char buffer[1000];
    sprintf(buffer, "gnuplot %s > %s", plotfile, psfile);
    system(buffer);
}
このIDLではbuffer変数にgnuplotを呼び出すシーケンス文字列を書き込み、
systemライブラリでそれを呼び出している。
入力側のfilename で指定されたファイルは、サーバ側のテンポラリディレクトリ
に適当な名前のファイルとして転送され、そのファイル名がstub関数に渡される。
出力側のファイルは、テンポラリ名だけが生成され、stub関数に渡される。
stubプログラムの実行後、出力属性のファイルは自動的にクライアント側に返送され、
クライアントプログラムで指定したファイル名でセーブされる。
この関数を呼び出すプログラム例を下に示す。
#include <stdio.h>
#include "ninf.h"
main(int argc, char ** argv){
  argc = Ninf_parse_arg(argc, argv);
  if (argc < 3) {
    fprintf(stderr, "USAGE: plot_main INPUT PSFILE\n");
    exit(2);
  }
  if (Ninf_call("plot/plot", argv[1], argv[2]) == NINF_ERROR)
    Ninf_perror("Ninf_call plot:");
}
引数として文字列を与えるだけで、ファイルの転送と計算を一括で
指定している。Ninfにはこの他にすでにfopenされている
ファイルポインタでファイル転送を指定する機能もある。これを
用いると、ファイルの途中までをクライアントで処理し、残りを
サーバ側で処理することが可能になる。
この手法と、次節で述べる複数のサーバを同時に使用する方法を組み合わせることで、 膨大なデータの一括処理が容易に記述できる。
元のプログラム例を下に示す。
long pi_trial(int seed, long times){
  long l, long counter = 0;
  srandom(seed);
  for (l = 0; l < times; l++){
	double x = (double)random() /	RAND_MAX;
	double y = (double)random() /	RAND_MAX;
	if (x * x + y * y < 1.0)
	  counter++;
  }
  return counter;
}
main(int argc, char ** argv){
  double pi;
  long times = atol(argv[1]);
  count = pi_trial(10, times);
  pi = 4.0 * (count / (double) times);
  printf("PI = %f\n", pi);
}
pi_trailの部分を別のファイル(trial_pi.cとする)にし、
trial_pi.o を作成する。
Module pi;
Define pi_trial(IN int seed, IN long times, OUT long * count)
"monte carlo pi computation"
Required "pi_trial.o"
{
  long counter;
  counter = pi_trial(seed, times);
  *count = counter;
}
main(int argc, char ** argv){
  double pi;
  long times, count;
  argc = Ninf_parse_arg(argc, argv);
  times = atol(argv[1]);
  if (Ninf_call("pi/pi_trial", 10, times, &count) == NINF_ERROR){
    Ninf_perror("pi_trial");
    exit(2);
  }
  pi = 4.0 * ( count / (double) times);
  printf("PI = %f\n", pi);
}
また、通常のNinf_callをもちいるとサーバ側の計算が終了するのを 待ってしまい並列実行ができないので、非同期呼び出しを行う Ninf_call_asyncを用いる。この関数は引数の送信が終了した時点で セッションIDを返り値としてリターンしてくる。 このセッションIDを用いて終了待ちや、同期処理を行う。
同期処理機構にはいくつかのバリエーションがあるが、
もっとも基本的なものは、Ninf_wait(int ID)である。
これはセッションIDを引数とし、そのセッションの終了を待つ関数である。
また、よく使うものとしては、先行する全てのセッションの終了を待つ
関数 Ninf_wait_all() がある。
ここでは、 Ninf_wait_all() を用いて
複数サーバを用いた並列実行のクライアントプログラムを書いてみる。
 1: #define NUM_HOSTS 16
 2: char * hosts[] =
 3: {"wiz00", "wiz01", "wiz02", "wiz03", "wiz04", "wiz05", "wiz06", "wiz07",
 4:  "wiz08", "wiz09", "wiz10", "wiz11", "wiz12", "wiz13", "wiz14", "wiz15"
 5: };
 6: int port = 4000;
 7: 
 8: main(int argc, char ** argv){
 9:   double pi;
10:   long times, count[NUM_HOSTS], sum;
11:   int i;
12:   times = atol(argv[1]) / NUM_HOSTS;
13: 
14:   for (i = 0; i < NUM_HOSTS; i++){
15:     char entry[100];
16:     sprintf(entry, "ninf://%s:%d/pi/pi_trial", hosts[i], port);
17:     if (Ninf_call_async(entry, i, times, &count[i]) == NINF_ERROR){
18:       Ninf_perror("pi_trial");
19:       exit(2);
20:     }
21:   }
22:   Ninf_wait_all();
23:   for (i = 0, sum = 0; i < NUM_HOSTS; i++)
24:     sum += count[i];
25:   pi = 4.0 * ( sum / ((double) times * NUM_HOSTS));
26:   printf("PI = %f\n", pi);
27: }
使用するホストの数と名前をそれぞれ1行目と2-5行目で指定している。
6行目のportは、使用するポート名を示している。
12行目で引数で指定される試行回数をホスト数で割り、各ホストで実行する
試行回数を決定している。
14行目から21行目のforループで、各ホストに試行を割り当てている。
16行目でNinfの呼び出しに用いるURIを生成し、
17行目でNinf_call_asyncを用いて各ホストに
非同期呼び出しを行っている。
22行目のNinf_wait_allで全てのホストでの実行が終了するのを
待つ。
23行目では、各ホストでの結果を集計している。
このように、GridRPCを用いるとパラメータサーベイを並列に行う プログラムは非常に容易に記述できる。
ここでは、負荷(試行回数)をある程度細分化し、個々のホストに対して 複数回実行を行う方法を用いる。早く計算が終了したホストに対しては 遅いホストよりも多くの計算が割り当てられ、結果として 負荷の分散が実現される。下にコード例を示す。
 1: #define NUM_HOSTS 16
 2: char * hosts[] =
 3: {"wiz00", "wiz01", "wiz02", "wiz03", "wiz04", "wiz05", "wiz06", "wiz07",
 4:  "wiz08", "wiz09", "wiz10", "wiz11", "wiz12", "wiz13", "wiz14", "wiz15"
 5: };
 6: int port = 4000;
 7: #define DIV 5
 8:
 9: main(int argc, char ** argv){
10:    double pi;
11:    long times, whole_times, count[NUM_HOSTS], sum = 0;
12:    int i, done = 0;
13:    char entry[NUM_HOSTS][100];
14:    int ids[NUM_HOSTS];
15:  
16:    whole_times = atol(argv[1]);
17:    times = (whole_times / NUM_HOSTS) / DIV ;
18:    for (i = 0; i < NUM_HOSTS; i++){
19:      sprintf(entry[i], "ninf://%s:%d/pi/pi_trial", hosts[i], port);
20:      if ((ids[i] =
21:           Ninf_call_async(entry[i], rand(), times, &count[i])) == NINF_ERROR){
22:        Ninf_perror("pi_trial");
23:        exit(2);
24:      }
25:    }
26:    while (1) {
27:      int id = Ninf_wait_any();        /* WAIT FOR ANY HOST */
28:      if (id == NINF_OK)
29:        break;
30:      for (i = 0; i < NUM_HOSTS; i++)  /* FIND HOST */
31:        if (ids[i] == id) break;
32:  
33:      sum += count[i];
34:      done += times;
35:      if (done >= whole_times)
36:        continue;
37:      if ((ids[i] =
38:           Ninf_call_async(entry[i], rand(), times, &count[i])) == NINF_ERROR){
39:        Ninf_perror("pi_trial");
40:        exit(2);
41:      }
42:    }
43:    pi = 4.0 * ( sum / (double)done);
44:    printf("PI = %f\n", pi);
45:  }
7行目のDIVは、各ホストあたり平均何回のNinf呼び出しを行うかを指定している。
各ホストに対して、そのセッションで実行しているセッションIDを収める
ids、第一引数となるURIをおさめたentryを
定義している。
17行でargv[1]で指定される総実行回数を、ホストの数とDIVとで割って求めている。 8行目から24行目のループで各ホストに対して1度づつNinf_callを非同期で 実行している。
26行から42行目までのループでは、呼び出しが終了したホストに対して 再度呼び出しを行っている。 27行目のNinf_wait_any()は、実行が終了したセッションのIDを取得している。 IDがNINF_OKである場合には実行終了待ちのセッションがないことを意味するので、 ループを終了する。 30,31行のループで、IDに対応する、ホストを求めている。 33,34行でそのホストからの結果を集計し、38行でそのホストに対して 再度Ninf呼び出しを実行している。
このように、Ninfを用いると動的に負荷分散を行うコードも比較的容易に 記述できる。
この問題に対する対処の一つとして、RPC発行のコストを低減する ことが考えられる。RPC発行コストは以下のように大きく分けることができる。
このためにNinf_executable_tという構造を導入した。 これはサーバ側のプロセスへの参照を保持する構造体で、 最初にこの構造体を取得してから、この構造体を使用してNinf呼び出しを 行うことで、実行プロセスの共有を実現する。 下にプログラムのメイン部のみを示す。
main(int argc, char ** argv){
  double pi;
  long times, whole_times, count[NUM_HOSTS], sum = 0;
  int i, done = 0;
  char entry[NUM_HOSTS][100];
  int ids[NUM_HOSTS];
  Ninf_executable_t * exe[NUM_HOSTS];
  if (argc < 2){
    fprintf(stderr, "USAGE: pi TIMES \n");
    exit(2);
  }
  whole_times = atol(argv[1]);
  times = (whole_times / NUM_HOSTS) / DIV ;
  for (i = 0; i < NUM_HOSTS; i++){
    sprintf(entry[i], "ninf://%s:%d/pi/pi_trial", hosts[i], port);
    exe[i] = Ninf_get_executable(entry[i]);   /* initialize executable */
    if ((ids[i] =
         Ninf_call_executable_async(exe[i], rand(), times, &count[i])) 
             == NINF_ERROR){
      Ninf_perror("pi_trial");
      exit(2);
    }
  }
  while (1) {
    int id;
    if (Ninf_wait_any(&id) == NINF_ERROR){  /* WAIT FOR ANY HOST */
      Ninf_perror(wait_any);
      break;
    }
    if (id == 0)  /* all sessions are done */
      break;
    for (i = 0; i < NUM_HOSTS; i++)  /* FIND HOST */
      inf (ids[i] == id) break;
    sum += count[i];
    done += times;
    if (done >= whole_times){
      Ninf_executable_finalize(exe[i]);      /* finalize executable */
      continue;
    }
    if ((ids[i] =
         Ninf_call_executable_async(exe[i], rand(), times, &count[i]))
             == NINF_ERROR){
      Ninf_perror("pi_trial");
      exit(2);
    }
  }
  pi = 4.0 * ( sum / (double)done);
  printf("PI = %f\n", pi);
}
前出のプログラムとの相違点は下線を引いた場所のみである。
Ninf_get_executable でNinf_executable_t を取得し、
その後Ninf_call_executable_async で、その
Ninf_executable_tを用いた非同期実行を行っている。
このように書くことで、RPC発行のコストが減少し、動的負荷分散を 行うコードでも、十分な効率で実行することができる。
本ドキュメントで示したとおり、GridRPCは使用が容易なことに特徴がある。 多くのユーザに膨大な計算パワーへのアクセスが可能になることで、 計算科学の分野での進展が期待される。