宽客秀

宽客秀

Quant.Show的Web3站点,Archives from quant.show

基于协整的配对交易策略

本文主要基于 Auquan 的 Pairs Trading Strategy 转载和整理(非原创),主要修改了两个地方:

  • 用 A 股(主要是银行股)替换了美股。(虽然 A 股事实上无法做空,但是用于研究是没有问题的。)
  • 增加了协整的 ADF 检验,旨在帮助读者更深刻理解协整的定义。

如此,构成比较完整的配对交易的入门资料,供大家阅读。本文的所有代码已在 JoinQuant 上运行过。


配对交易是基于数学分析的策略的一个很好的例子。

原理如下:
假设你有一对证券 X 和 Y,它们之间有某种潜在的经济联系。例如,可能是两家生产相同产品的公司,比如百事可乐和可口可乐。你期望这两者之间的价差(价格的比率或差异)随着时间保持不变。然而,偶尔,这对证券之间的价差可能会出现偏离。偏离可能是由于临时的供需变化、某一证券的大额买入 / 卖出订单、对某一公司的重要新闻的反应等。当两种证券之间出现临时偏离时,即一只股票上涨而另一只股票下跌,配对交易的策略就是做空表现良好的股票,做多表现不佳的股票,押注于这两者之间的 “价差” 最终会收敛。

配对交易是一种市场中立的交易策略,使交易者能够在几乎任何市场条件下获利:上升趋势、下降趋势或横盘整理。

我们将从构建一个人工示例开始。

1. 解释概念 —— 首先生成两个虚拟证券#

我们通过从正态分布中抽样来建模 X 的每日收益。然后我们进行累积求和,以获得 X 在每一天的值。

import numpy as np
import pandas as pd

import statsmodels
from statsmodels.tsa.stattools import coint
import matplotlib.pyplot as plt

仅为随机数生成器设置种子#

np.random.seed(107)

生成每日收益#

Xreturns = np.random.normal(0, 1, 100)

求和并将所有价格上移#

X = pd.Series(np.cumsum(Xreturns), name='X') + 50
X.plot(figsize=(15,7))
plt.show()

4a47a0db6e60853dedfcfdf08a5ca249

现在我们生成 Y。Y 应该与 X 有深厚的经济联系,因此 Y 的价格应该变化得非常相似。我们通过将 X 上移并添加一些从正态分布中抽取的随机噪声来建模这一点。

noise = np.random.normal(0, 1, 100)
Y = X + 5 + noise
Y.name = 'Y'
pd.concat([X, Y], axis=1).plot(figsize=(15,7))
plt.show()

fb5c81ed3a220004b71069645f112867

2. 协整#

协整,简单来说,是一种 “不同” 的相关性形式。如果两个序列是协整的,它们之间的比率将围绕一个均值波动。为了使配对交易在两个时间序列之间有效,随着时间的推移,期望的比率值必须收敛到均值,即它们应该是协整的。我们上面构建的时间序列是协整的。

协整性,区别于相关性,需要满足两个条件:
1)股票 X 和股票 Y 的价格对数序列是一阶单整序列。
如果股票 X 的对数价格是非平稳时间序列(毫无疑问,这是常识)& 股票 X 的简单单期收益率(对数价格相减结果可看做简单单期收益率)序列是平稳时间序列,则股票 X 的对数价格是一阶单整序列。
2)股票 X 和股票 Y 构造的回归方程,其残差序列是平稳的。
满足以上两个条件,股票 X 和股票 Y 可以说具有协整性。

高相关性并不一定意味着协整性。即使两个股票的相关性是差不多的,但协整关系的概率差别比较大。

(Y/X).plot(figsize=(15,7))
plt.axhline((Y/X).mean(), color='red', linestyle='--')
plt.xlabel (' 时间 ')
plt.legend ([' 价格比率 ', ' 均值 '])
plt.show()

10fb15c77258a991b0028080a64fb42d

3. 测试协整性#

方法 1:按照定义去判断是否具有协整性。#

首先,验证股票 X 和股票 Y 的对数价格是否非平稳序列。

平稳性检验,我们用 ADF 检验:

  • 原假设(Null Hypothesis):有单位根(unit root),即非平稳
  • 备择假设(Alternative Hypothesis):无单位根,即平稳

ADF(y,lags,trend,max_lags,method)函数说明:

(参考《量化投资以 Python 为工具》P338)

  • 参数 lags 的值确定检验模型。lags 的值表示一阶差分的滞后阶数。
  • 检验结果的 Test Statistic 值和 CriticalValues 值进行比较,落在哪个区间。小于显著性水平(1% 或 5%)的话,则拒绝原假设。(或者只看 p-value 即可,p-value 足够小,就拒绝原假设。)

stock_list = ['600015.XSHG', '601169.XSHG']
sh = get_price(stock_list, start_date="2019-01-01", end_date="2020-04-30", frequency="daily", fields=['close'])['close']
PAf=sh['600015.XSHG']
PBf=sh['601169.XSHG']
PAflog=np.log(PAf)
PBflog=np.log(PBf)

from arch.unitroot import ADF
adfA=ADF(PAflog)
print(adfA.summary().as_text())

Augmented Dickey-Fuller Results#

Test Statistic -0.972
P-value 0.763
Lags 0#

Trend: Constant
Critical Values: -3.45 (1%), -2.87 (5%), -2.57 (10%)
Null Hypothesis: The process contains a unit root.
Alternative Hypothesis: The process is weakly stationary.

从上述结果来看,Test Statistic=-0.972,大于原假设分布下的 1、5、10 分位数,从而不能拒绝原假设,进而说明 600015(华夏银行)的对数价格序列是非平稳的。同理,601169 (北京银行) 的对数价格序列也是非平稳的(验证略)。

第二步,验证股票 X 和股票 Y 的简单单期收益率序列是否平稳。

retA=PAflog.diff()[1:]
adfretA=ADF(retA)
print(adfretA.summary().as_text())

Augmented Dickey-Fuller Results#

Test Statistic -17.403
P-value 0.000
Lags 0#

Trend: Constant
Critical Values: -3.45 (1%), -2.87 (5%), -2.57 (10%)
Null Hypothesis: The process contains a unit root.
Alternative Hypothesis: The process is weakly stationary.

从上述结果来看,600015(华夏银行)的对数价格的差分(即简单单期收益率)序列是平稳的。同理,601169 (北京银行) 的简单单期收益率序列也是平稳的(验证略)。

最后,验证股票 X 和股票 Y 构造的回归方程,其残差序列是平稳的。

import statsmodels.api as sm

model = sm.OLS(PBflog, sm.add_constant(PAflog))
results = model.fit()
print(results.summary())

d2b5ca33bd970f64a6301fa75ae2eb22

#回归截距项
alpha=results.params[0]

#提取回归系数
beta=results.params[1]

#残差
spread=PBflog-beta*PAflog-alpha

#绘制残差序列时序图
spread.plot()
plt.title (' 价差序列 ')

09dd8c2662b96ce14928333f055c5580

#对残差做平稳性检验
adfSpread = ADF (spread,trend='nc') #残差均值为 0,所以 trend 设为 nc
print(adfSpread.summary().as_text())

Augmented Dickey-Fuller Results#

Test Statistic -3.555
P-value 0.000
Lags 3#

Trend: No Trend
Critical Values: -2.57 (1%), -1.94 (5%), -1.62 (10%)
Null Hypothesis: The process contains a unit root.
Alternative Hypothesis: The process is weakly stationary.

从上述结果可知,残差是平稳的。

至此为止,我们可以说股票 X 和股票 Y 具有协整关系。

方法 2:有一个方便的测试在 statsmodels.tsa.stattools 中。我们应该看到一个非常低的 p 值,因为我们人为地创建了两个尽可能协整的序列。#

# 计算协整检验的 p 值

将告知我们两个时间序列之间的比率是否围绕其均值平稳#

score, pvalue, _ = coint(X,Y)

看 p-value 即可,p-value 足够小,即可认为股票 X 和股票 Y 具有协整关系。(是不是觉得方法 2 比方法 1 简单多了~~不过,方法 1 更帮助理解协整的概念)

4. 如何进行配对交易?#

因为两个协整的时间序列(如上面的 600015.XSHG 和 601169.XSHG)会相互漂移,因此会有价差高和价差低的时刻。我们通过买入一种证券并卖出另一种证券来进行配对交易。这样,如果两种证券一起下跌或一起上涨,我们既不赚钱也不亏钱 —— 我们是市场中立的。

回到上面的 X 和 Y,遵循 Y = ⍺ X + e,使得比率 (Y/X) 围绕其均值⍺波动,我们通过两者的比率回归均值来赚钱。为了做到这一点,我们将观察 X 和 Y 相距较远的时刻,即⍺过高或过低:

  • 做多比率 当比率⍺低于通常水平时,我们期望它会增加。在上面的例子中,我们通过买入 Y 并卖出 X 来押注。
  • 做空比率 当比率⍺较大时,我们期望它会变小。在上面的例子中,我们通过卖出 Y 并买入 X 来押注。

注意,我们始终保持 “对冲头寸”:做空头寸在所卖证券贬值时获利,做多头寸在证券增值时获利,因此我们对整体市场的波动免疫。只有当证券 X 和 Y 相对彼此移动时,我们才会赚或亏钱。

5. 使用数据寻找表现相似的证券#

最好的方法是从你怀疑可能协整的证券开始,并进行统计检验。如果你只是对所有配对进行统计检验,你将会陷入多重比较偏差。

多重比较偏差是指在进行许多检验时,错误生成显著 p 值的机会增加,因为我们进行了大量的检验。如果在随机数据上进行了 100 个检验,我们应该期望看到 5 个 p 值低于 0.05。如果你在 n 个证券中进行协整比较,你将进行 n (n-1)/2 次比较,并且你应该期望看到许多错误显著的 p 值,随着你增加比较次数而增加。为了避免这种情况,选择少量你有理由怀疑可能协整的配对,并逐个进行检验。这将减少多重比较偏差的暴露。

所以让我们尝试找到一些显示协整的证券。我们将使用一篮子中国的银行股票,如 “600000.XSHG”、“600015.XSHG”、“600016.XSHG”、“600036.XSHG”、“601009.XSHG” 等。这些股票在相似的领域运营,可能具有协整的价格。我们扫描证券列表并测试所有配对之间的协整。它返回一个协整检验分数矩阵、一个 p 值矩阵,以及任何 p 值小于 0.05 的配对。此方法容易受到多重比较偏差的影响,实际上证券应经过第二次验证步骤。为了这个例子,我们忽略这一点。

def find_cointegrated_pairs(data):
# data 的长度
n = data.shape[1]
# 初始化
score_matrix = np.zeros((n, n))
pvalue_matrix = np.ones((n, n))
# 抽取列的名称
keys = data.keys()
# 初始化强协整组
pairs = []
for i in range(n):
for j in range(i+1, n):
S1 = data[keys[i]]
S2 = data[keys[j]]
result = coint(S1, S2)
score = result[0]
pvalue = result[1]
score_matrix[i, j] = score
pvalue_matrix[i, j] = pvalue
if pvalue < 0.05:
pairs.append((keys[i], keys[j]))
return score_matrix, pvalue_matrix, pairs

stock_list = ["600000.XSHG", "600015.XSHG", "600016.XSHG", "600036.XSHG", "601009.XSHG","601166.XSHG", "601169.XSHG", "601328.XSHG", "601398.XSHG", "601988.XSHG", "601998.XSHG"]
data = get_price(stock_list, start_date="2019-01-01", end_date="2020-04-30", frequency="daily", fields=["close"])["close"]

热图显示每对股票之间的协整检验 p 值#

scores, pvalues, pairs = find_cointegrated_pairs(data)
import seaborn
m = [0,0.2,0.4,0.6,0.8,1]
seaborn.heatmap(pvalues,
xticklabels=stock_list,
yticklabels=stock_list,
cmap='RdYlGn_r' ,
mask = (pvalues >= 0.98))
plt.show()
print(pairs)

8266e4bfeda1bd42d8f9794eb4ea0a13

[('600015.XSHG', '601169.XSHG'), ('600036.XSHG', '601328.XSHG'), ('601166.XSHG', '601398.XSHG')]

看起来 '600015.XSHG' 和 '601169.XSHG' 是协整的。让我们看看价格,以确保没有异常情况发生。

S1 = data['600015.XSHG']
S2 = data['601169.XSHG']
score, pvalue, _ = coint(S1, S2)
print(pvalue)
ratios = S1 / S2
ratios.plot(figsize=(15,7))
plt.axhline(ratios.mean())
plt.legend ([' 价格比率 '])
plt.show()

f19c9085129709ee14d013be869df69b

这个比率确实看起来围绕一个稳定的均值波动。绝对比率在统计学上并不是很有用。通过将其视为 z-score 来规范化我们的信号更有帮助。Z 分数定义为:

Z Score (Value) = (Value — Mean) / Standard Deviation

警告
在实践中,通常这样做是为了给数据提供某种规模,但这假设了一个潜在的分布。通常是正态分布。然而,许多金融数据并不是正态分布的,我们必须非常小心,不要简单地假设正态性或任何特定分布在生成统计时。比率的真实分布可能是非常厚尾的,容易受到极端值的影响,从而破坏我们的模型并导致巨大的损失。

def zscore(series):
return (series - series.mean()) / np.std(series)

zscore(ratios).plot(figsize=(15,7))
plt.axhline(zscore(ratios).mean(), color='black')
plt.axhline(1.0, color='red', linestyle='--')
plt.axhline(-1.0, color='green', linestyle='--')
plt.legend ([' 比率 z-score', ' 均值 ', '+1', '-1'])
plt.show()

9eb9cd58b9ea5e04c890326b5c1f471f

6. 简单策略:#

  • 当 z-score 低于 - 1.0 时,做 “多” 比率
  • 当 z-score 高于 1.0 时,做 “空” 比率
  • 当 z-score 接近零时,退出头寸

这只是冰山一角,仅仅是一个非常简单的例子来说明这些概念。

  • 在实践中,你会想要计算一个更优的权重,以确定持有 S1 和 S2 的股份数量
  • 你还会想要使用不断更新的统计数据进行交易。

首先将数据分为 70% 的训练集和 30% 的测试集。

ratios = data['600015.XSHG'] / data['601169.XSHG']
print(len(ratios))
le = int(len(ratios)*7/10)
train = ratios[]
test = ratios[le:]

通常,对整个样本大小进行统计可能是有害的。例如,如果市场在上涨,并且两种证券也随之上涨,那么你过去 3 年的平均价格可能并不能代表今天的情况。因此,交易者通常使用依赖于最近数据的滚动窗口的统计数据。

而不是使用比率值,让我们使用 5 日移动平均来计算 z 分数,以及 60 日移动平均和 60 日标准差作为均值和标准差。

ratios_mavg5 = train.rolling(window=5,center=False).mean()
ratios_mavg60 = train.rolling(window=60,center=False).mean()
std_60 = train.rolling(window=60,center=False).std()
zscore_60_5 = (ratios_mavg5 - ratios_mavg60)/std_60

plt.figure(figsize=(15,7))
plt.plot(train.index, train.values)
plt.plot(ratios_mavg5.index, ratios_mavg5.values)
plt.plot(ratios_mavg60.index, ratios_mavg60.values)
plt.legend ([' 比率 ','5 日比率移动平均 ', '60 日比率移动平均 '])
plt.ylabel (' 比率 ')
plt.show()

602e8f042f463dc47ebfdf6a94ed5a6d

我们可以使用移动平均来计算每个给定时间的比率的 z 分数。这将告诉我们比率的极端程度,以及此时进入头寸是否是个好主意。让我们现在看看 z 分数。

# 取滚动 60 天标准差
std_60 = train.rolling(window=60,center=False).std()
std_60.name = '60 天标准差'

计算每一天的 z 分数#

zscore_60_5 = (ratios_mavg5 - ratios_mavg60)/std_60
zscore_60_5.name = 'z-score'

plt.figure(figsize=(15,7))
zscore_60_5.plot()
plt.axhline(0, color='black')
plt.axhline(1.0, color='red', linestyle='--')
plt.axhline(-1.0, color='green', linestyle='--')
plt.legend ([' 滚动比率 z - 分数 ', ' 均值 ', '+1', '-1'])
plt.show()

d2b5ca33bd970f64a6301fa75ae2eb22 1

z 分数在没有上下文的情况下并没有太大意义,让我们将其与价格一起绘制,以了解它的样子。

# 绘制比率和来自 z 分数的买入和卖出信号
plt.figure(figsize=(15,7))

train[60:].plot()
buy = train.copy()
sell = train.copy()
buy[zscore_60_5>-1] = 0
sell[zscore_60_5<1] = 0
buy[60:].plot(color='g', linestyle='None', marker='^')
sell[60:].plot(color='r', linestyle='None', marker='^')
x1,x2,y1,y2 = plt.axis()
plt.axis((x1,x2,ratios.min(),ratios.max()))
plt.legend ([' 比率 ', ' 买入信号 ', ' 卖出信号 '])
plt.show()

d2b5ca33bd970f64a6301fa75ae2eb22 2

这对我们正在交易的实际股票意味着什么?让我们看看。

# 绘制价格和来自 z 分数的买入和卖出信号
plt.figure(figsize=(18,9))
S1 = data['600015.XSHG'].iloc[]
S2 = data['601169.XSHG'].iloc[]

S1[60:].plot(color='b')
S2[60:].plot(color='c')
buyR = 0*S1.copy()
sellR = 0*S1.copy()

当买入比率时,买入 S1 并卖出 S2#

buyR[buy!=0] = S1[buy!=0]
sellR[buy!=0] = S2[buy!=0]

当卖出比率时,卖出 S1 并买入 S2#

buyR[sell!=0] = S2[sell!=0]
sellR[sell!=0] = S1[sell!=0]

buyR[60:].plot(color='g', linestyle='None', marker='^')
sellR[60:].plot(color='r', linestyle='None', marker='^')
x1,x2,y1,y2 = plt.axis()
plt.axis((x1,x2,min(S1.min(),S2.min())*0.95,max(S1.max(),S2.max())*1.05))

plt.legend ([' 华夏银行 ',' 北京银行 ', ' 买入信号 ', ' 卖出信号 '])
plt.show()

d2b5ca33bd970f64a6301fa75ae2eb22 3

注意,我们有时在短腿上获利,有时在长腿上获利,有时两者都有。

让我们看看这个信号可以产生什么样的利润。我们编写一个简单的回测器,当比率较低时买入 1 个比率(买入 1 个 600015.XSHG 股票并卖出比率 x 601169.XSHG 股票),当比率较高时卖出 1 个比率(卖出 1 个 600015.XSHG 股票并买入比率 x 601169.XSHG 股票),并计算这些交易的盈亏。

def trade(S1, S2, window1, window2):
# 如果窗口长度为 0,算法没有意义,因此退出
if (window1 == 0) or (window2 == 0):
return 0
# 计算滚动均值和滚动标准差
ratios = S1/S2
ma1 = ratios.rolling(window=window1,center=False).mean()
ma2 = ratios.rolling(window=window2,center=False).mean()
std = ratios.rolling(window=window2,center=False).std()
zscore = (ma1 - ma2)/std
# 模拟交易
# 从没有资金和没有头寸开始
money = 0
countS1 = 0
countS2 = 0
for i in range(len(ratios)):
# 如果 z-score > 1,则做空
if zscore[i] > 1:
money += S1[i] - S2[i] * ratios[i]
countS1 -= 1
countS2 += ratios[i]
# 如果 z-score < -1,则做多
elif zscore[i] < -1:
money -= S1[i] - S2[i] * ratios[i]
countS1 += 1
countS2 -= ratios[i]
# 如果 z-score 在 -.5 和.5 之间,则清算头寸
elif abs(zscore[i]) < 0.5:
money += countS1*S1[i] + S2[i] * countS2
countS1 = 0
countS2 = 0

print('Z-score: '+ str(zscore[i]), countS1, countS2, S1[i] , S2[i])#

return money

trade(data['600015.XSHG'].iloc[], data['601169.XSHG'].iloc[], 5, 60)

这个策略似乎是有利可图的!(result=12.716) 现在我们可以通过改变我们的移动平均窗口、改变买入 / 卖出和退出头寸的阈值等进一步优化,并检查验证数据上的性能改进。我们还可以尝试更复杂的模型,如逻辑回归、SVM 等来进行 1/-1 预测。

让我们看看它在测试数据上的表现(result=1.487)

trade(data['600015.XSHG'].iloc[le:], data['601169.XSHG'].iloc[le:], 5, 60)

7. 避免过拟合#

过拟合是交易策略最危险的陷阱。在我们的模型中,我们使用了滚动参数估计,并可能希望优化窗口长度。我们可以简单地遍历所有可能的合理窗口长度,并根据我们的模型表现最佳的长度进行选择。下面我们编写一个简单的循环,根据训练数据的盈亏来评分窗口长度,并找到最佳的结果是 58。

# 找到窗口长度 0-254

该长度使用此策略可获得最高回报#

length_scores = [trade(data['600015.XSHG'].iloc[],
data['601169.XSHG'].iloc[], 5, l)
for l in range(255)]
best_length = np.argmax(length_scores)
print (' 最佳窗口长度:', best_length)

现在我们检查模型在测试数据上的表现,发现这个窗口长度与最佳的不同!这是因为我们最初的选择显然是过拟合到样本数据。

# 找到测试数据的回报

使用我们认为是最佳窗口长度#

length_scores2 = [trade(data['600015.XSHG'].iloc[le:],
data['601169.XSHG'].iloc[le:],5, l)
for l in range(255)]

用上述计算得出的 best_length 来计算 length_scores2#

print (best_length, ' 天窗口:', length_scores2 [best_length])

基于此数据集找到最佳窗口长度,#

并使用此窗口长度的回报#

best_length2 = np.argmax(length_scores2)
print (best_length2, ' 天窗口:', length_scores2 [best_length2])

我们可以看到上述的第一个结果是基于之前计算的 58,得出 length_scores2=1.487, 而实际最佳的结果是 best_length=52,对应的 length_scores2=2.09。

我们可以通过分别绘制训练和测试数据的窗口长度的盈亏来看到这一点。

plt.figure(figsize=(15,7))
plt.plot(length_scores)
plt.plot(length_scores2)
plt.xlabel (' 窗口长度 ')
plt.ylabel (' 得分 ')
plt.legend ([' 训练 ', ' 测试 '])
plt.show()

d2b5ca33bd970f64a6301fa75ae2eb22 4

参考文献:

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。