对照通达信一些指标的Python实现

行情软件里习惯了通达信的简介,虽然很多时候还是要依赖大智慧,但平时看图形基本都是用通达信。因此在有时候做数据分析的时候,不可避免的需要再次的实现一些指标功能,所以在Python里整理了一下,写了部分的指标工具。

# 威廉指标
def williams(df, n, column='williams'):
# 100*(10日内最高价的最高值-收盘价)/(10日内最高价的最高值-10日内最低价的最低值)
for i in range(len(df)):
if i < n-1: continue
df.ix[i, column] = 100 * (df.high.values[i-n+1:i+1].max()-df.close.values[i])/(
df.high.values[i-n+1:i+1].max()-df.low.values[i-n+1:i+1].min())
return df

# 布林指标
def bollinger(df,n):
for i in range(len(df)):
if i < n-1: continue
df.ix[i, 'BOLL'] = df.close.values[i-n+1:i+1].mean()
df.ix[i, 'UB'] = df.ix[i, 'BOLL'] + 2 * numpy.std(df.close.values[i-n+1:i+1], ddof=1)
df.ix[i, 'LB'] = df.ix[i, 'BOLL'] - 2 * numpy.std(df.close.values[i-n+1:i+1], ddof=1)
return df

# 轨道线
def ene(df,n,m1,m2):
for i in range(len(df)):
if i < n-1: continue
df.ix[i, 'UPPER'] = (1+m1/100)*df.close.values[i-n+1:i+1].mean()
df.ix[i, 'LOWER'] = (1-m2/100)*df.close.values[i-n+1:i+1].mean()
df.ix[i, 'ENE'] = (df.ix[i, 'UPPER'] + df.ix[i, 'LOWER'])/2
return df

def kdj(df,n,m1,m2):
for i in range(len(df)):
if i < n-1: continue
df.ix[i, 'rsv'] = (df.close.values[i]-df.low.values[i-n+1:i+1].min()) / (df.high.values[i-n+1:i+1].max()-df.low.values[i-n+1:i+1].min())*100
df = getSMA(df,m1,1,'rsv','K')
df = getSMA(df,m2,1,'K','D')
for i in range(len(df)):
df.ix[i, 'J'] = 3*df.K.values[i] - 2*df.D.values[i]
return df

所有数据都是Dataframe类型,以时间为Index顺序排列。关于getSMA这个函数可以在本博另一帖子《

行情软件里的平均函数以及Python的实现》中可以找到。

利用动态IP绑定一级域名做Web服务器

其实用动态IP绑定一级域名倒不是难题,问问度娘或者谷哥都可以找到答案。真正的问题是营运商封锁了80端口,这才是巧妇难为无米之炊的关键。前段时间想了一条思路,需要一个独立的服务器,这个想法实验了半年,证实是不行的,但是折腾的结果是做站还是可以的。

好了,不故弄玄虚了,其实我也没办法解决掉80端口被封的问题,只是换了个思路利用443端口做https的网站。本文所需记录的关键也就是这里了,因为https需要证书的,付费的证书咱没考虑,而选择的Let's Encrypt免费证书有时间限制的,期限已到就需要重新申请。所以有两样需要计划任务,一个是重拨号之后IP地址的更新,一个是证书到期后证书的重新申请。

关于IP解析,目前国内好用的有Dnspod的和阿里的,并且这两家都提供有解析更新API,所以很是方便,我这次用的是Dnspod的解析,他的开发文档地址是:https://www.dnspod.cn/docs/index.html,按照他的手册来并不复杂,就不多说了。重点来说说Let's Encrypt免费证书的申请,Let's Encrypt对证书的时效是90天,就是说90天后证书就会时效,但这个可以用计划任务来处理,脚本写好后90天运行一次就可以了。但它还有另外一个问题,就是Let's Encrypt需要你向它证明这个域名是属于你的,这有两种方式,第一是在你服务器里建立一个文件,Let's Encrypt会通过访问传统http来访问到这个文件就可以了,对于这个方式那么问题就来了,80端口已经被封了如何能访问得到呢?所以此路是不通的。第二种方式就是在dns服务器发送一个记录类型为txt的自定义解析,记录值就是Let's Encrypt计算得出的字符串,由Let's Encrypt去访问,能访问得到并且一样就OK通过了,这个没有问题,并且动态IP解析本来就要做一样的工作。

申请的代码如下:

import os
import sys
import json
import copy
import dns.resolver
from subprocess import Popen, PIPE
try:
    from urllib.request import urlopen  # Python 3
except ImportError:
    from urllib2 import urlopen  # Python 2

import platform
import re
import binascii
import base64
import hashlib
import shutil
import time
import textwrap
import logging

import dnspod  # 域名解析模块

CA = "https://acme-v01.api.letsencrypt.org"
system = platform.system()
sourcePath = './source/'
certsPath = './certs/'

log = logging.getLogger(__name__)
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)

class GetCerts(object):
    """docstring for GetCerts"""
    def __init__(self):
        super(GetCerts, self).__init__()
        self.config = self.__loadConfig()
        self.domain = self.__loadDomain()
        self.__cearteDir()

        self.accountkey = 'account.key'
        self.ops = self.config['ops'] if system == 'Windows' else 'openssl'

        self.cearteAccountKey()
        self.cearteDomainSource()
        self.loadAccountKey()
        self.regAccount()

        for i in self.domain:
            self.__loadPrivateKey(i)

    @staticmethod
    def __b64(b):
        return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")

    @staticmethod
    def __waitChallengeDeployed(domain):
        count = 0
        is_deployed = GetCerts.__checkDomain(domain)
        while count < 240 and not is_deployed:
            time.sleep(10)
            count += 1
            is_deployed = GetCerts.__checkDomain(domain)
        return is_deployed

    @staticmethod
    def __checkDomain(domain):
        try:
            answers = dns.resolver.query('_acme-challenge.{0}'.format(domain), 'TXT')
            if len(answers) > 0:
                return True
        except dns.exception.DNSException as e:
            log.debug("TXT not found: %s", e)
        except:
            log.error("Unexpected error: %s", sys.exc_info()[0])
            raise
        return False

    # 建立账号证书文件夹
    def __cearteDir(self):
        if not os.path.exists(certsPath):
            os.makedirs(certsPath)
        if not os.path.exists(sourcePath):
            os.makedirs(sourcePath)

    # 载入程序设置文件
    def __loadConfig(self):
        try:
            file = open('config.json', 'r')
        except:
            log.debug("缺少配置文件,请配置基本参数")
            return False
        else:
            return json.loads(file.read())

    # 载入域名列表文件
    def __loadDomain(self):
        try:
            file = open('domain.json', 'r')
        except:
            log.debug("缺少域名文件,请配置域名参数")
            return False
        else:
            return json.loads(file.read())

    # 建立账号证书文件
    def cearteAccountKey(self):
        self.regKey = False
        if os.path.isfile(self.accountkey):
            command = [self.ops,'rsa','-in',self.accountkey,'-check']
            proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
            proc.communicate()
            if proc.returncode != 0:
                command = [self.ops,'genrsa', '-out',self.accountkey,'4096']
                proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
                proc.communicate()
                self.regKey = True
        else:
            command = [self.ops,'genrsa', '-out',self.accountkey,'4096']
            proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
            proc.communicate()
            self.regKey = True
      
    # 建立域名私钥和请求证书  
    def cearteDomainSource(self):

        for i in self.domain:
            csrFile = '%s%s.csr' % (sourcePath, i['domain'])
            keyFile = '%s%s.key' % (sourcePath, i['domain'])

            if os.path.isfile(keyFile):
                command = [self.ops, 'rsa', '-in', keyFile, '-check']
                proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
                proc.communicate()
                if proc.returncode != 0:
                    command = [self.ops,'genrsa', '-out', keyFile, '4096']
                    proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
                    proc.communicate()
            else:
                command = [self.ops, 'genrsa', '-out', keyFile, '4096']
                proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
                proc.communicate()

            domains = str()
            if len(i['record']) > 1:
                for r in i['record']:
                    domains += 'DNS:%s,' % i['domain'] if r == '@' else 'DNS:%s.%s,' % (r, i['domain'])
            else:
                domains = i['domain'] if  i['record'][0] == '@' else '%s.%s' % (i['record'][0], i['domain'])
            if not os.path.isfile(csrFile):
                if system == 'Windows':
                    command = [self.ops, "req", "-new", "-sha256", "-nodes", "-keyout", keyFile,"-subj", "/CN={0}".format(domains), "-out", csrFile, '-config', self.config['ops_cnf']]
                else:
                    command = [self.ops, "req", "-new", "-sha256", "-nodes", "-keyout", keyFile,"-subj", "/CN={0}".format(domains), "-out", csrFile]
                proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
                proc.communicate()

    def loadAccountKey(self):
        command = [self.ops, "rsa", "-in", self.accountkey, "-noout", "-text"]
        proc = Popen(command,stdin=PIPE, stdout=PIPE, stderr=PIPE)
        out, err = proc.communicate()
        if proc.returncode != 0:
            raise IOError("OpenSSL Error: {0}".format(err))
        pub_hex, pub_exp = re.search(
            r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
            out.decode('utf8'), re.MULTILINE | re.DOTALL).groups()
        pub_exp = "{0:x}".format(int(pub_exp))
        pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
        self.header = {
            "alg": "RS256",
            "jwk": {
                "e": GetCerts.__b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
                "kty": "RSA",
                "n": GetCerts.__b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
            },
        }
        account_key_json = json.dumps(self.header['jwk'], sort_keys=True, separators=(',', ':'))
        self.thumbprint = GetCerts.__b64(hashlib.sha256(account_key_json.encode('utf8')).digest())

    def __loadPrivateKey(self, domain):

        self.dns = dnspod if domain['dns'] == 'dnspod' else alidns

        self.dns.ID = domain['id']
        self.dns.TOKEN = domain['token']

        csrFile = '%s%s.csr' % (sourcePath, domain['domain'])

        command = [self.ops, "req", "-in", csrFile, "-noout", "-text"]
        proc = Popen(command, stdout = PIPE, stderr = PIPE)
        out, err = proc.communicate()
        if proc.returncode != 0:
            raise IOError("Error loading {0}: {1}".format(csrFile, err))
        domains = set([])
        common_name = re.search(r"Subject:.*? CN=([^\s,;/]+)", out.decode('utf8'))
        if common_name is not None:
            domains.add(common_name.group(1))
        subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE|re.DOTALL)
        if subject_alt_names is not None:
            for san in subject_alt_names.group(1).split(", "):
                if san.startswith("DNS:"):
                    domains.add(san[4:])
        print('domains',domains)

        for i in domains:
            print(i)
            code, result = self.__sendSignedRequest(CA + "/acme/new-authz", {
                "resource": "new-authz",
                "identifier": {"type": "dns", "value": i},
            })
            log.debug("Requesting challenges: {0} {1}".format(code, result))
            if code != 201:
                raise ValueError("Error requesting challenges: {0} {1}".format(code, result))
            challenge = [c for c in json.loads(result.decode('utf8'))['challenges'] if c['type'] == "dns-01"][0]
            token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token'])
            keyauthorization = "{0}.{1}".format(token, self.thumbprint)
            dnstoken = GetCerts.__b64(hashlib.sha256(keyauthorization.encode('utf8')).digest())

            ndd = i.split(".")
            if len(ndd) == 2:
                subdomain = "_acme-challenge"
                basedomain = ndd[0] + "." + ndd[1]
            else:
                subdomain = "_acme-challenge." + ndd[0]
                basedomain = ndd[1] + "." + ndd[2]

            self.dns.updateRecord(basedomain, subdomain, dnstoken, 'TXT')

            try:
                is_deployed = GetCerts.__waitChallengeDeployed(i)
                if is_deployed:
                    code, result = self.__sendSignedRequest(challenge['uri'], {
                        "resource": "challenge",
                        "keyAuthorization": keyauthorization,
                    })
                    # print('code:',code,'result:',result)
                    if code != 202:
                        raise ValueError("Error triggering challenge: {0} {1}".format(code, result))
                    while True:
                        try:
                            resp = urlopen(challenge['uri'])
                            challenge_status = json.loads(resp.read().decode('utf8'))
                            # print('challenge_status:',challenge_status)
                            log.debug(challenge_status)
                        except IOError as e:
                            raise ValueError("Error checking challenge: {0} {1}".format(
                                e.code, json.loads(e.read().decode('utf8'))))
                        if challenge_status['status'] == "pending":
                            log.debug("Pending")
                            time.sleep(1)
                        elif challenge_status['status'] == "valid":
                            log.debug("{0} verified!".format(i))
                            break
                        else:
                            raise ValueError("{0} challenge did not pass: {1}".format(
                                i, challenge_status))
            finally:
                pass
        log.info("Signing certificate...")
        command = [self.ops, "req", "-in", csrFile, "-outform", "DER"]
        proc = Popen(command, stdout=PIPE, stderr=PIPE)
        csr_der, err = proc.communicate()
        code, result = self.__sendSignedRequest(CA + "/acme/new-cert", {
            "resource": "new-cert",
            "csr": self.__b64(csr_der),
        })
        if code != 201:
            raise ValueError("Error signing certificate: {0} {1}".format(code, result))

        sign_cert = """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n""".format(
            "\n".join(textwrap.wrap(base64.b64encode(result).decode('utf8'), 64)))

        sign_cert_file = open(certsPath+domain['domain']+'.crt', 'w')
        sign_cert_file.write(sign_cert)
        sign_cert_file.close()
        log.info("Certificate signed %s", domain['domain']+'.crt')

        shutil.copy(certsPath+domain['domain']+'.crt', certsPath+domain['domain']+'.pem')

        chain_cert = urlopen("https://letsencrypt.org/certs/lets-encrypt-x3-cross-signed.pem")
        with open(certsPath+domain['domain']+'.pem', 'ab') as output:
            output.write(chain_cert.read())

        log.info("Certificate chain signed %s", domain['domain']+'.chained.pem')


    def __sendSignedRequest(self, url, payload):
        payload64 = GetCerts.__b64(json.dumps(payload).encode('utf8'))
        protected = copy.deepcopy(self.header)
        protected["nonce"] = urlopen(CA + "/directory").headers['Replay-Nonce']
        protected64 = GetCerts.__b64(json.dumps(protected).encode('utf8'))
        command = [self.ops, "dgst", "-sha256", "-sign", self.accountkey]
        proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
        out, err = proc.communicate("{0}.{1}".format(protected64, payload64).encode('utf8'))
        if proc.returncode != 0:
            raise IOError("OpenSSL Error: {0}".format(err))
        data = json.dumps({
            "header": self.header, "protected": protected64,
            "payload": payload64, "signature": GetCerts.__b64(out),
        })
        try:
            resp = urlopen(url, data.encode('utf8'))
            return resp.getcode(), resp.read()
        except IOError as e:
            return getattr(e, "code", None), getattr(e, "read", e.__str__)()

    def regAccount(self):
        log.debug("Registering account...")
        code, result = self.__sendSignedRequest(CA + "/acme/new-reg", {
            "resource": "new-reg",
            "agreement": "https://letsencrypt.org/documents/LE-SA-v1.1.1-August-1-2016.pdf"
        })
        if code == 201:
            log.info("Account registered!")
        elif code == 409:
            log.debug("Already registered!")
        else:
            raise ValueError("Error registering: {0} {1}".format(code, result))

    def sign(self):
        pass

if __name__ == '__main__':
    gc=GetCerts()

还有参数文件的格式如下:

{
"ops":"d:/Wamp/Apache24/bin/openssl.exe",
"ops_cnf":"d:/Wamp/Apache24/conf/openssl.cnf",
"CA":"https://acme-v01.api.letsencrypt.org/directory",
"IP_VERSION":"",
"CONTACT_EMAIL":"",
"RENEW_DAYS":"30",
"KEY_ALGO":"rsa",
"KEYSIZE":"4096",
"LICENSE":"https://letsencrypt.org/documents/LE-SA-v1.1.1-August-1-2016.pdf"
}

域名的配置文件:

[{"dns": "dnspod", "id": "你的id", "token": "你的token", "domain": "你的域名(不要W)", "record": ["www"]}]

代码保存下来,剩下的工作就是制定执行运行任务了,当然可以直接把输出目录直接定义到Apache的证书目录去,每次自动更新后Apache再次载入就可以了,这时候的网站就出现了一把绿色的钥匙而不是大大红色的叉了。

关于Kodi字幕插件的开发

最近把家里的HDPC终于改成Debian了,起初一直在Windows 10下,但经过长时间的使用,装着E5200芯的老机器终于拉不动了,启动都要卡半天。思索了半天,干脆换到了Debian下。没去试现在的安卓X86版本和凤凰系统,是因为不想过于折腾了,虽说Debian并不是个好的娱乐平台,但好在安装Kodi倒是很方便对这个系统也比较熟悉,而我也仅仅用它就可以了。

安装的是Debian 9.4版本, Kodi是17.6的。设置什么的轻车熟路,电影、电视、直播啥的都基本没问题,但找遍了网上现有的中文字幕插件,就没一个可以用的,有的说是给15的版本可以用,但我没试而且也不想换版本。没办法了,只有自己改吧。

在这里真要吐槽一下Kodi的官方资料,英文的不说,而且是积极的简单,什么函数基本上就是一句话解释,连示例都没有一个,不过还好,综合了很多现成的源码,基本上了解了它字幕插件的思路和一些函数方法。代码是基于网络现有的插件基础上改的,字幕来源于字幕库。

首先是搜索函数,一番查验这个到是简单,原先的代码找不到内容是因为网站页面div的class改了,所以把bs的查找规则调整就ok了。

def Search( item ):
    subtitles_list = []

    log( __name__ ,"Search for [%s] by name" % (os.path.basename( item['file_original_path'] ),))
    if item['mansearch']:
        url = ZIMUKU_API % (item['mansearchstr'])
    else:
        url = ZIMUKU_API % (item['title'])
    try:
        socket = urllib.urlopen(url)
        data = socket.read()
        socket.close()
        soup = BeautifulSoup(data,'html.parser')
    except:
        return
    results = soup.find_all("div", class_="item prel clearfix")
    for it in results:
        moviename = it.find("div", class_="title").a.text.encode('utf-8')
        iurl = it.find("div", class_="title").a.get('href').encode('utf-8')
        movieurl = '%s%s' % (ZIMUKU_BASE, iurl)
        try:
            socket = urllib.urlopen(movieurl)
            data = socket.read()
            socket.close()
            soup = BeautifulSoup(data,'html.parser').find("table", class_="table")
            soup = soup.find("tbody")
        except:
            return
        subs = soup.find_all("tr")
        for sub in subs:
            name = sub.a.text.encode('utf-8')
            if name.split('.')[-1] not in ['zip','Zip','ZIP']: continue
            flag = sub.img.get('src').split('/')[-1].split('.')[0].encode('utf-8')
            lang = FLAG_DICT.get(flag,'unkonw')
            link = '%s%s' % (ZIMUKU_BASE, sub.a.get('href').encode('utf-8'))

            if lang == '英':
                subtitles_list.append({"language_name":"English", "filename":name, "link":link, "language_flag":'en', "rating":"0", "lang":lang})
            else:
                subtitles_list.append({"language_name":"Chinese", "filename":name, "link":link, "language_flag":'zh', "rating":"0", "lang":lang})

而下载函数就麻烦了一点,看原来代码是获取下载地址后直接下载,而实际上我在网站下载时发现网站使用了301重定向,并且在访问下载地址时,网站做了防爬虫处理,需要在头里带上Referer,否则系统是不会给你正确的定向地址的,经过改动的代码如下:

def Download(url,lang):
    try: shutil.rmtree(__temp__)
    except: pass
    try: os.makedirs(__temp__)
    except: pass

    exts = [".srt", ".sub", ".smi", ".ssa", ".ass" ]
    try:
        socket = urllib.urlopen( url )
        data = socket.read()
        soup = BeautifulSoup(data,'html.parser')
        url = soup.find("li", class_="li dlsub").a.get('href').encode('utf-8')
        socket = urllib.urlopen(url)

        socket = urllib.urlopen(url)
        data = socket.read()
        soup = BeautifulSoup(data,'html.parser')
        div = soup.find('div',class_='down clearfix')
        li = div.find('li')
        headers['Referer'] = url

        req = urllib2.Request(li.a.get('href'),headers=headers)
        resp = urllib2.urlopen(req)
        socket = urllib.urlopen(resp.geturl())
        data = socket.read()
        socket.close()
    except:
        return []
    if len(data) < 1024: return []
    tempfile = os.path.join(__temp__, "subtitles.zip")
    xbmc.log(tempfile)
    with open(tempfile, "wb") as subFile: subFile.write(data)
    subFile.close()
    xbmc.sleep(500)
    lists = unZip(tempfile)
    lists = [i for i in lists if os.path.splitext(i)[1] in exts]

    if len(lists) == 1:
        return lists[0]
    else:
        index = [i.split('/')[-1] for i in lists]
        sel = xbmcgui.Dialog().select('请选择压缩包中的字幕', index)
        if sel == -1: sel = 0
        return lists[sel]

经过调整,大的问题没有了,不过些小的地方,原代码里貌似是只能在Windows下用的,因为好像它是用Winrar来解压的,而我在Debian下使用,所以只能考虑Zip文件了,至于能不能用rar包还有单独的srt等文件,以后折腾吧。

def getFileList(path):
    fileslist = []
    for d in os.listdir(path):
        if os.path.isdir(path+d):
            fileslist.extend(getFileList(path+d+'/'))
        if os.path.isfile(path+d):
            fileslist.append(path+d)
    return fileslist

def unZip(filepath):
    zip_file = zipfile.ZipFile(filepath,'r')
    for names in zip_file.namelist():
        if type(names) == str and names[-1] != '/':
            utf8name = names.decode('gbk')
            data = zip_file.read(names)
            fo = open(os.path.join(__temp__, utf8name), "w")
            fo.write(data)
            fo.close()
        else:
            zip_file.extract(names,__temp__)
    return getFileList(__temp__+'/')

总结一下啊,用惯了Python3后用2真的不习惯,并且在Kodi里测试也是非常的不方便,改动一下需要重新进入才行,经过测试在Linux和Windows下都没问题,版本从17到最新的18,至于更低的版本则没测试,接下来要把各种字幕扩展名的文件下载功能补全,等到某一天有心情折腾了再说吧。附地址:https://pan.baidu.com/s/1GxAod4Xdll-3pvM_OOru8A

更新:由于上次改的有太多的局限性,所以继续抽时间改了改,现在这个版本继续支持Linux和Windows,并且文件格式由单纯的zip扩展到rar的压缩,还包括了各种未压缩的字幕格式,由于网上并没有发现7z的字幕压缩包,所以就没有考虑这个格式了。但现在由于格式的增多,就带来了一些问题,为了这个插件的正常使用,在Windows下必须要安装WinRAR并且目录还得是C:\Program Files\WinRAR,在Linux下需要安装unrar,否则的话会下载失败,其他没有要求,注意这点即可。

贴上代码:

def getFileList(path):
    fileslist = []
    for d in os.listdir(path):
        if os.path.isdir(path+d):
            fileslist.extend(getFileList(path+d+'/'))
        if os.path.isfile(path+d):
            fileslist.append(path+d)
    return fileslist

def extractCompress(file):
    path  = __temp__ + '/subtitles/'
    if os.path.isdir(path): shutil.rmtree(path)
    if not os.path.isdir(path): os.mkdir(path)

    if file.lower().endswith('zip'):
        zipFile = zipfile.ZipFile(file,'r')
        for names in zipFile.namelist():
            if type(names) == str and names[-1] != '/':
                utf8name = names.decode('gbk')
                data = zipFile.read(names)
                with open(path+utf8name, 'wb') as f: f.write(data)
            else:
                zipFile.extract(names,path)
        return getFileList(path)

    if file.lower().endswith('rar'):
        if platform.system() == 'Windows':
            rarPath = 'C:\Program Files\WinRAR'
            sysPath = os.getenv('Path')
            if 'winrar' not in sysPath.lower(): os.environ["Path"] = sysPath+';'+rarPath
            command = "winrar x -ibck %s %s" % (file, path)
        if platform.system() == 'Linux':
            command = 'unrar x %s %s' % (file, path)
        res = os.system(command)
        if res == 0: return getFileList(path)

def Search( item ):
    subtitles_list = []

    log(__name__ ,"Search for [%s] by name" % os.path.basename(item['file_original_path']))
    if item['mansearch']:
        url = ZIMUKU_API % (item['mansearchstr'])
    else:
        url = ZIMUKU_API % (item['title'])
    try:
        socket = urllib.urlopen(url)
        data = socket.read()
        socket.close()
        soup = BeautifulSoup(data,'html.parser')
    except:
        return
    results = soup.find_all("div", class_="item prel clearfix")
    for it in results:
        moviename = it.find("div", class_="title").a.text.encode('utf-8')
        iurl = it.find("div", class_="title").a.get('href').encode('utf-8')
        movieurl = '%s%s' % (ZIMUKU_BASE, iurl)
        try:
            socket = urllib.urlopen(movieurl)
            data = socket.read()
            socket.close()
            soup = BeautifulSoup(data,'html.parser').find("table", class_="table")
            soup = soup.find("tbody")
        except:
            return
        subs = soup.find_all("tr")
        for sub in subs:
            name = sub.a.text.encode('utf-8')
            flag = sub.img.get('src').split('/')[-1].split('.')[0].encode('utf-8')
           .get(flag,'unkonw')
            link = '%s%s' % (ZIMUKU_BASE, sub.a.get('href').encode('utf-8'))

            if lang == '英':
                subtitles_list.append({"language_name":"English", "filename":name, "link":link, "language_flag":'en', "rating":"0", "lang":lang})
            else:
                subtitles_list.append({"language_name":"Chinese", "filename":name, "link":link, "language_flag":'zh', "rating":"0", "lang":lang})

    if subtitles_list:
        for it in subtitles_list:
            listitem = xbmcgui.ListItem(label=it["language_name"],
                                  label2=it["filename"],
                                  iconImage=it["rating"],
                                  thumbnailImage=it["language_flag"]
                                  )

            listitem.setProperty( "sync", "false" )
            listitem.setProperty( "hearing_imp", "false" )

            url = "plugin://%s/?action=download&link=%s&lang=%s" % (__scriptid__,
                                                                        it["link"],
                                                                        it["lang"]
                                                                        )
            xbmcplugin.addDirectoryItem(handle=int(sys.argv[1]),url=url,listitem=listitem,isFolder=False)

def Download(url,lang):
    try: shutil.rmtree(__temp__)
    except: pass
    try: os.makedirs(__temp__)
    except: pass

    exts = [".srt", ".sub", ".smi", ".ssa", ".ass" ]
    try:
        socket = urllib.urlopen( url )
        data = socket.read()
        soup = BeautifulSoup(data,'html.parser')
        url = soup.find("li", class_="li dlsub").a.get('href').encode('utf-8')
        socket = urllib.urlopen(url)

        socket = urllib.urlopen(url)
        data = socket.read()
        soup = BeautifulSoup(data,'html.parser')
        div = soup.find('div',class_='down clearfix')
        li = div.find('li')
        headers['Referer'] = url

        req = urllib2.Request(li.a.get('href'),headers=headers)
        resp = urllib2.urlopen(req)
        fileName = resp.headers['Content-Disposition'].replace('"','').split('=')[1]

        socket = urllib.urlopen(resp.geturl())
        data = socket.read()
        socket.close()
    except:
        return []
    if len(data) < 1024: return []
    tempfile = os.path.join(__temp__, "subtitles.%s" % fileName.split('.')[-1])
    xbmc.log(tempfile)
    with open(tempfile, "wb") as subFile: subFile.write(data)
    xbmc.sleep(100)
    if fileName.split('.')[-1].lower() in ('zip','rar'):
        lists = extractCompress(tempfile)
    else:
        lists = [tempfile]

    lists = [i for i in lists if os.path.splitext(i)[1] in exts]

    if len(lists) == 1:
        return lists[0]
    else:
        index = [i.split('/')[-1] for i in lists]
        sel = xbmcgui.Dialog().select('请选择压缩包中的字幕', index)
        if sel == -1: sel = 0
        return lists[sel]

贴上下载地址:点击下载

行情软件里的平均函数以及Python的实现

在行情软件里,有很多的平均公式,一直没有深入的去了解,用得也是糊里糊涂的,现结合百度谷歌,对各种公式做个总结记录。

MA(x,n)–移动平均,是最简单的n日内的平均值;

计算公式:(X1+X2+X3+X4+…+Xn)/n

Python代码:

def getMA(df,n):
    for i in range(len(df)):
        if i >= n:
            df.ix[i,'ma'] = df.close.values[i-n:i].mean()
    return df

EMA(x,n)–指数移动平均,这个函数以相关周期为权重进行计算;

计算公式:[2*X+(n-1)*Y]/(n+1) 其中'Y'表示上一周期的Y值

例如:每天X值不同,由远到近标示分别为X1、X2、X3、X4……Xn

如果n=1,那么EMA(X,1) 则等于 [2*X1+(1-1)*Y]/(1+1)=X1

如果n=2,那么EMA(X,2) 则等于 [2*X2+(2-1)*Y]/(2+1)=(2/3)*X2+(1/3)*X1

如果n=3,那么EMA(X,3) 则等于 [2*X3+(3-1)*Y]/(3+1)

                                                  =[2*X3+2*((2/3)*X2+(1/3)*X1)]/4

                                                  =(1/2)*X3+(1/3)*X2+(1/6)*X1

以此类推,往下循环……

Python代码:

def getEMA(df,n):  
    for i in range(len(df)):  
        if i==0:  
            df.ix[i,'ema']=df.ix[i,'close']  
        if i>0:  
            df.ix[i,'ema']=(1-n)*df.ix[i-1,'close']+n*df.ix[i,'close']  
    return df

SMA(x,n,m)–简单移动平均,m为当日的权重,是个0~1之间的值;

计算公式:(X*M+Y'*(N-M))/N 其中Y表示上一周期值

SMA 就是把EMA(X,N) 中的权重2,变成了一个可自己定义的变数。要求M<N,M/N, (N-M)/N 就是一个加起来等于1的小数,于是定义动态平均值。

Python代码:

def getSMA(df,n,m):
    for i in range(len(df)):
        if i==0:
            df.ix[i,'sma'] = df.ix[i,'close']*m/n
        else:
            df.ix[i,'sma'] = [df.ix[i,'close']*m + (n-m)*df.ix[i-1,'sma']]/n
    return df

DMA(x,m)–动态移动平均,这个函数以动态设定的权重m进行计算;

计算公式:m*X+(1-m)*Y  其中Y表示上一周期值,A必须小于1

Python代码:

def getDMA(df,m):
    for i in range(len(df)):
        if i == 0:
            df.ix[i,'dma'] = df.ix[i,'close']/m
        else:
            df.ix[i,'dma'] = df.ix[i,'close']/m + (1-m)*df.ix[i-1,'dma']
    return df

TMA(x,n,m)–递归移动平均,这个函数可以完全控制当前周期的权重和上一次值的权重;

计算公示:m*X+n*Y 其中Y表示上一周期值,初值为m*X。

Python代码:

def getTMA(df,n,m):
    for i in range(len(df)):
        if i==0:
            df.ix[i,'sma'] = df.ix[i,'close']*m
        else:
            df.ix[i,'sma'] = df.ix[i,'close']*m + df.ix[i-1,'sma']*n
    return df

WMA(x,n)–加权移动平均,这个函数对于近日的权重会比其它函数敏感。

计算公式:n*X0+(n-1)*X1+(n- 2)*X2)+…+1*Xn)/(n+(n-1)+(n-2)+…+1)

X0表示本周期值,X1表示上一周期值。

Python代码:

def getWMA(df,n):
    weight = 0
    for i in range(n):
        weight += i
    for i in range(len(df)):
        if i >= n:
            sum = 0
            for j in range(n):
                 sum += (j+1)*df.ix[i-n+j,'close']
            df.ix[i,'wma'] = sum/weight

ps: 关于加权移动平均,有多个加权的计算方式,代码里的仅仅只是其中一种,如果需求不同,也可以换做其他计算方式。

1、末日加权移动平均线: 

计算公式=(C1+C2+……+Cn×2)/(n+1)

我们用C代表收盘价。末日指的是最后一日哦,可不是世界末日。我们看到只有最后一天的收盘价(Cn)乘了一个2。这样,原来的n个价格就变成了(n+1)个,所以在求的平均的时候要除以(n+1)。

2、线性加权移动平均线:

计算公式=(C1×1+C2×2+……+Cn×n)/(1+2+…+n)

这一种均线计算起来有一点点麻烦,就是计算时每个价格都乘以一个权值,这个权值刚好是它的编号。对于线性这个词,如果大家不理解,那么请继续关注慧济,以后我会为大家详细解释。

3、梯型加权移动平均线:计算方法如下(5日为例):

计算公式=((C1+C2)×1+(C2+C3)×2+(C3+C4)×3+(C4+C5)×4)/(2×1+2×2+2×3+2×4)

是不是有点像梯形的面积公式啊?梯形面积=(上底+下底)×高/2

4、平方系数加权移动平均线:计算方法如下(5日为例):

MA=((C1×1×1)+( C2×2×2)+(C3×3×3)+(C4×4×4)+( C5×5×5))/(1×1+2×2+3×3+4×4+5×5)

部分的国内PyPi源,存以备用

对于Python开发用户来讲,PIP安装软件包是家常便饭。但国外的源下载速度实在太慢,浪费时间。而且经常出现下载后安装出错问题。所以把PIP安装源替换成国内镜像,可以大幅提升下载速度,还可以提高安装成功率。

国内源(新版ubuntu要求使用https源,要注意):

清华:https://pypi.tuna.tsinghua.edu.cn/simple/

豆瓣:http://pypi.douban.com/simple/

阿里云:http://mirrors.aliyun.com/pypi/simple/

中国科技大学 https://pypi.mirrors.ustc.edu.cn/simple/

华中理工大学:http://pypi.hustunique.com/

山东理工大学:http://pypi.sdutlinux.org/

临时使用:

可以在使用pip的时候加参数-i https://pypi.tuna.tsinghua.edu.cn/simple。例如:pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspider,这样就会从清华这边的镜像去安装pyspider库。

永久修改,一劳永逸:

Linux下,修改 ~/.pip/pip.conf (没有就创建一个文件夹及文件。文件夹必须要加“.”,表示是隐藏文件夹)

内容如下:

[global]
trusted-host =  pypi.douban.com
index-url = http://pypi.douban.com/simple

windows下,直接在user目录中创建一个pip目录,如:C:\Users\xx\pip,新建文件pip.ini。内容同上。

用Pandas计算前复权数据

得到大智慧的除权数据琢磨了好些日子,最后发现还是用Pandas来解决是最便捷的,后复权还没做,先在这里做个笔记,备忘。

以股票600725为例,除权数据赋予变量ea,数据为dataframe格式,结构如下:

            present  bonus  price  rationed
date                                       
2016-12-28     10.0   0.00    0.0       0.0
2012-06-04      0.0   0.50    0.0       0.0
2011-05-20      0.0   1.00    0.0       0.0
2010-05-13      8.0   0.23    0.0       0.0
2009-05-15      0.0   2.00    0.0       0.0
2008-04-09      0.0   3.00    0.0       0.0
2007-04-12      0.0   1.00    0.0       0.0
2006-05-22      0.0   1.00    0.0       0.0
2006-02-10      3.5   0.00    0.0       0.0
2005-03-28      5.0   0.50    0.0       0.0
2003-03-26      0.0   2.00    0.0       0.0
2002-04-15      0.0   2.00    0.0       0.0
1997-06-05      0.0   2.00    0.0       0.0

如果做2016年11月1日到12月31日的复权,则对数据做切片:

ea = ea.ix[datetime.datetime(2016,12,31,0,0,0):datetime.datetime(2016,11,1,0,0,0)]

这样除权数据就只剩一条了:

            present  bonus  price  rationed
date                                       
2016-12-28     10.0    0.0    0.0       0.0

接着取出2016年11月1日到12月31日期间600725的日K数据赋予变量df,同样是dataframe格式,虽然我这里现在只需要一条除权数据,但做成循环更好一些,拿出每条除权数据去计算。里面再嵌套K线数据的字段循环,完成所有字段的前复权处理:

for key,val in ea.iterrows():
    date = key - datetime.timedelta(days=1)
    for field in df.columns.values:
        if field != 'volume' and field != 'amount':
            df.ix[date:,field] -= val.bonus/10
            df.ix[date:,field] += val.price*(val.rationed/10)
            df.ix[date:,field] /= 1 + val.present/10 + val.rationed/10

PS:交易量和交易金额不需要做复权,所以加了个判断排除掉。后复权公式有点不同,但方法是类似的,方法上需要注意的就是时间切片这个问题。另外我喜欢用数据倒序使用,所以在前复权时key需要减一天,如果是正序是不用的。另在动态的情况下,对于除权数据的切片可以使用K线数据的头尾日期去切片,切完之后再做处理节约计算机资源。

用Python抓取新浪的股票数据

最近做数据分析,先是找到了Tushare这个免费开源的第三方财经包,但后来用了几天之后发现,它的日交易历史数据有时候有不准确的情况,查看源代码发现,这个包的数据源是凤凰财经,而对比凤凰网站其站点的数据本身就是有出入的,所以到也不是Tushare的问题。于是百度了一圈,发现很多网友都是获取新浪的股票数据,包括其历史数据和实时数据。于是乎试了一下,发现速度还挺快,没有具体去测时间但从感官上要比Tushare获取的凤凰数据要快得多。并且数据也很丰富,囊括了每只票自上市以来的所有数据,对此Tushare貌似只有三年数据。当然,新浪数据也有不足的地方,细节上没凤凰数据那么丰富,没有价MA5、MA10以及量MA5、MA10等等,最重要的还是缺少每天的交易额。所幸我目前计算所需的数据里还不包括每天交易额。

新浪财经的数据接口地址是:http://money.finance.sina.com.cn/quotes_service/api/jsonp_v2.php/var=/CN_MarketData.getKLineData?symbol=sz000001&scale=240&ma=no&datalen=60。在地址里symbol指的是股票代码,这里需要注意的是不能只填数字代码,还需要把交易市场的前缀加上去,比如sz000001指的是平安银行,而sh000001则是上证指数;scale表示的是时间长度,以分钟为基本单位,输入240就表示下载日K线数据,60就是小时K线数据,貌似最短时间是5分钟,并没有提供分钟数据;datalen则是获取数据的条数,在日K线的时间长度了,datalen就是获取60天日K数据,当然也可以获取60小时K数据。

人生苦短,我用Python,所以代码就用它了,其实以前一直是用世界上最好的语言PHP 的,这是为了做数据分析才开始学着用Python,代码粗糙了些,返回的是个列表,每笔数据则是字典,将就着看吧。

import urllib.request
links = 'http://money.finance.sina.com.cn/quotes_service/api/jsonp_v2.php/var=/CN_MarketData.getKLineData?symbol=' + code + '&scale=' + str(scale) + '&ma=no&datalen='+str(datalen)
histData = urllib.request.urlopen(links).read()
histData = str(histData).split('[')[1]
histData = histData[1:len(histData) - 4].split('},{')
datas = []
for i in range(0, len(histData)):
    column = {}
    dayData = histData[i].split(',')
    for j in range(0, len(dayData)):
       field = dayData[j].split(':"')
       if field[0] == 'day':
          column['date'] = field[1].replace('"', '')
       else:
          column[field[0]] = field[1].replace('"', '')
    datas.append(column)
return datas

PS:这里要说明的是新浪默认字段里,日期字段名称是day,个人觉得很是膈应,如果是小时数据或者5分钟数据还叫day岂不难受?所以改成了 date,其实没什么大的意义,个人喜好而已。

实时数据获取方式和历史数据差别不大,需要的也是完整代码,地址是:http://hq.sinajs.cn/list=sz000001,不同的是实时数据可以多支同时获取的,代码之间用逗号隔开就可以了,经过实验,貌似最多一次可以获取100只票的实时数据。

links = 'http://hq.sinajs.cn/list=' + codes
realTimeData = urllib.request.urlopen(links).read()
realTimeData = realTimeData.decode('gbk').replace('"','').split('\n')
data = {}
for i in range(len(realTimeData)-1):
    if len(realTimeData[i]) > 0:
       data[realTimeData[i].split('=')[0].split('_')[2][2:]] = realTimeData[i].split('=')[1].split(',')[:-1]
return data

PS:大家获取可别太多线程,我试过,会被封。

用Python抓取大智慧除权数据

继续做的数据分析,由于新浪获取的是未复权数据,所以在分析的时候出了些小问题,结果变得扑所迷离。于是又用了几天Tushare的获取复权数据功能,本来是写了个循环,每天自动获取,可是几乎每次下载都卡死了,这真是坑爹的网络。翻了翻Tushare的这段代码,Tushare的这个数据倒也是新浪的,但是是从网页上扒下来的,过程好像获取了两个地址的数据,貌似一个复权因子一个后复权数据,没细看了。反正我也不认为这是合理的获取方式,理想的当然是JSON或者XML之类的格式最好了。百度了一圈,没有发现哪里有免费的前复权数据,更找不到除权数据,心想找到除权数据也好啊,自己来复权。这过程中,得到了几个新浪地址,好像和复权有关,但是也没琢磨出怎么用。在这里先把地址贴下来,有时间再研究。

http://vip.stock.finance.sina.com.cn/api/json.php/BasicStockSrv.getStockFuQuanData?symbol=sz000001&type=hfq,和惯例一样,symbol指得是股票代码,而type按拼音来说应该是后复权,但我输入qfq之后得到并非前复权。

http://biz.finance.sina.com.cn/stock/flash_hq/kline_data.php?symbol=sz000001&begin_date=20100101&end_date=20161206,同上,只是有开始和结束时间。

新浪没办法了,看到很多网友都是从大智慧获取,毕竟资料还多,于是下载安装了个大智慧365。装好后在安装目录下有个Download文件夹,而除权数据就在PWR文件夹里,FIN文件夹里放的是财务数据,而ABK里则是大智慧的板块数据。

这里之所以写个日志是因为,365里的数据结构并非网络上写的四字节方式,四字节转赠股,四字节红利,四字节的配股,四字节配股价。我所看到的是120字节分段的形式,任一只股票先120字节的4-12字节为代码,接来下的120字节里前四个字节为除权日期,接着4-20字节为空,剩下的就是除权数据了,而这个数据并非分段的数值形式,而是一句话,比如10股送5股之类的,就是说得到这句话后还要进行分割。一只票除权了多少次就有多少个这样的120字节了,整理之后把代码贴上:

path = '/home/jeff/Share/DZH/Download/PWR/' 
name = ['full_sh.PWR','full_sz.PWR']
data = []
for i in name:
    exFile = open(path+i,'rb')
    exFile.seek(8)
    while True:
        exContent = exFile.read(120)
        if not exContent:
            break
        if exContent[:4] == b'\xff\xff\xff\xff':
            code = exContent[4:12].decode('gbk')
        elif len(exContent) > 0:
            date= struct.unpack("I", exContent[:4])[0]
            date= time.localtime(date)
            exlist = exContent[20:].decode('gbk').split('\x00')[0][2:].split()
            present = 0
            bonus = 0
            rationed = 0
            price = 0
            for i in exlist:
                if '送' in i:
                    present += float(re.findall(r"\d+\.?\d*",i)[0])
                if '增' in i:
                    present += float(re.findall(r"\d+\.?\d*",i)[0])
                if '派' in i:
                    bonus += float(re.findall(r"\d+\.?\d*",i)[0])
                if '价' in i:
                    price += float(re.findall(r"\d+\.?\d*",i)[0])
                if '配' in i:
                    rationed += float(re.findall(r"\d+\.?\d*",i)[0]) - price
            data.append({'code':code[2:],'date':date,'present':present,'bonus':bonus,'price':price,'rationed':rationed})
return data