본문 바로가기
블록체인/블록체인이란?

Stochastic RSI(스토캐스틱 RSI) 시그널로 시스템 트레이딩 구현하기

by 제이제이_은재 2022. 6. 16.
반응형

💡 Stochastic RSI(스토캐스틱 RSI) 시그널로 시스템 트레이딩 구현하기

 

import pyupbit
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

 

먼저 stochastic rsi를 구하기 위해서는 rsi 값을 먼저 구해야한다.

 

def get_rsi(ohlcv, period):
    ohlcv["close"] = ohlcv["close"]
    delta = ohlcv["close"].diff()
    up, down = delta.copy(), delta.copy()
    up[up < 0] = 0
    down[down > 0] = 0
    _gain = up.ewm(com=(period - 1), min_periods=period).mean()
    _loss = down.abs().ewm(com=(period - 1), min_periods=period).mean()
    RS = _gain / _loss
    return pd.Series(100 - (100 / (1 + RS)), name="RSI")

df['RSI'] = get_rsi(df, 14)

 

stochastic rsi를 구하는 함수는 아래와 같다. 

 

def get_stochastic_rsi(data, k_window, d_window, window):
    
    min_val  = data.rolling(window=window, center=False).min()
    max_val = data.rolling(window=window, center=False).max()
    
    stoch = ( (data - min_val) / (max_val - min_val) ) * 100
    
    K = stoch.rolling(window=k_window, center=False).mean() 
    
    D = K.rolling(window=d_window, center=False).mean() 
    return K, D

df['K'], df['D'] = get_stochastic_rsi(df['RSI'], 3, 3, 14)

 

stochastic rsi의 그래프를 보면 2개의 데이터로 이루어진다. get_stochastic_rsi 함수에서도 2개의 값을 반환한다. 실제 매매를 할 때 K 값을 사용하고 있으므로 K값을 활용하기로 결정했다. K값으로 그래프를 그리면 아래와 같다.

 

최종적으로 구하고자 했던 데이터는 고점이 내려가고 있는지 저점이 올라가고 있는지를 확인하는 것이다. 주어진 그래프에서는 고점을 내리고 있는 것을 확인할 수 있다.

 

고점과 저점을 구하기 위해서 변곡점을 구해야했다. 변곡점을 구하기 위해서 diff함수를 사용했다. diff 함수는 값들의 차이를 구하는 함수이다.

 

x = np.array([1, 2, 4, 7, 0])
np.diff(x) # array([ 1,  2,  3, -7])

 

diff의 결과를 보면 양수에서 음수로 전환되는 점이 고점이 되는 것을 확인할 수 있다.

 

# diff하여 나온 결과는 input array의 크기보다 1이 작다. 따라서 NaN 한 개를 추가하여 길이를 맞춘다.
df["diff"] = np.append([np.NaN], np.diff(df['K']))
def unit_cal(x):
    if x != 0:
        return x / abs(x)
    else: 
        return 0
        
# 양수인지 음수인지만 활용하여 변곡점을 찾으므로 데이터의 크기를 상관없다.
# 따라서 -1, 0, 1로 변경하여 단순화한다.
df["unit"] = df["diff"].apply(unit_cal)

# 음수, 양수, 0을 구분하여 그래프에 표시한다.
fig, ax1 = plt.subplots(figsize=(15, 7))
ax1.plot(df['K'])
for idx, row in df.iterrows():
    if row["unit"] < 0:
        ax1.axvline(x=idx, color='red', linewidth=4.4, alpha =0.2)
    elif row["unit"] > 0:
        ax1.axvline(x=idx, color='blue', linewidth=4.4, alpha =0.2)
    else:
        ax1.axvline(x=idx, color='yellow', linewidth=4.4, alpha =0.2)

 

변곡점을 4가지(MAX, MIN, HIGH, LOW)로 분류하여 저장했다. MAX, MIN은 

 

def inflection_classification(df):
    df = df.copy()
    df["shift_unit"] = df["unit"].shift(-1)
    df["inflection"] = None
    pre_inflection = None
    for idx, row in df.iterrows():
    	# 이전 변곡점과 K값이 같은 경우 처리
        if pre_inflection != None and row["unit"] == 0:
            df.at[idx,"inflection"] = pre_inflection
        else:
            if row["K"] > 97:
                df.at[idx,"inflection"] = "MAX"
            elif row["K"] < 3:
                df.at[idx,"inflection"] = "MIN"
            elif row["shift_unit"] < row["unit"]:
                df.at[idx,"inflection"] = "HIGH"
            elif row["shift_unit"] > row["unit"]:
                df.at[idx,"inflection"] = "LOW"
            else:
                df.at[idx,"inflection"] = None
            pre_inflection = df.at[idx,"inflection"]
    return df

classification_df = inflection_classification(df)
fig, ax1 = plt.subplots(figsize=(15, 7))
ax1.plot(classification_df['K'])
for idx, row in classification_df.iterrows():
    if row["inflection"] == "MAX":
        ax1.scatter(idx, row["K"], c="red")
    elif row["inflection"] == "MIN":
        ax1.scatter(idx, row["K"], c="orange")
    elif row["inflection"] == "HIGH":
        ax1.scatter(idx, row["K"], c="pink")
    elif row["inflection"] == "LOW":
        ax1.scatter(idx, row["K"], c="green")
    elif row["inflection"] == "ZERO":
        ax1.scatter(idx, row["K"], c="blue")

 

MIN과 MAX의 경우 얼마나 지속되었는지를 확인하기 위해 점의 개수를 counting한다.

 

def inflection_count_nesting(df, init_count = None):
    df = df.copy()
    if init_count != None:
        df["count"] = init_count
    pre_idx = None
    for idx, row in df[~df["inflection"].isin([None])].iterrows():
        if pre_idx != None:
            current_inflection = row["inflection"]
            pre_inflection = df.loc[pre_idx, "inflection"]
            # diff값이 0인 경우 변곡이 생긴다. 아래의 code로 제거한다.
            if (pre_inflection == "HIGH" and current_inflection in ["HIGH", "MAX"]) or (pre_inflection == "LOW" and current_inflection in ["MIN", "LOW"]):
                df.at[pre_idx,"inflection"] = None
                df.at[idx,"count"] = 1
            # MIN, MAX가 연속적으로 발생하는 경우 count를 계산한다.
            elif pre_inflection == current_inflection:
                df.at[pre_idx,"inflection"] = None
                df.at[idx,"count"] = df.loc[pre_idx, "count"] + df.at[idx,"count"]
        pre_idx = idx
    return df

count_nesting_df = inflection_count_nesting(classification_df, 1)
fig, ax1 = plt.subplots(figsize=(15, 7))
ax1.plot(count_nesting_df['K'])
for idx, row in count_nesting_df.iterrows():
    if row["inflection"] == "MAX":
        ax1.scatter(idx, row["K"], c="red")
        ax1.text(idx, row["K"], row["count"])
    elif row["inflection"] == "MIN":
        ax1.scatter(idx, row["K"], c="orange")
        ax1.text(idx, row["K"], row["count"])
    elif row["inflection"] == "HIGH":
        ax1.scatter(idx, row["K"], c="pink")
        ax1.text(idx, row["K"], row["count"])
    elif row["inflection"] == "LOW":
        ax1.scatter(idx, row["K"], c="green")
        ax1.text(idx, row["K"], row["count"])

 

변곡점의 변동폭이 너무 작은 경우 무시해야하는 경우가 많다. 

 

왼쪽은 변곡점이 짝수개인 경우, 오른쪽은 변곡점이 홀수개인 경우

 

변곡점이 홀수개인 경우는 계속 상승하고 있는 경우이므로 제거한다. 변곡점이 홀수개인 경우는 전부 제거할 수는 없다. 가장 높거나 가장 낮은 변곡점만 남기고 제거한다.

 

def remove_inflection(df, inflection_list):
    selected_inflection = None
    last_inflection = inflection_list[-1]
    inflections = list(map(lambda x: x[2], inflection_list))
    # 변동폭이 작으면서 MAX, MIN이 포함되는 경우는 inflection을 MAX, MIN를 제외한 변곡점을 제거한다.
    if "MAX" in inflections or "MIN" in inflections:
        for inf in inflection_list:
            if not inf[2] in ["MAX", "MIN"]:
                df.at[inf[0],"inflection"] = None
                df.at[inf[0],"count"] = 1
    else:
    	# 변동폭이 작으면서 변곡점이 짝수인경우는 전부 제거한다.
        if len(inflection_list) % 2 == 0:
            for inf in inflection_list:
                df.at[inf[0],"inflection"] = None
                df.at[inf[0],"count"] = 1
        # 변동폭이 작으면서 변곡점이 홀수인 경우는 가장 높거나 낮은 경우를 제외하고 전부 제거한다.
        else:
            if last_inflection[2] in ["MIN", "LOW"]:
                min_inflection = reduce(lambda acc, cur: cur if acc[1] > cur[1] else acc, inflection_list, (0,101,0))
                inflection_list.remove(min_inflection)
                selected_inflection = min_inflection
            else:
                max_inflection = reduce(lambda acc, cur: cur if acc[1] <= cur[1] else acc, inflection_list, (0,-1,0))
                inflection_list.remove(max_inflection)
                selected_inflection = max_inflection
            for inf in inflection_list:
                df.at[inf[0],"inflection"] = None
                df.at[inf[0],"count"] = 1
    inflection_list.clear()
    
def flatten(df, k_diff):
    df = df.copy()
    pre_idx = None
    inflection_list = []
    for idx, row in df[~df["inflection"].isin([None])].iterrows():
        if pre_idx != None:
            current_inflection = (idx, df.loc[idx, "K"], df.loc[idx, "inflection"])
            pre_inflection = (pre_idx, df.loc[pre_idx, "K"], df.loc[pre_idx, "inflection"])
            # 변곡점의 변동폭이 k_diff보다 작은 경우 inflection_list에 추가한다.
            if abs(current_inflection[1] - pre_inflection[1]) < k_diff:
                if len(inflection_list) == 0 :
                    inflection_list.append(pre_inflection)
                inflection_list.append(current_inflection)
            # 변곡점의 변동폭이 k_diff보다 큰 경우가 발생하면 inflection_list에 추가되어 있는 변곡점을 제거한다.
            else:
                if len(inflection_list) > 1:
                    remove_inflection(df, inflection_list)
        pre_idx = idx
    if len(inflection_list) > 1:
        remove_inflection(df, inflection_list)
    return df

flatten_df = flatten(count_nesting_df, 7)
fig, ax1 = plt.subplots(figsize=(15, 7))
ax1.plot(flatten_df['K'])
for idx, row in flatten_df.iterrows():
    if row["inflection"] == "MAX":
        ax1.scatter(idx, row["K"], c="red")
        ax1.text(idx, row["K"], row["count"])
    elif row["inflection"] == "MIN":
        ax1.scatter(idx, row["K"], c="orange")
        ax1.text(idx, row["K"], row["count"])
    elif row["inflection"] == "HIGH":
        ax1.scatter(idx, row["K"], c="pink")
        ax1.text(idx, row["K"], row["count"])
    elif row["inflection"] == "LOW":
        ax1.scatter(idx, row["K"], c="green")
        ax1.text(idx, row["K"], row["count"])

 

위의 코드를 정리하여 하나의 함수로 구현했다. flatten_diff는 변동폭이 작은 경우 제거하기 위핸 parameter이고 inflection range는 변곡점의 선을 그릴 때 변동폭이 너무 작거나 큰 경우를 제외시키기 위한 parameter이다.

 

def stockastic_rsi_analysis(df, flatten_diff, inflection_range):
    df['RSI'] = GetRSI(df, 14)
    df['K'], df['D'] = stochastic(df['RSI'], 3, 3, 14)
    
    # diff하여 나온 결과는 input array의 크기보다 1이 작다. 따라서 NaN 한 개를 추가하여 길이를 맞춘다.
    df["diff"] = np.append([np.NaN], np.diff(df['K']))
    def unit_cal(x):
        if x != 0:
            return x / abs(x)
        else: 
            return 0
    df["unit"] = df["diff"].apply(unit_cal)
    
    classification_df = inflection_classification(df)
    count_nesting_df = inflection_count_nesting(classification_df, 1)
    flatten_df = flatten(count_nesting_df, 7)
    re_count_nesting_df =  inflection_count_nesting(flatten_df)
    fig, ax1 = plt.subplots(figsize=(15, 7))
    ax1.plot(re_count_nesting_df['K'])
    for idx, row in re_count_nesting_df.iterrows():
        if row["inflection"] == "MAX":
            ax1.scatter(idx, row["K"], c="red")
            ax1.text(idx, row["K"], row["count"])
        elif row["inflection"] == "MIN":
            ax1.scatter(idx, row["K"], c="orange")
            ax1.text(idx, row["K"], row["count"])
        elif row["inflection"] == "HIGH":
            ax1.scatter(idx, row["K"], c="pink")
            ax1.text(idx, row["K"], row["count"])
        elif row["inflection"] == "LOW":
            ax1.scatter(idx, row["K"], c="green")
            ax1.text(idx, row["K"], row["count"])
    last_inflection_row = re_count_nesting_df[~re_count_nesting_df["inflection"].isin([None])].iloc[-1]
    if last_inflection_row["inflection"] is "MIN":
        second = re_count_nesting_df[re_count_nesting_df["inflection"].isin(["MIN", "LOW"])].iloc[-2]
        if second["inflection"] is "MIN":
            if last_inflection_row["count"] > second["count"]:
                ax1.plot([second.name, last_inflection_row.name], [second["K"], last_inflection_row["K"]])
                return "짝궁"
            if last_inflection_row["count"] == second["count"]:
                ax1.plot([second.name, last_inflection_row.name], [second["K"], last_inflection_row["K"]])
                return "쌍궁"
        else:
            diff = last_inflection_row["K"] - second["K"]
            if inflection_range[0] <= diff and diff <= inflection_range[1]:
                return "짝궁"

    if last_inflection_row["inflection"] is "LOW":
        second = re_count_nesting_df[re_count_nesting_df["inflection"].isin(["MIN", "LOW"])].iloc[-2]
        diff = last_inflection_row["K"] - second["K"]
        if inflection_range[0] <= diff and diff <= inflection_range[1]:
            ax1.plot([second.name, last_inflection_row.name], [second["K"], last_inflection_row["K"]])
            return "짝궁"

    if last_inflection_row["inflection"] is "MAX":
        second = re_count_nesting_df[re_count_nesting_df["inflection"].isin(["MAX", "HIGH"])].iloc[-2]
        if second["inflection"] is "MAX":
            if last_inflection_row["count"] < second["count"]:
                ax1.plot([second.name, last_inflection_row.name], [second["K"], last_inflection_row["K"]])
                return("짝두")
            if last_inflection_row["count"] == second["count"]:
                ax1.plot([second.name, last_inflection_row.name], [second["K"], last_inflection_row["K"]])
                return "쌍두"
        else:
            diff = second["K"] - last_inflection_row["K"]
            if inflection_range[0] <= diff and diff <= inflection_range[1]:
                ax1.plot([second.name, last_inflection_row.name], [second["K"], last_inflection_row["K"]])
                return "짝두"

    if last_inflection_row["inflection"] is "HIGH":
        second = re_count_nesting_df[re_count_nesting_df["inflection"].isin(["MAX", "HIGH"])].iloc[-2]
        diff = second["K"] - last_inflection_row["K"]
        if inflection_range[0] <= diff and diff <= inflection_range[1]:
            ax1.plot([second.name, last_inflection_row.name], [second["K"], last_inflection_row["K"]])
            return "짝두"
df = pyupbit.get_ohlcv("KRW-KNC", interval="day")
print(stockastic_rsi_analysis(df,7,(5, 60)))

 

짝궁 신호가 발생한 것을 알 수 있다.

 

stochstic rsi 활용해 시그널을 구해보았다. 실제로 매매를 진행할 때는 stochstic rsi와 추세선을 함께 활용하고 있다. 다음에는 추세선을 그리는 방법에 대해서 알아보자.

반응형

댓글