缠论工具(笔, 线段)

您所在的位置:网站首页 缠论画段 缠论工具(笔, 线段)

缠论工具(笔, 线段)

2023-06-24 01:09| 来源: 网络整理| 查看: 265

前言:

在这转折性的2020, 我们有幸亲历禅师当年提出90年一个周期, 新周期代替旧周期的轮回时刻。 本人不才, 但是还是想分享一下我根据原文(顺便赚点积分), 写出的工具性程序。 就是笔, 线段部分。

研究内容

其实这部分有很多分歧, 原文确实也写的容易让人糊涂。 但是我已经尽我的全力理解, 并加以程序化。程序本身注释不多, 但是内容不少, 我在这里稍微写一个大概结构。

第一大段, 给出不少功能性函数以及常数定义

第二段, 就是笔, 线段的类定义。

标准化k线

标出顶底

根据旧笔定义删除无用的顶底

在以上过程中要随时注意缺口成笔的情况

线段

首先判断之前的线段是否有缺口

如果有缺口找特征元素的分型,如果找到新高或者新低删掉之前的分型, 如果找到新的分型看是否有缺口分别分析

如果没有缺口, 找特征元素分型, 并且看看是否有缺口 在以上过程中要注意缺口成段的情况。

第三段, 画出结果。 画图用的pyecharts, 生成html文件可以在研究中找到。

代码本身很多而且不容易懂, 但是愿有缘人得之。

结尾

我想顺便说说我的心得, 网上很多学习缠论的人, 都在说笔, 线段无用。 其实个人愚见, 笔, 线段是缠师在总结出自己的理论后想给大家一个比较标准的工具,帮助理解他说的内容。 在他的博文里面, 他自己也承认, 他给出的笔, 线段定义, 不是唯一的。 因为只要是符合缠论精髓的方式, 都可以。 缠论的精髓, 我认为在博文当中能明确分辨出: 中枢(走势终完美) 背驰(走势力竭) 区间套(递归的量变成质变) 但是缠师未能有时间详细解释例如周期, 成交量的用法。 也没能阐述除了技术面, 的缠论基本面, 比价的另外两层分析方式。 虽然遗憾, 但是如果什么都说了, 也许并不一定是好事呢?

修改:

代码 1496行 i = next_valid_elems[1] 修改为 i = next_elems[1]

2020-04-30: 研究修改包含以上代码, 同时修改一个笔缺口成线段的bug

2020-05-09: 修改缺口判断的bug, 用float_more/less 继续修改k线缺口直接成段bug, 现在差不多正确了

2020-05-11: 修复线段缺口判断bug, 同线段内如果有之前的笔关闭了缺口,应视为无缺口

2020-05-14: 修复线段bug, 底比顶高, 或者反过来

2020-05-15: 修复笔缺口bug。 stock = '300436.XSHE' end_time= '2017-12-27 13:30:00'

2020-05-18: 修复线段在当下的判断, 特定条件下不做预测。

2020-05-25: 修改缺口判断逻辑, 缺口成段更合理

2020-06-01: 修改线段当下判断。 对之前的条件增加可能性。

2020-06-29: 修改线段当下判断,现在会对最新的走势作出可能的线段预判。同时修改在有缺口情况下, 发现新高或者新低不作为的bug。

import numpy as np import copy import talib from enum import Enum from numpy.lib.recfunctions import append_fields from scipy.ndimage.interpolation import shift class InclusionType(Enum): # output: 0 = no inclusion, 1 = first contains second, 2 second contains first noInclusion = 0 firstCsecond = 2 secondCfirst = 3 class TopBotType(Enum): noTopBot = 0 bot2top = 0.5 top = 1 top2bot = -0.5 bot = -1 @classmethod def reverse(cls, tp): if tp == cls.top: return cls.bot elif tp == cls.bot: return cls.top elif tp == cls.top2bot: return cls.bot2top elif tp == cls.bot2top: return cls.top2bot else: return cls.noTopBot @classmethod def value2type(cls, val): if val == 0: return cls.noTopBot elif val == 0.5: return cls.bot2top elif val == 1: return cls.top elif val == -0.5: return cls.top2bot elif val == -1: return cls.bot else: return cls.noTopBot ######################## common method ############################### def float_less(a, b): return a < b and not np.isclose(a, b) def float_more(a, b): return a > b and not np.isclose(a, b) def float_less_equal(a, b): return a < b or np.isclose(a, b) def float_more_equal(a, b): return a > b or np.isclose(a, b) ######################## kBarprocessor ############################# GOLDEN_RATIO = 0.618 MIN_PRICE_UNIT=0.01 FEN_BI_COLUMNS = ['date', 'close', 'high', 'low', 'tb', 'real_loc'] FEN_DUAN_COLUMNS = ['date', 'close', 'high', 'low', 'chan_price', 'tb', 'xd_tb', 'real_loc'] def gap_range_func(a): if float_more(a['low'] - a['high_s1'], MIN_PRICE_UNIT): return [a['high_s1'], a['low']] elif float_less(a['high'] - a['low_s1'], -MIN_PRICE_UNIT): return [a['high'], a['low_s1']] else: return [0, 0] def get_previous_loc(loc, working_df): i = loc - 1 while i >= 0: if working_df[i]['tb'] == TopBotType.top.value or working_df[i]['tb'] == TopBotType.bot.value: return i else: i = i - 1 return None def get_next_loc(loc, working_df): i = loc + 1 while i < len(working_df): if working_df[i]['tb'] == TopBotType.top.value or working_df[i]['tb'] == TopBotType.bot.value: return i else: i = i + 1 return None class KBarChan(object): ''' This is a rewrite of KBarProcessor, that one is too slow!! we used pandas dataframe, we should use numpy array! df=False flag is used here ''' def __init__(self, kDf, isdebug=False, clean_standardzed=False): self.isdebug = isdebug self.clean_standardzed = clean_standardzed self.kDataFrame_origin = kDf self.kDataFrame_standardized = copy.deepcopy(kDf) self.kDataFrame_standardized = append_fields(self.kDataFrame_standardized, ['new_high', 'new_low', 'trend_type', 'real_loc'], [ [0]*len(self.kDataFrame_standardized), [0]*len(self.kDataFrame_standardized), [TopBotType.noTopBot.value]*len(self.kDataFrame_standardized), [i for i in range(len(self.kDataFrame_standardized))] ], [float, float, int, int], usemask=False) self.kDataFrame_marked = None self.kDataFrame_xd = None self.gap_XD = [] self.previous_skipped_idx = [] self.previous_with_xd_gap = False # help to check current gap as XD if self.isdebug: print("self.kDataFrame_origin head:{0}".format(self.kDataFrame_origin[:10])) print("self.kDataFrame_origin tail:{0}".format(self.kDataFrame_origin[-10:])) def checkInclusive(self, first, second): # output: 0 = no inclusion, 1 = first contains second, 2 second contains first isInclusion = InclusionType.noInclusion first_high = first['high'] if first['new_high']==0 else first['new_high'] second_high = second['high'] if second['new_high']==0 else second['new_high'] first_low = first['low'] if first['new_low']==0 else first['new_low'] second_low = second['low'] if second['new_low']==0 else second['new_low'] if float_less_equal(first_high, second_high) and float_more_equal(first_low, second_low): isInclusion = InclusionType.firstCsecond elif float_more_equal(first_high, second_high) and float_less_equal(first_low, second_low): isInclusion = InclusionType.secondCfirst return isInclusion def isBullType(self, first, second): # this is assuming first second aren't inclusive f_high = first['high'] if first['new_high']==0 else first['new_high'] s_high = second['high'] if second['new_high']==0 else second['new_high'] return TopBotType.bot2top if float_less(f_high, s_high) else TopBotType.top2bot def standardize(self, initial_state=TopBotType.noTopBot): # 1. We need to make sure we start with first two K-bars without inclusive relationship # drop the first if there is inclusion, and check again if initial_state == TopBotType.top or initial_state == TopBotType.bot: # given the initial state, make the first two bars non-inclusive, # the first bar is confirmed as pivot, anything followed with inclusive relation # will be merged into the first bar while len(self.kDataFrame_standardized) > 2: first_Elem = self.kDataFrame_standardized[0] second_Elem = self.kDataFrame_standardized[1] if self.checkInclusive(first_Elem, second_Elem) != InclusionType.noInclusion: if initial_state == TopBotType.bot: self.kDataFrame_standardized[0]['new_high'] = second_Elem['high'] self.kDataFrame_standardized[0]['new_low'] = first_Elem['low'] elif initial_state == TopBotType.top: self.kDataFrame_standardized[0]['new_high'] = first_Elem['high'] self.kDataFrame_standardized[0]['new_low'] = second_Elem['low'] self.kDataFrame_standardized=np.delete(self.kDataFrame_standardized, 1, axis=0) else: self.kDataFrame_standardized[0]['new_high'] = first_Elem['high'] self.kDataFrame_standardized[0]['new_low'] = first_Elem['low'] break else: while len(self.kDataFrame_standardized) > 2: first_Elem = self.kDataFrame_standardized[0] second_Elem = self.kDataFrame_standardized[1] if self.checkInclusive(first_Elem, second_Elem) != InclusionType.noInclusion: self.kDataFrame_standardized=np.delete(self.kDataFrame_standardized, 0, axis=0) else: self.kDataFrame_standardized[0]['new_high'] = first_Elem['high'] self.kDataFrame_standardized[0]['new_low'] = first_Elem['low'] break # 2. loop through the whole data set and process inclusive relationship pastElemIdx = 0 firstElemIdx = pastElemIdx+1 secondElemIdx = firstElemIdx+1 high='high' low='low' new_high = 'new_high' new_low = 'new_low' trend_type = 'trend_type' while secondElemIdx < len(self.kDataFrame_standardized): # xrange pastElem = self.kDataFrame_standardized[pastElemIdx] firstElem = self.kDataFrame_standardized[firstElemIdx] secondElem = self.kDataFrame_standardized[secondElemIdx] inclusion_type = self.checkInclusive(firstElem, secondElem) if inclusion_type != InclusionType.noInclusion: trend = firstElem[trend_type] if firstElem[trend_type]!=TopBotType.noTopBot.value else self.isBullType(pastElem, firstElem).value compare_func = max if np.isclose(trend, TopBotType.bot2top.value) else min if inclusion_type == InclusionType.firstCsecond: self.kDataFrame_standardized[secondElemIdx][new_high]=compare_func(firstElem[high] if firstElem[new_high]==0 else firstElem[new_high], secondElem[high] if secondElem[new_high]==0 else secondElem[new_high]) self.kDataFrame_standardized[secondElemIdx][new_low]=compare_func(firstElem[low] if firstElem[new_low]==0 else firstElem[new_low], secondElem[low] if secondElem[new_low]==0 else secondElem[new_low]) self.kDataFrame_standardized[secondElemIdx][trend_type] = trend self.kDataFrame_standardized[firstElemIdx][new_high]=0 self.kDataFrame_standardized[firstElemIdx][new_low]=0 ############ manage index for next round ########### firstElemIdx = secondElemIdx secondElemIdx += 1 else: self.kDataFrame_standardized[firstElemIdx][new_high]=compare_func(firstElem[high] if firstElem[new_high]==0 else firstElem[new_high], secondElem[high] if secondElem[new_high]==0 else secondElem[new_high]) self.kDataFrame_standardized[firstElemIdx][new_low]=compare_func(firstElem[low] if firstElem[new_low]==0 else firstElem[new_low], secondElem[low] if secondElem[new_low]==0 else secondElem[new_low]) self.kDataFrame_standardized[firstElemIdx][trend_type]=trend self.kDataFrame_standardized[secondElemIdx][new_high]=0 self.kDataFrame_standardized[secondElemIdx][new_low]=0 ############ manage index for next round ########### secondElemIdx += 1 else: if firstElem[new_high] == 0: self.kDataFrame_standardized[firstElemIdx][new_high] = firstElem[high] if firstElem[new_low] == 0: self.kDataFrame_standardized[firstElemIdx][new_low] = firstElem[low] if secondElem[new_high] == 0: self.kDataFrame_standardized[secondElemIdx][new_high] = secondElem[high] if secondElem[new_low] == 0: self.kDataFrame_standardized[secondElemIdx][new_low] = secondElem[low] ############ manage index for next round ########### pastElemIdx = firstElemIdx firstElemIdx = secondElemIdx secondElemIdx += 1 # clean up self.kDataFrame_standardized[high] = self.kDataFrame_standardized[new_high] self.kDataFrame_standardized[low] = self.kDataFrame_standardized[new_low] self.kDataFrame_standardized=self.kDataFrame_standardized[['date', 'close', 'high', 'low', 'real_loc']] # remove standardized kbars self.kDataFrame_standardized = self.kDataFrame_standardized[self.kDataFrame_standardized['high']!=0] # new index add for later distance calculation => straight after standardization self.kDataFrame_standardized = append_fields(self.kDataFrame_standardized, 'new_index', [i for i in range(len(self.kDataFrame_standardized))], usemask=False) return self.kDataFrame_standardized def checkTopBot(self, current, first, second): if float_more(first['high'], current['high']) and float_more(first['high'], second['high']): return TopBotType.top elif float_less(first['low'], current['low']) and float_less(first['low'], second['low']): return TopBotType.bot else: return TopBotType.noTopBot def markTopBot(self, initial_state=TopBotType.noTopBot): self.kDataFrame_standardized = append_fields(self.kDataFrame_standardized, 'tb', [TopBotType.noTopBot.value]*self.kDataFrame_standardized.size, usemask=False ) if self.kDataFrame_standardized.size < 7: return tb = 'tb' if initial_state == TopBotType.top or initial_state == TopBotType.bot: felem = self.kDataFrame_standardized[0] selem = self.kDataFrame_standardized[1] if (initial_state == TopBotType.top and float_more_equal(felem['high'], selem['high'])) or \ (initial_state == TopBotType.bot and float_less_equal(felem['low'], selem['low'])): self.kDataFrame_standardized[0][tb] = initial_state.value else: if self.isdebug: print("Incorrect initial state given!!!") # This function assume we have done the standardization process (no inclusion) last_idx = 0 for idx in range(self.kDataFrame_standardized.size-2): #xrange currentElem = self.kDataFrame_standardized[idx] firstElem = self.kDataFrame_standardized[idx+1] secondElem = self.kDataFrame_standardized[idx+2] topBotType = self.checkTopBot(currentElem, firstElem, secondElem) if topBotType != TopBotType.noTopBot: self.kDataFrame_standardized[idx+1][tb] = topBotType.value last_idx = idx+1 # mark the first kbar if (self.kDataFrame_standardized[0][tb] != TopBotType.top.value and\ self.kDataFrame_standardized[0][tb] != TopBotType.bot.value): first_loc = get_next_loc(0, self.kDataFrame_standardized) if first_loc is not None: first_tb = TopBotType.value2type(self.kDataFrame_standardized[first_loc][tb]) self.kDataFrame_standardized[0][tb] = TopBotType.reverse(first_tb).value # mark the last kbar last_tb = TopBotType.value2type(self.kDataFrame_standardized[last_idx][tb]) self.kDataFrame_standardized[-1][tb] = TopBotType.reverse(last_tb).value if self.isdebug: print("mark topbot on self.kDataFrame_standardized[20]:{0}".format(self.kDataFrame_standardized[:20])) def trace_back_index(self, working_df, previous_index): # find the closest FenXing with top/bot backwards from previous_index idx = previous_index-1 while idx >= 0: fx = working_df[idx] if fx['tb'] == TopBotType.noTopBot.value: idx -= 1 continue else: return idx if self.isdebug: print("We don't have previous valid FenXing") return None def prepare_original_kdf(self): if 'macd' not in self.kDataFrame_origin.dtype.names: _, _, macd = talib.MACD(self.kDataFrame_origin['close']) macd[np.isnan(macd)] = 0 self.kDataFrame_origin = append_fields(self.kDataFrame_origin, 'macd', macd, float, usemask=False) def gap_exists_in_range(self, start_idx, end_idx): # end_idx included # no need to drop first row, simple don't use = gap_working_df = self.kDataFrame_origin[(start_idx < self.kDataFrame_origin['date']) & (self.kDataFrame_origin['date'] = 4 new_index distance, we make decision on which one to remove, and go backwards if we hit all good three kbars, we check marked record and start from there finally, if we hit end but still have marked index, we make decision to remove one kbar and resume till finishes ''' self.gap_exists() # work out gap in the original kline working_df = self.kDataFrame_standardized[self.kDataFrame_standardized['tb']!=TopBotType.noTopBot.value] tb = 'tb' high = 'high' low = 'low' new_index = 'new_index' working_df = self.clean_first_two_tb(working_df) ################################# previous_index = 0 current_index = previous_index + 1 next_index = current_index + 1 ################################# while next_index < working_df.size and previous_index is not None and next_index is not None: previousFenXing = working_df[previous_index] currentFenXing = working_df[current_index] nextFenXing = working_df[next_index] if currentFenXing[tb] == previousFenXing[tb]: # used to track back the skipped previous_index if currentFenXing[tb] == TopBotType.top.value: if float_less(currentFenXing[high], previousFenXing[high]): previous_index, current_index, next_index = self.same_tb_remove_current(working_df, previous_index, current_index, next_index) elif float_more(currentFenXing[high], previousFenXing[high]): previous_index, current_index, next_index = self.same_tb_remove_previous(working_df, previous_index, current_index, next_index) else: # equal case gap_qualify = self.check_gap_qualify(working_df, previous_index, current_index, next_index) # only remove current if it's not valid with next in case of equality if working_df[next_index][new_index] - working_df[current_index][new_index] < 4 and not gap_qualify: previous_index, current_index, next_index = self.same_tb_remove_current(working_df, previous_index, current_index, next_index) else: previous_index, current_index, next_index = self.same_tb_remove_previous(working_df, previous_index, current_index, next_index) continue elif currentFenXing[tb] == TopBotType.bot.value: if float_more(currentFenXing[low], previousFenXing[low]): previous_index, current_index, next_index = self.same_tb_remove_current(working_df, previous_index, current_index, next_index) elif float_less(currentFenXing[low], previousFenXing[low]): previous_index, current_index, next_index = self.same_tb_remove_previous(working_df, previous_index, current_index, next_index) else:# equal case gap_qualify = self.check_gap_qualify(working_df, previous_index, current_index, next_index) # only remove current if it's not valid with next in case of equality if working_df[next_index][new_index] - working_df[current_index][new_index] < 4 and not gap_qualify: previous_index, current_index, next_index = self.same_tb_remove_current(working_df, previous_index, current_index, next_index) else: previous_index, current_index, next_index = self.same_tb_remove_previous(working_df, previous_index, current_index, next_index) continue elif currentFenXing[tb] == nextFenXing[tb]: if currentFenXing[tb] == TopBotType.top.value: if float_less(currentFenXing[high], nextFenXing[high]): previous_index, current_index, next_index = self.same_tb_remove_current(working_df, previous_index, current_index, next_index) elif float_more(currentFenXing[high], nextFenXing[high]): previous_index, current_index, next_index = self.same_tb_remove_next(working_df, previous_index, current_index, next_index) else: #equality case pre_pre_index = self.trace_back_index(working_df, previous_index) if pre_pre_index is None: previous_index, current_index, next_index = self.same_tb_remove_current(working_df, previous_index, current_index, next_index) continue gap_qualify = self.check_gap_qualify(working_df, pre_pre_index, previous_index, current_index) if working_df[current_index][new_index] - working_df[previous_index][new_index] >= 4 or gap_qualify: previous_index, current_index, next_index = self.same_tb_remove_next(working_df, previous_index, current_index, next_index) else: previous_index, current_index, next_index = self.same_tb_remove_current(working_df, previous_index, current_index, next_index) continue elif currentFenXing[tb] == TopBotType.bot.value: if float_more(currentFenXing[low], nextFenXing[low]): previous_index, current_index, next_index = self.same_tb_remove_current(working_df, previous_index, current_index, next_index) elif float_less(currentFenXing[low], nextFenXing[low]): previous_index, current_index, next_index = self.same_tb_remove_next(working_df, previous_index, current_index, next_index) else: pre_pre_index = self.trace_back_index(working_df, previous_index) if pre_pre_index is None: previous_index, current_index, next_index = self.same_tb_remove_current(working_df, previous_index, current_index, next_index) continue gap_qualify = self.check_gap_qualify(working_df, pre_pre_index, previous_index, current_index) if working_df[current_index][new_index] - working_df[previous_index][new_index] >= 4 or gap_qualify: previous_index, current_index, next_index = self.same_tb_remove_next(working_df, previous_index, current_index, next_index) else: previous_index, current_index, next_index = self.same_tb_remove_current(working_df, previous_index, current_index, next_index) continue gap_qualify = self.check_gap_qualify(working_df, previous_index, current_index, next_index) if currentFenXing[new_index] - previousFenXing[new_index] < 4: # comming from current next less than 4 new_index gap, we need to determine which ones to kill # once done we trace back if (nextFenXing[new_index] - currentFenXing[new_index]) >= 4 or gap_qualify: pre_pre_index = self.trace_back_index(working_df, previous_index) # if previous current are good, we go next if self.check_gap_qualify(working_df, pre_pre_index, previous_index, current_index): pass elif currentFenXing[tb] == TopBotType.bot.value and\ previousFenXing[tb] == TopBotType.top.value and\ nextFenXing[tb] == TopBotType.top.value: if float_more_equal(previousFenXing[high], nextFenXing[high]): # still can't make decision, but we can have idea about prepre case pre_pre_index = self.trace_back_index(working_df, previous_index) if pre_pre_index is None: working_df[current_index][tb] = TopBotType.noTopBot.value current_index = next_index next_index = self.get_next_tb(next_index, working_df) continue if pre_pre_index in self.previous_skipped_idx: self.previous_skipped_idx.remove(pre_pre_index) prepreFenXing = working_df[pre_pre_index] if float_more_equal(prepreFenXing[low], currentFenXing[low]): working_df[pre_pre_index][tb] = TopBotType.noTopBot.value temp_index = self.trace_back_index(working_df, pre_pre_index) if temp_index is None: working_df[current_index][tb] = TopBotType.noTopBot.value current_index = next_index next_index = self.get_next_tb(next_index, working_df) else: next_index = current_index current_index = previous_index previous_index = temp_index if previous_index in self.previous_skipped_idx: self.previous_skipped_idx.remove(previous_index) continue else: working_df[current_index][tb] = TopBotType.noTopBot.value current_index = previous_index previous_index = self.trace_back_index(working_df, previous_index) if previous_index in self.previous_skipped_idx: self.previous_skipped_idx.remove(previous_index) continue else: #previousFenXing[high] < nextFenXing[high] working_df[previous_index][tb] = TopBotType.noTopBot.value previous_index = self.trace_back_index(working_df, previous_index) if previous_index in self.previous_skipped_idx: self.previous_skipped_idx.remove(previous_index) continue elif currentFenXing[tb] == TopBotType.top.value and\ previousFenXing[tb] == TopBotType.bot.value and\ nextFenXing[tb] == TopBotType.bot.value: if float_less(previousFenXing[low], nextFenXing[low]): # still can't make decision, but we can have idea about prepre case pre_pre_index = self.trace_back_index(working_df, previous_index) if pre_pre_index is None: working_df[current_index][tb] = TopBotType.noTopBot.value current_index = next_index next_index = self.get_next_tb(next_index, working_df) continue if pre_pre_index in self.previous_skipped_idx: self.previous_skipped_idx.remove(pre_pre_index) prepreFenXing = working_df[pre_pre_index] if float_less_equal(prepreFenXing[high], currentFenXing[high]): working_df[pre_pre_index][tb] = TopBotType.noTopBot.value temp_index = self.trace_back_index(working_df, pre_pre_index) if temp_index is None: working_df[current_index][tb] = TopBotType.noTopBot.value current_index = next_index next_index = self.get_next_tb(next_index, working_df) else: next_index = current_index current_index = previous_index previous_index = temp_index if previous_index in self.previous_skipped_idx: self.previous_skipped_idx.remove(previous_index) continue else: working_df[current_index][tb] = TopBotType.noTopBot.value current_index = previous_index previous_index = self.trace_back_index(working_df, previous_index) if previous_index in self.previous_skipped_idx: self.previous_skipped_idx.remove(previous_index) continue else: #previousFenXing[low] >= nextFenXing[low] working_df[previous_index][tb] = TopBotType.noTopBot.value previous_index = self.trace_back_index(working_df, previous_index) if previous_index in self.previous_skipped_idx: self.previous_skipped_idx.remove(previous_index) continue else: #(nextFenXing[new_index] - currentFenXing[new_index]) < 4 and not gap_qualify: temp_index = self.get_next_tb(next_index, working_df) if temp_index == working_df.size: # we reached the end, need to go back previous_index, current_index, next_index = self.work_on_end(previous_index, current_index, next_index, working_df) else: # leave it for next round again! self.previous_skipped_idx.append(previous_index) # mark index so we come back previous_index = current_index current_index = next_index next_index = temp_index continue elif (nextFenXing[new_index] - currentFenXing[new_index]) < 4 and not gap_qualify: temp_index = self.get_next_tb(next_index, working_df) if temp_index == working_df.size: # we reached the end, need to go back previous_index, current_index, next_index = self.work_on_end(previous_index, current_index, next_index, working_df) else: # leave it for next round again! self.previous_skipped_idx.append(previous_index) # mark index so we come back previous_index = current_index current_index = next_index next_index = temp_index continue elif (nextFenXing[new_index] - currentFenXing[new_index]) >= 4 or gap_qualify: if currentFenXing[tb] == TopBotType.top.value and nextFenXing[tb] == TopBotType.bot.value and float_more(currentFenXing[high], nextFenXing[high]): pass elif currentFenXing[tb] == TopBotType.top.value and nextFenXing[tb] == TopBotType.bot.value and float_less_equal(currentFenXing[high], nextFenXing[high]): working_df[current_index][tb] = TopBotType.noTopBot.value current_index = next_index next_index = self.get_next_tb(next_index, working_df) continue elif currentFenXing[tb] == TopBotType.top.value and nextFenXing[tb] == TopBotType.bot.value and float_less_equal(currentFenXing[low],nextFenXing[low]): working_df[next_index][tb] = TopBotType.noTopBot.value next_index = self.get_next_tb(next_index, working_df) continue elif currentFenXing[tb] == TopBotType.top.value and nextFenXing[tb] == TopBotType.bot.value and float_more(currentFenXing[low], nextFenXing[low]): pass elif currentFenXing[tb] == TopBotType.bot.value and nextFenXing[tb] == TopBotType.top.value and float_less(currentFenXing[low], nextFenXing[low]): pass elif currentFenXing[tb] == TopBotType.bot.value and nextFenXing[tb] == TopBotType.top.value and float_more_equal(currentFenXing[low], nextFenXing[low]): working_df[current_index][tb] = TopBotType.noTopBot.value current_index = next_index next_index = self.get_next_tb(next_index, working_df) continue elif currentFenXing[tb] == TopBotType.bot.value and nextFenXing[tb] == TopBotType.top.value and float_less(currentFenXing[high], nextFenXing[high]): pass elif currentFenXing[tb] == TopBotType.bot.value and nextFenXing[tb] == TopBotType.top.value and float_more_equal(currentFenXing[high], nextFenXing[high]): working_df[next_index][tb] = TopBotType.noTopBot.value next_index = self.get_next_tb(next_index, working_df) continue if self.previous_skipped_idx: # if we still have some left to do previous_index = self.previous_skipped_idx.pop() if working_df[previous_index][tb] == TopBotType.noTopBot.value: previous_index = self.get_next_tb(previous_index, working_df) current_index = self.get_next_tb(previous_index, working_df) next_index = self.get_next_tb(current_index, working_df) continue # only confirmed tb comes here previous_index = current_index current_index=next_index next_index = self.get_next_tb(next_index, working_df) # if nextIndex is the last one, final clean up if next_index == working_df.size: if ((working_df[current_index][tb]==TopBotType.top.value and float_more(self.kDataFrame_origin[-1][high], working_df[current_index][high])) \ or (working_df[current_index][tb]==TopBotType.bot.value and float_less(self.kDataFrame_origin[-1][low], working_df[current_index][low]) )): working_df[-1][tb] = working_df[current_index][tb] working_df[current_index][tb] = TopBotType.noTopBot.value if working_df[current_index][tb] == TopBotType.noTopBot.value and\ ((working_df[previous_index][tb]==TopBotType.top.value and float_more(self.kDataFrame_origin[-1][high], working_df[previous_index][high])) or\ (working_df[previous_index][tb]==TopBotType.bot.value and float_less(self.kDataFrame_origin[-1][low], working_df[previous_index][low]))): working_df[-1][tb] = working_df[previous_index][tb] working_df[previous_index][tb] = TopBotType.noTopBot.value if working_df[previous_index][tb] == working_df[current_index][tb]: if working_df[current_index][tb] == TopBotType.top.value: if float_more(working_df[current_index][high],working_df[previous_index][high]): working_df[previous_index][tb] = TopBotType.noTopBot.value else: working_df[current_index][tb] = TopBotType.noTopBot.value elif working_df[current_index][tb] == TopBotType.bot.value: if float_less(working_df[current_index][low], working_df[previous_index][low]): working_df[previous_index][tb] = TopBotType.noTopBot.value else: working_df[current_index][tb] = TopBotType.noTopBot.value ################################### self.kDataFrame_marked = working_df[working_df[tb]!=TopBotType.noTopBot.value][FEN_BI_COLUMNS] if self.isdebug: print("self.kDataFrame_marked head 20:{0}".format(self.kDataFrame_marked[:20])) print("self.kDataFrame_marked tail 20:{0}".format(self.kDataFrame_marked[-20:])) def work_on_end(self, pre_idx, cur_idx, nex_idx, working_df): ''' only triggered at the end of fenbi loop ''' previousFenXing = working_df[pre_idx] currentFenXing = working_df[cur_idx] nextFenXing = working_df[nex_idx] if currentFenXing['tb'] == TopBotType.top.value: if float_more(previousFenXing['low'], nextFenXing['low']): working_df[pre_idx]['tb'] = TopBotType.noTopBot.value pre_idx = self.trace_back_index(working_df, pre_idx) else: working_df[nex_idx]['tb'] = TopBotType.noTopBot.value nex_idx = cur_idx cur_idx = pre_idx pre_idx = self.trace_back_index(working_df, pre_idx) else: # TopBotType.bot if float_less(previousFenXing['high'], nextFenXing['high']): working_df[pre_idx]['tb'] = TopBotType.noTopBot.value pre_idx = self.trace_back_index(working_df, pre_idx) else: working_df[nex_idx]['tb'] = TopBotType.noTopBot.value nex_idx = cur_idx cur_idx = pre_idx pre_idx = self.trace_back_index(working_df, pre_idx) if pre_idx in self.previous_skipped_idx: self.previous_skipped_idx.remove(pre_idx) return pre_idx, cur_idx, nex_idx def getMarkedBL(self): self.standardize() self.markTopBot() self.defineBi() # self.defineBi_new() self.getPureBi() return self.kDataFrame_marked def getPureBi(self): # only use the price relavent self.kDataFrame_marked = append_fields(self.kDataFrame_marked, 'chan_price', [0]*len(self.kDataFrame_marked), float, usemask=False) i = 0 while i < self.kDataFrame_marked.size: item = self.kDataFrame_marked[i] if item['tb'] == TopBotType.top.value: self.kDataFrame_marked[i]['chan_price'] = item['high'] elif item['tb'] == TopBotType.bot.value: self.kDataFrame_marked[i]['chan_price'] = item['low'] else: print("Invalid tb for chan_price") i = i + 1 if self.isdebug: print("getPureBi:{0}".format(self.kDataFrame_marked[['date', 'chan_price', 'tb', 'real_loc']][-20:])) def getFenBi(self, initial_state=TopBotType.noTopBot): self.standardize(initial_state) self.markTopBot(initial_state) self.defineBi() self.getPureBi() return self.kDataFrame_marked def getFenDuan(self, initial_state=TopBotType.noTopBot): temp_df = self.getFenBi(initial_state) if temp_df.size==0: return temp_df self.defineXD(initial_state) return self.kDataFrame_xd def getOriginal_df(self): return self.kDataFrame_origin def getFenBI_df(self): return self.kDataFrame_marked def getFenDuan_df(self): return self.kDataFrame_xd ################################################## XD defintion ################################################## def find_initial_direction(self, working_df, initial_status=TopBotType.noTopBot): chan_price = 'chan_price' if initial_status != TopBotType.noTopBot: # first six elem, this can only be used when we are sure about the direction of the xd if initial_status == TopBotType.top: initial_loc = working_df[:6]['chan_price'].argmax(axis=0) elif initial_status == TopBotType.bot: initial_loc = working_df[:6]['chan_price'].argmin(axis=0) else: initial_loc = None print("Invalid Initial TopBot type") working_df[initial_loc]['xd_tb'] = initial_status.value if self.isdebug: print("initial xd_tb:{0} located at {1}".format(initial_status, working_df[initial_loc]['date'])) initial_direction = TopBotType.top2bot if initial_status == TopBotType.top else TopBotType.bot2top else: initial_loc = current_loc = 0 initial_direction = TopBotType.noTopBot while current_loc + 3 < working_df.size: first = working_df[current_loc] second = working_df[current_loc+1] third = working_df[current_loc+2] forth = working_df[current_loc+3] if float_less(first[chan_price], second[chan_price]): found_direction = (float_less_equal(first[chan_price],third[chan_price]) and float_less(second[chan_price],forth[chan_price])) or\ (float_more_equal(first[chan_price],third[chan_price]) and float_more(second[chan_price],forth[chan_price])) else: found_direction = (float_less(first[chan_price],third[chan_price]) and float_less_equal(second[chan_price],forth[chan_price])) or\ (float_more(first[chan_price],third[chan_price]) and float_more_equal(second[chan_price],forth[chan_price])) if found_direction: initial_direction = TopBotType.bot2top if (float_less(first[chan_price],third[chan_price]) or float_less(second[chan_price],forth[chan_price])) else TopBotType.top2bot initial_loc = current_loc break else: current_loc = current_loc + 1 return initial_loc, initial_direction def combine_gaps(self, gap_regions): ''' gap regions come in as ordered ''' i = 0 if len(gap_regions)


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3