Ninfのつかいかた

中田秀基(ETL)、松岡聡(TIT)、佐藤三久(RWCP)、関口智嗣(ETL)

概要

Gridシステムの使用を容易にするミドルウェアとしてGridRPCがある。 本ドキュメントは、GridRPCの一つであるNinfシステムに関して 演算ライブラリをRPC化する場合、バイナリプログラムをRPC化する場合、 RPC化した計算ルーチンを用いて複数のサーバでパラメータサーベイを 行う場合について、具体的にプログラムサンプルを挙げて説明する。

1. はじめに

GridRPCとは、Gridシステムの使用を容易にするミドルウェアである。 本ドキュメントは、GridRPCの一つであるNinfについてその使用法を述べる。

GridRPCが有効な状況は以下の3通りに分類できる。

本稿ではいくつかの具体的なケースを挙げ、その際のNinfの使用法を説明する。 section 2では、計算ライブラリのNinf化手法とその使用法について述べる。 section 3では、ファイルをインターフェイスとして用いるバイナリプログラムの GridRPC化手法について述べる。 section 4では、パラメータサーベイを行う事例を示す。

2. 計算ルーチンのNinf化

まず、Ninf化するライブラリが、リンク可能なライブラリ関数として定義されている場合を 考える。

2.1 ライブラリ関数例

サンプルとして非常に簡単な行列の掛け算を例に取る。 下に示す関数mmulは、第1引数に行列のサイズ、 第2,3引数に入力行列をとる。第4引数は出力行列である。
void  mmul(int n, double * a, double * b, double * c){
  double t;
  int i, j, k;
  for (i = 0; i < N; i++) {
    for (j = 0; j < N; j++) {
      t = 0;
      for (k = 0; k < N; k++){
        t += a[i * n + k] * b[k * n + j];
      }
      c[i*N+j] = t;
    }
  }
}
この関数を利用するメインルーチンは以下のようになる。
main(){
  double A[N*N], B[N*N], C[N*N];
  initMatA(N, A);  /* initialize */
  initMatB(N, B);  /* initialize */

  mmul(N, A, B, C);
}

2.2 ライブラリ関数のNinf化

このライブラリ関数をNinf化するには、この関数のインターフェイス情報を 記述する必要がある。NinfではNinf IDLと呼ばれる記述言語を用いる。 下にこの関数のインターフェイスを示す。
1: Module mmul;
2: 
3: Define mmul(IN int    N,      IN  double A[N*N], 
4:             IN double B[N*N], OUT double C[N*N])
5: "matmul"
6: Required "mmul_lib.o"
7: Calls "C" mmul(N, A, B, C);
1行目のModule mmul;はモジュール名の宣言である。 IDLファイルとモジュールは1対1に対応する。 通常IDLファイルのファイル名はモジュール名.idlとする。 1つのモジュールには任意個のエントリを定義することができる。 3行目から7行目が、エントリmmul/mmulの定義である。 3行目と4行目で、このエントリのインターフェイスを定義している。 通常のCのプロトタイプ宣言との相違は、返り値がないことと、 モードを指定していること、配列のサイズを指定していることである。

メモリ領域を共有している通常の関数呼び出しと異なり、 GridRPCではデータの転送が必要となる。データの転送量を確定するために、 配列のサイズを指定する必要があるのだ。 配列サイズの指定には、他のint型の引数(ここではN)を用いることが でき、さらに四則演算も許される。

5行目には、このエントリに関する説明がかかれている。 6行目では、このエントリを実現する実行ファイルLinkする際に必要な オブジェクトファイルを指定している。 7行目で、実際に呼び出すライブラリ関数と、呼び出しシーケンスを指定している。

Ninfでは、このIDLファイルをIDLコンパイラでコンパイルすることで、 stubとなるメインルーチンのソースコードと、makeファイルが生成される。 このときファイル名はモジュール名.makとなる。 このmakeファイルを用いてmakeを実行することで実行ファイルが生成される。

> ninf_gen mmul.idl
> make -f mmul.mak

2.3 サーバへの登録

_stub_mmulという名前の実行ファイルが出来ているはずである。 この実行ファイルをNinfサーバに組み込むには二つの方法がある。 前者の場合、たとえばmmul.confという名前で以下の エントリを持つファイルを作っておく。
....
stubs _stub_mmul
....
そして、このファイルを指定してサーバを起動する。 サーバ設定ファイルではこの他にもさまざまな設定が記述できるが、 ここでは割愛する。
> ninf_serv_tcp mmul.conf
後者の方法では、登録用のソフトウェアninf_registerを用いる。 この場合、すでにNinfサーバがたっていて、その使用しているポートが わかっていなければならない。ポートを指定して以下のように登録を行う。
> ninf_register -port 4000 _stub_mmul

2.4 クライアントプログラムの変更

このライブラリをRPC呼び出しするには、以下のようにメインプログラムを 変更する。
main(int argc, char ** argv){
  double A[N*N], B[N*N], C[N*N];

  argc = Ninf_parse_arg(argc, argv);
  initMatA(N, A);  /* initialize */
  initMatB(N, B);  /* initialize */

  if (Ninf_call("mmul/mmul", N, A, B, C) != NINF_ERROR)
    Ninf_perror("mmul");
}
本質的に変更されている場所は、関数呼び出しの1行のみである。 このようにGridRPCを使用することは非常に容易である。

もとのプログラムの1行がここではエラーチェックによって2行になっている。 Ninf呼び出しはグローバル環境で実行されるため、 通常のライブラリ関数と比較すると実行に失敗する可能性のある場所が 多くなっている。このため、返り値をチェックし、エラーの場合には Ninf_perrorを用いて原因を出力するようにプログラムすることが重要である。

このプログラムのコンパイルは以下のようにすればよい。

> ninfcc -o mmul_ninf mmul_ninf.c 
とすればよい。ninfccはCコンパイラへのラッパである。

また、上のコードではmainの引数にargc, argvを設定し、 Ninf_parse_argを追加している。 このルーチンは、引数列からNinfに関連する引数を抜き出して、 Ninfルーチンの設定を行う関数である。たとえばninf.apgrid.orgのポート4000で サービスを行っているNinfサーバを用いる場合下のように実行すればよい。

> ./mmul_ninf -server ninf.apgrid.org -port 4000
Ninf_parse_argは、引数列からNinfで解析した引数を除いた値を返す。 argvも破壊的に書き換え、やはりNinf向けの引数は取り除く。したがって Ninf_parse_arg以降はNinf関連の引数はない場合と同じになる。 本来のアプリケーションの引数解釈は、Ninf_parse_argの後で 行うことが望ましい。

3. ファイルを使用するプログラムのNinf化

前節で示した手法は、計算ルーチンがソースまたはライブラリ関数として 提供されており、通常の関数インターフェイスで呼び出すことができることを 前提にしている。しかし、実際には使用したい計算ルーチンがバイナリプログラム でしか提供されておらず、入出力インターフェイスもファイルを使用する ように設計されている場合も多い。 このようなプログラムをRPC化するために、 GridRPCの多くはファイルの転送機能をサポートしている。

本節ではファイルを使用するプログラムの例として gnuplot を用いる。 gnuplot はグラフを描画するプログラムで、 インタラクティブに使用することもできるが、 引数で指定したファイルからスクリプトを読み込んで、 標準出力にファイルを出力することもできる。 下に示すのはgnuplotに入力するスクリプトである。

set terminal postscript
set xlabel "x"
set ylabel "y"
plot f(x) = sin(x*a), a = .2, f(x), a = .4, f(x)
このスクリプトをgplotという名前でセーブしていたとすると、
> gnuplot gplot > graph.ps
とすることで、グラフを得ることができる。 これをサーバ側で実行するためには、 入力ファイル(gplot) と出力ファイル(graph.ps)をクライアント・サーバ間で転送する 必要がある。

Ninf IDLでファイルの転送を指定するには、filename という型を用いる。 gnuplotをRPCで用いるためのIDLを下に示す。

Module plot;
Define plot(IN filename plotfile, OUT filename psfile )
"invoke gnuplot"
{
    char buffer[1000];
    sprintf(buffer, "gnuplot %s > %s", plotfile, psfile);
    system(buffer);
}
このIDLではbuffer変数にgnuplotを呼び出すシーケンス文字列を書き込み、 systemライブラリでそれを呼び出している。

入力側のfilename で指定されたファイルは、サーバ側のテンポラリディレクトリ に適当な名前のファイルとして転送され、そのファイル名がstub関数に渡される。 出力側のファイルは、テンポラリ名だけが生成され、stub関数に渡される。 stubプログラムの実行後、出力属性のファイルは自動的にクライアント側に返送され、 クライアントプログラムで指定したファイル名でセーブされる。

この関数を呼び出すプログラム例を下に示す。

#include <stdio.h>
#include "ninf.h"

main(int argc, char ** argv){
  argc = Ninf_parse_arg(argc, argv);

  if (argc < 3) {
    fprintf(stderr, "USAGE: plot_main INPUT PSFILE\n");
    exit(2);
  }
  if (Ninf_call("plot/plot", argv[1], argv[2]) == NINF_ERROR)
    Ninf_perror("Ninf_call plot:");
}
引数として文字列を与えるだけで、ファイルの転送と計算を一括で 指定している。Ninfにはこの他にすでにfopenされている ファイルポインタでファイル転送を指定する機能もある。これを 用いると、ファイルの途中までをクライアントで処理し、残りを サーバ側で処理することが可能になる。

この手法と、次節で述べる複数のサーバを同時に使用する方法を組み合わせることで、 膨大なデータの一括処理が容易に記述できる。

4. パラメータサーベイプログラムの実行

本節ではGridRPCを用いたパラメータサーベイの例を示す。

4.1 サンプルプログラム

例としてモンテカルロ法を用いた円周率の計算を行う。 この手法は、円に外接する正方形内にランダムに点を大量に生成し、 それらの点が円の中に収まる確率から円の面積を逆算するものである。

元のプログラム例を下に示す。

long pi_trial(int seed, long times){
  long l, long counter = 0;
  srandom(seed);
  for (l = 0; l < times; l++){
	double x = (double)random() /	RAND_MAX;
	double y = (double)random() /	RAND_MAX;
	if (x * x + y * y < 1.0)
	  counter++;
  }
  return counter;
}

main(int argc, char ** argv){
  double pi;
  long times = atol(argv[1]);
  count = pi_trial(10, times);
  pi = 4.0 * (count / (double) times);
  printf("PI = %f\n", pi);
}

4.2 プログラムのGridRPC化

まず、このプログラムをGridRPC対応に書き直す。それには以下のステップが必要である。
  1. 関数pi_trailの部分を別のファイル(trial_pi.cとする)にし、 trial_pi.o を作成する。
  2. IDLファイルを記述する。
    Module pi;
    
    Define pi_trial(IN int seed, IN long times, OUT long * count)
    "monte carlo pi computation"
    Required "pi_trial.o"
    {
      long counter;
      counter = pi_trial(seed, times);
      *count = counter;
    }
    
  3. プログラム本体部をNinf_callを用いて書き直す。
    main(int argc, char ** argv){
      double pi;
      long times, count;
      argc = Ninf_parse_arg(argc, argv);
      times = atol(argv[1]);
    
      if (Ninf_call("pi/pi_trial", 10, times, &count) == NINF_ERROR){
        Ninf_perror("pi_trial");
        exit(2);
      }
      pi = 4.0 * ( count / (double) times);
      printf("PI = %f\n", pi);
    }
    
以上の手続きでGridRPC化が完了する。

4.3 複数サーバを用いた並列計算

次に、このメインプログラムを書き換えて、複数のサーバにタスクを分配する コードに変更する。Ninfでは、呼び出し関数名にURI形式を指定することで プログラム中で明示的にサーバを指定することができる。

また、通常のNinf_callをもちいるとサーバ側の計算が終了するのを 待ってしまい並列実行ができないので、非同期呼び出しを行う Ninf_call_asyncを用いる。この関数は引数の送信が終了した時点で セッションIDを返り値としてリターンしてくる。 このセッションIDを用いて終了待ちや、同期処理を行う。

同期処理機構にはいくつかのバリエーションがあるが、 もっとも基本的なものは、Ninf_wait(int ID)である。 これはセッションIDを引数とし、そのセッションの終了を待つ関数である。 また、よく使うものとしては、先行する全てのセッションの終了を待つ 関数 Ninf_wait_all() がある。 ここでは、 Ninf_wait_all() を用いて 複数サーバを用いた並列実行のクライアントプログラムを書いてみる。

 1: #define NUM_HOSTS 16
 2: char * hosts[] =
 3: {"wiz00", "wiz01", "wiz02", "wiz03", "wiz04", "wiz05", "wiz06", "wiz07",
 4:  "wiz08", "wiz09", "wiz10", "wiz11", "wiz12", "wiz13", "wiz14", "wiz15"
 5: };
 6: int port = 4000;
 7: 
 8: main(int argc, char ** argv){
 9:   double pi;
10:   long times, count[NUM_HOSTS], sum;
11:   int i;
12:   times = atol(argv[1]) / NUM_HOSTS;
13: 
14:   for (i = 0; i < NUM_HOSTS; i++){
15:     char entry[100];
16:     sprintf(entry, "ninf://%s:%d/pi/pi_trial", hosts[i], port);
17:     if (Ninf_call_async(entry, i, times, &count[i]) == NINF_ERROR){
18:       Ninf_perror("pi_trial");
19:       exit(2);
20:     }
21:   }
22:   Ninf_wait_all();
23:   for (i = 0, sum = 0; i < NUM_HOSTS; i++)
24:     sum += count[i];
25:   pi = 4.0 * ( sum / ((double) times * NUM_HOSTS));
26:   printf("PI = %f\n", pi);
27: }
使用するホストの数と名前をそれぞれ1行目と2-5行目で指定している。 6行目のportは、使用するポート名を示している。 12行目で引数で指定される試行回数をホスト数で割り、各ホストで実行する 試行回数を決定している。

14行目から21行目のforループで、各ホストに試行を割り当てている。 16行目でNinfの呼び出しに用いるURIを生成し、 17行目でNinf_call_asyncを用いて各ホストに 非同期呼び出しを行っている。 22行目のNinf_wait_allで全てのホストでの実行が終了するのを 待つ。 23行目では、各ホストでの結果を集計している。

このように、GridRPCを用いるとパラメータサーベイを並列に行う プログラムは非常に容易に記述できる。

4.4 複数サーバを用いた並列計算 : 動的負荷分散

このサンプルプログラムは、負荷を静的に均等に分散することが容易である。 しかし、プログラムによっては負荷を静的に均等分割できないものもある。 また、サーバ群の性能に偏りがある場合、負荷を均等に分割すると、 実行時間にばらつきが出てしまい、もっとも遅い計算機に律速されることとなってしまう。 このような場合には、動的に負荷を分散する必要がある。

ここでは、負荷(試行回数)をある程度細分化し、個々のホストに対して 複数回実行を行う方法を用いる。早く計算が終了したホストに対しては 遅いホストよりも多くの計算が割り当てられ、結果として 負荷の分散が実現される。下にコード例を示す。

 1: #define NUM_HOSTS 16
 2: char * hosts[] =
 3: {"wiz00", "wiz01", "wiz02", "wiz03", "wiz04", "wiz05", "wiz06", "wiz07",
 4:  "wiz08", "wiz09", "wiz10", "wiz11", "wiz12", "wiz13", "wiz14", "wiz15"
 5: };
 6: int port = 4000;
 7: #define DIV 5
 8:
 9: main(int argc, char ** argv){
10:    double pi;
11:    long times, whole_times, count[NUM_HOSTS], sum = 0;
12:    int i, done = 0;
13:    char entry[NUM_HOSTS][100];
14:    int ids[NUM_HOSTS];
15:  
16:    whole_times = atol(argv[1]);
17:    times = (whole_times / NUM_HOSTS) / DIV ;
18:    for (i = 0; i < NUM_HOSTS; i++){
19:      sprintf(entry[i], "ninf://%s:%d/pi/pi_trial", hosts[i], port);
20:      if ((ids[i] =
21:           Ninf_call_async(entry[i], rand(), times, &count[i])) == NINF_ERROR){
22:        Ninf_perror("pi_trial");
23:        exit(2);
24:      }
25:    }
26:    while (1) {
27:      int id = Ninf_wait_any();        /* WAIT FOR ANY HOST */
28:      if (id == NINF_OK)
29:        break;
30:      for (i = 0; i < NUM_HOSTS; i++)  /* FIND HOST */
31:        if (ids[i] == id) break;
32:  
33:      sum += count[i];
34:      done += times;
35:      if (done >= whole_times)
36:        continue;
37:      if ((ids[i] =
38:           Ninf_call_async(entry[i], rand(), times, &count[i])) == NINF_ERROR){
39:        Ninf_perror("pi_trial");
40:        exit(2);
41:      }
42:    }
43:    pi = 4.0 * ( sum / (double)done);
44:    printf("PI = %f\n", pi);
45:  }
7行目のDIVは、各ホストあたり平均何回のNinf呼び出しを行うかを指定している。 各ホストに対して、そのセッションで実行しているセッションIDを収める ids、第一引数となるURIをおさめたentryを 定義している。

17行でargv[1]で指定される総実行回数を、ホストの数とDIVとで割って求めている。 8行目から24行目のループで各ホストに対して1度づつNinf_callを非同期で 実行している。

26行から42行目までのループでは、呼び出しが終了したホストに対して 再度呼び出しを行っている。 27行目のNinf_wait_any()は、実行が終了したセッションのIDを取得している。 IDがNINF_OKである場合には実行終了待ちのセッションがないことを意味するので、 ループを終了する。 30,31行のループで、IDに対応する、ホストを求めている。 33,34行でそのホストからの結果を集計し、38行でそのホストに対して 再度Ninf呼び出しを実行している。

このように、Ninfを用いると動的に負荷分散を行うコードも比較的容易に 記述できる。

4.5 Ninf_executable_tを用いた動的負荷分散

一般にマスターワーカ型のプログラミングスタイルでは、 計算粒度の設定が問題になる。 粒度を大きくすると、負荷分散がうまくいかない場合が生じやすく、 逆に粒度を小さくとると、計算の発行に掛かるコストが大きくなり、 オーバヘッドを生じたり、マスター役をつとめるクライアントの 負荷が大きくなり過ぎ、サーバを十分に活用できなくなる場合も ありうる。 特に、GridRPCでは一般にクライアントサーバ間が遠くなるため、 RPC発行のコストが大きいため、この問題は深刻である。

この問題に対する対処の一つとして、RPC発行のコストを低減する ことが考えられる。RPC発行コストは以下のように大きく分けることができる。

  1. サーバへの接続コスト
  2. サーバ側実行プロセスの起動コスト
  3. サーバ側実行プロセスとの通信コスト
3は本質的なため、削減することは出来ないが、1と2は、 サーバ側の実行プロセスを維持し、複数のセッションで共有することで 削減することができる。

このためにNinf_executable_tという構造を導入した。 これはサーバ側のプロセスへの参照を保持する構造体で、 最初にこの構造体を取得してから、この構造体を使用してNinf呼び出しを 行うことで、実行プロセスの共有を実現する。 下にプログラムのメイン部のみを示す。

main(int argc, char ** argv){
  double pi;
  long times, whole_times, count[NUM_HOSTS], sum = 0;
  int i, done = 0;
  char entry[NUM_HOSTS][100];
  int ids[NUM_HOSTS];
  Ninf_executable_t * exe[NUM_HOSTS];

  if (argc < 2){
    fprintf(stderr, "USAGE: pi TIMES \n");
    exit(2);
  }
  whole_times = atol(argv[1]);
  times = (whole_times / NUM_HOSTS) / DIV ;

  for (i = 0; i < NUM_HOSTS; i++){
    sprintf(entry[i], "ninf://%s:%d/pi/pi_trial", hosts[i], port);
    exe[i] = Ninf_get_executable(entry[i]);   /* initialize executable */
    if ((ids[i] =
         Ninf_call_executable_async(exe[i], rand(), times, &count[i])) 
             == NINF_ERROR){
      Ninf_perror("pi_trial");
      exit(2);
    }
  }
  while (1) {
    int id;
    if (Ninf_wait_any(&id) == NINF_ERROR){  /* WAIT FOR ANY HOST */
      Ninf_perror(wait_any);
      break;
    }
    if (id == 0)  /* all sessions are done */
      break;
    for (i = 0; i < NUM_HOSTS; i++)  /* FIND HOST */
      inf (ids[i] == id) break;

    sum += count[i];
    done += times;
    if (done >= whole_times){
      Ninf_executable_finalize(exe[i]);      /* finalize executable */
      continue;
    }
    if ((ids[i] =
         Ninf_call_executable_async(exe[i], rand(), times, &count[i]))
             == NINF_ERROR){
      Ninf_perror("pi_trial");
      exit(2);
    }
  }
  pi = 4.0 * ( sum / (double)done);
  printf("PI = %f\n", pi);
}
前出のプログラムとの相違点は下線を引いた場所のみである。 Ninf_get_executableNinf_executable_t を取得し、 その後Ninf_call_executable_async で、その Ninf_executable_tを用いた非同期実行を行っている。

このように書くことで、RPC発行のコストが減少し、動的負荷分散を 行うコードでも、十分な効率で実行することができる。

5. おわりに

本ドキュメントでは、Ninfの代表的な使用法を説明した。

本ドキュメントで示したとおり、GridRPCは使用が容易なことに特徴がある。 多くのユーザに膨大な計算パワーへのアクセスが可能になることで、 計算科学の分野での進展が期待される。