Question-3: PACC#
import yfinance as yf
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import statistics
from sklearn.metrics import accuracy_score
Title#
Enhancing Pattern Accuracy Metric in Candlestick Pattern Mining for Stock Price Forecasting
Abstract#
Predicting future stock prices is notoriously challenging due to the influence of numerous variables. However, recent advances in pattern mining and machine learning have yielded promising results in forecasting stock market behavior. Candlestick patterns which are visual representations of stock price movements can be used to identify patterns that may indicate future trends.
Current prediction methods rely on matching sequences of candlesticks to patterns with the highest “pattern accuracy” scores. However, this metric has limitations, as it can yield high scores even when patterns appear only once in historical data, reducing reliability. This project aims to improve predictive accuracy by developing a weighted pattern accuracy metric that considers the frequency and significance of patterns within historical data, ultimately enhancing the robustness of stock price forecasts.
Definition of PACC#
Let \(PSet = \{(KTS_i, Trend_i)\}_{i=1}^n =\{(KTS_1, Trend_1), (KTS_2, Trend_2),....,(KTS_n, Trend_n)\}\) be the Pattern Set.
Subsequence#
\(\begin{equation} whetherSub(X,Y)= \begin{cases} 1 & \text{if } \mbox{X is a subsequence of Y} \\ 0 & \text{if } \mbox{otherwise} \end{cases} \end{equation}\)
oNum#
\(\displaystyle oNum_i = \sum_{j=1}^nwhetherSub(KTS_i,KTS_j)\)
sameTrendNum#
\(\begin{equation} sameTrend((KTS_i, Trend_i),(KTS_j, Trend_j))= \begin{cases} 1 & \text{if } KTS_i \,\,\,\mbox{is a subsequence of} \,\,\,KTS_j \\ &\,\,\,\,\,\,\,\,\, \mbox{ and }\,\,\, Trend_i = Trend_j \\\\ 0 & \text{if } \mbox{otherwise} \end{cases} \end{equation}\)
\(\displaystyle sameTrendNum_i = \sum_{j=1}^nsameTrend(KTS_i,KTS_j)\)
PACC#
\(\displaystyle PACC_i = \frac{sameTrendNum_i}{oNum_i}\)
Data#
def get_training_test_data(stock='AMZN', start='2019-1-1', end='2021-1-31', training_ratio=0.96):
df = yf.Ticker(stock).history(start=start, end=end)
df = df.iloc[:,:-3]
df.reset_index(inplace=True)
df['Date'] = [i.date() for i in df.Date]
df['fcc'] = [np.sign(df.Close.loc[i+1]-df.Close.loc[i]) for i in range(len(df)-1)]+[np.nan]
training_length = int(len(df)*training_ratio)
training_data = df.iloc[:training_length,:]
test_data = df.iloc[training_length:,:]
test_data.reset_index(inplace=True, drop=True)
return (training_data, test_data)
df_train, df_test = get_training_test_data()
df_train.shape, df_test.shape
((503, 6), (21, 6))
Encoding#
def encoder(hp, op, cp, lp):
if hp > op > cp > lp: return 'a'
if hp == op > cp > lp: return 'b'
if hp == op > cp == lp: return 'c'
if hp > op > cp == lp: return 'd'
if hp > cp > op > lp: return 'e'
if hp == cp > op > lp: return 'f'
if hp == cp > op == lp: return 'g'
if hp > cp > op == lp: return 'h'
if hp > op == cp > lp: return 'i'
if hp == op == cp > lp: return 'j'
if hp == op == cp == lp: return 'k'
if hp > op == cp == lp: return 'l'
def df_encoder(data):
data_ = data.copy()
encoder_list = []
for i in data_.index:
hp, op, cp, lp = data_[['High','Open', 'Close', 'Low']].loc[i]
encoder_list.append(encoder(hp, op, cp, lp))
data_['code'] = encoder_list
return data_
df_train = df_encoder(df_train)
df_test = df_encoder(df_test)
df_train.head().round(2)
Date | Open | High | Low | Close | fcc | code | |
---|---|---|---|---|---|---|---|
0 | 2019-01-02 | 73.26 | 77.67 | 73.05 | 76.96 | -1.0 | e |
1 | 2019-01-03 | 76.00 | 76.90 | 74.86 | 75.01 | 1.0 | a |
2 | 2019-01-04 | 76.50 | 79.70 | 75.92 | 78.77 | 1.0 | e |
3 | 2019-01-07 | 80.12 | 81.73 | 79.46 | 81.48 | 1.0 | e |
4 | 2019-01-08 | 83.23 | 83.83 | 80.83 | 82.83 | 1.0 | a |
Change Points#
def change_points(data):
data_ = data.copy()
change_pts, change_types = [True], ['Start']
for i in range(1, len(data_)-1):
left_change = int(np.sign(data_['Close'][i]-data_['Close'][i-1]))
right_change = int(np.sign(data_['Close'][i+1]-data_['Close'][i]))
if left_change != right_change:
change_pts.append(True)
else:
change_pts.append(False)
change_types.append((left_change, right_change))
change_pts.append(False)
change_types.append('End')
data_['change_points'] = change_pts
data_['change_types'] = change_types
return data_
df_train = change_points(df_train)
df_train.head().round(2)
Date | Open | High | Low | Close | fcc | code | change_points | change_types | |
---|---|---|---|---|---|---|---|---|---|
0 | 2019-01-02 | 73.26 | 77.67 | 73.05 | 76.96 | -1.0 | e | True | Start |
1 | 2019-01-03 | 76.00 | 76.90 | 74.86 | 75.01 | 1.0 | a | True | (-1, 1) |
2 | 2019-01-04 | 76.50 | 79.70 | 75.92 | 78.77 | 1.0 | e | False | (1, 1) |
3 | 2019-01-07 | 80.12 | 81.73 | 79.46 | 81.48 | 1.0 | e | False | (1, 1) |
4 | 2019-01-08 | 83.23 | 83.83 | 80.83 | 82.83 | 1.0 | a | False | (1, 1) |
Segmentation and Labeling#
def segmentation(data):
segment_list, trend_list = [], []
pattern = ''
for i in range(len(data)-1):
pattern += data.code[i]
if data.change_points.loc[i]:
segment_list.append(pattern)
j = i
while True:
if data.change_types[j+1][0] != 0: # first non-equal direction
trend_list.append(data.change_types[j+1][0])
break
j += 1
pattern = ''
df_seg = pd.DataFrame()
df_seg['kts'] = segment_list
df_seg['trend'] = trend_list
df_seg_unique = pd.DataFrame(df_seg.value_counts()).reset_index()
return df_seg_unique
PSet = segmentation(df_train)
PSet.head()
kts | trend | count | |
---|---|---|---|
0 | a | 1 | 58 |
1 | e | -1 | 47 |
2 | aa | 1 | 23 |
3 | ee | -1 | 15 |
4 | e | 1 | 9 |
PACC#
def whethersub(X, Y):
if len(X) > len(Y): return 0
else:
k = 0
for i in range(len(X)):
if X[i] in Y[k:]:
k = Y.find(X[i], k)+1
else:
return 0
return 1
def onum(pset):
pset_ = pset.copy()
onum_list = []
for i in range(len(pset)):
total = 0
for j in range(len(pset)):
total += whethersub(pset_.loc[i, 'kts'], pset_.loc[j, 'kts'])* pset_.loc[j, 'count']
onum_list.append(total)
pset_['onum'] = onum_list
return pset_
def sametrendnum(pset):
pset_ = pset.copy()
sametrendnum_list = []
for i in range(len(pset_)):
total = 0
for j in range(len(pset_)):
if whethersub(pset_.loc[i,'kts'], pset_.loc[j,'kts']) & (pset_.loc[i,'trend'] == pset_.loc[j, 'trend']):
total += pset_.loc[j, 'count']
sametrendnum_list.append(total)
pset_['sametrendnum'] = sametrendnum_list
return pset_
def pacc(pset):
pset_ = pset.copy()
pset_['pacc'] = pset_['sametrendnum'] / pset_['onum']
return pset_
PRSet = pacc(sametrendnum(onum(PSet)))
PRSet.head().round(2)
kts | trend | count | onum | sametrendnum | pacc | |
---|---|---|---|---|---|---|
0 | a | 1 | 58 | 154 | 111 | 0.72 |
1 | e | -1 | 47 | 146 | 115 | 0.79 |
2 | aa | 1 | 23 | 58 | 47 | 0.81 |
3 | ee | -1 | 15 | 60 | 56 | 0.93 |
4 | e | 1 | 9 | 146 | 31 | 0.21 |
PRSet.tail().round(2)
kts | trend | count | onum | sametrendnum | pacc | |
---|---|---|---|---|---|---|
53 | aaaea | 1 | 1 | 1 | 1 | 1.00 |
54 | aaaaa | 1 | 1 | 3 | 3 | 1.00 |
55 | eaa | 1 | 1 | 9 | 4 | 0.44 |
56 | aab | 1 | 1 | 3 | 3 | 1.00 |
57 | hee | -1 | 1 | 3 | 3 | 1.00 |
Possible Deficiencies#
Index-53: The pattern ‘aaaea’ appears only once, but its PACC score is perfect (PACC = 1).
Patterns that appear less than a specified threshold can be removed from the PSet.
The contribution of ‘ab’ to the PACC of ‘cdab’ and ‘cafffffbd’ is equal, even though ‘ab’ appears in ‘cafffffbd’ with many letters in between.
If ‘ab’ appears at the end of a sequence, the contribution of that sequence to the PACC of ‘ab’ should be greater.