맨위로버튼이미지

전체 학습을 한 파일로 돌리면 많은 경우 5일 이상의 시간이 걸립니다.

 

이런 부분을 Joblib lib를 이용하여 오래 걸리는 부분을 작은 단위로 분리하여 학습을 작은 단위별로 진행할 수 있게 분리했습니다.

 

다시 해야할 부분만 진행하여 학습을 강화하다면 효율적일 것 같습니다.

 

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.init as init
import torchvision
import torch.nn.functional as F

# https://pytorch.org/docs/stable/torchvision/datasets.html
# 파이토치에서는 torchvision.datasets에 MNIST 등의 다양한 데이터를 사용하기 용이하게 정리해놨습니다.
# 이를 사용하면 데이터를 따로 학습에 맞게 정리하거나 하지 않아도 바로 사용이 가능합니다.
import torchvision.datasets as dset

# https://pytorch.org/docs/stable/torchvision/transforms.html?highlight=transforms
# torchvision.transforms에는 이미지 데이터를 자르거나 확대 및 다양하게 변형시키는 함수들이 구현되어 있습니다. 
import torchvision.transforms as transforms

# https://pytorch.org/docs/stable/data.html?highlight=dataloader#torch.utils.data.DataLoader
# DataLoader는 전처리가 끝난 데이터들을 지정한 배치 크기에 맞게 모아서 전달해주는 역할을 합니다.
from torch.utils.data import DataLoader
import pandas as pd

import numpy as np
import matplotlib.pyplot as pltx

import plotly.graph_objects as go
import plotly.subplots as ms
import plotly.express as px
import plotly as plt
import pymysql
import pandas as pd
import numpy as np
import time
import talib
from PIL import Image
import io
import os.path as path
import joblib
import common.constrant as Const
import math

ticker = 'ETH'
class Market():
    def __init__(self) -> None:
        self.df = pd.DataFrame()
    
    def add_bbands(self):
        try:
            print("볼리져 밴드 구하기:%s"  % time.ctime(time.time()))
            self.df['ma20'] = talib.SMA(np.asarray(self.df['close']), 20)
            self.df['stddev'] = self.df['close'].rolling(window=20).std() # 20일 이동표준편차
            upper, middle, lower = talib.BBANDS(np.asarray(self.df['close']), timeperiod=40, nbdevup=2.3, nbdevdn=2.3, matype=0)
            self.df['lower']=lower
            self.df['middle']=middle
            self.df['upper']=upper
            # self.df['bbands_sub']=self.df.upper - self.df.lower
            # self.df['bbands_submax'] =  self.df.bbands_sub.rolling(window=840, min_periods=10).max()
            # self.df['bbands_submin'] =  self.df.bbands_sub.rolling(window=840, min_periods=10).min()
            # self.df['bbs_deg'] = (self.df.mavg - self.df.mavg.shift()).apply(lambda x: math.degrees(math.atan2(x, 40))) 

            # self.df['pct8']=(self.df.close - self.df.lower)/(self.df.upper - self.df.lower)
            # self.df['pct8vsclose']=self.df.close - self.df.pct8
        except Exception as ex:
            print("`add_bbands -> exception! %s `" % str(ex))

    def add_relative_strength(self):
        try:
            print("상대 강도 지수 구하기:%s"  % time.ctime(time.time()))
            rsi14 = talib.RSI(np.asarray(self.df['close']), 14)
            self.df['rsi14'] = rsi14
        except Exception as ex:
            print("`add_relative_strength -> exception! %s `" % str(ex))
            
    def add_iip(self):
        try:
            #역추세전략을 위한 IIP계산
            self.df['II'] = (2*self.df['close']-self.df['high']-self.df['low'])/(self.df['high']-self.df['low'])*self.df['volume']
            self.df['IIP21'] = self.df['II'].rolling(window=21).sum()/self.df['volume'].rolling(window=21).sum()*100            
        except Exception as ex:
            print("`add_iip -> exception! %s `" % str(ex))
            
    def add_stock_cast(self):
        try:
            # 스토캐스틱 구하기
            self.df['ndays_high'] = self.df['high'].rolling(window=14, min_periods=1).max()    # 14일 중 최고가
            self.df['ndays_low'] = self.df['low'].rolling(window=14, min_periods=1).min()      # 14일 중 최저가
            self.df['fast_k'] = (self.df['close'] - self.df['ndays_low']) / (self.df['ndays_high'] - self.df['ndays_low']) * 100  # Fast %K 구하기
            self.df['slow_d'] = self.df['fast_k'].rolling(window=3).mean()    # Slow %D 구하기        except Exception as ex:
        except Exception as ex:
            print("`add_stock_cast -> exception! %s `" % str(ex))
    
    def add_mfi(self):
        try:
            # MFI 구하기
            self.df['PB'] = (self.df['close'] - self.df['lower']) / (self.df['upper'] - self.df['lower'])
            self.df['TP'] = (self.df['high'] + self.df['low'] + self.df['close']) / 3
            self.df['PMF'] = 0
            self.df['NMF'] = 0
            for i in range(len(self.df.close)-1):
                if self.df.TP.values[i] < self.df.TP.values[i+1]:
                    self.df.PMF.values[i+1] = self.df.TP.values[i+1] * self.df.volume.values[i+1]
                    self.df.NMF.values[i+1] = 0
                else:
                    self.df.NMF.values[i+1] = self.df.TP.values[i+1] * self.df.volume.values[i+1]
                    self.df.PMF.values[i+1] = 0
            self.df['MFR'] = (self.df.PMF.rolling(window=10).sum() / self.df.NMF.rolling(window=10).sum())
            self.df['MFI10'] = 100 - 100 / (1 + self.df['MFR'])    
        except Exception as ex:
            print("`add_mfi -> exception! %s `" % str(ex))

    def add_macd(self, sort, long, sig):
        try:
            print("MACD 구하기:%s, sort:%d, long:%d, sig:%d"  % (time.ctime(time.time()), sort, long, sig))
            macd, macdsignal, macdhist = talib.MACD(np.asarray(self.df['close']), sort, long, sig) 
            self.df['macd'] = macd
            # self.df['macdmax'] = self.df.macd.rolling(window=840, min_periods=100).max()
            # self.df['macdmin'] = self.df.macd.rolling(window=840, min_periods=100).min()
            # self.df['macd_max_rate'] = self.df.apply(lambda x: (x['macdmax'] - x['macd']) * 100 / (x['macdmax'] - x['macdmin']), axis=1)
            # self.df['macd_min_rate'] = self.df.apply(lambda x: (x['macd'] - x['macdmin']) * 100 / (x['macdmax'] - x['macdmin']), axis=1)
            # self.df['prv_macd_degrees'] = self.df.macd_degrees.shift()
            self.df['signal'] = macdsignal
            self.df['flag'] = macdhist
            # self.df['prv_osc_degrees'] = self.df.osc_degrees.shift()
        except Exception as ex:
            print("`add_macd -> exception! %s `" % str(ex))

    def get_data(self):
        table_name="TB_ETH_TRADE"
        sqlcon = pymysql.connect(host='192.168.255.59', user='yuxxxx', password='------', db='eth')
        cursor = sqlcon.cursor(pymysql.cursors.DictCursor)
        str_query = """select 
            'time'
            ,`start` as open
            , high
            , low
            , close
            , volume
            , macd
            , macdmax
            , macdmin
            , `signal`
            , `flag`
            , osc_degrees
            , sma1200
            , sma1200_degrees
            , wma1200
            , wma1200_degrees
            from %s""" % table_name
        cursor.execute(str_query)
        data = cursor.fetchall()
        self.df = pd.DataFrame(data)
        sqlcon.close()
        self.df['sma3'] = talib.SMA(np.asarray(self.df['close']), 3)
        self.df['sma20'] = talib.SMA(np.asarray(self.df['close']), 20)
        self.df['sma20deg'] = (self.df.sma20 - self.df.sma20.shift()).apply(lambda x: math.degrees(math.atan2(x, Const.DEGREES_X))) 
        self.df['closemin'] = self.df.apply(lambda x: x['close'] if x['sma20deg'] >= 0 else x['high'], axis = 1).rolling(window=100, center=True).min() 
        self.df['closemax'] = self.df.apply(lambda x: x['close'] if x['sma20deg'] <= 0 else 0, axis = 1).rolling(window=100, center=True).max()
        self.add_macd(12,26,9)
        self.add_bbands()
        self.add_relative_strength()
        self.add_stock_cast()
        self.add_iip()
        self.add_mfi()
        # self.df.dropna(inplace=True)
        self.df.reset_index(drop=True,inplace=True)
        return self.df


batch_size = 200
learning_rate = 0.001
num_epoch = 100
data_size = 100
action_kind = 5
screen_height = 50
screen_width  = 70
qsize = 2048

mk = Market()
df = mk.get_data()
df = df.tail((df.index.max()//batch_size)*batch_size)

df.to_csv("data_all.dat")

이 부분은 데이터를 읽어 드리는 부분인데 이미 DB에 저장된 데이터를 읽어 오므로 1~2분 정도면 끝납니다. 그래서 데이터를 data_all.dat파일에 저장합니다.

 

사실 이 파일만 있으면 모든게 해결되는 거죠.

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.init as init
import torchvision
import torch.nn.functional as F

# https://pytorch.org/docs/stable/torchvision/datasets.html
# 파이토치에서는 torchvision.datasets에 MNIST 등의 다양한 데이터를 사용하기 용이하게 정리해놨습니다.
# 이를 사용하면 데이터를 따로 학습에 맞게 정리하거나 하지 않아도 바로 사용이 가능합니다.
import torchvision.datasets as dset

# https://pytorch.org/docs/stable/torchvision/transforms.html?highlight=transforms
# torchvision.transforms에는 이미지 데이터를 자르거나 확대 및 다양하게 변형시키는 함수들이 구현되어 있습니다. 
import torchvision.transforms as transforms

# https://pytorch.org/docs/stable/data.html?highlight=dataloader#torch.utils.data.DataLoader
# DataLoader는 전처리가 끝난 데이터들을 지정한 배치 크기에 맞게 모아서 전달해주는 역할을 합니다.
from torch.utils.data import DataLoader
import pandas as pd

import numpy as np
import matplotlib.pyplot as pltx

import plotly.graph_objects as go
import plotly.subplots as ms
import plotly.express as px
import plotly as plt
import pymysql
import pandas as pd
import numpy as np
import time
import talib
from PIL import Image
import io
import os.path as path
import joblib
# import common.constrant as Const

batch_size = 200
learning_rate = 0.001
num_epoch = 100
data_size = 100
action_kind = 4
screen_height = 50
screen_width  = 70
qsize = 2048

df = pd.read_csv("data_all.dat")
df = df.tail(100000)
start_idx = df.index[0]
print("start_idx[{}]".format(start_idx))

def get_chart(df, idx, max_data:int=300, i_w:int=140, i_h:int=100):
    ndf = df.head(idx - start_idx).tail(max_data)
    ndf.reset_index(drop=True, inplace=True)
    print(ndf)
    
    candle = go.Candlestick(x=ndf.index,open=ndf['open'],high=ndf['high'],low=ndf['low'],close=ndf['close'], increasing_line_color = 'red',decreasing_line_color = 'blue', showlegend=False)
    upper = go.Scatter(x=ndf.index, y=ndf['upper'], line=dict(color='red', width=2), name='upper', showlegend=False)
    ma20 = go.Scatter(x=ndf.index, y=ndf['ma20'], line=dict(color='black', width=2), name='ma20', showlegend=False)
    lower = go.Scatter(x=ndf.index, y=ndf['lower'], line=dict(color='blue', width=2), name='lower', showlegend=False)

    volume = go.Bar(x=ndf.index, y=ndf['volume'], marker_color='red', name='volume', showlegend=False)

    MACD = go.Scatter(x=ndf.index, y=ndf['macd'], line=dict(color='blue', width=2), name='MACD', legendgroup='group2', legendgrouptitle_text='MACD')
    MACD_Signal = go.Scatter(x=ndf.index, y=ndf['signal'], line=dict(dash='dashdot', color='green', width=2), name='MACD_Signal')
    MACD_Oscil = go.Bar(x=ndf.index, y=ndf['flag'], marker_color='purple', name='MACD_Oscil')

    fast_k = go.Scatter(x=ndf.index, y=ndf['fast_k'], line=dict(color='skyblue', width=2), name='fast_k', legendgroup='group3', legendgrouptitle_text='%K %D')
    slow_d = go.Scatter(x=ndf.index, y=ndf['slow_d'], line=dict(dash='dashdot', color='black', width=2), name='slow_d')

    PB = go.Scatter(x=ndf.index, y=ndf['PB']*100, line=dict(color='blue', width=2), name='PB', legendgroup='group4', legendgrouptitle_text='PB, MFI')
    MFI10 = go.Scatter(x=ndf.index, y=ndf['MFI10'], line=dict(dash='dashdot', color='green', width=2), name='MFI10')

    RSI = go.Scatter(x=ndf.index, y=ndf['rsi14'], line=dict(color='red', width=2), name='RSI', legendgroup='group5', legendgrouptitle_text='RSI')
    
    # 스타일
    fig = ms.make_subplots(rows=5, cols=2, specs=[[{'rowspan':4},{}],[None,{}],[None,{}],[None,{}],[{},{}]], shared_xaxes=True, horizontal_spacing=0.03, vertical_spacing=0.01)

    fig.add_trace(candle,row=1,col=1)
    fig.add_trace(upper,row=1,col=1)
    fig.add_trace(ma20,row=1,col=1)
    fig.add_trace(lower,row=1,col=1)

    fig.add_trace(volume,row=5,col=1)

    fig.add_trace(candle,row=1,col=2)
    fig.add_trace(upper,row=1,col=2)
    fig.add_trace(ma20,row=1,col=2)
    fig.add_trace(lower,row=1,col=2)

    fig.add_trace(MACD,row=2,col=2)
    fig.add_trace(MACD_Signal,row=2,col=2)
    fig.add_trace(MACD_Oscil,row=2,col=2)

    fig.add_trace(fast_k,row=3,col=2)
    fig.add_trace(slow_d,row=3,col=2)

    fig.add_trace(PB,row=4,col=2)
    fig.add_trace(MFI10,row=4,col=2)

    fig.add_trace(RSI,row=5,col=2)

    # 추세추종
    # trend_fol = 0
    # trend_refol = 0
    # for i in ndf.index:
    #     if ndf['PB'][i] > 0.8 and ndf['MFI10'][i] > 80:
    #         trend_fol = go.Scatter(x=[ndf.index[i]], y=[ndf['close'][i]], marker_color='orange', marker_size=20, marker_symbol='triangle-up', opacity=0.7, showlegend=False)
    #         fig.add_trace(trend_fol,row=1,col=1)
    #     elif ndf['PB'][i] < 0.2 and ndf['MFI10'][i] < 20:
    #         trend_fol = go.Scatter(x=[ndf.index[i]], y=[ndf['close'][i]], marker_color='darkblue', marker_size=20, marker_symbol='triangle-down', opacity=0.7, showlegend=False)
    #         fig.add_trace(trend_fol,row=1,col=1)

    # 역추세추종
    # for i in ndf.index:
    #     if ndf['PB'][i] < 0.05 and ndf['IIP21'][i] > 0:
    #         trend_refol = go.Scatter(x=[ndf.index[i]], y=[ndf['close'][i]], marker_color='purple', marker_size=20, marker_symbol='triangle-up', opacity=0.7, showlegend=False)  #보라
    #         fig.add_trace(trend_refol,row=1,col=1)
    #     elif df['PB'][i] > 0.95 and ndf['IIP21'][i] < 0:
    #         trend_refol = go.Scatter(x=[ndf.index[i]], y=[ndf['close'][i]], marker_color='skyblue', marker_size=20, marker_symbol='triangle-down', opacity=0.7, showlegend=False)  #하늘
    #         fig.add_trace(trend_refol,row=1,col=1)    

    # fig.add_trace(trend_fol,row=1,col=1)
    # 추세추총전략을 통해 캔들차트에 표시합니다.

    # fig.add_trace(trend_refol,row=1,col=1)
    # 역추세 전략을 통해 캔들차트에 표시합니다.
    
    # fig.update_layout(autosize=True, xaxis1_rangeslider_visible=False, xaxis2_rangeslider_visible=False, margin=dict(l=50,r=50,t=50,b=50), template='seaborn', title=f'({ticker})의 날짜: ETH [추세추종전략:오↑파↓] [역추세전략:보↑하↓]')
    # fig.update_xaxes(tickformat='%y년%m월%d일', zeroline=True, zerolinewidth=1, zerolinecolor='black', showgrid=True, gridwidth=2, gridcolor='lightgray', showline=True,linewidth=2, linecolor='black', mirror=True)
    # fig.update_yaxes(tickformat=',d', zeroline=True, zerolinewidth=1, zerolinecolor='black', showgrid=True, gridwidth=2, gridcolor='lightgray',showline=True,linewidth=2, linecolor='black', mirror=True)
    # fig.update_traces(xhoverformat='%y년%m월%d일')
    # size = len(img)
    # img = plt.io.to_image(fig, format='png')
    # canvas = FigureCanvasBase(fig)
    # img = Image.frombytes(mode='RGB', size=(700, 500), data=fig.to_image().to, decoder_name='raw')
    # img = Image.frombytes('RGBA', (700, 500), plt.io.to_image(fig, format='png'), 'raw')
    # img = Image.fromarray('RGBA', (700, 500), np.array(plt.io.to_image(fig, format='png')), 'raw')
    # img = Image.fromarray(np.array(plt.io.to_image(fig, format='png')), 'RGB')
    # img = Image.fromarray(np.array(plt.io.to_image(fig, format='png')), 'L')
    # img = Image.open(io.BytesIO(plt.io.to_image(fig, format='png', width=140, height=100)))
    # img = Image.open(io.BytesIO(plt.io.to_image(fig, format='png', width=700, height=500)))
    img = Image.open(io.BytesIO(plt.io.to_image(fig, format='png')))
    # print(img)
    img.convert("RGB")
    img.show()
    # print(img)
    img.thumbnail((i_h, i_w), Image.LANCZOS)
    # img.show()
    # print(img)
    # img = Image.Image(img, 'RGB')
    return img

def select_action(df, idx):
    print("idx{0:d} close:{1} closemin:{2} closemax:{3}".format(idx, df.loc[idx, "close"], df.loc[idx, "closemin"], df.loc[idx, "closemax"]))
    if df.loc[idx, "sma20deg"] >= 0 :
        action = 0 if df.loc[idx, "close"] <= df.loc[idx, "closemin"] else ((action_kind-1) if df.loc[idx, "close"] >= df.loc[idx, "closemax"] else 1)
    else:
        action = 0 if df.loc[idx, "close"] <= df.loc[idx, "closemin"] else ((action_kind-1) if df.loc[idx, "close"] >= df.loc[idx, "closemax"] else 2)
    return action

converter = torchvision.transforms.ToTensor()
charts = []
for idx in df.index:
    if idx < (start_idx + data_size):
        continue
    else:
        if idx % 50 == 0 :
            print("progress {0:.02f}".format(idx*100/df.index.max()))
        print("idx[{0:06d}]".format(idx))
        img = get_chart(df, idx, data_size, i_w = screen_width, i_h = screen_height)
        act = select_action(df, idx)
        print("act:{}".format(act))
        # act = 0 if  act < 0 else act
        # print("act:{}".format(act))
        img = converter(img).unsqueeze(0)
        # act = torch.tensor(act, dtype=torch.int32)
        act = torch.tensor(act, dtype=torch.float)
        charts.append([img,act])

joblib.dump(charts, "charts_{0:02d}_1.dmp".format(action_kind))

이 부분은 실제로 시간이 제일 오래 걸리는 부분입니다.

 

Jupyter Notebook이나 Vs code에 Notebook이 설치되어 있다면 Notebook으로 실행할 것을 권합니다.

 

실제 차트가 정상적으로 만들어 지고 있는지 확인할 필요가 있습니다.

 

그러나 Vs code의 Notebook의 경우 이미지 버퍼의 버그가 있어서 차트 50장 정도를 출력하고 나면 메모리 오류로 프로그램이 죽습니다.

 

실제 파일을 만들때는 아래 img.show()함수를 주석처리 하고 output출력도 최소화 하여 파일을 만들 필요가 있습니다.

joblib.dump는 torch.save와 같은 lib를 사용하여 시리얼라이즈를 하고 있습니다.

 

다만 joblib의 경우 python의 모든 변수를 저장하기는 하지만 한가지 값을 저장하는 용도라면 torch.save하는 것이 좋습니다.

torch.save의 경우는 deep learning model을 저장하는데 최적화 되어 있습니다.

상태를 dictionaly화 해서 저장하는 것이 틀립니다. 

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.init as init
import torchvision
import torch.nn.functional as F

# https://pytorch.org/docs/stable/torchvision/datasets.html
# 파이토치에서는 torchvision.datasets에 MNIST 등의 다양한 데이터를 사용하기 용이하게 정리해놨습니다.
# 이를 사용하면 데이터를 따로 학습에 맞게 정리하거나 하지 않아도 바로 사용이 가능합니다.
import torchvision.datasets as dset

# https://pytorch.org/docs/stable/torchvision/transforms.html?highlight=transforms
# torchvision.transforms에는 이미지 데이터를 자르거나 확대 및 다양하게 변형시키는 함수들이 구현되어 있습니다. 
import torchvision.transforms as transforms

# https://pytorch.org/docs/stable/data.html?highlight=dataloader#torch.utils.data.DataLoader
# DataLoader는 전처리가 끝난 데이터들을 지정한 배치 크기에 맞게 모아서 전달해주는 역할을 합니다.
from torch.utils.data import DataLoader
import pandas as pd

import numpy as np
import matplotlib.pyplot as pltx

import plotly.graph_objects as go
import plotly.subplots as ms
import plotly.express as px
import plotly as plt
import pymysql
import pandas as pd
import numpy as np
import time
import talib
from PIL import Image
import io
import os.path as path
import joblib
import common.constrant as Const

batch_size = 200
learning_rate = 0.001
num_epoch = 100
data_size = 100
action_kind = 4
screen_height = 50
screen_width  = 70
qsize = 2048

df = pd.read_csv("data_all.dat")

class DQN(nn.Module):
    def __init__(self, h, w, outputs, qsize):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(4, h*w, kernel_size=5, stride=2)
        self.bn1 = nn.BatchNorm2d(h*w)
        self.conv2 = nn.Conv2d(h*w, qsize, kernel_size=5, stride=2)
        self.bn2 = nn.BatchNorm2d(qsize)
        self.conv3 = nn.Conv2d(qsize, qsize, kernel_size=5, stride=2)
        # self.bn3 = nn.BatchNorm2d(qsize)

        linear_input_size = 3 * qsize
        # self.head = nn.Linear(linear_input_size, outputs)
        self.head = nn.Sequential(
            nn.Linear(linear_input_size, qsize),
            nn.ReLU(),
            nn.Linear(qsize, qsize),
            nn.ReLU(),
            nn.Linear(qsize, outputs)
        )

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        # print("x1------->", x.size())
        x = F.relu(self.bn2(self.conv2(x)))
        # print("x2------->", x.size())
        x = F.relu(self.bn3(self.conv3(x)))
        # print("x3------->", x.size())
        x = x.view(x.size(0), -1)
        # print("x4------->", x.size())
        x = self.head(x)
        # print("x5------->", x.size())
        return  x

def select_action(df, idx):
    print("idx{0:d} close:{1} closemin:{2} closemax:{3}".format(idx, df.loc[idx, "close"], df.loc[idx, "closemin"], df.loc[idx, "closemax"]))
    if df.loc[idx, "sma20deg"] >= 0 :
        action = 0 if df.loc[idx, "close"] <= df.loc[idx, "closemin"] else ((action_kind-1) if df.loc[idx, "close"] >= df.loc[idx, "closemax"] else 1)
    else:
        action = 0 if df.loc[idx, "close"] <= df.loc[idx, "closemin"] else ((action_kind-1) if df.loc[idx, "close"] >= df.loc[idx, "closemax"] else 2)
    return action

# gpu가 사용 가능한 경우에는 device를 gpu로 설정하고 불가능하면 cpu로 설정합니다.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

charts = []
charts = joblib.load("charts_{0:02d}_1.dmp".format(action_kind))
train_loader = DataLoader(charts,batch_size=batch_size, shuffle=True,num_workers=2,drop_last=True)
test_loader = DataLoader(charts,batch_size=batch_size, shuffle=False,num_workers=2,drop_last=True)

# 모델을 지정한 장치로 올립니다.
model = DQN(screen_height, screen_width, action_kind, qsize=qsize).to(device)
if path.exists("train_dqn_{0:02d}_{1}_{2:04d}.pt".format(action_kind, device, qsize)):
    model.load_state_dict(torch.load("train_dqn_{0:02d}_{1}_{2:04d}.pt".format(action_kind,device,qsize)))
    model.eval()
loss_func = nn.CrossEntropyLoss()

# 최적화함수로는 Adam을 사용합니다.
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
optimizer.zero_grad()

loss_arr =[]
model.train()
for i in range(num_epoch):
    for j,[image,label] in enumerate(train_loader):
        x = image.to(device).squeeze(1)
        y = label.type(torch.LongTensor).to(device)
        
        output = model.forward(x)
        loss = loss_func(output,y)
        loss.backward()
        optimizer.step()
        
        if j % 1000 == 0:
            print(loss.cpu().detach().numpy())
            loss_arr.append(loss.cpu().detach().numpy())
            torch.save(model.state_dict(),"train_dqn_{0:02d}_{1}_{2:04d}.pt".format(action_kind, device, qsize))

torch.save(model.state_dict(),"train_dqn_{0:02d}_{1}_{2:04d}.pt".format(action_kind, device, qsize))
# pltx.plot(loss_arr)
# pltx.show()

model.eval()
# 맞은 개수, 전체 개수를 저장할 변수를 지정합니다.
correct = 0
total = 0.000000000001

# 인퍼런스 모드를 위해 no_grad 해줍니다.
with torch.no_grad():
# 테스트로더에서 이미지와 정답을 불러옵니다.
    for image,label in test_loader:
        # 두 데이터 모두 장치에 올립니다.
        x = image.to(device).squeeze(1)
        y_= label.to(device)

        # 모델에 데이터를 넣고 결과값을 얻습니다.
        output = model.forward(x)
        
        # https://pytorch.org/docs/stable/torch.html?highlight=max#torch.max
        # torch.max를 이용해 최대 값 및 최대값 인덱스를 뽑아냅니다.
        # 여기서는 최대값은 필요없기 때문에 인덱스만 사용합니다.
        _,output_index = torch.max(output,1)
        
        # 전체 개수는 라벨의 개수로 더해줍니다.
        # 전체 개수를 알고 있음에도 이렇게 하는 이유는 batch_size, drop_last의 영향으로 몇몇 데이터가 잘릴수도 있기 때문입니다.
        total += label.size(0)
        
        # 모델의 결과의 최대값 인덱스와 라벨이 일치하는 개수를 correct에 더해줍니다.
        correct += (output_index == y_).sum().float()

    # 테스트 데이터 전체에 대해 위의 작업을 시행한 후 정확도를 구해줍니다.
    print("Accuracy of Test Data: {}%".format(100*correct/total))

 

이렇게 charts.dmp 파일을 여러개의 이름으로 만들고

학습함수에 넣어서 학습을 진행합니다.

 

자신이 가지고 있는 보드수 만큼만 프로세스를 뛰울 수 있으니 보드가 한장이면 위의 소스를 사용하시면 되고

혹시 여러장의 RTX보드를 가지고 있다면 model = nn.DataParallel(model, device_ids=[0,1]).to(device를 추가하는 것이 좋습니다.

(보드가 2장일 때, 보드가 3장일 경우는 [0,1]을 [0,1,2]로 바꾸면 됩니다.

단 보드는 같은 것을 사용하는 것이 좋습니다. 

보드0과 보드1의 gpu core 용량이 75%이상 차이가 나면 실행할 수 없습니다.

 

위치는 현재 소스의 모델 바로 아래 입니다.

단 이 경우 model.load_state_dict 함수를 model.module.load_state_dict함수로 변경을 해야 합니다.

지금 저장되어 있는 pt파일의 경우 단순 모델을 기준으로 저장되어 있으므로 다른 PC나 혹시 싱글로 predict를 할 경우는 싱글보드에 맞게 저장이 되어야 합니다.

 

마찬가지로 저장 또한

torch.save(model.state_dict(),"train_dqn_{0:02d}_{1}_{2:04d}.pt".format(action_kind, device, qsize))
로 저장할 경우 model.module.state_dict()의 값이 저장이 되어야 합니다.
 
그렇지 않으면 어렵게 저장된 charts.dmp파일을 활용할 수가 없습니다.
 
학습은 0.00001정도 까지 진행을 하면 predict 확률은 거의 100% 정도 나올 것입니다. 
 
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.init as init
import torchvision
import torch.nn.functional as F

# https://pytorch.org/docs/stable/torchvision/datasets.html
# 파이토치에서는 torchvision.datasets에 MNIST 등의 다양한 데이터를 사용하기 용이하게 정리해놨습니다.
# 이를 사용하면 데이터를 따로 학습에 맞게 정리하거나 하지 않아도 바로 사용이 가능합니다.
import torchvision.datasets as dset

# https://pytorch.org/docs/stable/torchvision/transforms.html?highlight=transforms
# torchvision.transforms에는 이미지 데이터를 자르거나 확대 및 다양하게 변형시키는 함수들이 구현되어 있습니다. 
import torchvision.transforms as transforms

# https://pytorch.org/docs/stable/data.html?highlight=dataloader#torch.utils.data.DataLoader
# DataLoader는 전처리가 끝난 데이터들을 지정한 배치 크기에 맞게 모아서 전달해주는 역할을 합니다.
from torch.utils.data import DataLoader
import pandas as pd

import numpy as np
import matplotlib.pyplot as pltx

import plotly.graph_objects as go
import plotly.subplots as ms
import plotly.express as px
import plotly as plt
import pymysql
import pandas as pd
import numpy as np
import time
import talib
from PIL import Image
import io
import os.path as path
import joblib
import common.constrant as Const

batch_size = 200
learning_rate = 0.001
num_epoch = 100
data_size = 100
action_kind = 5
screen_height = 50
screen_width  = 70
qsize = 2048

df = pd.read_csv("data_all.dat")

class Account():
    def __init__(self, df, origin) -> None:
        self.BASIC_FEES = 0.0005
        self.hold_score = 0
        self.byu_score = 0
        self.sell_score = 0        
        self.orgin_money = origin
        self.money = origin
        self.balance = origin
        self.unit  = 0
        self.buy_index = 0
        self.max_rate = 0
        self.df = df
        self.rate = 0
        self.old_rate = 0
        self.bak_unit = 0
        self.bak_balance = 0
    
    def reset(self):
        self.balance = self.orgin_money
        self.money = self.orgin_money
        self.old_rate = 0
        self.unit = 0
        self.rate = 0
        self.buy_index = 0
        
    def select(self, action:int): 
        ret_action = 0
        if self.unit > 0 :
            ret_action = 0 if action == 1 else action
        else:
            ret_action = 0 if action == 2 else action
        return ret_action
    
    def exec_action(self, action, idx):
        real_action = self.select(action)
        if real_action == 1:
            return self.unit_buy(idx), real_action
        elif real_action == 2:
            return self.unit_sell(idx), real_action
        else:
            return self.unit_hold(idx), real_action
    
    def unit_buy(self, index):
        self.back_up()
        buy_balance = self.balance
        self.buy_index = index
        self.old_rate = self.rate
        while True:
            buy_unit = buy_balance / self.df.loc[index, 'close']
            amount = (buy_unit * self.df.loc[index, 'close']) * self.BASIC_FEES + buy_unit * self.df.loc[index, 'close']
            if self.balance < amount :
                buy_balance -= self.balance * 0.0001
            else:
                self.balance -= amount
                self.unit = buy_unit
                self.rate = ((self.unit * self.df.loc[index, 'close'] + self.balance) - self.orgin_money) * 100 / self.orgin_money
                # if index%50 == 0 : print("buy index[{}]hold unit[{:,.6f}] remind money[{:,.6f}] rate[{:,.8f}] expected[{:,.8f}]".format(index, self.unit, self.balance, self.rate, (self.unit * self.df.loc[index, 'close'] + self.balance)))
                print("buy index[{}]hold unit[{:,.6f}] remind money[{:,.6f}] rate[{:,.8f}] expected[{:,.8f}]".format(index, self.unit, self.balance, self.rate, (self.unit * self.df.loc[index, 'close'] + self.balance)))
                break
        return self.rate
        
    def unit_sell(self, index):
        self.back_up()
        self.old_rate = self.rate
        sell_balance = self.unit * self.df.loc[index, 'close'] - (self.unit * self.df.loc[index, 'close']) * self.BASIC_FEES
        self.balance += sell_balance
        self.unit = 0
        self.rate = ((self.unit * self.df.loc[index, 'close'] + self.balance) - self.orgin_money) * 100 / self.orgin_money
        # if index%50 == 0 : print("sell index[{}]hold unit[{:,.6f}] remind money[{:,.6f}] rate[{:,.8f}] expected[{:,.8f}]".format(index, self.unit, self.balance, self.rate, (self.unit * self.df.loc[index, 'close'] + self.balance)))
        print("sell index[{}]hold unit[{:,.6f}] remind money[{:,.6f}] rate[{:,.8f}] expected[{:,.8f}]".format(index, self.unit, self.balance, self.rate, (self.unit * self.df.loc[index, 'close'] + self.balance)))
        self.money = self.balance
        self.max_rate = 0
        return self.rate
    
    def unit_hold(self, index):
        self.old_rate = self.rate
        self.rate = ((self.unit * self.df.loc[index, 'close'] + self.balance) - self.orgin_money) * 100 / self.orgin_money
        # print("index[{}]hold unit[{:,.6f}] remind money[{:,.6f}] rate[{:,.8f}] expected[{:,.8f}]".format(index, self.unit, self.balance, self.rate, (self.unit * self.df.loc[index, 'close'] + self.balance)))
        return self.rate
    
    def get_newaction(self, index):
        if self.unit > 0:
            hold_rate = ((self.unit * self.df.loc[index, "sma3"] + self.balance) - self.orgin_money) * 100 / self.orgin_money
            sell_rate = ((self.unit * self.df.loc[index, "close"] + self.balance) - self.orgin_money) * 100 / self.orgin_money
            return 0 if hold_rate > sell_rate else 2
        else:
            buy_balance = self.balance
            self.buy_index = index
            self.old_rate = self.rate
            while True:
                buy_unit = buy_balance / self.df.loc[index, 'close']
                amount = (buy_unit * self.df.loc[index, 'close']) * self.BASIC_FEES + buy_unit * self.df.loc[index, 'close']
                if self.balance < amount :
                    buy_balance -= self.balance * 0.0001
                else:
                    self.balance -= amount
                    self.unit = buy_unit
                    buy_rate = ((self.unit * self.df.loc[index, 'close'] + self.balance) - self.orgin_money) * 100 / self.orgin_money
                    break
            hold_rate = (self.balance - self.orgin_money) * 100 / self.orgin_money
            return 0 if hold_rate > buy_rate else 1
        
    def back_up(self):
        self.bak_balance = self.balance
        self.bak_unit = self.unit

    def  back_ward(self):
        self.balance = self.bak_balance 
        self.unit = self.bak_unit
                
    def is_bankrupt(self):
        return (self.money < 10000)

start_idx = df.index[0]
print("start_idx[{}]".format(start_idx))
def get_chart(df, idx, max_data:int=300, i_w:int=140, i_h:int=100):
    ndf = df.head(idx - start_idx).tail(max_data)
    ndf.reset_index(drop=True, inplace=True)
    
    candle = go.Candlestick(x=ndf.index,open=ndf['open'],high=ndf['high'],low=ndf['low'],close=ndf['close'], increasing_line_color = 'red',decreasing_line_color = 'blue', showlegend=False)
    upper = go.Scatter(x=ndf.index, y=ndf['upper'], line=dict(color='red', width=2), name='upper', showlegend=False)
    ma20 = go.Scatter(x=ndf.index, y=ndf['ma20'], line=dict(color='black', width=2), name='ma20', showlegend=False)
    lower = go.Scatter(x=ndf.index, y=ndf['lower'], line=dict(color='blue', width=2), name='lower', showlegend=False)

    volume = go.Bar(x=ndf.index, y=ndf['volume'], marker_color='red', name='volume', showlegend=False)

    MACD = go.Scatter(x=ndf.index, y=ndf['macd'], line=dict(color='blue', width=2), name='MACD', legendgroup='group2', legendgrouptitle_text='MACD')
    MACD_Signal = go.Scatter(x=ndf.index, y=ndf['signal'], line=dict(dash='dashdot', color='green', width=2), name='MACD_Signal')
    MACD_Oscil = go.Bar(x=ndf.index, y=ndf['flag'], marker_color='purple', name='MACD_Oscil')

    fast_k = go.Scatter(x=ndf.index, y=ndf['fast_k'], line=dict(color='skyblue', width=2), name='fast_k', legendgroup='group3', legendgrouptitle_text='%K %D')
    slow_d = go.Scatter(x=ndf.index, y=ndf['slow_d'], line=dict(dash='dashdot', color='black', width=2), name='slow_d')

    PB = go.Scatter(x=ndf.index, y=ndf['PB']*100, line=dict(color='blue', width=2), name='PB', legendgroup='group4', legendgrouptitle_text='PB, MFI')
    MFI10 = go.Scatter(x=ndf.index, y=ndf['MFI10'], line=dict(dash='dashdot', color='green', width=2), name='MFI10')

    RSI = go.Scatter(x=ndf.index, y=ndf['rsi14'], line=dict(color='red', width=2), name='RSI', legendgroup='group5', legendgrouptitle_text='RSI')
    
    # 스타일
    fig = ms.make_subplots(rows=5, cols=2, specs=[[{'rowspan':4},{}],[None,{}],[None,{}],[None,{}],[{},{}]], shared_xaxes=True, horizontal_spacing=0.03, vertical_spacing=0.01)

    fig.add_trace(candle,row=1,col=1)
    fig.add_trace(upper,row=1,col=1)
    fig.add_trace(ma20,row=1,col=1)
    fig.add_trace(lower,row=1,col=1)

    fig.add_trace(volume,row=5,col=1)

    fig.add_trace(candle,row=1,col=2)
    fig.add_trace(upper,row=1,col=2)
    fig.add_trace(ma20,row=1,col=2)
    fig.add_trace(lower,row=1,col=2)

    fig.add_trace(MACD,row=2,col=2)
    fig.add_trace(MACD_Signal,row=2,col=2)
    fig.add_trace(MACD_Oscil,row=2,col=2)

    fig.add_trace(fast_k,row=3,col=2)
    fig.add_trace(slow_d,row=3,col=2)

    fig.add_trace(PB,row=4,col=2)
    fig.add_trace(MFI10,row=4,col=2)

    fig.add_trace(RSI,row=5,col=2)

    # 추세추종
    # trend_fol = 0
    # trend_refol = 0
    # for i in ndf.index:
    #     if ndf['PB'][i] > 0.8 and ndf['MFI10'][i] > 80:
    #         trend_fol = go.Scatter(x=[ndf.index[i]], y=[ndf['close'][i]], marker_color='orange', marker_size=20, marker_symbol='triangle-up', opacity=0.7, showlegend=False)
    #         fig.add_trace(trend_fol,row=1,col=1)
    #     elif ndf['PB'][i] < 0.2 and ndf['MFI10'][i] < 20:
    #         trend_fol = go.Scatter(x=[ndf.index[i]], y=[ndf['close'][i]], marker_color='darkblue', marker_size=20, marker_symbol='triangle-down', opacity=0.7, showlegend=False)
    #         fig.add_trace(trend_fol,row=1,col=1)

    # 역추세추종
    # for i in ndf.index:
    #     if ndf['PB'][i] < 0.05 and ndf['IIP21'][i] > 0:
    #         trend_refol = go.Scatter(x=[ndf.index[i]], y=[ndf['close'][i]], marker_color='purple', marker_size=20, marker_symbol='triangle-up', opacity=0.7, showlegend=False)  #보라
    #         fig.add_trace(trend_refol,row=1,col=1)
    #     elif df['PB'][i] > 0.95 and ndf['IIP21'][i] < 0:
    #         trend_refol = go.Scatter(x=[ndf.index[i]], y=[ndf['close'][i]], marker_color='skyblue', marker_size=20, marker_symbol='triangle-down', opacity=0.7, showlegend=False)  #하늘
    #         fig.add_trace(trend_refol,row=1,col=1)    

    # fig.add_trace(trend_fol,row=1,col=1)
    # 추세추총전략을 통해 캔들차트에 표시합니다.

    # fig.add_trace(trend_refol,row=1,col=1)
    # 역추세 전략을 통해 캔들차트에 표시합니다.
    
    # fig.update_layout(autosize=True, xaxis1_rangeslider_visible=False, xaxis2_rangeslider_visible=False, margin=dict(l=50,r=50,t=50,b=50), template='seaborn', title=f'({ticker})의 날짜: ETH [추세추종전략:오↑파↓] [역추세전략:보↑하↓]')
    # fig.update_xaxes(tickformat='%y년%m월%d일', zeroline=True, zerolinewidth=1, zerolinecolor='black', showgrid=True, gridwidth=2, gridcolor='lightgray', showline=True,linewidth=2, linecolor='black', mirror=True)
    # fig.update_yaxes(tickformat=',d', zeroline=True, zerolinewidth=1, zerolinecolor='black', showgrid=True, gridwidth=2, gridcolor='lightgray',showline=True,linewidth=2, linecolor='black', mirror=True)
    # fig.update_traces(xhoverformat='%y년%m월%d일')
    # size = len(img)
    # img = plt.io.to_image(fig, format='png')
    # canvas = FigureCanvasBase(fig)
    # img = Image.frombytes(mode='RGB', size=(700, 500), data=fig.to_image().to, decoder_name='raw')
    # img = Image.frombytes('RGBA', (700, 500), plt.io.to_image(fig, format='png'), 'raw')
    # img = Image.fromarray('RGBA', (700, 500), np.array(plt.io.to_image(fig, format='png')), 'raw')
    # img = Image.fromarray(np.array(plt.io.to_image(fig, format='png')), 'RGB')
    # img = Image.fromarray(np.array(plt.io.to_image(fig, format='png')), 'L')
    # img = Image.open(io.BytesIO(plt.io.to_image(fig, format='png', width=140, height=100)))
    # img = Image.open(io.BytesIO(plt.io.to_image(fig, format='png', width=700, height=500)))
    img = Image.open(io.BytesIO(plt.io.to_image(fig, format='png')))
    # print(img)
    img.convert("RGB")
    # print(img)
    img.thumbnail((i_h, i_w), Image.LANCZOS)
    # print(img)
    # img = Image.Image(img, 'RGB')
    # img.show()
    return img

class DQN(nn.Module):
    def __init__(self, h, w, outputs, qsize):
        super(DQN, self).__init__()
        self.conv1 = nn.Sequential(
            nn.ReLU(),
            nn.Conv2d(4, h*w, kernel_size=5, stride=2),
            nn.BatchNorm2d(h*w)
        )
        self.conv2 = nn.Sequential(
            nn.ReLU(),
            nn.Conv2d(h*w, qsize, kernel_size=5, stride=2),
            nn.BatchNorm2d(qsize),
            # nn.MaxPool2d(kernel_size=2,stride=2)
        )
        self.conv3 = nn.Sequential(
            nn.ReLU(),
            nn.Conv2d(qsize, qsize, kernel_size=5, stride=2),
            nn.BatchNorm2d(qsize),
            nn.Softmax()
            # nn.MaxPool2d(kernel_size=2,stride=2)
        )
        # self.mp3 = nn.MaxPool2d(kernel_size=2,stride=2)                                
        # # [batch_size,64,6,6] -> [batch_size,64,3,3]

        linear_input_size = 3 * qsize
        self.head = nn.Linear(linear_input_size, outputs)
        # self.head = nn.Sequential(
        #     nn.Linear(linear_input_size, qsize),
        #     nn.ReLU(),
        #     nn.Linear(qsize, qsize),
        #     nn.ReLU(),
        #     nn.Linear(qsize, outputs)
        # )

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = self.conv1(x)
        print("x1------->", x.size())
        x = self.conv2(x)
        print("x2------->", x.size())
        x = self.conv3(x)
        print("x3------->", x.size())
        x = x.view(x.size(0), -1)
        print("x4------->", x.size())
        x = self.head(x)
        print("x5------->", x.size())
        print(x)
        x = torch.argmax(x, dim= -1)
        print(x)
        print("x6------->", x.size())
        return  x

def select_action(df, idx):
    print("idx{0:d} close:{1} closemin:{2} closemax:{3}".format(idx, df.loc[idx, "close"], df.loc[idx, "closemin"], df.loc[idx, "closemax"]))
    if df.loc[idx, "sma20deg"] >= 0 :
        action = 0 if df.loc[idx, "close"] <= df.loc[idx, "closemin"] else ((action_kind-1) if df.loc[idx, "close"] >= df.loc[idx, "closemax"] else 1)
    else:
        action = 0 if df.loc[idx, "close"] <= df.loc[idx, "closemin"] else ((action_kind-1) if df.loc[idx, "close"] >= df.loc[idx, "closemax"] else 2)
    return action

# gpu가 사용 가능한 경우에는 device를 gpu로 설정하고 불가능하면 cpu로 설정합니다.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
account = Account(df, 50000000)
account.reset()

converter = torchvision.transforms.ToTensor()

# 모델을 지정한 장치로 올립니다.
model = DQN(screen_height, screen_width, action_kind, qsize=qsize).to(device)
if path.exist("train_dqn_{0:02d}_{1}_{2:04d}.pt".format(action_kind, device, qsize)):
    torch.load(model.state_dict(), "train_dqn_{0:02d}_{1}_{2:04d}.pt".format(action_kind,device,qsize))
model.eval()

with torch.no_grad():
    tot = 0.0000000000001
    corr = 0.0
    predict_arr = []
    for idy in df.index:
        x = get_chart(df, idy, data_size, i_w = screen_width, i_h = screen_height)
        x = converter(x).unsqueeze(0).to(device).squeeze(1)

        output = model.forward(x)

        _,action = torch.max(output,1)
        # print("action:", action)
        action = action.cpu().numpy()[0]
        # print("action:", action)
        predict_arr[action] += 1
        sel_act = select_action(df, idy)
        tot += 1 if sel_act in [0, action_kind-1] or action in [0, action_kind-1] else 0
        corr += 1 if (sel_act in [0, action_kind-1] or action in [0, action_kind-1]) and sel_act == action else 0
        reward, real_action = account.exec_action(2 if action == (action_kind - 1) else (1 if action == 0 else 0) , idy)
        if idy % 100 == 0:
            print("progress {0:.2f}".format(idy * 100 / df.index.max()))
            print("idy:%d action[%d:%d] price [%.4f] unit[%.4f] agent rate:%.05f remind money:%.02f accuracy:%.02f" 
                % (idy, sel_act, action, df.loc[idy, 'close'], account.unit, account.rate, account.balance + account.unit * df.loc[idy, 'close'], 100*corr/tot))
        print("prediction count")
        for act in predict_arr:
            print("action{0:02d}  ==>  {1:d}".format(act, predict_arr[act]))

실제 데이터를 이용하여 정확도를 재측정합니다.

 

실제 시스템에 적용할지 말지는 본인이 판단해야 할 것 같습니다. 

 

모델이 잘 작동하지 않거나 차수가 다르게 나오는 등의 문제가 발생할 수 있습니다. 

 

댓글로 문의 주시면 상세히 답변 드리겠습니다. 

 

Good Luck!!!

 
반응형
LIST

+ Recent posts