Apache Spark 3.0에서 바뀐 기능 정리
기능, ANSI SQL Compliance, Store Assignment 정책, query semantics 업데이트 등
Apache Spark 3.0이 배포된지는 몇 달이 되었는데, 최근에 ETL 작업을 하지 않아서 따로 챙겨보진 못했다.
이번 업데이트에서는 Spark SQL에서 눈여겨 볼만한 기능들이 많이 추가된 것 같다. 특히 Spark SQL 개발자들에게 유용한 기능들이 많이 추가되었다.
사용할 일이 생겼으니, 주요한 변화를 정리를 해보자.
ANSI SQL Compliant Feature
일부 예약된 키워드를 식별자로 사용할 수 없게 된다.
select * from some_table create where create.some_column = 1
조건문에 예약 키워드인 create가 사용되었는데도 문제없이 실행이 된다.
하지만 이제 스파크는 ANSI SQL 표준을 준수할 수 있다. 이를 사용하기 위해선
spark.sql.ansi.enabled 속성을 true로 설정하면 된다.
설정 후 위의 쿼리를 실행하면 이렇게 에러가 발생한다.
Error in SQL statement: ParseException: no viable alternative at input 'create'(line 1, pos 41)
== SQL ==
select * from some_table create where create.some_column = 1
----------------------------^^^
com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException: org.apache.spark.sql.catalyst.parser.ParseException:
상세 내용은 여기 Spark JIRA에서 확인할 수 있고,
예약 키워드들은 여기에서 확인할 수 있다.
Store Assignment policy
이 기능은 SQL에서 데이터 품질을 엄격하게 확인하기 위해서 도입되었다.
스파크 2.4.4에서 아래와 같이 SQL을 시켜보자.
크게 문제가 없어 보이는데 정수형 컬럼에 문자열을 넣는 예제이다.
CREATE TABLE assignment_policy_check(col INT);
INSERT INTO assignment_policy_check VALUES("123")
SELECT * FROM assignment_policy_check
>>> 1, 123
2.4.4, 3.0에서 모두 이상 없이 실행된다.
실수로 이런 값을 넣는 상황을 막고 싶다면 다음과 같이 값을 설정해주면 된다.
spark.sql.storeAssignmentPolicy를 ANSI로 설정
from_json의 옵션 모드 (FAILFAST/PERMISSIVE)
스파크 3.0에서는 PERMISSIVE나 FAILFAST 모드가 추가되었다. 만약 JSON이 잘못되었거나 파싱 할 수 없을 때는 에러가 발생한다.
df.withColumn("colname", from_json(df['colname'], jsonSchema, {'mode':FAILFAST'})).show()
>> 파싱할 수 없을경우 에러 발생
지수 표기법 / Exponential notation
스파크 3.0에서는 1E10이라고 쓰인 숫자는 이중으로 해석된다. 스파크 2.4 버전 이하에서는 십진법으로 해석된다.
다음 예시를 보자.
# 스파크 2.4
df = spark.sql("SELECT 1E10")
df.printSchema()
>> root
>> |-- 1.0E10 : double (nullable = false)
# 스파크 3.0 이면서 spark.sql.legacy.exponentLiteralAsDecimal.enabled’ 이 true로 설정.
df = spark.sql("SELECT 1E10")
df.printSchema()
>> root
>> |-- 10000000000 : decimal(11,0) (nullable = false)
예전 버전처럼 쓰고 싶다면 아래와 같이 설정하면 된다.
spark.sql.legacy.expontLiteralAsDecimal.enabled를 false로 설정
음수형 0 / Negative decimal zeroes
스파크 2.4 버전 이하에서는 float/double형 -0.0은 의미론적으로 0.0과 동일하지만 groupby, partition, join에서 키값으로 사용될 때는 다른 값으로 간주되었다.(즉 버그나 마찬가지) 스파크 3.0에서 고쳐졌다.
다음과 같은 데이터가 있다고 가정할 때 2.4와 3.0에서 groupby 결과를 보자.
df.show()
>>
+----+------+
| num| val|
+----+------+
| 0.0| one|
|-0.0| two|
+----+------+
# 스파크 2.4 이하
df.groupBy('num').agg(count('*')).show()
>>
+----+--------+
| num|count(1)|
+----+--------+
| 0.0| 1|
|-0.0| 1|
+----+--------+
#스파크 3.0
df.groupBy('num').agg(count('*')).show()
>>
+----+--------+
| num|count(1)|
+----+--------+
| 0.0| 2|
+----+--------+
Keyword String을 Date나 Timestamp로 변경
Spark 3.0에서는 문자열을 날짜나 시간으로 변환할 수 있다.
다음 예시를 확인해보자.
SELECT timestamp 'today'
>> 2020-09-10T00:00:00.000+0000
다음과 같은 키워드들이 가능하다.
select timestamp 'x'
x는 다음과 같은 값들이 올 수 있다.
> epoch -> 1970-01-01 00:00:00+00 (Unix system time의 0)
> today -> 오늘 자정
> yesterday -> 어제 자정
> tomorrow -> 내일 자정
> now -> 쿼리 하는 순간의 시간
max_by / min_by
가장 주목해서 본 기능이다. 한 컬럼이 최소/최대 일 때의 다른 컬럼 값을 얻어 낼 수 있다.
사용법은 다음과 같다.
max_by(x,y)
min_by(x,y)
max_by(x,y,n)
min_by(x,y,n)
max_by(x,y) / min_by(x,y) = y가 최대/최소일 때의 x값을 반환
max_by(x,y,n) / min_by(x,y,n) = y가 최대/최소에서 n번째까지의 때의 x값을 n개 반환
생각해보면 이런 것들을 값들을 얻어내기 위해 많은 과정을 거치는 삽질들을 했던 것 같은데 이 함수로 많이 편해질 것 같다. 사용법은 여기 참고
서브 쿼리 안의 WITH 절
CTE(common tabel expression)을 이제 서브 쿼리 내에서 사용할 수 있다. 쿼리 가독성을 향상할 것이며, 복수의 CTE를 사용할 때 중요하다. 다음 예를 보자.
select * from
(with inner_cte as
(select * from some_table where num = 0)
select num from inner_cte)
;
Filter(Where ...)
이제 통계를 낸 이후 한 번 더 필터링을 할 필요가 없어졌다.
다음과 같이 실행한다.
SELECT
count(*) filter (WHERE num > 4) as filtered
count(*) as no_filtered
FROM
some_table
// 필더링된 카운트와, 안된 카운트가 각각 출력된다.
>> filtered, no_filtered
>> 3, 6
Every / Any / Some
표현식을 만족하는지에 대한 boolean값을 리턴한다.
사용법은 다음과 같다.
Any(expr)
Every(expr)
Some(expr)
예를 들어보자.
// some_table에 1, 2 두개의 값이 있다고 가정
SELECT Every(num % 2 = 0) FROM some_table
>> false
SELECT Any(num % 2 = 0) FROM some_table
>> true
SELECT Some(num % 2 = 0) FROM some_table
>> true
Every는 모든 값이 표현식을 만족해야 true를 반환하며 Any와 Some은 하나라도 만족하면 true를 반환한다.
count_if
filter(where ...)과 비슷하며 만족하는 레코드의 수를 나타낸다.
다음과 같이 사용할 수 있다.
SELECT count_if(num % 2 == 0) FROME some_table
>> num % == 0를 만족하는 레코드 수를 반환
bool_and / bool_or
불린 값에 대한 연산 결과를 제공한다.
한 컬럼에 다음과 같은 불린 값이 들어 있다고 가정해보자.
boolean_column
1, true
2, false
3, true
그리고 다음과 같은 sql문을 실행해보자.
SELECT bool_and(boolean_column) FROM some_table
>> false // (= true and false and true)
SELECT bool_or(boolean_column) FROM some_tabel
>> true // (= true or false or true)
기타
- Java 11 지원
- Hadoop 3 지원
- Pandas UDF를 위한 힌트나 기능 강화
- Skew join 최적화 기능 추가
등 눈에 띄는 것만 적어보았다.
정리하자면
이번 버전에서는 Spark SQL 개발자를 위한 기능이 많이 추가가 되었다. 버그도 많이 수정되었고, 검증, 품질점검은 이전보다 쉬워졌다. 특히 max_by, min_by, filter+count 조합은 서브 쿼리 실행을 줄일 수 있게 되어 나 같은 게으른 개발자를 위한 기능이 많아졌다. 나름대로 만족하는 변화이다.
더 많은 기능들이 궁금하다면
아래의 링크에서 찾아보자. SQL관련은 SQL Compatibility Enhancements을 검색하면 아래에 리스트로 정리되어있다.
Reference
spark.apache.org/releases/spark-release-3-0-0.html
끝
'ML | DL | Big data > Data Engineering' 카테고리의 다른 글
지식 베이스(Knowledge base)에 대해 알아보자 (0) | 2021.10.08 |
---|---|
지식 그래프(Knowledge Graph)가 무엇인지 알아보자 (0) | 2021.10.07 |
ETL, ELT의 4가지 주요 차이점 (0) | 2021.04.26 |
Apache Spark - Master, Slave 구성 별 메모리, 코어 수 설정하기 (0) | 2021.03.09 |
AWS EMR에서 매번 같은 버전의 아나콘다 사용하기 (0) | 2021.03.09 |
댓글