Coding#

import yfinance as yf
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import statistics
from sklearn.metrics import accuracy_score

Data#

The following function returns a tuple containing the training and test datasets for a specified stock and time period.

  • The training_ratio parameter represents the percentage of data allocated to the training set.

def get_training_test_data(stock='AMZN', start='2019-1-1', end='2021-1-31', training_ratio=0.96):
    df = yf.Ticker(stock).history(start=start, end=end)
    df = df.iloc[:,:-3]
    df.reset_index(inplace=True)
    df['Date'] = [i.date() for i in df.Date]
    df['fcc'] = [np.sign(df.Close.loc[i+1]-df.Close.loc[i]) for i in range(len(df)-1)]+[np.nan]
    training_length = int(len(df)*training_ratio)
    training_data = df.iloc[:training_length,:] 
    test_data = df.iloc[training_length:,:]
    test_data.reset_index(inplace=True, drop=True)
    return (training_data, test_data)
df_train, df_test = get_training_test_data()
df_train.shape, df_test.shape
((503, 6), (21, 6))
df_train.head()
Date Open High Low Close fcc
0 2019-01-02 73.260002 77.667999 73.046501 76.956497 -1.0
1 2019-01-03 76.000504 76.900002 74.855499 75.014000 1.0
2 2019-01-04 76.500000 79.699997 75.915497 78.769501 1.0
3 2019-01-07 80.115501 81.727997 79.459503 81.475502 1.0
4 2019-01-08 83.234497 83.830498 80.830498 82.829002 1.0
df_test.head()
Date Open High Low Close fcc
0 2020-12-30 167.050003 167.104996 164.123505 164.292496 -1.0
1 2020-12-31 163.750000 164.145996 162.059998 162.846497 -1.0
2 2021-01-04 163.500000 163.600006 157.201004 159.331497 1.0
3 2021-01-05 158.300507 161.169006 158.253006 160.925507 -1.0
4 2021-01-06 157.324005 159.875504 156.557999 156.919006 1.0

Visualization#

def cs_visualize(hp, op, cp, lp, x=0, details=False, linewidth=20):
    if cp > op: color = 'green'
    elif cp < op:  color = 'red'
    else: color ='black'

    plt.plot([x,x], [lp, hp], c=color)
    
    if cp != op:
        plt.plot([x,x], [op, cp], c=color, linewidth=linewidth)
    else:
        plt.plot([x-0.1,x+0.1], [op, cp], c=color, linewidth=1)

    if details:
        plt.text(x+0.01, hp, 'high')
        plt.text(x+0.01, lp , 'low')
        plt.text(x+0.01, cp, 'close')
        plt.text(x+0.01, op, 'open')
def cs_visualize_df(data, M=0, N=None):
    plt.figure(figsize=(20,5))
    for i in data.index[M:N]:
        hp, op, cp, lp = data[['High','Open', 'Close', 'Low']].loc[i]
        cs_visualize(hp, op, cp, lp, x=i)
cs_visualize_df(df_train, M=0, N=20)
_images/063769fc7c329951e69fdf9b43810facc7b9b9c33c7a0754c597050f1b9624f6.png

Encoding#

def encoder(hp, op, cp, lp):
    if hp  > op > cp  > lp: return 'a'
    if hp == op > cp  > lp: return 'b'
    if hp == op > cp == lp: return 'c'
    if hp  > op > cp == lp: return 'd'
        
    if hp  > cp > op  > lp: return 'e'
    if hp == cp > op  > lp: return 'f'
    if hp == cp > op == lp: return 'g'
    if hp  > cp > op == lp: return 'h'

    if hp  > op == cp  > lp: return 'i'
    if hp == op == cp  > lp: return 'j'
    if hp == op == cp == lp: return 'k'
    if hp  > op == cp == lp: return 'l' 
def df_encoder(data):
    data_ = data.copy()
    encoder_list = []
    for i in data_.index:
        hp, op, cp, lp = data_[['High','Open', 'Close', 'Low']].loc[i]
        encoder_list.append(encoder(hp, op, cp, lp))
    data_['code'] = encoder_list
    return data_
df_train = df_encoder(df_train)
df_train.head().round(2)
Date Open High Low Close fcc code
0 2019-01-02 73.26 77.67 73.05 76.96 -1.0 e
1 2019-01-03 76.00 76.90 74.86 75.01 1.0 a
2 2019-01-04 76.50 79.70 75.92 78.77 1.0 e
3 2019-01-07 80.12 81.73 79.46 81.48 1.0 e
4 2019-01-08 83.23 83.83 80.83 82.83 1.0 a

The chart below illustrates the candlestick charts along with their corresponding codes.

cs_visualize_df(df_train, M=0, N=20)
plt.xticks(range(0,20), df_train.code[0:20])
plt.grid(axis='x');
_images/4cb58ffb6c16d5d0bd4062b332ee1c930dd4f8cfbf19dd1660d939c38eeece5b.png

Change Points#

def change_points(data):
    data_ = data.copy()
    change_pts, change_types = [True], ['Start']

    for i in range(1, len(data_)-1):
        
        left_change  = int(np.sign(data_['Close'][i]-data_['Close'][i-1]))
        right_change = int(np.sign(data_['Close'][i+1]-data_['Close'][i]))
        
        if left_change != right_change:
            change_pts.append(True)
        else:
            change_pts.append(False)
            
        change_types.append((left_change, right_change))

    change_pts.append(False)
    change_types.append('End')

    data_['change_points'] = change_pts
    data_['change_types'] = change_types
    
    return data_
df_train = change_points(df_train)
df_train.head().round(2)
Date Open High Low Close fcc code change_points change_types
0 2019-01-02 73.26 77.67 73.05 76.96 -1.0 e True Start
1 2019-01-03 76.00 76.90 74.86 75.01 1.0 a True (-1, 1)
2 2019-01-04 76.50 79.70 75.92 78.77 1.0 e False (1, 1)
3 2019-01-07 80.12 81.73 79.46 81.48 1.0 e False (1, 1)
4 2019-01-08 83.23 83.83 80.83 82.83 1.0 a False (1, 1)
df_train = change_points(df_train)
df_train.tail().round(2)
Date Open High Low Close fcc code change_points change_types
498 2020-12-22 160.14 161.10 159.00 160.33 -1.0 e True (1, -1)
499 2020-12-23 160.25 160.51 159.21 159.26 -1.0 a False (-1, -1)
500 2020-12-24 159.70 160.10 158.45 158.63 1.0 a True (-1, 1)
501 2020-12-28 159.70 165.20 158.63 164.20 1.0 e False (1, 1)
502 2020-12-29 165.50 167.53 164.06 166.10 -1.0 e False End
def cp_visualizer(data, M=0, N=None):
    plt.figure(figsize=(20,5))
    plt.plot(data.index[M:N], data.Close[M:N], alpha=0.2)
    color_list = ['black', 'red']
    plt.scatter(data.index[M:N], data.Close[M:N], c=[color_list[i] for i in data.change_points[M:N]])
    plt.xticks(data.index[M:N], data.code[M:N], fontsize=12)
    plt.grid();
  • In the graph of Close values below, the red points indicate the change points, while the x-values correspond to the candlestick code for each corresponding day.

cp_visualizer(df_train, M=0, N=20)
_images/b8191d33dbfa570f44c31362d09f526609c8561b42b7c753f635af25ec6e48b1.png

Segmentation and Labeling#

def segmentation(data):
    segment_list, trend_list = [], []
    pattern = ''
    
    for i in range(len(data)-1):  
        pattern += data.code[i]
    
        if data.change_points.loc[i]:
            segment_list.append(pattern) 
            j = i
            while True:
                if data.change_types[j+1][0] != 0:   # first non-equal direction
                    trend_list.append(data.change_types[j+1][0]) 
                    break
                j += 1
    
            pattern = ''

    df_seg = pd.DataFrame()
    df_seg['kts'] = segment_list
    df_seg['trend'] = trend_list

    df_seg_unique = pd.DataFrame(df_seg.value_counts()).reset_index()

    return df_seg_unique
PSet = segmentation(df_train)
PSet.head()
kts trend count
0 a 1 58
1 e -1 47
2 aa 1 23
3 ee -1 15
4 e 1 9

Subsequence#

def whethersub(X, Y):
    if len(X) > len(Y): return 0
    else:
        k = 0
        for i in range(len(X)):
            if X[i] in Y[k:]:
                k = Y.find(X[i], k)+1
            else:
                return 0          
    return 1 

oNum#

def onum(pset):
    pset_ = pset.copy()
    onum_list = []
    for i in range(len(pset)):
        total = 0
        for j in range(len(pset)):
            total += whethersub(pset_.loc[i, 'kts'], pset_.loc[j, 'kts'])* pset_.loc[j, 'count']
        onum_list.append(total)
    pset_['onum'] = onum_list
    return pset_
PRSet = onum(PSet)
PRSet.head()
kts trend count onum
0 a 1 58 154
1 e -1 47 146
2 aa 1 23 58
3 ee -1 15 60
4 e 1 9 146

sameTrendNum#

def sametrendnum(pset):
    pset_ = pset.copy()
    sametrendnum_list = []
    for i in range(len(pset_)):
        total = 0
        for j in range(len(pset_)):
            if whethersub(pset_.loc[i,'kts'], pset_.loc[j,'kts']) & (pset_.loc[i,'trend'] == pset_.loc[j, 'trend']):
                total += pset_.loc[j, 'count']    
        sametrendnum_list.append(total)
    pset_['sametrendnum'] = sametrendnum_list
    return pset_  
PRSet = sametrendnum(PRSet)
PRSet.head()
kts trend count onum sametrendnum
0 a 1 58 154 111
1 e -1 47 146 115
2 aa 1 23 58 47
3 ee -1 15 60 56
4 e 1 9 146 31

PACC#

def pacc(pset):
    pset_ = pset.copy()
    pset_['pacc'] = pset_['sametrendnum'] / pset_['onum'] 
    return pset_
PRSet = pacc(PRSet)
PRSet.head().round(2)
kts trend count onum sametrendnum pacc
0 a 1 58 154 111 0.72
1 e -1 47 146 115 0.79
2 aa 1 23 58 47 0.81
3 ee -1 15 60 56 0.93
4 e 1 9 146 31 0.21
  • The set described above is called the Pattern Record Set (PRSet).

    • It consists of KTS-Trend pairs (KtsP) and their corresponding Pattern Accuracy Score (PACC) values.

Subsequence Model#

def subsequence_model_predict(kts, prset):
    best_pattern = []
    best_pacc = [0]
    best_trend = []
    for i in range(len(prset)):
        if (whethersub(prset.loc[i, 'kts'], kts)):
            if prset.loc[i, 'pacc'] > best_pacc[-1]:
                best_pattern = [prset.loc[i, 'kts']] 
                best_pacc = [prset.loc[i, 'pacc']]
                best_trend = [ prset.loc[i, 'trend']]
            elif prset.loc[i, 'pacc'] == best_pacc[-1]:
                best_pattern.append(prset.loc[i, 'kts'])
                best_pacc.append(prset.loc[i, 'pacc'])
                best_trend.append(prset.loc[i, 'trend'])

    if len(best_trend) == 0:
        pred = statistics.mode(prset.trend)
    elif len(best_trend) == 1:
        pred = best_trend[0]
    else:
        pred = statistics.mode(best_trend)
        
    return {'Best Pattern':best_pattern, 'Best PACC': best_pacc, 'Best Trend': best_trend, 'Prediction':pred}
subsequence_model_predict('aaee', PRSet)
{'Best Pattern': ['ee'],
 'Best PACC': [0.9333333333333333],
 'Best Trend': [-1],
 'Prediction': -1}
subsequence_model_predict('acd', PRSet)
{'Best Pattern': ['a'],
 'Best PACC': [0.7207792207792207],
 'Best Trend': [1],
 'Prediction': 1}

Test Set#

df_test.head().round(2)
Date Open High Low Close fcc
0 2020-12-30 167.05 167.10 164.12 164.29 -1.0
1 2020-12-31 163.75 164.15 162.06 162.85 -1.0
2 2021-01-04 163.50 163.60 157.20 159.33 1.0
3 2021-01-05 158.30 161.17 158.25 160.93 -1.0
4 2021-01-06 157.32 159.88 156.56 156.92 1.0
df_test = df_encoder(df_test)
df_test.head().round(2)
Date Open High Low Close fcc code
0 2020-12-30 167.05 167.10 164.12 164.29 -1.0 a
1 2020-12-31 163.75 164.15 162.06 162.85 -1.0 a
2 2021-01-04 163.50 163.60 157.20 159.33 1.0 a
3 2021-01-05 158.30 161.17 158.25 160.93 -1.0 e
4 2021-01-06 157.32 159.88 156.56 156.92 1.0 a
def test_data_prep(data, N=5):
    test_data = []
    for i in range(len(data)-N):
        kts = ''
        for j in range(N):
            kts += data.code.loc[i+j]
        test_data.append((kts, data.fcc.loc[i+j]))
        
    if type(test_data[-1][-1]) != float:
        test_data.pop()
    
    return test_data
test_data = test_data_prep(df_test)
test_data
[('aaaea', 1.0),
 ('aaeae', 1.0),
 ('aeaee', -1.0),
 ('eaeea', 1.0),
 ('aeeae', 1.0),
 ('eeaee', -1.0),
 ('eaeea', -1.0),
 ('aeeaa', 1.0),
 ('eeaae', 1.0),
 ('eaaee', 1.0),
 ('aaeee', -1.0),
 ('aeeea', 1.0),
 ('eeeaa', 1.0),
 ('eeaae', -1.0),
 ('eaaea', 1.0)]

Prediction#

def subsequence_model_score(data, prset):
    pred_trend_test = [subsequence_model_predict(kts, prset)['Prediction'] for (kts,trend) in data]
    actual_trend_test = [trend for (kts,trend) in data]
    return accuracy_score(pred_trend_test,  actual_trend_test)
subsequence_model_score(test_data, PRSet)
0.6