其实用动态IP绑定一级域名倒不是难题,问问度娘或者谷哥都可以找到答案。真正的问题是营运商封锁了80端口,这才是巧妇难为无米之炊的关键。前段时间想了一条思路,需要一个独立的服务器,这个想法实验了半年,证实是不行的,但是折腾的结果是做站还是可以的。

好了,不故弄玄虚了,其实我也没办法解决掉80端口被封的问题,只是换了个思路利用443端口做https的网站。本文所需记录的关键也就是这里了,因为https需要证书的,付费的证书咱没考虑,而选择的Let's Encrypt免费证书有时间限制的,期限已到就需要重新申请。所以有两样需要计划任务,一个是重拨号之后IP地址的更新,一个是证书到期后证书的重新申请。

关于IP解析,目前国内好用的有Dnspod的和阿里的,并且这两家都提供有解析更新API,所以很是方便,我这次用的是Dnspod的解析,他的开发文档地址是:https://www.dnspod.cn/docs/index.html,按照他的手册来并不复杂,就不多说了。重点来说说Let's Encrypt免费证书的申请,Let's Encrypt对证书的时效是90天,就是说90天后证书就会时效,但这个可以用计划任务来处理,脚本写好后90天运行一次就可以了。但它还有另外一个问题,就是Let's Encrypt需要你向它证明这个域名是属于你的,这有两种方式,第一是在你服务器里建立一个文件,Let's Encrypt会通过访问传统http来访问到这个文件就可以了,对于这个方式那么问题就来了,80端口已经被封了如何能访问得到呢?所以此路是不通的。第二种方式就是在dns服务器发送一个记录类型为txt的自定义解析,记录值就是Let's Encrypt计算得出的字符串,由Let's Encrypt去访问,能访问得到并且一样就OK通过了,这个没有问题,并且动态IP解析本来就要做一样的工作。

申请的代码如下:

import os
import sys
import json
import copy
import dns.resolver
from subprocess import Popen, PIPE
try:
    from urllib.request import urlopen  # Python 3
except ImportError:
    from urllib2 import urlopen  # Python 2
import platform
import re
import binascii
import base64
import hashlib
import shutil
import time
import textwrap
import logging
import dnspod  # 域名解析模块
CA = "https://acme-v01.api.letsencrypt.org"
system = platform.system()
sourcePath = './source/'
certsPath = './certs/'
log = logging.getLogger(__name__)
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
class GetCerts(object):
    """docstring for GetCerts"""
    def __init__(self):
        super(GetCerts, self).__init__()
        self.config = self.__loadConfig()
        self.domain = self.__loadDomain()
        self.__cearteDir()
        self.accountkey = 'account.key'
        self.ops = self.config['ops'] if system == 'Windows' else 'openssl'
        self.cearteAccountKey()
        self.cearteDomainSource()
        self.loadAccountKey()
        self.regAccount()
        for i in self.domain:
            self.__loadPrivateKey(i)
    @staticmethod
    def __b64(b):
        return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")
    @staticmethod
    def __waitChallengeDeployed(domain):
        count = 0
        is_deployed = GetCerts.__checkDomain(domain)
        while count < 240 and not is_deployed:
            time.sleep(10)
            count += 1
            is_deployed = GetCerts.__checkDomain(domain)
        return is_deployed
    @staticmethod
    def __checkDomain(domain):
        try:
            answers = dns.resolver.query('_acme-challenge.{0}'.format(domain), 'TXT')
            if len(answers) > 0:
                return True
        except dns.exception.DNSException as e:
            log.debug("TXT not found: %s", e)
        except:
            log.error("Unexpected error: %s", sys.exc_info()[0])
            raise
        return False
    # 建立账号证书文件夹
    def __cearteDir(self):
        if not os.path.exists(certsPath):
            os.makedirs(certsPath)
        if not os.path.exists(sourcePath):
            os.makedirs(sourcePath)
    # 载入程序设置文件
    def __loadConfig(self):
        try:
            file = open('config.json', 'r')
        except:
            log.debug("缺少配置文件,请配置基本参数")
            return False
        else:
            return json.loads(file.read())
    # 载入域名列表文件
    def __loadDomain(self):
        try:
            file = open('domain.json', 'r')
        except:
            log.debug("缺少域名文件,请配置域名参数")
            return False
        else:
            return json.loads(file.read())
    # 建立账号证书文件
    def cearteAccountKey(self):
        self.regKey = False
        if os.path.isfile(self.accountkey):
            command = [self.ops,'rsa','-in',self.accountkey,'-check']
            proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
            proc.communicate()
            if proc.returncode != 0:
                command = [self.ops,'genrsa', '-out',self.accountkey,'4096']
                proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
                proc.communicate()
                self.regKey = True
        else:
            command = [self.ops,'genrsa', '-out',self.accountkey,'4096']
            proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
            proc.communicate()
            self.regKey = True
      
    # 建立域名私钥和请求证书  
    def cearteDomainSource(self):
        for i in self.domain:
            csrFile = '%s%s.csr' % (sourcePath, i['domain'])
            keyFile = '%s%s.key' % (sourcePath, i['domain'])
            if os.path.isfile(keyFile):
                command = [self.ops, 'rsa', '-in', keyFile, '-check']
                proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
                proc.communicate()
                if proc.returncode != 0:
                    command = [self.ops,'genrsa', '-out', keyFile, '4096']
                    proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
                    proc.communicate()
            else:
                command = [self.ops, 'genrsa', '-out', keyFile, '4096']
                proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
                proc.communicate()
            domains = str()
            if len(i['record']) > 1:
                for r in i['record']:
                    domains += 'DNS:%s,' % i['domain'] if r == '@' else 'DNS:%s.%s,' % (r, i['domain'])
            else:
                domains = i['domain'] if  i['record'][0] == '@' else '%s.%s' % (i['record'][0], i['domain'])
            if not os.path.isfile(csrFile):
                if system == 'Windows':
                    command = [self.ops, "req", "-new", "-sha256", "-nodes", "-keyout", keyFile,"-subj", "/CN={0}".format(domains), "-out", csrFile, '-config', self.config['ops_cnf']]
                else:
                    command = [self.ops, "req", "-new", "-sha256", "-nodes", "-keyout", keyFile,"-subj", "/CN={0}".format(domains), "-out", csrFile]
                proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
                proc.communicate()
    def loadAccountKey(self):
        command = [self.ops, "rsa", "-in", self.accountkey, "-noout", "-text"]
        proc = Popen(command,stdin=PIPE, stdout=PIPE, stderr=PIPE)
        out, err = proc.communicate()
        if proc.returncode != 0:
            raise IOError("OpenSSL Error: {0}".format(err))
        pub_hex, pub_exp = re.search(
            r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
            out.decode('utf8'), re.MULTILINE | re.DOTALL).groups()
        pub_exp = "{0:x}".format(int(pub_exp))
        pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
        self.header = {
            "alg": "RS256",
            "jwk": {
                "e": GetCerts.__b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
                "kty": "RSA",
                "n": GetCerts.__b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
            },
        }
        account_key_json = json.dumps(self.header['jwk'], sort_keys=True, separators=(',', ':'))
        self.thumbprint = GetCerts.__b64(hashlib.sha256(account_key_json.encode('utf8')).digest())
    def __loadPrivateKey(self, domain):
        self.dns = dnspod if domain['dns'] == 'dnspod' else alidns
        self.dns.ID = domain['id']
        self.dns.TOKEN = domain['token']
        csrFile = '%s%s.csr' % (sourcePath, domain['domain'])
        command = [self.ops, "req", "-in", csrFile, "-noout", "-text"]
        proc = Popen(command, stdout = PIPE, stderr = PIPE)
        out, err = proc.communicate()
        if proc.returncode != 0:
            raise IOError("Error loading {0}: {1}".format(csrFile, err))
        domains = set([])
        common_name = re.search(r"Subject:.*? CN=([^\s,;/]+)", out.decode('utf8'))
        if common_name is not None:
            domains.add(common_name.group(1))
        subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE|re.DOTALL)
        if subject_alt_names is not None:
            for san in subject_alt_names.group(1).split(", "):
                if san.startswith("DNS:"):
                    domains.add(san[4:])
        print('domains',domains)
        for i in domains:
            print(i)
            code, result = self.__sendSignedRequest(CA + "/acme/new-authz", {
                "resource": "new-authz",
                "identifier": {"type": "dns", "value": i},
            })
            log.debug("Requesting challenges: {0} {1}".format(code, result))
            if code != 201:
                raise ValueError("Error requesting challenges: {0} {1}".format(code, result))
            challenge = [c for c in json.loads(result.decode('utf8'))['challenges'] if c['type'] == "dns-01"][0]
            token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token'])
            keyauthorization = "{0}.{1}".format(token, self.thumbprint)
            dnstoken = GetCerts.__b64(hashlib.sha256(keyauthorization.encode('utf8')).digest())
            ndd = i.split(".")
            if len(ndd) == 2:
                subdomain = "_acme-challenge"
                basedomain = ndd[0] + "." + ndd[1]
            else:
                subdomain = "_acme-challenge." + ndd[0]
                basedomain = ndd[1] + "." + ndd[2]
            self.dns.updateRecord(basedomain, subdomain, dnstoken, 'TXT')
            try:
                is_deployed = GetCerts.__waitChallengeDeployed(i)
                if is_deployed:
                    code, result = self.__sendSignedRequest(challenge['uri'], {
                        "resource": "challenge",
                        "keyAuthorization": keyauthorization,
                    })
                    # print('code:',code,'result:',result)
                    if code != 202:
                        raise ValueError("Error triggering challenge: {0} {1}".format(code, result))
                    while True:
                        try:
                            resp = urlopen(challenge['uri'])
                            challenge_status = json.loads(resp.read().decode('utf8'))
                            # print('challenge_status:',challenge_status)
                            log.debug(challenge_status)
                        except IOError as e:
                            raise ValueError("Error checking challenge: {0} {1}".format(
                                e.code, json.loads(e.read().decode('utf8'))))
                        if challenge_status['status'] == "pending":
                            log.debug("Pending")
                            time.sleep(1)
                        elif challenge_status['status'] == "valid":
                            log.debug("{0} verified!".format(i))
                            break
                        else:
                            raise ValueError("{0} challenge did not pass: {1}".format(
                                i, challenge_status))
            finally:
                pass
        log.info("Signing certificate...")
        command = [self.ops, "req", "-in", csrFile, "-outform", "DER"]
        proc = Popen(command, stdout=PIPE, stderr=PIPE)
        csr_der, err = proc.communicate()
        code, result = self.__sendSignedRequest(CA + "/acme/new-cert", {
            "resource": "new-cert",
            "csr": self.__b64(csr_der),
        })
        if code != 201:
            raise ValueError("Error signing certificate: {0} {1}".format(code, result))
        sign_cert = """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n""".format(
            "\n".join(textwrap.wrap(base64.b64encode(result).decode('utf8'), 64)))
        sign_cert_file = open(certsPath+domain['domain']+'.crt', 'w')
        sign_cert_file.write(sign_cert)
        sign_cert_file.close()
        log.info("Certificate signed %s", domain['domain']+'.crt')
        shutil.copy(certsPath+domain['domain']+'.crt', certsPath+domain['domain']+'.pem')
        chain_cert = urlopen("https://letsencrypt.org/certs/lets-encrypt-x3-cross-signed.pem")
        with open(certsPath+domain['domain']+'.pem', 'ab') as output:
            output.write(chain_cert.read())
        log.info("Certificate chain signed %s", domain['domain']+'.chained.pem')
    def __sendSignedRequest(self, url, payload):
        payload64 = GetCerts.__b64(json.dumps(payload).encode('utf8'))
        protected = copy.deepcopy(self.header)
        protected["nonce"] = urlopen(CA + "/directory").headers['Replay-Nonce']
        protected64 = GetCerts.__b64(json.dumps(protected).encode('utf8'))
        command = [self.ops, "dgst", "-sha256", "-sign", self.accountkey]
        proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
        out, err = proc.communicate("{0}.{1}".format(protected64, payload64).encode('utf8'))
        if proc.returncode != 0:
            raise IOError("OpenSSL Error: {0}".format(err))
        data = json.dumps({
            "header": self.header, "protected": protected64,
            "payload": payload64, "signature": GetCerts.__b64(out),
        })
        try:
            resp = urlopen(url, data.encode('utf8'))
            return resp.getcode(), resp.read()
        except IOError as e:
            return getattr(e, "code", None), getattr(e, "read", e.__str__)()
    def regAccount(self):
        log.debug("Registering account...")
        code, result = self.__sendSignedRequest(CA + "/acme/new-reg", {
            "resource": "new-reg",
            "agreement": "https://letsencrypt.org/documents/LE-SA-v1.1.1-August-1-2016.pdf"
        })
        if code == 201:
            log.info("Account registered!")
        elif code == 409:
            log.debug("Already registered!")
        else:
            raise ValueError("Error registering: {0} {1}".format(code, result))
    def sign(self):
        pass
if __name__ == '__main__':
    gc=GetCerts()

还有参数文件的格式如下:

{
"ops":"d:/Wamp/Apache24/bin/openssl.exe",
"ops_cnf":"d:/Wamp/Apache24/conf/openssl.cnf",
"CA":"https://acme-v01.api.letsencrypt.org/directory",
"IP_VERSION":"",
"CONTACT_EMAIL":"",
"RENEW_DAYS":"30",
"KEY_ALGO":"rsa",
"KEYSIZE":"4096",
"LICENSE":"https://letsencrypt.org/documents/LE-SA-v1.1.1-August-1-2016.pdf"
}

域名的配置文件:

[{"dns": "dnspod", "id": "你的id", "token": "你的token", "domain": "你的域名(不要W)", "record": ["www"]}]

代码保存下来,剩下的工作就是制定执行运行任务了,当然可以直接把输出目录直接定义到Apache的证书目录去,每次自动更新后Apache再次载入就可以了,这时候的网站就出现了一把绿色的钥匙而不是大大红色的叉了。