본문 바로가기
일/aws

Amazon EMR 구축 #2 :: 프로비저닝

by blair2dev 2022. 2. 1.

아키텍처 구성 

EMR Architecture

  • Airflow , edge node
    • airflow 를 통해 spark job submit이 가능하도록 따로 인스턴스를 두었다. 
  • EMR Cluster 
    • master 와 core를 하나씩 두고 task 노드를 spot 으로 하여 처리량에 따라 최소 비용으로 scalble 이 가능하도록 auto scaling 을 설정
  • Bastion 
    • 외부에서 ssh 를 통해 작업이 가능하도록 해준다. 
  • NAT gateway 
    • private 망의 노드들이 패키지 설치 등을 위해 외부 망에 접근이 필요하므로 outbound 를 허용해준다. 
  • EFS 
    • 소스 등 파일시스템 공유를 위해 

 

 

 

EMR 클러스터 생성 

 

네트워크 인터페이스(eni) 생성 

마스터 노드의 IP를 고정하기 위해서 IP 를 홀딩 해두는 목적으로 사용한다. 

  1. EMR 이 뜰 때 eni 에서 세컨더리 프라이빗 IP 를 뻇어서 EMR 마스터 노드의 세컨더리 프라이빗 IP 로 할당
  2. EMR이 종료될 때 EMR 마스터 노드에서 세컨더리 프라이빗 IP 를 뻇어서 eni 세컨더리 프라이빗 IP 로 할당

위 동작은 EMR 클러스터가 생성될 때 bootstrap.sh 에서 수행한다. 

 

클러스터가 사용할 서브넷을 넣는다 

프라이빗 IPv4 주소 - 자동할당 

 

 

프라이빗 IPv4 주소를 같은 대역으로 아무거나 중복되지 않게 넣는다. (중복이면 얼럿 뜸)

 

 

단계 1: 소프트웨어 및 단계 

 

소프트웨어 구성 : emr 클러스터를 프로비저닝 하면서 기본적으로 깔아 줄 app 들이다. 

app 버전 들은 emr  버전에 dependant 하게 정해진다. 

 

여러 마스터 노드 : emr 클러스터의 마스터 노드를 이중화 할 것인지 선택한다. 

체크를 하면 multi-az 를 사용하여 ha 를 구현한다. 

 

AWS Glue 데이터 카탈로그 : 체크하면 glue 데이터 카탈로그를 외부 메타스토어(데이터를 먼저 저장하고 스키마를 입힌다.)를 사용한다. 

 

스파크 설정구성, 파이선 설정구성 등을 티폴트로 정해준다. 이 설정은 최하위 우선순위로 작동한다. 

s3 에서 JSON 로드를 통해 아래의 json 파일을 로드시킨다. 

 

EMR 에서 돌아가는 app 의 디폴드 세팅값을 정해준다.

emr-software_settings-devstg-64.json 

 

[
   {
      "configurations":[
         {
            "classification":"export",
            "properties":{
               "PYSPARK_PYTHON":"/usr/bin/python3"
            }
         }
      ],
      "classification":"spark-env",
      "properties":{
         
      }
   },
   {
      "classification":"spark-defaults",
      "properties":{
         "spark.num.executors":"2",
         "spark.driver.maxResultSize":"0",
         "spark.rdd.compress":"true",
         "spark.network.timeout":"3600s",
         "spark.shuffle.blockTransferService":"nio",
         "spark.sql.broadcastTimeout":"3600",
         "spark.shuffle.spill.compress":"true",
         "spark.shuffle.compress":"true",
         "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
         "spark.dynamicAllocation.minExecutors":"1",
         "spark.dynamicAllocation.initialExecutors":"1",
         "spark.executor.instances":"2",
         "spark.dynamicAllocation.executorIdleTimeout":"120s",
         "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout":"3s",
         "spark.executor.memory":"4g",
         "spark.driver.memory":"4g",
         "spark.kryoserializer.buffer.max":"512m",
         "spark.master":"yarn",
         "spark.driver.cores":"2",
         "spark.executor.heartbeatInterval":"1800s",
         "spark.dynamicAllocation.schedulerBacklogTimeout":"3s",
         "spark.executor.cores":"2",
         "spark.io.compression.codec":"lz4",
         "spark.submit.deployMode":"cluster",
         "spark.dynamicAllocation.maxExecutors":"10",
         "spark.yarn.am.waitTime":"120s",
         "spark.port.maxRetries":"200",
         "spark.shuffle.service.enabled":"true",
         "spark.yarn.historyServer.allowTracking":"true",
         "spark.dynamicAllocation.enabled":"true",
       "spark.history.fs.cleaner.enabled": "true",
         "spark.history.fs.cleaner.interval": "1d",
         "spark.history.fs.cleaner.maxAge": "14d"
      }
   },
   {
      "classification":"capacity-scheduler",
      "properties":{
         "yarn.scheduler.capacity.maximum-am-resource-percent":"0.9",
         "yarn.scheduler.capacity.resource-calculator":"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
      }
   },
   {
      "classification":"yarn-site",
      "properties":{
         "yarn.nodemanager.resource.memory-mb":"65536",
         "yarn.nodemanager.pmem-check-enabled":"false",
         "yarn.scheduler.maximum-allocation-mb":"65536",
         "yarn.nodemanager.vmem-check-enabled":"false",
       "yarn.nodemanager.localizer.cache.cleanup.interval-ms": "400000",
         "yarn.nodemanager.localizer.cache.target-size-mb": "65536",
       "yarn.resourcemanager.nodemanager-graceful-decommission-timeout-sec": "-1"
      }
   },
   {
      "classification":"mapred-site",
      "properties":{
         "yarn.app.mapreduce.am.resource.mb":"65536"
      }
   }
]

 

 

 

Concurrancy : step 이 어플리케이션을 구동하는 단위라고 생각할 수 있다. 

concurrancy 를 설정하면 이 step 을 동시에 얼마나 구동할 지 정해줄 수 있다. 

After last step completes  

clusters enters waiting state  : 클러스터가 종료 되지 않고 대기 해준다. 

마지막 단계가 완료된 후 클러스터 자동 종료 : 클러스터가 종료 된다. 

 

 

단계 2: 하드웨어 

 

균일한 인스턴스 그룹 : 노드 유형별로 균일한 스펙의 인스턴스를 사용한다. 

 

인스턴스 플릿 : 인스턴스 유형을 각각 지정할 수 있다. 

유닛 : 코어수 

 

 

Cluster scaling

클러스터 인스턴스의 스켈러블을 구현해준다. 

 

Use EMR-manages scaling

코어와 테스크 유닛 등을 워크로드에 따라 지정한 숫자 범위 내에서 자동으로 조정해준다. 

 

 

Create a custom automatic scailing policy

우측 연필 아이콘을 누르면 

 

규칙에 따라 스케일 아웃, 인의 조건을 생성할 수 있다. 

조건은 yarn의 리소스 점유율 등 여러가지가 트리거로 작동된다. 

연속 횟수 : 5분 동안 트리거를 몇번 할 지 

휴지 기간 : 트리거가 된 후 쿨다운을 어느정도 할 지 

 

 

자동 종료 

클러스터 유휴 된 이후 종료될 때 까지의 시간이다. 

 

로깅 : EMR 이 생성될 때 까지의 로깅이 저장될 s3 path 이다. 

 

태그 : emr 클러스터가 달게 되는 태그이다. 우리는 bootstrap.sh 에서 관련 처리를 해준다. 

 

 

 

필요한 pip 패키지

requirement.txt 

pandas==1.0.5
redis==2.10.6
redis-py-cluster==1.3.6
PyMySQL==0.9.3
pyarrow==1.0.0
#fastparquet==0.4.1
boto3==1.14.33
botocore==1.17.33
prettytable==0.7.2
requests==2.24.0

 

 

 

 

 

emr-bootstrap-practice.sh :: EMR 프로비저닝 할 때 수행하는 스크립트

#!/bin/bash

S3_PATH="s3://xxxxxxxxxxxxx/emr-config"
EFS_DOMAIN="fs-xxxxxxxx.efs.ap-northeast-2.amazonaws.com"
EMR_MASTER_IP="000.000.000.000"
#eni 세컨더리 IP 를 넣어 준다. 
ENI_ID="eni-xxxxxxxxxxx"
#위에서 만든 eni ID 를 넣어준다. 

#####################################################
# EMR Cluster Node
#####################################################
# install tools
sudo yum install -y python3-devel
sudo yum install -y jq

# install python package
aws s3 cp $S3_PATH/emr//practice-requirements/requirements.txt .
#패키지리스트와 버전들 정의 
aws s3 cp $S3_PATH/emr/get-pip.py .
#pip3 를 쓰기 위해 pip3를 인스톨하는 스크립트 (안되는 경우가 있었다 함.)
python3 get-pip.py
export PATH=/home/hadoop/.local/bin/:$PATH
sudo pip3 install -r requirements.txt -t /usr/lib/python3.7/site-packages
#정의한 패키지들 설치
#####################################################
# MECAB write by sjh
#####################################################
sudo pip3 install mecab-python3==1.0.1
wget https://bitbucket.org/eunjeon/mecab-ko/downloads/mecab-0.996-ko-0.9.2.tar.gz
sudo tar xvf mecab-0.996-ko-0.9.2.tar.gz


cd  mecab-0.996-ko-0.9.2
./configure
make -j 4
make check
sudo make install
sudo ldconfig
cd ..


sudo yum install -y automake

wget https://bitbucket.org/eunjeon/mecab-ko-dic/downloads/mecab-ko-dic-2.0.1-20150920.tar.gz
tar xvf mecab-ko-dic-2.0.1-20150920.tar.gz


cd mecab-ko-dic-2.0.1-20150920
./autogen.sh
./configure --with-mecab-config=../mecab-0.996-ko-0.9.2/mecab-config
make
sudo make install



# Set tag to EMR Cluster Node
aws s3 cp $S3_PATH/emr/emr-tag-practice.sh .
# 태그 일괄 적용 스크립트
perl -pi -e 's/\r\n/\n/g' emr-tag-practice.sh
sh emr-tag-practice.sh

# EFS Mount
sudo yum -y install nfs-utils
sudo mkdir /practice-efs
sudo chown hadoop:root /practice-efs
sudo mount -t nfs -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2,noresvport $EFS_DOMAIN:/ /practice-efs

# KST 시간으로 설정
sudo ln -sf /usr/share/zoneinfo/Asia


#####################################################
# EMR Master Node
#####################################################
# Assign private ip to EMR Master Node
aws s3 cp $S3_PATH/emr/assign_private_ip.py .
python3 assign_private_ip.py $EMR_MASTER_IP $ENI_ID

 

 

assign_private_ip.py   ::  master node ip 할당  

#!/usr/bin/python
#
#Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
#Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file 
#except in compliance with the License. A copy of the License is located at
#
#    http://aws.amazon.com/apache2.0/
#
#or in the "license" file accompanying this file. This file is distributed on an "AS IS" 
#BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
#License for the specific language governing permissions and limitations under the License.

import sys, subprocess

is_master = subprocess.check_output(['cat /emr/instance-controller/lib/info/instance.json | jq .isMaster'], shell=True).decode('utf-8').strip()

if is_master == "true":
    private_ip = str(sys.argv[1])
    res_interface_id = str(sys.argv[2])
    instance_id = subprocess.check_output(['/usr/bin/curl -s http://169.254.169.254/latest/meta-data/instance-id'], shell=True).decode('utf-8')
    print("instance id: {}".format(instance_id))
    interface_id = subprocess.check_output(['aws ec2 describe-instances --instance-ids %s | jq .Reservations[].Instances[].NetworkInterfaces[].NetworkInterfaceId' % instance_id], shell=True).decode('utf-8').strip().strip('"')
    print("interface id: {}".format(interface_id))

    # UnAssign Private IP from Reserved ENI
    subprocess.check_call(['aws ec2 unassign-private-ip-addresses --network-interface-id %s --private-ip-addresses %s' % (res_interface_id, private_ip)], shell=True)

    # Assign private IP to the master instance:
    subprocess.check_call(['aws ec2 assign-private-ip-addresses --network-interface-id %s --private-ip-addresses %s' % (interface_id, private_ip)], shell=True)

    subnet_id = subprocess.check_output(['aws ec2 describe-instances --instance-ids %s | jq .Reservations[].Instances[].NetworkInterfaces[].SubnetId' % instance_id], shell=True).decode('utf-8').strip().strip('"').strip().strip('"')
        
    subnet_cidr = subprocess.check_output(['aws ec2 describe-subnets --subnet-ids %s | jq .Subnets[].CidrBlock' % subnet_id], shell=True).decode('utf-8').strip().strip('"')
    cidr_prefix = subnet_cidr.split("/")[1]

    # Add the private IP address to the default network interface:
    subprocess.check_call(['sudo ip addr add dev eth0 %s/%s' % (private_ip, cidr_prefix)], shell=True)

    # Configure iptablles rules such that traffic is redirected from the secondary to the primary IP address:
    primary_ip = subprocess.check_output(['/sbin/ifconfig eth0 | grep \'inet\' '], shell=True).decode('utf-8').strip().split()[1] # new
    # primary_ip = subprocess.check_output(['/sbin/ifconfig eth0 | grep \'inet addr:\' | cut -d: -f2 | awk \'{ print $1}\''], shell=True).strip() # old
    print("primary_ip: {}".format(primary_ip))
    subprocess.check_call(['sudo iptables -t nat -A PREROUTING -d %s -j DNAT --to-destination %s' % (private_ip, primary_ip)], shell=True)

    # make shutdown script
    subprocess.check_call(['mkdir -p /mnt/var/lib/instance-controller/public/shutdown-actions'], shell=True)
    f = open("/mnt/var/lib/instance-controller/public/shutdown-actions/return_ip.sh",'w')
    f.write('#!/bin/bash\n')
    f.write('aws ec2 unassign-private-ip-addresses --network-interface-id %s --private-ip-addresses %s\n' % (interface_id,private_ip))
    f.write('aws ec2 assign-private-ip-addresses --network-interface-id %s --private-ip-addresses %s' % (res_interface_id,private_ip))
    f.close()
    print("end of assign")
else:
    print("Not the master node")

 

 

emr-tag-practice.sh  :: 클러스터 인스턴스에 일괄 테그를 달아주는 쉘스크립트

#!/bin/bash

export IS_MASTER=$(cat /mnt/var/lib/info/instance.json | jq -r ".isMaster")
export INSTANCE_GROUP_ID=$(cat /mnt/var/lib/info/instance.json | jq -r ".instanceGroupId")
export CLUSTER_ID=$(cat /mnt/var/lib/info/job-flow.json | jq -r ".jobFlowId")
export INSTANCE_ID=$(wget -q -O - http://169.254.169.254/latest/meta-data/instance-id)
export INSTANCE_GROUP_TYPE=$(cat /mnt/var/lib/info/job-flow.json | jq -r ".instanceGroups | .[] | select( .instanceGroupId == \"${INSTANCE_GROUP_ID}\") | .instanceRole" | tr a-z A-Z)

export CURRENT_TAG_NAME=$(aws ec2 --region ap-northeast-2 describe-tags --filters Name=resource-id,Values=${INSTANCE_ID} | jq -r ".Tags | .[] | select( .Key == \"Name\") | .Value")
export NEW_TAG_NAME="${CURRENT_TAG_NAME}-${INSTANCE_GROUP_TYPE}"
export VALUE=“??????”

echo "IS_MASTER: $IS_MASTER"
echo "INSTANCE_GROUP_ID: $INSTANCE_GROUP_ID"
echo "CLUSTER_ID: $CLUSTER_ID"
echo "INSTANCE_ID: $INSTANCE_ID"
echo "INSTANCE_GROUP_TYPE: $INSTANCE_GROUP_TYPE"
echo "CURRENT_TAG_NAME: $CURRENT_TAG_NAME"
echo "NEW_TAG_NAME: $NEW_TAG_NAME"

aws ec2 create-tags --region ap-northeast-2 --resources ${INSTANCE_ID} --tags Key=Name,Value=${NEW_TAG_NAME}
aws ec2 create-tags --region ap-northeast-2 --resources ${INSTANCE_ID} --tags Key=Project,Value=EMR-PRACTICE
aws ec2 create-tags --region ap-northeast-2 --resources ${INSTANCE_ID} --tags Key=Customer,Value=${VALUE}
aws ec2 create-tags --region ap-northeast-2 --resources ${INSTANCE_ID} --tags Key=Biz Unit,Value=EMR-PRACTICE-CLUSTER

 

 

 

 

 

 

보안옵션 

일반적인 보안 스터프 를 생성 적용 후 클러스터 생성