- 개요
- Spark streaming의 마이크로 배치로 과하게 많이 쌓인 parquet file merge 하는 프로세스 개발
- 개념
- glue 는 spark context 를 래핑하는 glue context 라는 것이 있다.
- glue job 에서는 아래 dataframe 변환 과정을 거친다.
- glue dynamic frame 으로 데이터를 가져온다.
- spark dataframe 으로 변환 후 데이터 처리를 한다.
- glue dynamic frame 으로 변환하여 저장
- glue job 에서는 아래 dataframe 변환 과정을 거친다.
- glue 는 spark context 를 래핑하는 glue context 라는 것이 있다.
- 스크립트
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *
from awsglue.job import Job
from datetime import date, timedelta
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
yesterday = date.today()- timedelta(days=1)
S3_location = f's3://stream-source'
#datasource0 = glueContext.create_dynamic_frame_from_options(
# connection_type="s3",
# connection_options = {
# "paths": [S3_location]
# },
# format="parquet",
# transformation_ctx="datasource0")
p_year = yesterday.year
p_month = yesterday.month
p_day = yesterday.day
str_year = str(p_year)
str_month = str(p_month)
str_day = str(p_day)
print("start parquet merger")
print(str_year + " " + str_month + " " + str_day)
push_down_predicate_condition = "(year== '"+ str_year +"' and month== '"+ str_month +"' and day== '"+str_day+"')"
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database = "raw_behavior",
table_name = "gtm",
push_down_predicate = push_down_predicate_condition,
transformation_ctx="datasource0")
df1 = datasource0.toDF()
df1.printSchema()
df1.withColumn("")
df0 = df1.repartition(1)
df0.show(10)
new_dynamic_frame= DynamicFrame.fromDF(df0, glueContext, "new_dynamic_frame")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = new_dynamic_frame, connection_type = "s3",
connection_options = {
"path": "s3://stream-target",
"partitionKeys": ["year", "month", "day"]
}, format = "parquet", transformation_ctx = "datasink4")
print("merger complete successfully")
job.commit()
- push_down_predicate : 설정된 조건에 해당하는 데이터를 불러온다. 파일 전체를 가져온 후 메모리에서 필터링 하는 것이 아닌 처음 읽을 때부터 필요한 데이터만 읽어올 수 있어, 효율적인 프로세싱이 가능하다.
- toDF() : spark dataframe 으로 바꿔준다.
- parititonKeys : 데이터를 저장할 때 셜정된 칼럼으로 파티셔닝하여 저장한다.
- repartition : spark 실행할 때 병렬 갯수 만큼 파티션이 생기고 hdfs 상에 마지막 경로에 그만큼 파일들이 생기는데 repartition(1)을 해주어야 하나로 떨궈준다. (스파트 파티션 개념 : https://tech.kakao.com/2021/10/08/spark-shuffle-partition/ )
'일 > aws' 카테고리의 다른 글
AWS Glue 활용기 #4 :: Glue Crawler 트러블 슈팅 1/2 (0) | 2022.05.01 |
---|---|
AWS Glue 활용기 #3 :: Glue workflow (0) | 2022.03.27 |
AWS Glue 활용기 #1 :: Architect (0) | 2022.03.27 |
Amazon EMR 구축 #2 :: 프로비저닝 (0) | 2022.02.01 |
Amazon EMR 구축 #1 :: 개념 정리 (0) | 2022.02.01 |