Spark로 HDFS 데이터 활용하기 (+ pandas, time range filter)

2020. 7. 22. 15:12Computer Science/Backend

python 기반입니다

 

HDFS 데이터 spark로 불러오기

HDFS(Hadoop File System)으로 저장 돼 있는 데이터를 주피터 노트북에 불러와서 스파크 데이터 프레임으로 활용하는 일이 종종 있다. 첫 번 째로 Spark Session을 열어줘야 된다. SparkSession은 인스턴스 생성을 위한 build() 메서드를 제공하는데, 이 메서드를 통해 인스턴스를 재사용 하거나 새로 생성할 수 있다. 

from pyspark.sql.session import SparkSession
import pyspark.sql.functions as F

# SparkSession 인스턴스 생성
spark = SparkSession.builder.appName('deepjin').getOrCreate()

 

위 과정에서 'deepjin'이라는 스파크 세션 인스턴스를 생성했다. 

 

# hdfs:// 뒤에 자신의 HADOOP PATH를 넣으면 데이터 불러올 수 있다.
# 아래의 예시같은 경우는 parquet 형식의 데이터를 불러온다.
data = spark.read.parquet("hdfs://~~~")

# Check Data Type
type(data)
# 결과 값으로 pyspark.sql.dataframe.DataFrame 이라는 타입을 볼 수 있을 것이다.

# Check Schema
data.printSchema()

parquet 형식으로 된 하둡 파일을 불러오고 type과 스키마를 찍어보는 과정을 진행했다.

 

이제 sql문 중 select를 활용해서 데이터를 필터링 해보자.

data_select = data.select(F.col("원하는 컬럼값"))

이렇게 해주면 원하는 컬럼만 select해서 데이터를 가져올 수 있다. 아무래도 필요 없는 컬럼까지 모두 다루다 보면 메모리 효율성이 떨어지기 때문에 select를 잘 활용해주는 게 중요하다.

 

데이터를 테이블 형태로 보려면 아래와 같이 하면 된다.

data_select.show()

show() 함수에 파라미터로 숫자를 주면, 숫자 갯수만큼의 row를 출력해준다. 아무 값도 넣지 않은 디폴트로는 20개를 출력해준다.

 

Time Range filtering

우선 pandas로 datetime list를 만들어두면 편하다. 

import pandas as pd
import numpy as np

# 2020년 1월 1일 날짜로 데이터를 만든다.
# freq 옵션은 지정해주지 않았을 때 디폴트로 'D'가 적용된다. Day 단위라는 뜻이다.
# 아래 예시에서는 freq='H' 옵션을 주겠다. Hours 단위라는 뜻이다.
# .tolist()를 통해 list타입으로 지정해준다.
date_lst = pd.date_range(start="2020-01-01 00:00:00", end="2020-01-02 00:00:00", freq='H').tolist()

위 코드의 date_lst와 전에 만들어 둔 data_select (스파크 데이터 프레임)을 활용해서 time stamp 단위로 데이터를 묶어보자. 

 

result = []
for i in range(len(date_lst)):
	tmp = data_select.filter(F.col("타임스탬프 타입의 컬럼").between(date_lst[i], date_lst[i+1])).count()
    result.append(tmp)

이렇게 해주면 한시간 단위로 row들을 묶게 된다. 즉, 한 시간에 몇 개의 rows가 있는지 count 할 수 있다. result에 들어가는 값은 아마도 [3, 6, 2345, 437568, 243, ...] 처럼 생겼을 것이다.