基于期权定价的兴全合润基金交易策略

1. 兴全合润期权结构分析

``````from matplotlib import pyplot
import numpy as np
import pandas as pd
import seaborn as sns
def AReturn(base):
if base < 1.21:
return 1.0
else:
return base - 0.21

def BReturn(base):
if base < 0.5:
return 0.1 / 0.6
elif base < 1.21:
return (base - 0.4) / 0.6
else:
return base + 0.14

xspace = np.linspace(0.2, 1.5, 40)

aSeries = [AReturn(x) for x in xspace]
bSeries = [BReturn(x) for x in xspace]

pyplot.figure(figsize=(12,8))
pyplot.plot(xspace, xspace, '-k')
pyplot.plot(xspace, aSeries, '-.k')
pyplot.plot(xspace, bSeries, '--k')
pyplot.xlim((0.2,1.5))
pyplot.legend(['NAV', 'A', 'B'], loc = 'best', fontsize = 16)

<matplotlib.legend.Legend at 0x64f2d10>
``````

• A份额买入一份行权价为1.21元的期权

• B份额买入5/3份行权价为0的看涨期权，同时卖出2/3份行权价为1.21元的看涨期权

``````# 导入需要的模块
from CAL.PyCAL import *

# 读入外部行情数据

riskFree = data['Shibor 3M'] / 100.0
maturity = data['Maturity']
spot = data['163406.OFCN']
ATarget = data['150016.XSHE']
BTarget = data['150017.XSHE']

def AOptionPrice(vol, riskFree, maturity, spot):
price1 = BSMPrice(1, 1.21, spot, riskFree, 0.0, vol[0], maturity, rawOutput = True)
return  1.0*np.exp(-riskFree*maturity) + price1[0]

def BOptionPrice(vol, riskFree, maturity, spot):
price1 = BSMPrice(1, 1.21, spot, riskFree, 0.0, vol[0], maturity, rawOutput = True)
return  -2.0/3.0*np.exp(-riskFree*maturity) + 5.0/3.0 * spot - 2.0/3.0 * price1[0]

aTheoreticalPrice = AOptionPrice([0.09], riskFree, maturity, spot)
bTheoreticalPrice = BOptionPrice([0.09], riskFree, maturity, spot)
``````

``````data['A (Theoretical)'] = aTheoreticalPrice
data['B (Theoretical)'] = bTheoreticalPrice
pyplot.figure(figsize = (16,10))
ax1 = pyplot.subplot('211')
data.plot('endDate', ['150016.XSHE','A (Theoretical)'], style = ['-.k', '-k'])
ax1.legend(['A', 'A (Theoretical)'], loc = 'best')
ax2 = pyplot.subplot('212')
data.plot('endDate', ['150017.XSHE','B (Theoretical)'], style = ['-.k', '-k'])
ax2.legend(['B', 'B (Theoretical)'], loc = 'best')

<matplotlib.legend.Legend at 0x7134610>
``````

2. 兴全合润的期权投资策略

• A端由于收益算法的原因，属于类固定收益产品，并且它在标的价格高企时，凸性为负；
• B端属于杠杆类型，在标的股价高企时，凸性为正；
• 市场可能会对凸性的不同，对于AB端分别进行折溢价调整；
• 15%是一个魔幻数（Magic Number)，真实市场波动率水平显然不应该是一个常值。

• 起始日期： 2010年5月31日
• 结束日期： 2015年3月27日
• 起始资金： 100000元
• 调仓周期： 每个交易日
``````prices1 = []
prices2 = []
aTarget = []
bTarget = []

def processDate(record):

riskFree = record['Shibor 3M'] / 100.0
maturity = record['Maturity']
spot = record['163406.OFCN']
ATarget = record['150016.XSHE']
BTarget = record['150017.XSHE']

def errorFunction(vol):

price1 = AOptionPrice(vol, riskFree, maturity, spot)
price2 = BOptionPrice(vol, riskFree, maturity, spot)

return (price1 - ATarget)**2 + (price2 - BTarget)**2

out, fx, its, imode, smode = optimize.fmin_slsqp(errorFunction, [0.15], bounds = [(0.01, 0.25)], epsilon = 1e-6, iter = 10000, disp = False, full_output = True, acc = 1e-16)

price1 = AOptionPrice(out, riskFree, maturity, spot)
price2 = BOptionPrice(out, riskFree, maturity, spot)

prices1.append(price1)
prices2.append(price2)
aTarget.append(ATarget)
bTarget.append(BTarget)

return price1 - ATarget, price2 - BTarget
``````
``````import datetime as dt
from scipy import optimize

callDate = [dt.datetime(2013,4,19)]

class deque:

def __init__(self, maxlen):
self.maxlen = maxlen
self.cont = []

def append(self,vec):

self.cont.append(vec)
if len(self.cont)>self.maxlen:
self.cont = self.cont[len(self.cont) - self.maxlen:]

def __item__(self, i):
return self.cont[i]

def average(self):
sum = 0.0
for i in xrange(len(self.cont)):
sum += self.cont[i]
return sum / float(len(self.cont))

class Account:

def __init__(self, cash, commission = 0.0005):

self.aAmount = 0
self.bAmount = 0
self.commission = commission
self.cash = cash

def order(self, amount, fundType, price):
if fundType.upper() == 'A':
self.aAmount += amount
if amount> 0:
self.cash -= amount * price * (1.0 + self.commission)
else:
self.cash -= amount * price * (1.0 - self.commission)
elif fundType.upper() == 'B':
self.bAmount += amount
if amount> 0:
self.cash -= amount * price * (1.0 + self.commission)
else:
self.cash -= amount * price * (1.0 - self.commission)

def currentValue(self, aQuote, bQuote):
return self.aAmount * aQuote + self.bAmount * bQuote + self.cash

def BackTesting(data, window = 20, startAmount = 100000, tradeVol = 2000):

account = Account(startAmount)

aWindow = deque(maxlen = window)
bWindow = deque(maxlen = window)
performance = [startAmount]
aVol = [0]
bVol = [0]
cash = [startAmount]
for i in xrange(1, len(data)):
previousDay = data.loc[i-1]
aUnderEstimated, bUnderEstimated  = processDate(previousDay)
aWindow.append(aUnderEstimated)
bWindow.append(bUnderEstimated)

aAverage = aWindow.average()
bAverage = bWindow.average()

today = data.loc[i]
aPrice = today['150016.XSHE']
bPrice = today['150017.XSHE']
if i >= 5:
# 如果分级A端相对于B端更便宜
if aUnderEstimated - aAverage > bUnderEstimated - bAverage:

if account.bAmount >0:

# 如果分级B端相对于A端更便宜
elif aUnderEstimated - aAverage < bUnderEstimated - bAverage:
if account.aAmount >0:

for calDate in callDate:
if today['endDate'] == calDate:
account.order(-account.aAmount, 'A', aPrice)
account.order(-account.bAmount, 'B', bPrice)

performance.append(account.currentValue(aPrice, bPrice))
aVol.append(account.aAmount)
bVol.append(account.bAmount)
cash.append(account.cash)

originalReturn = data[['150016.XSHE', '150017.XSHE', '163406.OFCN']].values
start = originalReturn[0]
originalReturn[0] = 1.0
dates = data['endDate']
scalar = 1.0
count = 0
for i in xrange(1, len(originalReturn)):
if count < len(callDate) and dates[i-1] == callDate[count]:
start = originalReturn[i]
originalReturn[i] =  originalReturn[i-1]
count += 1
else:
scalar = originalReturn[i] / start
start = originalReturn[i]
originalReturn[i] = originalReturn[i-1] * scalar
scalar = float(performance[0])
performance = [p / scalar for p in performance]
return pd.DataFrame({'Performance':performance, '150016.XSHE': aVol, '150017.XSHE': bVol, 'Cash': cash, '163406.OFCN': data['163406.OFCN'].values, 'A Performance': originalReturn[:,0], 'B Performance': originalReturn[:,1],'Benchmark Return':originalReturn[:,2]} ,index = data.endDate)
``````
``````bt = BackTesting(data, tradeVol = 20000)
bt.plot(y = ['Benchmark Return', 'Performance', 'A Performance', 'B Performance'], figsize = (16,8), style = ['-k', '-.k'])
pyplot.legend( ['Benchmark', 'Strategy', 'A', 'B'], loc = 'best')

<matplotlib.legend.Legend at 0x7285510>
``````

``````res = pd.DataFrame({'A (Implied)': prices1, 'B (Implied)':prices2, 'A': aTarget, 'B': bTarget}, index = data.endDate[1:])
pyplot.figure(figsize = (16,10))
ax1 = pyplot.subplot('211')
res.plot(y = ['A (Implied)', 'A'], style = ['-k', '-.k'])
pyplot.legend(['A (Implied)', 'A'])
ax1 = pyplot.subplot('212')
res.plot(y = ['B (Implied)', 'B'], style = ['-k', '-.k'])
pyplot.legend(['B (Implied)', 'B'])

<matplotlib.legend.Legend at 0x7804290>
``````

3. 我们是否能够比“猴子”做的更好？

``````def BackTesting2(data, window = 20, startAmount = 100000, tradeVol = 2000):

account = Account(startAmount)

performance = [startAmount]
aVol = [0]
bVol = [0]
cash = [startAmount]
s = MersenneTwister19937UniformRsg(seed = 1234)
for i in xrange(1, len(data)):
previousDay = data.loc[i-1]
aUnderEstimated, bUnderEstimated  = processDate(previousDay)

today = data.loc[i]
aPrice = today['150016.XSHE']
bPrice = today['150017.XSHE']
if i >= 5:
# 如果随机数>0.5
if s.nextSequence()[0] > 0.5:

if account.bAmount >0:

# 如果随机数<0.5
elif s.nextSequence()[0] < 0.5:
if account.aAmount >0:

for calDate in callDate:
if today['endDate'] == calDate:
account.order(-account.aAmount, 'A', aPrice)
account.order(-account.bAmount, 'B', bPrice)

performance.append(account.currentValue(aPrice, bPrice))
aVol.append(account.aAmount)
bVol.append(account.bAmount)
cash.append(account.cash)

originalReturn = list(data['163406.OFCN'].values)
start = originalReturn[0]
originalReturn[0] = 1.0
dates = data['endDate']
scalar = 1.0
count = 0
for i in xrange(1, len(originalReturn)):
if count < len(callDate) and dates[i-1] == callDate[count]:
start = originalReturn[i]
originalReturn[i] =  originalReturn[i-1]
count += 1
else:
scalar = originalReturn[i] / start
start = originalReturn[i]
originalReturn[i] = originalReturn[i-1] * scalar
scalar = float(performance[0])
performance = [p / scalar for p in performance]
return pd.DataFrame({'Performance':performance, '150016.XSHE': aVol, '150017.XSHE': bVol, 'Cash': cash, '163406.OFCN': data['163406.OFCN'].values, 'Benchmark Return':originalReturn } ,index = data.endDate)
``````
``````bt1 = BackTesting(data, tradeVol = 20000)
bt2 = BackTesting2(data, tradeVol = 20000)
bt1['Monky'] = bt2['Performance']
bt1.plot(y = ['Benchmark Return', 'Monky', 'Performance'], figsize = (16,8), style = ['-k', '--k', '-.k'])
pyplot.legend( ['Benchmark', 'Monkey', 'Strategy'], loc = 'best')

<matplotlib.legend.Legend at 0x7804150>
``````

4. 使用历史波动率

``````def processDate2(record):

riskFree = record['Shibor 3M'] / 100.0
maturity = record['Maturity']
spot = record['163406.OFCN']
ATarget = record['150016.XSHE']
BTarget = record['150017.XSHE']
volatility = record['volatility']

vol = [volatility]

price1 = AOptionPrice(vol, riskFree, maturity, spot)
price2 = BOptionPrice(vol, riskFree, maturity, spot)

return price1 - ATarget, price2 - BTarget

def BackTesting3(data, window = 20, startAmount = 100000, tradeVol = 2000):

account = Account(startAmount)

aWindow = deque(maxlen = window)
bWindow = deque(maxlen = window)
performance = [startAmount]
aVol = [0]
bVol = [0]
cash = [startAmount]
for i in xrange(1, len(data)):
previousDay = data.loc[i-1]
aUnderEstimated, bUnderEstimated  = processDate2(previousDay)
aWindow.append(aUnderEstimated)
bWindow.append(bUnderEstimated)

aAverage = aWindow.average()
bAverage = bWindow.average()

today = data.loc[i]
aPrice = today['150016.XSHE']
bPrice = today['150017.XSHE']
if i >= 5:
# 如果分级A端相对于B端更便宜
if aUnderEstimated - aAverage > bUnderEstimated - bAverage:

if account.bAmount >0:

# 如果分级B端相对于A端更便宜
elif aUnderEstimated - aAverage < bUnderEstimated - bAverage:
if account.aAmount >0:

for calDate in callDate:
if today['endDate'] == calDate:
account.order(-account.aAmount, 'A', aPrice)
account.order(-account.bAmount, 'B', bPrice)

performance.append(account.currentValue(aPrice, bPrice))
aVol.append(account.aAmount)
bVol.append(account.bAmount)
cash.append(account.cash)

originalReturn = list(data['163406.OFCN'].values)
start = originalReturn[0]
originalReturn[0] = 1.0
dates = data['endDate']
scalar = 1.0
count = 0
for i in xrange(1, len(originalReturn)):
if count < len(callDate) and dates[i-1] == callDate[count]:
start = originalReturn[i]
originalReturn[i] =  originalReturn[i-1]
count += 1
else:
scalar = originalReturn[i] / start
start = originalReturn[i]
originalReturn[i] = originalReturn[i-1] * scalar
scalar = float(performance[0])
performance = [p / scalar for p in performance]
return pd.DataFrame({'Performance':performance, '150016.XSHE': aVol, '150017.XSHE': bVol, 'Cash': cash, '163406.OFCN': data['163406.OFCN'].values, 'Benchmark Return':originalReturn } ,index = data.endDate)
``````
``````bt3 = BackTesting3(data, tradeVol = 20000)
bt1['Historical (Vol)'] = bt3['Performance']
bt1.plot(y = ['Benchmark Return', 'Historical (Vol)', 'Performance'], figsize = (16,8), style = ['-k', '--k', '-.k'])
pyplot.legend( ['Benchmark', 'Historical Vol', 'Implied Vol'], loc = 'best')

<matplotlib.legend.Legend at 0x7841410>
``````

5. 风险收益分析

``````value = bt1[['Performance', 'Benchmark Return']]
value['endDate'] = value.index.values
returnRes = [0]
tmp = np.log(value['Performance'][1:].values/ value['Performance'][:-1].values)
returnRes.extend(tmp)
value['Per. Return'] = returnRes
returnRes = [0]
tmp = np.log(value['Benchmark Return'][1:].values/ value['Benchmark Return'][:-1].values)
returnRes.extend(tmp)
value['Benchmark Return'] = returnRes
year2010 = value[(value['endDate'] > Date(2010,1,1).toTimestamp()) & (value['endDate'] <= Date(2010,12,31).toTimestamp())]
year2011 = value[(value['endDate'] > Date(2011,1,1).toTimestamp()) & (value['endDate'] <= Date(2011,12,31).toTimestamp())]
year2012 = value[(value['endDate'] > Date(2012,1,1).toTimestamp()) & (value['endDate'] <= Date(2012,12,31).toTimestamp())]
year2013 = value[(value['endDate'] > Date(2013,1,1).toTimestamp()) & (value['endDate'] <= Date(2013,12,31).toTimestamp())]
year2014 = value[(value['endDate'] > Date(2014,1,1).toTimestamp()) & (value['endDate'] <= Date(2014,12,31).toTimestamp())]
year2015 = value[(value['endDate'] > Date(2015,1,1).toTimestamp()) & (value['endDate'] <= Date(2015,12,31).toTimestamp())]

days = 252

def perRes(yearRes):
yearRes['Excess Return'] = yearRes['Per. Return'] - yearRes['Benchmark Return']
mean = yearRes.mean() * days * 100
std = yearRes.std() * np.sqrt(days) * 100

return mean['Per. Return'], mean['Excess Return'], std['Per. Return']

res2010 = perRes(year2010)
res2011 = perRes(year2011)
res2012 = perRes(year2012)
res2013 = perRes(year2013)
res2014 = perRes(year2014)
res2015 = perRes(year2015)

perRet = []
exceRet= []
perStd = []

for res in [res2010, res2011, res2012, res2013, res2014, res2015]:
perRet.append(res[0])
exceRet.append(res[1])
perStd.append(res[2])

resTable = pd.DataFrame({'Strategy (Return %)':perRet, 'Excess (Return %)':exceRet, 'Strategy (Volatility %)':perStd }, index = ['2010', '2011', '2012', '2013', '2014', '2015'])
resTable.index.name = 'Year'
resTable.plot(kind = 'bar', figsize = (14,8), legend = True)

<matplotlib.axes.AxesSubplot at 0x82f4b50>
``````