• 텐서플로우 분산 병렬 처리

    2019. 2. 21. 11:51

    by. 위지원

    텐서플로우의 분산 병렬성은 gRPC 통신 프레임워크를 기반으로 한다.


    텐서플로우의 분산,병렬 처리기능은 2016년 4월에 정식 릴리즈되었다고 한다.



    단일/분산 처리는 다음과 같은 차이를 가지고 있다. 단일에서는 한 프로세스 안에서 모든게 돌아가지만 분산에서는 클ㄹ라이언트,마스터,워커가 다른 process에서 작동한다.


    client process는 세션 인터페이스를 통해 master process와 통신한다.

    master process는 cpu/gpu에서 연산을 실행시키는 역할을 해서 worker process에게 연산을 할당 한다.

    그림 출처 : http://sungsoo.github.io/2016/06/20/tensorflow-introduction.html


    예제를 돌리면 결과는 잘 나온다.

    전체 코드는 https://github.com/tensorflow/tensorflow/blob/master/tensorflow/tools/dist_test/python/mnist_replica.py 에 있다.


    #Master Server로 서버를 open

    2019-02-20 12:08:22.449468: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:381] Started server with target: grpc://localhost:2222


    #Worker1 Server 실행 결과

    1550664754.423616: Worker 0: training step 7514 done (global step: 14999)
    1550664754.435160: Worker 0: training step 7515 done (global step: 15001)
    Training ends @ 1550664754.435457
    Training elapsed time: 104.118199 s
    After 15000 training step(s), validation cross entropy = 1274.82


    #Worker2 Server 실행 결과

    1550664754.415218: Worker 1: training step 7487 done (global step: 14998)
    1550664754.427751: Worker 1: training step 7488 done (global step: 15000)
    Training ends @ 1550664754.428204
    Training elapsed time: 103.626396 s
    After 15000 training step(s), validation cross entropy = 1274.82


    일단은 https://github.com/tensorflow/examples/blob/master/community/en/g3doc/deploy/distributed.md 를 보고 하겠다.


    TensorFlow 분산병렬에서..

    1. Tensorflow에서의 Cluster는 TensorFlow graph 분산 처리에 참여하는 task의 집합을 이야기한다.

    2. 각 Task는 master(세션을 생성)와 worker(executes operations in graph)가 포함된 TensorFlow Server와 연결된다.

    3. cluster는 여러개의 job으로 나뉠 수 있다. job은 task로 나뉜다.


    클러스터를 시작하려면
    0. 클러스터의 task당 TensorFlow 서버를 시작한다.
    1. tf.train.ClusterSpec 을 이용해서 cluster에 있는 task에 대한 설명을 작성한다.
    2. tf.train.Server 를 생성해서 ClusterSpec을 전달하고 job name와 task index로 local task를 식별한다.


    사용 방법은 다음과 같다.  자세한 내용은 https://www.tensorflow.org/api_docs/python/tf/train/ClusterSpec 에 존재한다.

    cluster = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222",
                                               
    "worker1.example.com:2222",
                                               
    "worker2.example.com:2222"],
                                   
    "ps": ["ps0.example.com:2222",
                                           
    "ps1.example.com:2222"]})


    예를들면 위와 같이 설정하면

    /job:worker/task:0

    /job:ps/task:0

    요롷게 task가 할당되는 것!



    tf.train.Server의 파라미터는 아래와 같다.


    __init__(
        server_or_cluster_def
    ,
        job_name
    =None,
        task_index
    =None,
        protocol
    =None,
        config
    =None,
        start
    =True
    )


    그래서 아래와 같이 cluster를 만들어서 server에 넘겨주면 된당.


    cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
    server = tf.train.Server(cluster, job_name="local", task_index=0)


    아래와 같이 해볼 수 있다.


    >>> cluster = tf.train.ClusterSpec({"local":["203.255.77.xxx:2222","203.255.77.xxx:2223","203.255.77.xxx:2224"]})
    >>> server = tf.train.Server(cluster,job_name="local",task_index=0)
    2019-02-21 02:22:36.150951: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2
    2019-02-21 02:22:36.183508: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:222] Initialize GrpcChannelCache for job local -> {0 -> localhost:2222, 1 -> 203.255.77.xxx:2223, 2 -> 203.255.77.xxx:2224}
    2019-02-21 02:22:36.185589: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:381] Started server with target: grpc://localhost:2222
    >>> server = tf.train.Server(cluster,job_name="local",task_index=1)
    2019-02-21 02:24:35.375274: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:222] Initialize GrpcChannelCache for job local -> {0 -> 203.255.77.xxx:2222, 1 -> localhost:2223, 2 -> 203.255.77.xxx:2224}
    2019-02-21 02:24:35.376792: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:381] Started server with target: grpc://localhost:2223


    아니면 요롷게도 아래와 같이 하면 job_name이 ps와 worker가 된다.


    >>> tf.train.ClusterSpec({"worker":["203.255.77.xxx:2223","203.255.77.xxx:2224"],"ps":["203.255.77.xxx:2222"]})
    <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f63586a5240>

    >>> server = tf.train.Server(cluster,job_name="ps",task_index=0)
    2019-02-21 02:44:17.602855: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:222] Initialize GrpcChannelCache for job ps -> {0 -> localhost:2222}
    2019-02-21 02:44:17.603931: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:222] Initialize GrpcChannelCache for job worker -> {0 -> 203.255.77.xxx:2223, 1 -> 203.255.77.xxx:2224}
    2019-02-21 02:44:17.606056: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:381] Started server with target: grpc://localhost:2222
    >>> server = tf.train.Server(cluster,job_name="worker",task_index=1)
    2019-02-21 02:44:25.979505: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:222] Initialize GrpcChannelCache for job ps -> {0 -> 203.255.77.xxx:2222}
    2019-02-21 02:44:25.980369: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:222] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2223, 1 -> 203.255.77.xxx:2224}
    2019-02-21 02:44:25.982540: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:381] Started server with target: grpc://localhost:2223


    tf.device를 이용하면 특정 디바이스에 작업을 설정할 수가 있다. https://www.tensorflow.org/api_docs/python/tf/device


    tf.device(device_name_or_function)


    아래와 같이 지정해서 사용할 수 있다!


    >>> with tf.device("/job:ps/task:0"):
    ...     weights_1 = tf.Variable(1)
    ...     biases_1 = tf.Variable(0.1)


    다시 소스코드로 돌아와서 분석을 해보자.


    1. 동기식 병렬싱을 실행할지 비동기식 병렬성을 실행할지를 정한다.


    *일반적으로 동기식 데이터 병렬성이 비동기식 보다 빠르게 수렴되고 모델이 더 정확하다.

    *일반적으로 동기식 모델 병렬성은 loss의 감쇠속도가 더 빠르며 달성할 수 있는 최대 정밀도가 높다. 다만 동기식 병렬성은 bucket dffect가 있어 속도가 가장 느린 기기에 맞춰지므로 속도가 같으면 효율성이 좋아진다고 한다.



    1
    2
    3
    4
    5
    6
    7
    8
    9
    flags.DEFINE_boolean("sync_replicas", False,
        "Use the sync_replicas (synchronized replicas) mode, "
        "wherein the parameter updates from workers are aggregated "
        "before applied to avoid stale gradients")
     
    flags.DEFINE_integer("replicas_to_aggregate", None,
        "Number of replicas to aggregate before parameter update "
        "is applied (For sync_replicas mode only; default: "
        "num_workers)")
    cs


    1) sync_replicas를 True로 설정하면 동기식 병렬성을 시작할 수 있다.

    2) replicas_to_aggregate는 기울기 기본 값을 None으로 설정한다. :

    - 동기식 병렬성을 실행할 때 매개 변수를 1회 업데이트 하는데 몇개의 batch 기울기를 누적해야하는지를 설정한다.

    - None으로 설정하면 모든 worker가 1개의 batch 훈련을 마친 후 모델 매개변수를 업데이트 한다는 뜻



    2. 메인함수


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    def main(unused_argv):
      mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)
      if FLAGS.download_only:
        sys.exit(0)
     
      if FLAGS.job_name is None or FLAGS.job_name == "":
        raise ValueError("Must specify an explicit `job_name`")
      if FLAGS.task_index is None or FLAGS.task_index == "":
        raise ValueError("Must specify an explicit `task_index`")
     
      print("job name = %s" % FLAGS.job_name)
      print("task index = %d" % FLAGS.task_index)
     
      ps_spec = FLAGS.ps_hosts.split(",")
      worker_spec = FLAGS.worker_hosts.split(",")
    cs


    1) 동일하게 input_data.read_data_sets()함수를 이용하여 mnist 데이터 세트를 다운로드하고 one_hot encoding을 진행한다.

    2) 필수 매개변수의 존재 여부를 확인한 후 주소를 해석하고 변수에 넣어준다.


    3. worker 수 구하기


    1
    2
    3
    4
    5
    6
    7
    8
    num_workers = len(worker_spec)
     
    cluster = tf.train.ClusterSpec({"ps": ps_spec, "worker": worker_spec})
     
    if not FLAGS.existing_servers:
        server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
        if FLAGS.job_name == "ps":
            server.join()  
    cs


    1)  if 부분에서 ps면 서버에 그냥 join()을 하는 부분을 눈여겨 보자



    4. 특정 device를 설정하는 부분 ; 위에서 설명한 함수를 사용한다.  gpu 개수에 따라 gpu를 설정할지도 선택하는 문구도 보인다.


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    is_chief = (FLAGS.task_index == 0)
     
    if FLAGS.num_gpus > 0:
        gpu = (FLAGS.task_index % FLAGS.num_gpus)
        worker_device = "/job:worker/task:%d/gpu:%d" % (FLAGS.task_index, gpu)
     
    elif FLAGS.num_gpus == 0:
        cpu = 0
        worker_device = "/job:worker/task:%d/cpu:%d" % (FLAGS.task_index, cpu)
     
    with tf.device(
          tf.train.replica_device_setter(
              worker_device=worker_device,
              ps_device="/job:ps/cpu:0",
              cluster=cluster)):
        global_step = tf.Variable(0, name="global_step", trainable=False)  
    cs


    5. 신경망 모델의 정의


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    hid_w = tf.Variable(
            tf.truncated_normal(
                [IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
                stddev=1.0 / IMAGE_PIXELS),
            name="hid_w")
    hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")
    sm_w = tf.Variable(
            tf.truncated_normal(
                [FLAGS.hidden_units, 10],
                stddev=1.0 / math.sqrt(FLAGS.hidden_units)),
            name="sm_w")
    sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
     
    = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
    y_ = tf.placeholder(tf.float32, [None, 10])
     
    hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
    hid = tf.nn.relu(hid_lin)
     
    = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
    cross_entropy = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-101.0)))
     
    opt = tf.train.AdamOptimizer(FLAGS.learning_rate)           
    cs


    1) truncated_normal과 zeros 함수를 이용해서 weight와 bias를 초기화 한다.

    2) 17line에서 wm_plus_b를 이용해서 입력x에 대해 행렬 곱을 진행한다.

    3) 18line에서 ReLU 활성화 함수를 사용하여 첫번째 히든 레이어 출력을 얻는다.

    4) 20line에서 softmax까지 적용하고 최종 출력을 얻고

    5) 21line에서 cross entropy를 구한다.


    6. 동기식 프로그램에 대한 최적화


    1
    2
    3
    4
    5
    opt = tf.train.SyncReplicasOptimizer(
              opt,
              replicas_to_aggregate=replicas_to_aggregate,
              total_num_replicas=num_workers,
              name="mnist_sync_replicas")
    cs


    1) 동기식이라고 True로 설정된 경우에는 SyncReplicasOptimizer를 통해 최적화를 진행 할 수 있다.

    - 원래의 최적화 프로그램을 동기식 분산 훈련버전으로 변환할 수 있다.


    7.   분산 훈련 supervisor 생성


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    init_op = tf.global_variables_initializer()
    train_dir = tempfile.mkdtemp()
     
    if FLAGS.sync_replicas:
          sv = tf.train.Supervisor(
              is_chief=is_chief,
              logdir=train_dir,
              init_op=init_op,
              local_init_op=local_init_op,
              ready_for_local_init_op=ready_for_local_init_op,
              recovery_wait_secs=1,
              global_step=global_step)
    else:
          sv = tf.train.Supervisor(
              is_chief=is_chief,
              logdir=train_dir,
              init_op=init_op,
              recovery_wait_secs=1,
              global_step=global_step)       
    cs



    1) train.Supervisor를 이용해서 분산 훈련 superVisor를 만든다.

    -task를 관리하고 분산식 훈련에 참여하게 된다.


    8. allow_soft_placement는 Ture로 지정되면 지정된 기기에서 작업수행이 어려운경우 다른 기기로 전송되어 실행해라 라는 것이다.


    1
    2
    3
    4
    sess_config = tf.ConfigProto(
            allow_soft_placement=True,
            log_device_placement=False,
            device_filters=["/job:ps","/job:worker/task:%d" % FLAGS.task_index])
    cs


    9. session 작업


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    if is_chief:
          print("Worker %d: Initializing session..." % FLAGS.task_index)
    else:
          print("Worker %d: Waiting for session to be initialized..." %
                FLAGS.task_index)
     
    if FLAGS.existing_servers:
          server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index]
          print("Using existing server at: %s" % server_grpc_url)
          sess = sv.prepare_or_wait_for_session(server_grpc_url, config=sess_config)
    else:
          sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)            
    cs


    1) main 노드인 경우 Session 초기화를 진행하고, 그렇지 않은 노드는 메인 노드의 초기화를 기다린다. prepare_or_wait_for_session()


    10. 동기모드와 큐


    1
    2
    3
    4
    5
    6
    if FLAGS.sync_replicas:
        if is_chief:
            chief_queue_runner = opt.get_chief_queue_runner()
     
    if FLAGS.sync_replicas and is_chief:
          sv.start_queue_runners(sess, [chief_queue_runner])
    cs


    1) 동기모드인 경우에는 대기열을 만들고 실행한다.




    '2019년' 카테고리의 다른 글

    pyspark 에러  (0) 2019.02.22
    텐서플로우 분산 병렬처리 동기/비동기 를 훌륭하게 설명한 블로그  (0) 2019.02.21
    텐서플로우 GPU병렬성  (0) 2019.02.20
    텐서보드  (0) 2019.02.20
    설치 메모  (0) 2019.02.19