Question-3: PACC#

import yfinance as yf
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import statistics
from sklearn.metrics import accuracy_score

Title#

Enhancing Pattern Accuracy Metric in Candlestick Pattern Mining for Stock Price Forecasting

Abstract#

Predicting future stock prices is notoriously challenging due to the influence of numerous variables. However, recent advances in pattern mining and machine learning have yielded promising results in forecasting stock market behavior. Candlestick patterns which are visual representations of stock price movements can be used to identify patterns that may indicate future trends.

Current prediction methods rely on matching sequences of candlesticks to patterns with the highest “pattern accuracy” scores. However, this metric has limitations, as it can yield high scores even when patterns appear only once in historical data, reducing reliability. This project aims to improve predictive accuracy by developing a weighted pattern accuracy metric that considers the frequency and significance of patterns within historical data, ultimately enhancing the robustness of stock price forecasts.

Definition of PACC#

Let \(PSet = \{(KTS_i, Trend_i)\}_{i=1}^n =\{(KTS_1, Trend_1), (KTS_2, Trend_2),....,(KTS_n, Trend_n)\}\) be the Pattern Set.

Subsequence#

\(\begin{equation} whetherSub(X,Y)= \begin{cases} 1 & \text{if } \mbox{X is a subsequence of Y} \\ 0 & \text{if } \mbox{otherwise} \end{cases} \end{equation}\)

oNum#

\(\displaystyle oNum_i = \sum_{j=1}^nwhetherSub(KTS_i,KTS_j)\)

sameTrendNum#

\(\begin{equation} sameTrend((KTS_i, Trend_i),(KTS_j, Trend_j))= \begin{cases} 1 & \text{if } KTS_i \,\,\,\mbox{is a subsequence of} \,\,\,KTS_j \\ &\,\,\,\,\,\,\,\,\, \mbox{ and }\,\,\, Trend_i = Trend_j \\\\ 0 & \text{if } \mbox{otherwise} \end{cases} \end{equation}\)

\(\displaystyle sameTrendNum_i = \sum_{j=1}^nsameTrend(KTS_i,KTS_j)\)

PACC#

\(\displaystyle PACC_i = \frac{sameTrendNum_i}{oNum_i}\)

Data#

def get_training_test_data(stock='AMZN', start='2019-1-1', end='2021-1-31', training_ratio=0.96):
    df = yf.Ticker(stock).history(start=start, end=end)
    df = df.iloc[:,:-3]
    df.reset_index(inplace=True)
    df['Date'] = [i.date() for i in df.Date]
    df['fcc'] = [np.sign(df.Close.loc[i+1]-df.Close.loc[i]) for i in range(len(df)-1)]+[np.nan]
    training_length = int(len(df)*training_ratio)
    training_data = df.iloc[:training_length,:] 
    test_data = df.iloc[training_length:,:]
    test_data.reset_index(inplace=True, drop=True)
    return (training_data, test_data)
df_train, df_test = get_training_test_data()
df_train.shape, df_test.shape
((503, 6), (21, 6))

Encoding#

def encoder(hp, op, cp, lp):
    if hp  > op > cp  > lp: return 'a'
    if hp == op > cp  > lp: return 'b'
    if hp == op > cp == lp: return 'c'
    if hp  > op > cp == lp: return 'd'
        
    if hp  > cp > op  > lp: return 'e'
    if hp == cp > op  > lp: return 'f'
    if hp == cp > op == lp: return 'g'
    if hp  > cp > op == lp: return 'h'

    if hp  > op == cp  > lp: return 'i'
    if hp == op == cp  > lp: return 'j'
    if hp == op == cp == lp: return 'k'
    if hp  > op == cp == lp: return 'l' 

def df_encoder(data):
    data_ = data.copy()
    encoder_list = []
    for i in data_.index:
        hp, op, cp, lp = data_[['High','Open', 'Close', 'Low']].loc[i]
        encoder_list.append(encoder(hp, op, cp, lp))
    data_['code'] = encoder_list
    return data_
df_train = df_encoder(df_train)
df_test = df_encoder(df_test)
df_train.head().round(2)
Date Open High Low Close fcc code
0 2019-01-02 73.26 77.67 73.05 76.96 -1.0 e
1 2019-01-03 76.00 76.90 74.86 75.01 1.0 a
2 2019-01-04 76.50 79.70 75.92 78.77 1.0 e
3 2019-01-07 80.12 81.73 79.46 81.48 1.0 e
4 2019-01-08 83.23 83.83 80.83 82.83 1.0 a

Change Points#

def change_points(data):
    data_ = data.copy()
    change_pts, change_types = [True], ['Start']

    for i in range(1, len(data_)-1):
        
        left_change  = int(np.sign(data_['Close'][i]-data_['Close'][i-1]))
        right_change = int(np.sign(data_['Close'][i+1]-data_['Close'][i]))
        
        if left_change != right_change:
            change_pts.append(True)
        else:
            change_pts.append(False)
            
        change_types.append((left_change, right_change))

    change_pts.append(False)
    change_types.append('End')

    data_['change_points'] = change_pts
    data_['change_types'] = change_types
    
    return data_
df_train = change_points(df_train)
df_train.head().round(2)
Date Open High Low Close fcc code change_points change_types
0 2019-01-02 73.26 77.67 73.05 76.96 -1.0 e True Start
1 2019-01-03 76.00 76.90 74.86 75.01 1.0 a True (-1, 1)
2 2019-01-04 76.50 79.70 75.92 78.77 1.0 e False (1, 1)
3 2019-01-07 80.12 81.73 79.46 81.48 1.0 e False (1, 1)
4 2019-01-08 83.23 83.83 80.83 82.83 1.0 a False (1, 1)

Segmentation and Labeling#

def segmentation(data):
    segment_list, trend_list = [], []
    pattern = ''
    
    for i in range(len(data)-1):  
        pattern += data.code[i]
    
        if data.change_points.loc[i]:
            segment_list.append(pattern) 
            j = i
            while True:
                if data.change_types[j+1][0] != 0:   # first non-equal direction
                    trend_list.append(data.change_types[j+1][0]) 
                    break
                j += 1
    
            pattern = ''

    df_seg = pd.DataFrame()
    df_seg['kts'] = segment_list
    df_seg['trend'] = trend_list

    df_seg_unique = pd.DataFrame(df_seg.value_counts()).reset_index()

    return df_seg_unique
PSet = segmentation(df_train)
PSet.head()
kts trend count
0 a 1 58
1 e -1 47
2 aa 1 23
3 ee -1 15
4 e 1 9

PACC#

def whethersub(X, Y):
    if len(X) > len(Y): return 0
    else:
        k = 0
        for i in range(len(X)):
            if X[i] in Y[k:]:
                k = Y.find(X[i], k)+1
            else:
                return 0          
    return 1 

def onum(pset):
    pset_ = pset.copy()
    onum_list = []
    for i in range(len(pset)):
        total = 0
        for j in range(len(pset)):
            total += whethersub(pset_.loc[i, 'kts'], pset_.loc[j, 'kts'])* pset_.loc[j, 'count']
        onum_list.append(total)
    pset_['onum'] = onum_list
    return pset_

def sametrendnum(pset):
    pset_ = pset.copy()
    sametrendnum_list = []
    for i in range(len(pset_)):
        total = 0
        for j in range(len(pset_)):
            if whethersub(pset_.loc[i,'kts'], pset_.loc[j,'kts']) & (pset_.loc[i,'trend'] == pset_.loc[j, 'trend']):
                total += pset_.loc[j, 'count']    
        sametrendnum_list.append(total)
    pset_['sametrendnum'] = sametrendnum_list
    return pset_ 

def pacc(pset):
    pset_ = pset.copy()
    pset_['pacc'] = pset_['sametrendnum'] / pset_['onum'] 
    return pset_
PRSet = pacc(sametrendnum(onum(PSet)))
PRSet.head().round(2)
kts trend count onum sametrendnum pacc
0 a 1 58 154 111 0.72
1 e -1 47 146 115 0.79
2 aa 1 23 58 47 0.81
3 ee -1 15 60 56 0.93
4 e 1 9 146 31 0.21
PRSet.tail().round(2)
kts trend count onum sametrendnum pacc
53 aaaea 1 1 1 1 1.00
54 aaaaa 1 1 3 3 1.00
55 eaa 1 1 9 4 0.44
56 aab 1 1 3 3 1.00
57 hee -1 1 3 3 1.00

Possible Deficiencies#

  1. Index-53: The pattern ‘aaaea’ appears only once, but its PACC score is perfect (PACC = 1).

    • Patterns that appear less than a specified threshold can be removed from the PSet.

  2. The contribution of ‘ab’ to the PACC of ‘cdab’ and ‘cafffffbd’ is equal, even though ‘ab’ appears in ‘cafffffbd’ with many letters in between.

  3. If ‘ab’ appears at the end of a sequence, the contribution of that sequence to the PACC of ‘ab’ should be greater.