본문 바로가기
일/aws

AWS Glue 활용기 #2 :: Glue job 개발

by blair2dev 2022. 3. 27.
  1. 개요
    1. Spark streaming의 마이크로 배치로 과하게 많이 쌓인 parquet file merge 하는 프로세스 개발 
  2. 개념 
    • glue 는 spark context 를 래핑하는 glue context 라는 것이 있다. 
      • glue job 에서는 아래 dataframe 변환 과정을 거친다. 
        • glue dynamic frame 으로 데이터를 가져온다. 
        • spark dataframe 으로 변환 후 데이터 처리를 한다. 
        • glue dynamic frame 으로 변환하여 저장
  3. 스크립트 
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *
from awsglue.job import Job
from datetime import date, timedelta


## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


yesterday = date.today()- timedelta(days=1)


S3_location =  f's3://stream-source'
#datasource0 = glueContext.create_dynamic_frame_from_options(
#                connection_type="s3",
#                connection_options = {
#                    "paths": [S3_location]
#                },
#                format="parquet",
#                transformation_ctx="datasource0")


p_year = yesterday.year
p_month = yesterday.month
p_day = yesterday.day

str_year = str(p_year)
str_month = str(p_month)
str_day = str(p_day)

print("start parquet merger")
print(str_year + "    " + str_month + "     " + str_day) 

push_down_predicate_condition = "(year== '"+ str_year +"' and month== '"+ str_month +"' and day== '"+str_day+"')"

datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database = "raw_behavior",
    table_name = "gtm",
    push_down_predicate = push_down_predicate_condition,
    transformation_ctx="datasource0")
                

df1 = datasource0.toDF()
df1.printSchema()
df1.withColumn("")

df0 = df1.repartition(1)
df0.show(10)

new_dynamic_frame= DynamicFrame.fromDF(df0, glueContext, "new_dynamic_frame")

datasink4 = glueContext.write_dynamic_frame.from_options(frame = new_dynamic_frame, connection_type = "s3", 
                connection_options = {
                        "path": "s3://stream-target", 
                        "partitionKeys": ["year", "month", "day"]
                }, format = "parquet", transformation_ctx = "datasink4")

print("merger complete successfully")

job.commit()

- push_down_predicate : 설정된 조건에 해당하는 데이터를 불러온다. 파일 전체를 가져온 후 메모리에서 필터링 하는 것이 아닌 처음 읽을 때부터 필요한 데이터만 읽어올 수 있어, 효율적인 프로세싱이 가능하다. 

- toDF() : spark dataframe 으로 바꿔준다.

- parititonKeys : 데이터를 저장할 때 셜정된 칼럼으로 파티셔닝하여 저장한다. 

- repartition : spark 실행할 때 병렬 갯수 만큼 파티션이 생기고 hdfs 상에 마지막 경로에 그만큼 파일들이 생기는데 repartition(1)을 해주어야 하나로 떨궈준다. (스파트 파티션 개념 : https://tech.kakao.com/2021/10/08/spark-shuffle-partition/ )