전체 학습을 한 파일로 돌리면 많은 경우 5일 이상의 시간이 걸립니다.
이런 부분을 Joblib lib를 이용하여 오래 걸리는 부분을 작은 단위로 분리하여 학습을 작은 단위별로 진행할 수 있게 분리했습니다.
다시 해야할 부분만 진행하여 학습을 강화하다면 효율적일 것 같습니다.
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.init as init
import torchvision
import torch.nn.functional as F
# https://pytorch.org/docs/stable/torchvision/datasets.html
# 파이토치에서는 torchvision.datasets에 MNIST 등의 다양한 데이터를 사용하기 용이하게 정리해놨습니다.
# 이를 사용하면 데이터를 따로 학습에 맞게 정리하거나 하지 않아도 바로 사용이 가능합니다.
import torchvision.datasets as dset
# https://pytorch.org/docs/stable/torchvision/transforms.html?highlight=transforms
# torchvision.transforms에는 이미지 데이터를 자르거나 확대 및 다양하게 변형시키는 함수들이 구현되어 있습니다.
import torchvision.transforms as transforms
# https://pytorch.org/docs/stable/data.html?highlight=dataloader#torch.utils.data.DataLoader
# DataLoader는 전처리가 끝난 데이터들을 지정한 배치 크기에 맞게 모아서 전달해주는 역할을 합니다.
from torch.utils.data import DataLoader
import pandas as pd
import numpy as np
import matplotlib.pyplot as pltx
import plotly.graph_objects as go
import plotly.subplots as ms
import plotly.express as px
import plotly as plt
import pymysql
import pandas as pd
import numpy as np
import time
import talib
from PIL import Image
import io
import os.path as path
import joblib
import common.constrant as Const
import math
ticker = 'ETH'
class Market():
def __init__(self) -> None:
self.df = pd.DataFrame()
def add_bbands(self):
try:
print("볼리져 밴드 구하기:%s" % time.ctime(time.time()))
self.df['ma20'] = talib.SMA(np.asarray(self.df['close']), 20)
self.df['stddev'] = self.df['close'].rolling(window=20).std() # 20일 이동표준편차
upper, middle, lower = talib.BBANDS(np.asarray(self.df['close']), timeperiod=40, nbdevup=2.3, nbdevdn=2.3, matype=0)
self.df['lower']=lower
self.df['middle']=middle
self.df['upper']=upper
# self.df['bbands_sub']=self.df.upper - self.df.lower
# self.df['bbands_submax'] = self.df.bbands_sub.rolling(window=840, min_periods=10).max()
# self.df['bbands_submin'] = self.df.bbands_sub.rolling(window=840, min_periods=10).min()
# self.df['bbs_deg'] = (self.df.mavg - self.df.mavg.shift()).apply(lambda x: math.degrees(math.atan2(x, 40)))
# self.df['pct8']=(self.df.close - self.df.lower)/(self.df.upper - self.df.lower)
# self.df['pct8vsclose']=self.df.close - self.df.pct8
except Exception as ex:
print("`add_bbands -> exception! %s `" % str(ex))
def add_relative_strength(self):
try:
print("상대 강도 지수 구하기:%s" % time.ctime(time.time()))
rsi14 = talib.RSI(np.asarray(self.df['close']), 14)
self.df['rsi14'] = rsi14
except Exception as ex:
print("`add_relative_strength -> exception! %s `" % str(ex))
def add_iip(self):
try:
#역추세전략을 위한 IIP계산
self.df['II'] = (2*self.df['close']-self.df['high']-self.df['low'])/(self.df['high']-self.df['low'])*self.df['volume']
self.df['IIP21'] = self.df['II'].rolling(window=21).sum()/self.df['volume'].rolling(window=21).sum()*100
except Exception as ex:
print("`add_iip -> exception! %s `" % str(ex))
def add_stock_cast(self):
try:
# 스토캐스틱 구하기
self.df['ndays_high'] = self.df['high'].rolling(window=14, min_periods=1).max() # 14일 중 최고가
self.df['ndays_low'] = self.df['low'].rolling(window=14, min_periods=1).min() # 14일 중 최저가
self.df['fast_k'] = (self.df['close'] - self.df['ndays_low']) / (self.df['ndays_high'] - self.df['ndays_low']) * 100 # Fast %K 구하기
self.df['slow_d'] = self.df['fast_k'].rolling(window=3).mean() # Slow %D 구하기 except Exception as ex:
except Exception as ex:
print("`add_stock_cast -> exception! %s `" % str(ex))
def add_mfi(self):
try:
# MFI 구하기
self.df['PB'] = (self.df['close'] - self.df['lower']) / (self.df['upper'] - self.df['lower'])
self.df['TP'] = (self.df['high'] + self.df['low'] + self.df['close']) / 3
self.df['PMF'] = 0
self.df['NMF'] = 0
for i in range(len(self.df.close)-1):
if self.df.TP.values[i] < self.df.TP.values[i+1]:
self.df.PMF.values[i+1] = self.df.TP.values[i+1] * self.df.volume.values[i+1]
self.df.NMF.values[i+1] = 0
else:
self.df.NMF.values[i+1] = self.df.TP.values[i+1] * self.df.volume.values[i+1]
self.df.PMF.values[i+1] = 0
self.df['MFR'] = (self.df.PMF.rolling(window=10).sum() / self.df.NMF.rolling(window=10).sum())
self.df['MFI10'] = 100 - 100 / (1 + self.df['MFR'])
except Exception as ex:
print("`add_mfi -> exception! %s `" % str(ex))
def add_macd(self, sort, long, sig):
try:
print("MACD 구하기:%s, sort:%d, long:%d, sig:%d" % (time.ctime(time.time()), sort, long, sig))
macd, macdsignal, macdhist = talib.MACD(np.asarray(self.df['close']), sort, long, sig)
self.df['macd'] = macd
# self.df['macdmax'] = self.df.macd.rolling(window=840, min_periods=100).max()
# self.df['macdmin'] = self.df.macd.rolling(window=840, min_periods=100).min()
# self.df['macd_max_rate'] = self.df.apply(lambda x: (x['macdmax'] - x['macd']) * 100 / (x['macdmax'] - x['macdmin']), axis=1)
# self.df['macd_min_rate'] = self.df.apply(lambda x: (x['macd'] - x['macdmin']) * 100 / (x['macdmax'] - x['macdmin']), axis=1)
# self.df['prv_macd_degrees'] = self.df.macd_degrees.shift()
self.df['signal'] = macdsignal
self.df['flag'] = macdhist
# self.df['prv_osc_degrees'] = self.df.osc_degrees.shift()
except Exception as ex:
print("`add_macd -> exception! %s `" % str(ex))
def get_data(self):
table_name="TB_ETH_TRADE"
sqlcon = pymysql.connect(host='192.168.255.59', user='yuxxxx', password='------', db='eth')
cursor = sqlcon.cursor(pymysql.cursors.DictCursor)
str_query = """select
'time'
,`start` as open
, high
, low
, close
, volume
, macd
, macdmax
, macdmin
, `signal`
, `flag`
, osc_degrees
, sma1200
, sma1200_degrees
, wma1200
, wma1200_degrees
from %s""" % table_name
cursor.execute(str_query)
data = cursor.fetchall()
self.df = pd.DataFrame(data)
sqlcon.close()
self.df['sma3'] = talib.SMA(np.asarray(self.df['close']), 3)
self.df['sma20'] = talib.SMA(np.asarray(self.df['close']), 20)
self.df['sma20deg'] = (self.df.sma20 - self.df.sma20.shift()).apply(lambda x: math.degrees(math.atan2(x, Const.DEGREES_X)))
self.df['closemin'] = self.df.apply(lambda x: x['close'] if x['sma20deg'] >= 0 else x['high'], axis = 1).rolling(window=100, center=True).min()
self.df['closemax'] = self.df.apply(lambda x: x['close'] if x['sma20deg'] <= 0 else 0, axis = 1).rolling(window=100, center=True).max()
self.add_macd(12,26,9)
self.add_bbands()
self.add_relative_strength()
self.add_stock_cast()
self.add_iip()
self.add_mfi()
# self.df.dropna(inplace=True)
self.df.reset_index(drop=True,inplace=True)
return self.df
batch_size = 200
learning_rate = 0.001
num_epoch = 100
data_size = 100
action_kind = 5
screen_height = 50
screen_width = 70
qsize = 2048
mk = Market()
df = mk.get_data()
df = df.tail((df.index.max()//batch_size)*batch_size)
df.to_csv("data_all.dat")
이 부분은 데이터를 읽어 드리는 부분인데 이미 DB에 저장된 데이터를 읽어 오므로 1~2분 정도면 끝납니다. 그래서 데이터를 data_all.dat파일에 저장합니다.
사실 이 파일만 있으면 모든게 해결되는 거죠.
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.init as init
import torchvision
import torch.nn.functional as F
# https://pytorch.org/docs/stable/torchvision/datasets.html
# 파이토치에서는 torchvision.datasets에 MNIST 등의 다양한 데이터를 사용하기 용이하게 정리해놨습니다.
# 이를 사용하면 데이터를 따로 학습에 맞게 정리하거나 하지 않아도 바로 사용이 가능합니다.
import torchvision.datasets as dset
# https://pytorch.org/docs/stable/torchvision/transforms.html?highlight=transforms
# torchvision.transforms에는 이미지 데이터를 자르거나 확대 및 다양하게 변형시키는 함수들이 구현되어 있습니다.
import torchvision.transforms as transforms
# https://pytorch.org/docs/stable/data.html?highlight=dataloader#torch.utils.data.DataLoader
# DataLoader는 전처리가 끝난 데이터들을 지정한 배치 크기에 맞게 모아서 전달해주는 역할을 합니다.
from torch.utils.data import DataLoader
import pandas as pd
import numpy as np
import matplotlib.pyplot as pltx
import plotly.graph_objects as go
import plotly.subplots as ms
import plotly.express as px
import plotly as plt
import pymysql
import pandas as pd
import numpy as np
import time
import talib
from PIL import Image
import io
import os.path as path
import joblib
# import common.constrant as Const
batch_size = 200
learning_rate = 0.001
num_epoch = 100
data_size = 100
action_kind = 4
screen_height = 50
screen_width = 70
qsize = 2048
df = pd.read_csv("data_all.dat")
df = df.tail(100000)
start_idx = df.index[0]
print("start_idx[{}]".format(start_idx))
def get_chart(df, idx, max_data:int=300, i_w:int=140, i_h:int=100):
ndf = df.head(idx - start_idx).tail(max_data)
ndf.reset_index(drop=True, inplace=True)
print(ndf)
candle = go.Candlestick(x=ndf.index,open=ndf['open'],high=ndf['high'],low=ndf['low'],close=ndf['close'], increasing_line_color = 'red',decreasing_line_color = 'blue', showlegend=False)
upper = go.Scatter(x=ndf.index, y=ndf['upper'], line=dict(color='red', width=2), name='upper', showlegend=False)
ma20 = go.Scatter(x=ndf.index, y=ndf['ma20'], line=dict(color='black', width=2), name='ma20', showlegend=False)
lower = go.Scatter(x=ndf.index, y=ndf['lower'], line=dict(color='blue', width=2), name='lower', showlegend=False)
volume = go.Bar(x=ndf.index, y=ndf['volume'], marker_color='red', name='volume', showlegend=False)
MACD = go.Scatter(x=ndf.index, y=ndf['macd'], line=dict(color='blue', width=2), name='MACD', legendgroup='group2', legendgrouptitle_text='MACD')
MACD_Signal = go.Scatter(x=ndf.index, y=ndf['signal'], line=dict(dash='dashdot', color='green', width=2), name='MACD_Signal')
MACD_Oscil = go.Bar(x=ndf.index, y=ndf['flag'], marker_color='purple', name='MACD_Oscil')
fast_k = go.Scatter(x=ndf.index, y=ndf['fast_k'], line=dict(color='skyblue', width=2), name='fast_k', legendgroup='group3', legendgrouptitle_text='%K %D')
slow_d = go.Scatter(x=ndf.index, y=ndf['slow_d'], line=dict(dash='dashdot', color='black', width=2), name='slow_d')
PB = go.Scatter(x=ndf.index, y=ndf['PB']*100, line=dict(color='blue', width=2), name='PB', legendgroup='group4', legendgrouptitle_text='PB, MFI')
MFI10 = go.Scatter(x=ndf.index, y=ndf['MFI10'], line=dict(dash='dashdot', color='green', width=2), name='MFI10')
RSI = go.Scatter(x=ndf.index, y=ndf['rsi14'], line=dict(color='red', width=2), name='RSI', legendgroup='group5', legendgrouptitle_text='RSI')
# 스타일
fig = ms.make_subplots(rows=5, cols=2, specs=[[{'rowspan':4},{}],[None,{}],[None,{}],[None,{}],[{},{}]], shared_xaxes=True, horizontal_spacing=0.03, vertical_spacing=0.01)
fig.add_trace(candle,row=1,col=1)
fig.add_trace(upper,row=1,col=1)
fig.add_trace(ma20,row=1,col=1)
fig.add_trace(lower,row=1,col=1)
fig.add_trace(volume,row=5,col=1)
fig.add_trace(candle,row=1,col=2)
fig.add_trace(upper,row=1,col=2)
fig.add_trace(ma20,row=1,col=2)
fig.add_trace(lower,row=1,col=2)
fig.add_trace(MACD,row=2,col=2)
fig.add_trace(MACD_Signal,row=2,col=2)
fig.add_trace(MACD_Oscil,row=2,col=2)
fig.add_trace(fast_k,row=3,col=2)
fig.add_trace(slow_d,row=3,col=2)
fig.add_trace(PB,row=4,col=2)
fig.add_trace(MFI10,row=4,col=2)
fig.add_trace(RSI,row=5,col=2)
# 추세추종
# trend_fol = 0
# trend_refol = 0
# for i in ndf.index:
# if ndf['PB'][i] > 0.8 and ndf['MFI10'][i] > 80:
# trend_fol = go.Scatter(x=[ndf.index[i]], y=[ndf['close'][i]], marker_color='orange', marker_size=20, marker_symbol='triangle-up', opacity=0.7, showlegend=False)
# fig.add_trace(trend_fol,row=1,col=1)
# elif ndf['PB'][i] < 0.2 and ndf['MFI10'][i] < 20:
# trend_fol = go.Scatter(x=[ndf.index[i]], y=[ndf['close'][i]], marker_color='darkblue', marker_size=20, marker_symbol='triangle-down', opacity=0.7, showlegend=False)
# fig.add_trace(trend_fol,row=1,col=1)
# 역추세추종
# for i in ndf.index:
# if ndf['PB'][i] < 0.05 and ndf['IIP21'][i] > 0:
# trend_refol = go.Scatter(x=[ndf.index[i]], y=[ndf['close'][i]], marker_color='purple', marker_size=20, marker_symbol='triangle-up', opacity=0.7, showlegend=False) #보라
# fig.add_trace(trend_refol,row=1,col=1)
# elif df['PB'][i] > 0.95 and ndf['IIP21'][i] < 0:
# trend_refol = go.Scatter(x=[ndf.index[i]], y=[ndf['close'][i]], marker_color='skyblue', marker_size=20, marker_symbol='triangle-down', opacity=0.7, showlegend=False) #하늘
# fig.add_trace(trend_refol,row=1,col=1)
# fig.add_trace(trend_fol,row=1,col=1)
# 추세추총전략을 통해 캔들차트에 표시합니다.
# fig.add_trace(trend_refol,row=1,col=1)
# 역추세 전략을 통해 캔들차트에 표시합니다.
# fig.update_layout(autosize=True, xaxis1_rangeslider_visible=False, xaxis2_rangeslider_visible=False, margin=dict(l=50,r=50,t=50,b=50), template='seaborn', title=f'({ticker})의 날짜: ETH [추세추종전략:오↑파↓] [역추세전략:보↑하↓]')
# fig.update_xaxes(tickformat='%y년%m월%d일', zeroline=True, zerolinewidth=1, zerolinecolor='black', showgrid=True, gridwidth=2, gridcolor='lightgray', showline=True,linewidth=2, linecolor='black', mirror=True)
# fig.update_yaxes(tickformat=',d', zeroline=True, zerolinewidth=1, zerolinecolor='black', showgrid=True, gridwidth=2, gridcolor='lightgray',showline=True,linewidth=2, linecolor='black', mirror=True)
# fig.update_traces(xhoverformat='%y년%m월%d일')
# size = len(img)
# img = plt.io.to_image(fig, format='png')
# canvas = FigureCanvasBase(fig)
# img = Image.frombytes(mode='RGB', size=(700, 500), data=fig.to_image().to, decoder_name='raw')
# img = Image.frombytes('RGBA', (700, 500), plt.io.to_image(fig, format='png'), 'raw')
# img = Image.fromarray('RGBA', (700, 500), np.array(plt.io.to_image(fig, format='png')), 'raw')
# img = Image.fromarray(np.array(plt.io.to_image(fig, format='png')), 'RGB')
# img = Image.fromarray(np.array(plt.io.to_image(fig, format='png')), 'L')
# img = Image.open(io.BytesIO(plt.io.to_image(fig, format='png', width=140, height=100)))
# img = Image.open(io.BytesIO(plt.io.to_image(fig, format='png', width=700, height=500)))
img = Image.open(io.BytesIO(plt.io.to_image(fig, format='png')))
# print(img)
img.convert("RGB")
img.show()
# print(img)
img.thumbnail((i_h, i_w), Image.LANCZOS)
# img.show()
# print(img)
# img = Image.Image(img, 'RGB')
return img
def select_action(df, idx):
print("idx{0:d} close:{1} closemin:{2} closemax:{3}".format(idx, df.loc[idx, "close"], df.loc[idx, "closemin"], df.loc[idx, "closemax"]))
if df.loc[idx, "sma20deg"] >= 0 :
action = 0 if df.loc[idx, "close"] <= df.loc[idx, "closemin"] else ((action_kind-1) if df.loc[idx, "close"] >= df.loc[idx, "closemax"] else 1)
else:
action = 0 if df.loc[idx, "close"] <= df.loc[idx, "closemin"] else ((action_kind-1) if df.loc[idx, "close"] >= df.loc[idx, "closemax"] else 2)
return action
converter = torchvision.transforms.ToTensor()
charts = []
for idx in df.index:
if idx < (start_idx + data_size):
continue
else:
if idx % 50 == 0 :
print("progress {0:.02f}".format(idx*100/df.index.max()))
print("idx[{0:06d}]".format(idx))
img = get_chart(df, idx, data_size, i_w = screen_width, i_h = screen_height)
act = select_action(df, idx)
print("act:{}".format(act))
# act = 0 if act < 0 else act
# print("act:{}".format(act))
img = converter(img).unsqueeze(0)
# act = torch.tensor(act, dtype=torch.int32)
act = torch.tensor(act, dtype=torch.float)
charts.append([img,act])
joblib.dump(charts, "charts_{0:02d}_1.dmp".format(action_kind))
이 부분은 실제로 시간이 제일 오래 걸리는 부분입니다.
Jupyter Notebook이나 Vs code에 Notebook이 설치되어 있다면 Notebook으로 실행할 것을 권합니다.
실제 차트가 정상적으로 만들어 지고 있는지 확인할 필요가 있습니다.
그러나 Vs code의 Notebook의 경우 이미지 버퍼의 버그가 있어서 차트 50장 정도를 출력하고 나면 메모리 오류로 프로그램이 죽습니다.
실제 파일을 만들때는 아래 img.show()함수를 주석처리 하고 output출력도 최소화 하여 파일을 만들 필요가 있습니다.
joblib.dump는 torch.save와 같은 lib를 사용하여 시리얼라이즈를 하고 있습니다.
다만 joblib의 경우 python의 모든 변수를 저장하기는 하지만 한가지 값을 저장하는 용도라면 torch.save하는 것이 좋습니다.
torch.save의 경우는 deep learning model을 저장하는데 최적화 되어 있습니다.
상태를 dictionaly화 해서 저장하는 것이 틀립니다.
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.init as init
import torchvision
import torch.nn.functional as F
# https://pytorch.org/docs/stable/torchvision/datasets.html
# 파이토치에서는 torchvision.datasets에 MNIST 등의 다양한 데이터를 사용하기 용이하게 정리해놨습니다.
# 이를 사용하면 데이터를 따로 학습에 맞게 정리하거나 하지 않아도 바로 사용이 가능합니다.
import torchvision.datasets as dset
# https://pytorch.org/docs/stable/torchvision/transforms.html?highlight=transforms
# torchvision.transforms에는 이미지 데이터를 자르거나 확대 및 다양하게 변형시키는 함수들이 구현되어 있습니다.
import torchvision.transforms as transforms
# https://pytorch.org/docs/stable/data.html?highlight=dataloader#torch.utils.data.DataLoader
# DataLoader는 전처리가 끝난 데이터들을 지정한 배치 크기에 맞게 모아서 전달해주는 역할을 합니다.
from torch.utils.data import DataLoader
import pandas as pd
import numpy as np
import matplotlib.pyplot as pltx
import plotly.graph_objects as go
import plotly.subplots as ms
import plotly.express as px
import plotly as plt
import pymysql
import pandas as pd
import numpy as np
import time
import talib
from PIL import Image
import io
import os.path as path
import joblib
import common.constrant as Const
batch_size = 200
learning_rate = 0.001
num_epoch = 100
data_size = 100
action_kind = 4
screen_height = 50
screen_width = 70
qsize = 2048
df = pd.read_csv("data_all.dat")
class DQN(nn.Module):
def __init__(self, h, w, outputs, qsize):
super(DQN, self).__init__()
self.conv1 = nn.Conv2d(4, h*w, kernel_size=5, stride=2)
self.bn1 = nn.BatchNorm2d(h*w)
self.conv2 = nn.Conv2d(h*w, qsize, kernel_size=5, stride=2)
self.bn2 = nn.BatchNorm2d(qsize)
self.conv3 = nn.Conv2d(qsize, qsize, kernel_size=5, stride=2)
# self.bn3 = nn.BatchNorm2d(qsize)
linear_input_size = 3 * qsize
# self.head = nn.Linear(linear_input_size, outputs)
self.head = nn.Sequential(
nn.Linear(linear_input_size, qsize),
nn.ReLU(),
nn.Linear(qsize, qsize),
nn.ReLU(),
nn.Linear(qsize, outputs)
)
# Called with either one element to determine next action, or a batch
# during optimization. Returns tensor([[left0exp,right0exp]...]).
def forward(self, x):
x = F.relu(self.bn1(self.conv1(x)))
# print("x1------->", x.size())
x = F.relu(self.bn2(self.conv2(x)))
# print("x2------->", x.size())
x = F.relu(self.bn3(self.conv3(x)))
# print("x3------->", x.size())
x = x.view(x.size(0), -1)
# print("x4------->", x.size())
x = self.head(x)
# print("x5------->", x.size())
return x
def select_action(df, idx):
print("idx{0:d} close:{1} closemin:{2} closemax:{3}".format(idx, df.loc[idx, "close"], df.loc[idx, "closemin"], df.loc[idx, "closemax"]))
if df.loc[idx, "sma20deg"] >= 0 :
action = 0 if df.loc[idx, "close"] <= df.loc[idx, "closemin"] else ((action_kind-1) if df.loc[idx, "close"] >= df.loc[idx, "closemax"] else 1)
else:
action = 0 if df.loc[idx, "close"] <= df.loc[idx, "closemin"] else ((action_kind-1) if df.loc[idx, "close"] >= df.loc[idx, "closemax"] else 2)
return action
# gpu가 사용 가능한 경우에는 device를 gpu로 설정하고 불가능하면 cpu로 설정합니다.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
charts = []
charts = joblib.load("charts_{0:02d}_1.dmp".format(action_kind))
train_loader = DataLoader(charts,batch_size=batch_size, shuffle=True,num_workers=2,drop_last=True)
test_loader = DataLoader(charts,batch_size=batch_size, shuffle=False,num_workers=2,drop_last=True)
# 모델을 지정한 장치로 올립니다.
model = DQN(screen_height, screen_width, action_kind, qsize=qsize).to(device)
if path.exists("train_dqn_{0:02d}_{1}_{2:04d}.pt".format(action_kind, device, qsize)):
model.load_state_dict(torch.load("train_dqn_{0:02d}_{1}_{2:04d}.pt".format(action_kind,device,qsize)))
model.eval()
loss_func = nn.CrossEntropyLoss()
# 최적화함수로는 Adam을 사용합니다.
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
optimizer.zero_grad()
loss_arr =[]
model.train()
for i in range(num_epoch):
for j,[image,label] in enumerate(train_loader):
x = image.to(device).squeeze(1)
y = label.type(torch.LongTensor).to(device)
output = model.forward(x)
loss = loss_func(output,y)
loss.backward()
optimizer.step()
if j % 1000 == 0:
print(loss.cpu().detach().numpy())
loss_arr.append(loss.cpu().detach().numpy())
torch.save(model.state_dict(),"train_dqn_{0:02d}_{1}_{2:04d}.pt".format(action_kind, device, qsize))
torch.save(model.state_dict(),"train_dqn_{0:02d}_{1}_{2:04d}.pt".format(action_kind, device, qsize))
# pltx.plot(loss_arr)
# pltx.show()
model.eval()
# 맞은 개수, 전체 개수를 저장할 변수를 지정합니다.
correct = 0
total = 0.000000000001
# 인퍼런스 모드를 위해 no_grad 해줍니다.
with torch.no_grad():
# 테스트로더에서 이미지와 정답을 불러옵니다.
for image,label in test_loader:
# 두 데이터 모두 장치에 올립니다.
x = image.to(device).squeeze(1)
y_= label.to(device)
# 모델에 데이터를 넣고 결과값을 얻습니다.
output = model.forward(x)
# https://pytorch.org/docs/stable/torch.html?highlight=max#torch.max
# torch.max를 이용해 최대 값 및 최대값 인덱스를 뽑아냅니다.
# 여기서는 최대값은 필요없기 때문에 인덱스만 사용합니다.
_,output_index = torch.max(output,1)
# 전체 개수는 라벨의 개수로 더해줍니다.
# 전체 개수를 알고 있음에도 이렇게 하는 이유는 batch_size, drop_last의 영향으로 몇몇 데이터가 잘릴수도 있기 때문입니다.
total += label.size(0)
# 모델의 결과의 최대값 인덱스와 라벨이 일치하는 개수를 correct에 더해줍니다.
correct += (output_index == y_).sum().float()
# 테스트 데이터 전체에 대해 위의 작업을 시행한 후 정확도를 구해줍니다.
print("Accuracy of Test Data: {}%".format(100*correct/total))
이렇게 charts.dmp 파일을 여러개의 이름으로 만들고
학습함수에 넣어서 학습을 진행합니다.
자신이 가지고 있는 보드수 만큼만 프로세스를 뛰울 수 있으니 보드가 한장이면 위의 소스를 사용하시면 되고
혹시 여러장의 RTX보드를 가지고 있다면 model = nn.DataParallel(model, device_ids=[0,1]).to(device) 를 추가하는 것이 좋습니다.
(보드가 2장일 때, 보드가 3장일 경우는 [0,1]을 [0,1,2]로 바꾸면 됩니다.
단 보드는 같은 것을 사용하는 것이 좋습니다.
보드0과 보드1의 gpu core 용량이 75%이상 차이가 나면 실행할 수 없습니다.
위치는 현재 소스의 모델 바로 아래 입니다.
단 이 경우 model.load_state_dict 함수를 model.module.load_state_dict함수로 변경을 해야 합니다.
지금 저장되어 있는 pt파일의 경우 단순 모델을 기준으로 저장되어 있으므로 다른 PC나 혹시 싱글로 predict를 할 경우는 싱글보드에 맞게 저장이 되어야 합니다.
마찬가지로 저장 또한
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.init as init
import torchvision
import torch.nn.functional as F
# https://pytorch.org/docs/stable/torchvision/datasets.html
# 파이토치에서는 torchvision.datasets에 MNIST 등의 다양한 데이터를 사용하기 용이하게 정리해놨습니다.
# 이를 사용하면 데이터를 따로 학습에 맞게 정리하거나 하지 않아도 바로 사용이 가능합니다.
import torchvision.datasets as dset
# https://pytorch.org/docs/stable/torchvision/transforms.html?highlight=transforms
# torchvision.transforms에는 이미지 데이터를 자르거나 확대 및 다양하게 변형시키는 함수들이 구현되어 있습니다.
import torchvision.transforms as transforms
# https://pytorch.org/docs/stable/data.html?highlight=dataloader#torch.utils.data.DataLoader
# DataLoader는 전처리가 끝난 데이터들을 지정한 배치 크기에 맞게 모아서 전달해주는 역할을 합니다.
from torch.utils.data import DataLoader
import pandas as pd
import numpy as np
import matplotlib.pyplot as pltx
import plotly.graph_objects as go
import plotly.subplots as ms
import plotly.express as px
import plotly as plt
import pymysql
import pandas as pd
import numpy as np
import time
import talib
from PIL import Image
import io
import os.path as path
import joblib
import common.constrant as Const
batch_size = 200
learning_rate = 0.001
num_epoch = 100
data_size = 100
action_kind = 5
screen_height = 50
screen_width = 70
qsize = 2048
df = pd.read_csv("data_all.dat")
class Account():
def __init__(self, df, origin) -> None:
self.BASIC_FEES = 0.0005
self.hold_score = 0
self.byu_score = 0
self.sell_score = 0
self.orgin_money = origin
self.money = origin
self.balance = origin
self.unit = 0
self.buy_index = 0
self.max_rate = 0
self.df = df
self.rate = 0
self.old_rate = 0
self.bak_unit = 0
self.bak_balance = 0
def reset(self):
self.balance = self.orgin_money
self.money = self.orgin_money
self.old_rate = 0
self.unit = 0
self.rate = 0
self.buy_index = 0
def select(self, action:int):
ret_action = 0
if self.unit > 0 :
ret_action = 0 if action == 1 else action
else:
ret_action = 0 if action == 2 else action
return ret_action
def exec_action(self, action, idx):
real_action = self.select(action)
if real_action == 1:
return self.unit_buy(idx), real_action
elif real_action == 2:
return self.unit_sell(idx), real_action
else:
return self.unit_hold(idx), real_action
def unit_buy(self, index):
self.back_up()
buy_balance = self.balance
self.buy_index = index
self.old_rate = self.rate
while True:
buy_unit = buy_balance / self.df.loc[index, 'close']
amount = (buy_unit * self.df.loc[index, 'close']) * self.BASIC_FEES + buy_unit * self.df.loc[index, 'close']
if self.balance < amount :
buy_balance -= self.balance * 0.0001
else:
self.balance -= amount
self.unit = buy_unit
self.rate = ((self.unit * self.df.loc[index, 'close'] + self.balance) - self.orgin_money) * 100 / self.orgin_money
# if index%50 == 0 : print("buy index[{}]hold unit[{:,.6f}] remind money[{:,.6f}] rate[{:,.8f}] expected[{:,.8f}]".format(index, self.unit, self.balance, self.rate, (self.unit * self.df.loc[index, 'close'] + self.balance)))
print("buy index[{}]hold unit[{:,.6f}] remind money[{:,.6f}] rate[{:,.8f}] expected[{:,.8f}]".format(index, self.unit, self.balance, self.rate, (self.unit * self.df.loc[index, 'close'] + self.balance)))
break
return self.rate
def unit_sell(self, index):
self.back_up()
self.old_rate = self.rate
sell_balance = self.unit * self.df.loc[index, 'close'] - (self.unit * self.df.loc[index, 'close']) * self.BASIC_FEES
self.balance += sell_balance
self.unit = 0
self.rate = ((self.unit * self.df.loc[index, 'close'] + self.balance) - self.orgin_money) * 100 / self.orgin_money
# if index%50 == 0 : print("sell index[{}]hold unit[{:,.6f}] remind money[{:,.6f}] rate[{:,.8f}] expected[{:,.8f}]".format(index, self.unit, self.balance, self.rate, (self.unit * self.df.loc[index, 'close'] + self.balance)))
print("sell index[{}]hold unit[{:,.6f}] remind money[{:,.6f}] rate[{:,.8f}] expected[{:,.8f}]".format(index, self.unit, self.balance, self.rate, (self.unit * self.df.loc[index, 'close'] + self.balance)))
self.money = self.balance
self.max_rate = 0
return self.rate
def unit_hold(self, index):
self.old_rate = self.rate
self.rate = ((self.unit * self.df.loc[index, 'close'] + self.balance) - self.orgin_money) * 100 / self.orgin_money
# print("index[{}]hold unit[{:,.6f}] remind money[{:,.6f}] rate[{:,.8f}] expected[{:,.8f}]".format(index, self.unit, self.balance, self.rate, (self.unit * self.df.loc[index, 'close'] + self.balance)))
return self.rate
def get_newaction(self, index):
if self.unit > 0:
hold_rate = ((self.unit * self.df.loc[index, "sma3"] + self.balance) - self.orgin_money) * 100 / self.orgin_money
sell_rate = ((self.unit * self.df.loc[index, "close"] + self.balance) - self.orgin_money) * 100 / self.orgin_money
return 0 if hold_rate > sell_rate else 2
else:
buy_balance = self.balance
self.buy_index = index
self.old_rate = self.rate
while True:
buy_unit = buy_balance / self.df.loc[index, 'close']
amount = (buy_unit * self.df.loc[index, 'close']) * self.BASIC_FEES + buy_unit * self.df.loc[index, 'close']
if self.balance < amount :
buy_balance -= self.balance * 0.0001
else:
self.balance -= amount
self.unit = buy_unit
buy_rate = ((self.unit * self.df.loc[index, 'close'] + self.balance) - self.orgin_money) * 100 / self.orgin_money
break
hold_rate = (self.balance - self.orgin_money) * 100 / self.orgin_money
return 0 if hold_rate > buy_rate else 1
def back_up(self):
self.bak_balance = self.balance
self.bak_unit = self.unit
def back_ward(self):
self.balance = self.bak_balance
self.unit = self.bak_unit
def is_bankrupt(self):
return (self.money < 10000)
start_idx = df.index[0]
print("start_idx[{}]".format(start_idx))
def get_chart(df, idx, max_data:int=300, i_w:int=140, i_h:int=100):
ndf = df.head(idx - start_idx).tail(max_data)
ndf.reset_index(drop=True, inplace=True)
candle = go.Candlestick(x=ndf.index,open=ndf['open'],high=ndf['high'],low=ndf['low'],close=ndf['close'], increasing_line_color = 'red',decreasing_line_color = 'blue', showlegend=False)
upper = go.Scatter(x=ndf.index, y=ndf['upper'], line=dict(color='red', width=2), name='upper', showlegend=False)
ma20 = go.Scatter(x=ndf.index, y=ndf['ma20'], line=dict(color='black', width=2), name='ma20', showlegend=False)
lower = go.Scatter(x=ndf.index, y=ndf['lower'], line=dict(color='blue', width=2), name='lower', showlegend=False)
volume = go.Bar(x=ndf.index, y=ndf['volume'], marker_color='red', name='volume', showlegend=False)
MACD = go.Scatter(x=ndf.index, y=ndf['macd'], line=dict(color='blue', width=2), name='MACD', legendgroup='group2', legendgrouptitle_text='MACD')
MACD_Signal = go.Scatter(x=ndf.index, y=ndf['signal'], line=dict(dash='dashdot', color='green', width=2), name='MACD_Signal')
MACD_Oscil = go.Bar(x=ndf.index, y=ndf['flag'], marker_color='purple', name='MACD_Oscil')
fast_k = go.Scatter(x=ndf.index, y=ndf['fast_k'], line=dict(color='skyblue', width=2), name='fast_k', legendgroup='group3', legendgrouptitle_text='%K %D')
slow_d = go.Scatter(x=ndf.index, y=ndf['slow_d'], line=dict(dash='dashdot', color='black', width=2), name='slow_d')
PB = go.Scatter(x=ndf.index, y=ndf['PB']*100, line=dict(color='blue', width=2), name='PB', legendgroup='group4', legendgrouptitle_text='PB, MFI')
MFI10 = go.Scatter(x=ndf.index, y=ndf['MFI10'], line=dict(dash='dashdot', color='green', width=2), name='MFI10')
RSI = go.Scatter(x=ndf.index, y=ndf['rsi14'], line=dict(color='red', width=2), name='RSI', legendgroup='group5', legendgrouptitle_text='RSI')
# 스타일
fig = ms.make_subplots(rows=5, cols=2, specs=[[{'rowspan':4},{}],[None,{}],[None,{}],[None,{}],[{},{}]], shared_xaxes=True, horizontal_spacing=0.03, vertical_spacing=0.01)
fig.add_trace(candle,row=1,col=1)
fig.add_trace(upper,row=1,col=1)
fig.add_trace(ma20,row=1,col=1)
fig.add_trace(lower,row=1,col=1)
fig.add_trace(volume,row=5,col=1)
fig.add_trace(candle,row=1,col=2)
fig.add_trace(upper,row=1,col=2)
fig.add_trace(ma20,row=1,col=2)
fig.add_trace(lower,row=1,col=2)
fig.add_trace(MACD,row=2,col=2)
fig.add_trace(MACD_Signal,row=2,col=2)
fig.add_trace(MACD_Oscil,row=2,col=2)
fig.add_trace(fast_k,row=3,col=2)
fig.add_trace(slow_d,row=3,col=2)
fig.add_trace(PB,row=4,col=2)
fig.add_trace(MFI10,row=4,col=2)
fig.add_trace(RSI,row=5,col=2)
# 추세추종
# trend_fol = 0
# trend_refol = 0
# for i in ndf.index:
# if ndf['PB'][i] > 0.8 and ndf['MFI10'][i] > 80:
# trend_fol = go.Scatter(x=[ndf.index[i]], y=[ndf['close'][i]], marker_color='orange', marker_size=20, marker_symbol='triangle-up', opacity=0.7, showlegend=False)
# fig.add_trace(trend_fol,row=1,col=1)
# elif ndf['PB'][i] < 0.2 and ndf['MFI10'][i] < 20:
# trend_fol = go.Scatter(x=[ndf.index[i]], y=[ndf['close'][i]], marker_color='darkblue', marker_size=20, marker_symbol='triangle-down', opacity=0.7, showlegend=False)
# fig.add_trace(trend_fol,row=1,col=1)
# 역추세추종
# for i in ndf.index:
# if ndf['PB'][i] < 0.05 and ndf['IIP21'][i] > 0:
# trend_refol = go.Scatter(x=[ndf.index[i]], y=[ndf['close'][i]], marker_color='purple', marker_size=20, marker_symbol='triangle-up', opacity=0.7, showlegend=False) #보라
# fig.add_trace(trend_refol,row=1,col=1)
# elif df['PB'][i] > 0.95 and ndf['IIP21'][i] < 0:
# trend_refol = go.Scatter(x=[ndf.index[i]], y=[ndf['close'][i]], marker_color='skyblue', marker_size=20, marker_symbol='triangle-down', opacity=0.7, showlegend=False) #하늘
# fig.add_trace(trend_refol,row=1,col=1)
# fig.add_trace(trend_fol,row=1,col=1)
# 추세추총전략을 통해 캔들차트에 표시합니다.
# fig.add_trace(trend_refol,row=1,col=1)
# 역추세 전략을 통해 캔들차트에 표시합니다.
# fig.update_layout(autosize=True, xaxis1_rangeslider_visible=False, xaxis2_rangeslider_visible=False, margin=dict(l=50,r=50,t=50,b=50), template='seaborn', title=f'({ticker})의 날짜: ETH [추세추종전략:오↑파↓] [역추세전략:보↑하↓]')
# fig.update_xaxes(tickformat='%y년%m월%d일', zeroline=True, zerolinewidth=1, zerolinecolor='black', showgrid=True, gridwidth=2, gridcolor='lightgray', showline=True,linewidth=2, linecolor='black', mirror=True)
# fig.update_yaxes(tickformat=',d', zeroline=True, zerolinewidth=1, zerolinecolor='black', showgrid=True, gridwidth=2, gridcolor='lightgray',showline=True,linewidth=2, linecolor='black', mirror=True)
# fig.update_traces(xhoverformat='%y년%m월%d일')
# size = len(img)
# img = plt.io.to_image(fig, format='png')
# canvas = FigureCanvasBase(fig)
# img = Image.frombytes(mode='RGB', size=(700, 500), data=fig.to_image().to, decoder_name='raw')
# img = Image.frombytes('RGBA', (700, 500), plt.io.to_image(fig, format='png'), 'raw')
# img = Image.fromarray('RGBA', (700, 500), np.array(plt.io.to_image(fig, format='png')), 'raw')
# img = Image.fromarray(np.array(plt.io.to_image(fig, format='png')), 'RGB')
# img = Image.fromarray(np.array(plt.io.to_image(fig, format='png')), 'L')
# img = Image.open(io.BytesIO(plt.io.to_image(fig, format='png', width=140, height=100)))
# img = Image.open(io.BytesIO(plt.io.to_image(fig, format='png', width=700, height=500)))
img = Image.open(io.BytesIO(plt.io.to_image(fig, format='png')))
# print(img)
img.convert("RGB")
# print(img)
img.thumbnail((i_h, i_w), Image.LANCZOS)
# print(img)
# img = Image.Image(img, 'RGB')
# img.show()
return img
class DQN(nn.Module):
def __init__(self, h, w, outputs, qsize):
super(DQN, self).__init__()
self.conv1 = nn.Sequential(
nn.ReLU(),
nn.Conv2d(4, h*w, kernel_size=5, stride=2),
nn.BatchNorm2d(h*w)
)
self.conv2 = nn.Sequential(
nn.ReLU(),
nn.Conv2d(h*w, qsize, kernel_size=5, stride=2),
nn.BatchNorm2d(qsize),
# nn.MaxPool2d(kernel_size=2,stride=2)
)
self.conv3 = nn.Sequential(
nn.ReLU(),
nn.Conv2d(qsize, qsize, kernel_size=5, stride=2),
nn.BatchNorm2d(qsize),
nn.Softmax()
# nn.MaxPool2d(kernel_size=2,stride=2)
)
# self.mp3 = nn.MaxPool2d(kernel_size=2,stride=2)
# # [batch_size,64,6,6] -> [batch_size,64,3,3]
linear_input_size = 3 * qsize
self.head = nn.Linear(linear_input_size, outputs)
# self.head = nn.Sequential(
# nn.Linear(linear_input_size, qsize),
# nn.ReLU(),
# nn.Linear(qsize, qsize),
# nn.ReLU(),
# nn.Linear(qsize, outputs)
# )
# Called with either one element to determine next action, or a batch
# during optimization. Returns tensor([[left0exp,right0exp]...]).
def forward(self, x):
x = self.conv1(x)
print("x1------->", x.size())
x = self.conv2(x)
print("x2------->", x.size())
x = self.conv3(x)
print("x3------->", x.size())
x = x.view(x.size(0), -1)
print("x4------->", x.size())
x = self.head(x)
print("x5------->", x.size())
print(x)
x = torch.argmax(x, dim= -1)
print(x)
print("x6------->", x.size())
return x
def select_action(df, idx):
print("idx{0:d} close:{1} closemin:{2} closemax:{3}".format(idx, df.loc[idx, "close"], df.loc[idx, "closemin"], df.loc[idx, "closemax"]))
if df.loc[idx, "sma20deg"] >= 0 :
action = 0 if df.loc[idx, "close"] <= df.loc[idx, "closemin"] else ((action_kind-1) if df.loc[idx, "close"] >= df.loc[idx, "closemax"] else 1)
else:
action = 0 if df.loc[idx, "close"] <= df.loc[idx, "closemin"] else ((action_kind-1) if df.loc[idx, "close"] >= df.loc[idx, "closemax"] else 2)
return action
# gpu가 사용 가능한 경우에는 device를 gpu로 설정하고 불가능하면 cpu로 설정합니다.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
account = Account(df, 50000000)
account.reset()
converter = torchvision.transforms.ToTensor()
# 모델을 지정한 장치로 올립니다.
model = DQN(screen_height, screen_width, action_kind, qsize=qsize).to(device)
if path.exist("train_dqn_{0:02d}_{1}_{2:04d}.pt".format(action_kind, device, qsize)):
torch.load(model.state_dict(), "train_dqn_{0:02d}_{1}_{2:04d}.pt".format(action_kind,device,qsize))
model.eval()
with torch.no_grad():
tot = 0.0000000000001
corr = 0.0
predict_arr = []
for idy in df.index:
x = get_chart(df, idy, data_size, i_w = screen_width, i_h = screen_height)
x = converter(x).unsqueeze(0).to(device).squeeze(1)
output = model.forward(x)
_,action = torch.max(output,1)
# print("action:", action)
action = action.cpu().numpy()[0]
# print("action:", action)
predict_arr[action] += 1
sel_act = select_action(df, idy)
tot += 1 if sel_act in [0, action_kind-1] or action in [0, action_kind-1] else 0
corr += 1 if (sel_act in [0, action_kind-1] or action in [0, action_kind-1]) and sel_act == action else 0
reward, real_action = account.exec_action(2 if action == (action_kind - 1) else (1 if action == 0 else 0) , idy)
if idy % 100 == 0:
print("progress {0:.2f}".format(idy * 100 / df.index.max()))
print("idy:%d action[%d:%d] price [%.4f] unit[%.4f] agent rate:%.05f remind money:%.02f accuracy:%.02f"
% (idy, sel_act, action, df.loc[idy, 'close'], account.unit, account.rate, account.balance + account.unit * df.loc[idy, 'close'], 100*corr/tot))
print("prediction count")
for act in predict_arr:
print("action{0:02d} ==> {1:d}".format(act, predict_arr[act]))
실제 데이터를 이용하여 정확도를 재측정합니다.
실제 시스템에 적용할지 말지는 본인이 판단해야 할 것 같습니다.
모델이 잘 작동하지 않거나 차수가 다르게 나오는 등의 문제가 발생할 수 있습니다.
댓글로 문의 주시면 상세히 답변 드리겠습니다.
Good Luck!!!
'python > 자동매매 프로그램' 카테고리의 다른 글
DQN network 수정 (0) | 2024.03.12 |
---|---|
Colab에서 살아남기 (3) | 2023.03.07 |
강화학습을 이용한 비트코인 매매프로그램(13) - 강화학습 최적화 (1) | 2023.02.14 |
강화학습을 이용한 비트코인 매매프로그램(12) - 실거래 적용 (5) | 2023.01.30 |
강화학습을 이용한 비트코인 매매프로그램(11) - ResNet + RNN 적용 모델 (1) | 2022.12.25 |