본문 바로가기
ML | DL | Big data/Data Engineering

Apache Spark 3.0 변경점/변화 총 정리

by 썽하 2020. 9. 10.

 

Apache Spark 3.0에서 바뀐 기능 정리

기능, ANSI SQL Compliance, Store Assignment 정책, query semantics 업데이트 등

 

 

Apache Spark 3.0이 배포된지는 몇 달이 되었는데, 최근에 ETL 작업을 하지 않아서 따로 챙겨보진 못했다.

이번 업데이트에서는 Spark SQL에서 눈여겨 볼만한 기능들이 많이 추가된 것 같다. 특히 Spark SQL 개발자들에게 유용한 기능들이 많이 추가되었다.

사용할 일이 생겼으니, 주요한 변화를 정리를 해보자.

 

ANSI SQL Compliant Feature

일부 예약된 키워드를 식별자로 사용할 수 없게 된다.

 

select * from some_table create where create.some_column = 1

 

조건문에 예약 키워드인 create가 사용되었는데도 문제없이 실행이 된다.

 

하지만 이제 스파크는 ANSI SQL 표준을 준수할 수 있다. 이를 사용하기 위해선

spark.sql.ansi.enabled 속성을 true로 설정하면 된다.

 

설정 후 위의 쿼리를 실행하면 이렇게 에러가 발생한다.

Error in SQL statement: ParseException: no viable alternative at input 'create'(line 1, pos 41)

== SQL ==
select * from some_table create where create.some_column = 1
----------------------------^^^

com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException: org.apache.spark.sql.catalyst.parser.ParseException:

 

상세 내용은 여기 Spark JIRA에서 확인할 수 있고,

예약 키워드들은 여기에서 확인할 수 있다.

 

Store Assignment policy

이 기능은 SQL에서 데이터 품질을 엄격하게 확인하기 위해서 도입되었다.

스파크 2.4.4에서 아래와 같이 SQL을 시켜보자.

크게 문제가 없어 보이는데 정수형 컬럼에 문자열을 넣는 예제이다.

CREATE TABLE assignment_policy_check(col INT);
INSERT INTO assignment_policy_check VALUES("123")
SELECT * FROM assignment_policy_check

>>> 1, 123

2.4.4, 3.0에서 모두 이상 없이 실행된다.

실수로 이런 값을 넣는 상황을 막고 싶다면 다음과 같이 값을 설정해주면 된다.

spark.sql.storeAssignmentPolicyANSI로 설정

 

from_json의 옵션 모드 (FAILFAST/PERMISSIVE)

스파크 3.0에서는 PERMISSIVE나 FAILFAST 모드가 추가되었다. 만약 JSON이 잘못되었거나 파싱 할 수 없을 때는 에러가 발생한다.

df.withColumn("colname", from_json(df['colname'], jsonSchema, {'mode':FAILFAST'})).show()
>> 파싱할 수 없을경우 에러 발생

 

지수 표기법 / Exponential notation

스파크 3.0에서는 1E10이라고 쓰인 숫자는 이중으로 해석된다. 스파크 2.4 버전 이하에서는 십진법으로 해석된다.

다음 예시를 보자.

# 스파크 2.4
df = spark.sql("SELECT 1E10")
df.printSchema()

>> root
>> |-- 1.0E10 : double (nullable = false)


# 스파크 3.0 이면서 spark.sql.legacy.exponentLiteralAsDecimal.enabled’ 이 true로 설정.
df = spark.sql("SELECT 1E10")
df.printSchema()

>> root
>> |-- 10000000000 : decimal(11,0) (nullable = false)

예전 버전처럼 쓰고 싶다면 아래와 같이 설정하면 된다.

spark.sql.legacy.expontLiteralAsDecimal.enabled를 false로 설정

 

 

음수형 0 / Negative decimal zeroes

스파크 2.4 버전 이하에서는 float/double형 -0.0은 의미론적으로 0.0과 동일하지만 groupby, partition, join에서 키값으로 사용될 때는 다른 값으로 간주되었다.(즉 버그나 마찬가지) 스파크 3.0에서 고쳐졌다. 

다음과 같은 데이터가 있다고 가정할 때 2.4와 3.0에서 groupby 결과를 보자.

df.show()
>>
+----+------+
| num|   val|
+----+------+
| 0.0|   one|
|-0.0|   two|
+----+------+

# 스파크 2.4 이하
df.groupBy('num').agg(count('*')).show()
>>
+----+--------+
| num|count(1)|
+----+--------+
| 0.0|       1|
|-0.0|       1|
+----+--------+

#스파크 3.0
df.groupBy('num').agg(count('*')).show()
>>
+----+--------+
| num|count(1)|
+----+--------+
| 0.0|       2|
+----+--------+

 

Keyword String을 Date나 Timestamp로 변경

Spark 3.0에서는 문자열을 날짜나 시간으로 변환할 수 있다.

다음 예시를 확인해보자.

SELECT timestamp 'today'
>> 2020-09-10T00:00:00.000+0000

다음과 같은 키워드들이 가능하다.

select timestamp 'x'
x는 다음과 같은 값들이 올 수 있다.

> epoch -> 1970-01-01 00:00:00+00 (Unix system time의 0)
> today -> 오늘 자정
> yesterday -> 어제 자정
> tomorrow -> 내일 자정
> now -> 쿼리 하는 순간의 시간

 

max_by / min_by

가장 주목해서 본 기능이다. 한 컬럼이 최소/최대 일 때의 다른 컬럼 값을 얻어 낼 수 있다.

사용법은 다음과 같다.

max_by(x,y)
min_by(x,y)
max_by(x,y,n)
min_by(x,y,n)

max_by(x,y) / min_by(x,y) = y가 최대/최소일 때의 x값을 반환
max_by(x,y,n) / min_by(x,y,n) = y가 최대/최소에서 n번째까지의 때의 x값을 n개 반환

 

생각해보면 이런 것들을 값들을 얻어내기 위해 많은 과정을 거치는 삽질들을 했던 것 같은데 이 함수로 많이 편해질 것 같다. 사용법은 여기 참고

 

서브 쿼리 안의 WITH 절

CTE(common tabel expression)을 이제 서브 쿼리 내에서 사용할 수 있다. 쿼리 가독성을 향상할 것이며, 복수의 CTE를 사용할 때 중요하다. 다음 예를 보자.

select * from 
	(with inner_cte as
    		(select * from some_table where num = 0)
	select num from inner_cte)
;

 

Filter(Where ...)

이제 통계를 낸 이후 한 번 더 필터링을 할 필요가 없어졌다.

다음과 같이 실행한다.

SELECT
	count(*) filter (WHERE num > 4) as filtered
    count(*) as no_filtered
FROM 
	some_table
 
 // 필더링된 카운트와, 안된 카운트가 각각 출력된다.
 >>	filtered, no_filtered
 >> 3, 6

 

Every / Any / Some

표현식을 만족하는지에 대한 boolean값을 리턴한다.

사용법은 다음과 같다.

Any(expr)
Every(expr)
Some(expr)

예를 들어보자.

// some_table에 1, 2 두개의 값이 있다고 가정
SELECT Every(num % 2 = 0) FROM some_table
>> false

SELECT Any(num % 2 = 0) FROM some_table
>> true

SELECT Some(num % 2 = 0) FROM some_table
>> true

Every는 모든 값이 표현식을 만족해야 true를 반환하며 AnySome은 하나라도 만족하면 true를 반환한다.

 

 

count_if

filter(where ...)과 비슷하며 만족하는 레코드의 수를 나타낸다.

다음과 같이 사용할 수 있다.

SELECT count_if(num % 2 == 0) FROME some_table
>> num % == 0를 만족하는 레코드 수를 반환

 

bool_and / bool_or

불린 값에 대한 연산 결과를 제공한다.

한 컬럼에 다음과 같은 불린 값이 들어 있다고 가정해보자.

boolean_column
1, true
2, false
3, true

그리고 다음과 같은 sql문을 실행해보자.

SELECT bool_and(boolean_column) FROM some_table
>> false // (= true and false and true)

SELECT bool_or(boolean_column) FROM some_tabel
>> true // (= true or false or true)

 

기타

  • Java 11 지원
  • Hadoop 3 지원
  • Pandas UDF를 위한 힌트나 기능 강화
  • Skew join 최적화 기능 추가

등 눈에 띄는 것만 적어보았다.

 

정리하자면

이번 버전에서는 Spark SQL 개발자를 위한 기능이 많이 추가가 되었다. 버그도 많이 수정되었고, 검증, 품질점검은 이전보다 쉬워졌다. 특히 max_by, min_by, filter+count 조합은 서브 쿼리 실행을 줄일 수 있게 되어 나 같은 게으른 개발자를 위한 기능이 많아졌다. 나름대로 만족하는 변화이다.

더 많은 기능들이 궁금하다면

아래의 링크에서 찾아보자. SQL관련은 SQL Compatibility Enhancements을 검색하면 아래에 리스트로 정리되어있다.

 

Reference

spark.apache.org/releases/spark-release-3-0-0.html

 


댓글