其实用动态IP绑定一级域名倒不是难题,问问度娘或者谷哥都可以找到答案。真正的问题是营运商封锁了80端口,这才是巧妇难为无米之炊的关键。前段时间想了一条思路,需要一个独立的服务器,这个想法实验了半年,证实是不行的,但是折腾的结果是做站还是可以的。

好了,不故弄玄虚了,其实我也没办法解决掉80端口被封的问题,只是换了个思路利用443端口做https的网站。本文所需记录的关键也就是这里了,因为https需要证书的,付费的证书咱没考虑,而选择的Let's Encrypt免费证书有时间限制的,期限已到就需要重新申请。所以有两样需要计划任务,一个是重拨号之后IP地址的更新,一个是证书到期后证书的重新申请。

关于IP解析,目前国内好用的有Dnspod的和阿里的,并且这两家都提供有解析更新API,所以很是方便,我这次用的是Dnspod的解析,他的开发文档地址是:https://www.dnspod.cn/docs/index.html,按照他的手册来并不复杂,就不多说了。重点来说说Let's Encrypt免费证书的申请,Let's Encrypt对证书的时效是90天,就是说90天后证书就会时效,但这个可以用计划任务来处理,脚本写好后90天运行一次就可以了。但它还有另外一个问题,就是Let's Encrypt需要你向它证明这个域名是属于你的,这有两种方式,第一是在你服务器里建立一个文件,Let's Encrypt会通过访问传统http来访问到这个文件就可以了,对于这个方式那么问题就来了,80端口已经被封了如何能访问得到呢?所以此路是不通的。第二种方式就是在dns服务器发送一个记录类型为txt的自定义解析,记录值就是Let's Encrypt计算得出的字符串,由Let's Encrypt去访问,能访问得到并且一样就OK通过了,这个没有问题,并且动态IP解析本来就要做一样的工作。

申请的代码如下:

import osimport sysimport jsonimport copyimport dns.resolverfrom subprocess import Popen, PIPEtry:    from urllib.request import urlopen  # Python 3except ImportError:    from urllib2 import urlopen  # Python 2import platformimport reimport binasciiimport base64import hashlibimport shutilimport timeimport textwrapimport loggingimport dnspod  # 域名解析模块CA = "https://acme-v01.api.letsencrypt.org"system = platform.system()sourcePath = './source/'certsPath = './certs/'log = logging.getLogger(__name__)log.addHandler(logging.StreamHandler())log.setLevel(logging.INFO)class GetCerts(object):    """docstring for GetCerts"""    def __init__(self):        super(GetCerts, self).__init__()        self.config = self.__loadConfig()        self.domain = self.__loadDomain()        self.__cearteDir()        self.accountkey = 'account.key'        self.ops = self.config['ops'] if system == 'Windows' else 'openssl'        self.cearteAccountKey()        self.cearteDomainSource()        self.loadAccountKey()        self.regAccount()        for i in self.domain:            self.__loadPrivateKey(i)    @staticmethod    def __b64(b):        return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")    @staticmethod    def __waitChallengeDeployed(domain):        count = 0        is_deployed = GetCerts.__checkDomain(domain)        while count < 240 and not is_deployed:            time.sleep(10)            count += 1            is_deployed = GetCerts.__checkDomain(domain)        return is_deployed    @staticmethod    def __checkDomain(domain):        try:            answers = dns.resolver.query('_acme-challenge.{0}'.format(domain), 'TXT')            if len(answers) > 0:                return True        except dns.exception.DNSException as e:            log.debug("TXT not found: %s", e)        except:            log.error("Unexpected error: %s", sys.exc_info()[0])            raise        return False    # 建立账号证书文件夹    def __cearteDir(self):        if not os.path.exists(certsPath):            os.makedirs(certsPath)        if not os.path.exists(sourcePath):            os.makedirs(sourcePath)    # 载入程序设置文件    def __loadConfig(self):        try:            file = open('config.json', 'r')        except:            log.debug("缺少配置文件,请配置基本参数")            return False        else:            return json.loads(file.read())    # 载入域名列表文件    def __loadDomain(self):        try:            file = open('domain.json', 'r')        except:            log.debug("缺少域名文件,请配置域名参数")            return False        else:            return json.loads(file.read())    # 建立账号证书文件    def cearteAccountKey(self):        self.regKey = False        if os.path.isfile(self.accountkey):            command = [self.ops,'rsa','-in',self.accountkey,'-check']            proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)            proc.communicate()            if proc.returncode != 0:                command = [self.ops,'genrsa', '-out',self.accountkey,'4096']                proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)                proc.communicate()                self.regKey = True        else:            command = [self.ops,'genrsa', '-out',self.accountkey,'4096']            proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)            proc.communicate()            self.regKey = True          # 建立域名私钥和请求证书      def cearteDomainSource(self):        for i in self.domain:            csrFile = '%s%s.csr' % (sourcePath, i['domain'])            keyFile = '%s%s.key' % (sourcePath, i['domain'])            if os.path.isfile(keyFile):                command = [self.ops, 'rsa', '-in', keyFile, '-check']                proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)                proc.communicate()                if proc.returncode != 0:                    command = [self.ops,'genrsa', '-out', keyFile, '4096']                    proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)                    proc.communicate()            else:                command = [self.ops, 'genrsa', '-out', keyFile, '4096']                proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)                proc.communicate()            domains = str()            if len(i['record']) > 1:                for r in i['record']:                    domains += 'DNS:%s,' % i['domain'] if r == '@' else 'DNS:%s.%s,' % (r, i['domain'])            else:                domains = i['domain'] if  i['record'][0] == '@' else '%s.%s' % (i['record'][0], i['domain'])            if not os.path.isfile(csrFile):                if system == 'Windows':                    command = [self.ops, "req", "-new", "-sha256", "-nodes", "-keyout", keyFile,"-subj", "/CN={0}".format(domains), "-out", csrFile, '-config', self.config['ops_cnf']]                else:                    command = [self.ops, "req", "-new", "-sha256", "-nodes", "-keyout", keyFile,"-subj", "/CN={0}".format(domains), "-out", csrFile]                proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)                proc.communicate()    def loadAccountKey(self):        command = [self.ops, "rsa", "-in", self.accountkey, "-noout", "-text"]        proc = Popen(command,stdin=PIPE, stdout=PIPE, stderr=PIPE)        out, err = proc.communicate()        if proc.returncode != 0:            raise IOError("OpenSSL Error: {0}".format(err))        pub_hex, pub_exp = re.search(            r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",            out.decode('utf8'), re.MULTILINE | re.DOTALL).groups()        pub_exp = "{0:x}".format(int(pub_exp))        pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp        self.header = {            "alg": "RS256",            "jwk": {                "e": GetCerts.__b64(binascii.unhexlify(pub_exp.encode("utf-8"))),                "kty": "RSA",                "n": GetCerts.__b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),            },        }        account_key_json = json.dumps(self.header['jwk'], sort_keys=True, separators=(',', ':'))        self.thumbprint = GetCerts.__b64(hashlib.sha256(account_key_json.encode('utf8')).digest())    def __loadPrivateKey(self, domain):        self.dns = dnspod if domain['dns'] == 'dnspod' else alidns        self.dns.ID = domain['id']        self.dns.TOKEN = domain['token']        csrFile = '%s%s.csr' % (sourcePath, domain['domain'])        command = [self.ops, "req", "-in", csrFile, "-noout", "-text"]        proc = Popen(command, stdout = PIPE, stderr = PIPE)        out, err = proc.communicate()        if proc.returncode != 0:            raise IOError("Error loading {0}: {1}".format(csrFile, err))        domains = set([])        common_name = re.search(r"Subject:.*? CN=([^\s,;/]+)", out.decode('utf8'))        if common_name is not None:            domains.add(common_name.group(1))        subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE|re.DOTALL)        if subject_alt_names is not None:            for san in subject_alt_names.group(1).split(", "):                if san.startswith("DNS:"):                    domains.add(san[4:])        print('domains',domains)        for i in domains:            print(i)            code, result = self.__sendSignedRequest(CA + "/acme/new-authz", {                "resource": "new-authz",                "identifier": {"type": "dns", "value": i},            })            log.debug("Requesting challenges: {0} {1}".format(code, result))            if code != 201:                raise ValueError("Error requesting challenges: {0} {1}".format(code, result))            challenge = [c for c in json.loads(result.decode('utf8'))['challenges'] if c['type'] == "dns-01"][0]            token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token'])            keyauthorization = "{0}.{1}".format(token, self.thumbprint)            dnstoken = GetCerts.__b64(hashlib.sha256(keyauthorization.encode('utf8')).digest())            ndd = i.split(".")            if len(ndd) == 2:                subdomain = "_acme-challenge"                basedomain = ndd[0] + "." + ndd[1]            else:                subdomain = "_acme-challenge." + ndd[0]                basedomain = ndd[1] + "." + ndd[2]            self.dns.updateRecord(basedomain, subdomain, dnstoken, 'TXT')            try:                is_deployed = GetCerts.__waitChallengeDeployed(i)                if is_deployed:                    code, result = self.__sendSignedRequest(challenge['uri'], {                        "resource": "challenge",                        "keyAuthorization": keyauthorization,                    })                    # print('code:',code,'result:',result)                    if code != 202:                        raise ValueError("Error triggering challenge: {0} {1}".format(code, result))                    while True:                        try:                            resp = urlopen(challenge['uri'])                            challenge_status = json.loads(resp.read().decode('utf8'))                            # print('challenge_status:',challenge_status)                            log.debug(challenge_status)                        except IOError as e:                            raise ValueError("Error checking challenge: {0} {1}".format(                                e.code, json.loads(e.read().decode('utf8'))))                        if challenge_status['status'] == "pending":                            log.debug("Pending")                            time.sleep(1)                        elif challenge_status['status'] == "valid":                            log.debug("{0} verified!".format(i))                            break                        else:                            raise ValueError("{0} challenge did not pass: {1}".format(                                i, challenge_status))            finally:                pass        log.info("Signing certificate...")        command = [self.ops, "req", "-in", csrFile, "-outform", "DER"]        proc = Popen(command, stdout=PIPE, stderr=PIPE)        csr_der, err = proc.communicate()        code, result = self.__sendSignedRequest(CA + "/acme/new-cert", {            "resource": "new-cert",            "csr": self.__b64(csr_der),        })        if code != 201:            raise ValueError("Error signing certificate: {0} {1}".format(code, result))        sign_cert = """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n""".format(            "\n".join(textwrap.wrap(base64.b64encode(result).decode('utf8'), 64)))        sign_cert_file = open(certsPath+domain['domain']+'.crt', 'w')        sign_cert_file.write(sign_cert)        sign_cert_file.close()        log.info("Certificate signed %s", domain['domain']+'.crt')        shutil.copy(certsPath+domain['domain']+'.crt', certsPath+domain['domain']+'.pem')        chain_cert = urlopen("https://letsencrypt.org/certs/lets-encrypt-x3-cross-signed.pem")        with open(certsPath+domain['domain']+'.pem', 'ab') as output:            output.write(chain_cert.read())        log.info("Certificate chain signed %s", domain['domain']+'.chained.pem')    def __sendSignedRequest(self, url, payload):        payload64 = GetCerts.__b64(json.dumps(payload).encode('utf8'))        protected = copy.deepcopy(self.header)        protected["nonce"] = urlopen(CA + "/directory").headers['Replay-Nonce']        protected64 = GetCerts.__b64(json.dumps(protected).encode('utf8'))        command = [self.ops, "dgst", "-sha256", "-sign", self.accountkey]        proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)        out, err = proc.communicate("{0}.{1}".format(protected64, payload64).encode('utf8'))        if proc.returncode != 0:            raise IOError("OpenSSL Error: {0}".format(err))        data = json.dumps({            "header": self.header, "protected": protected64,            "payload": payload64, "signature": GetCerts.__b64(out),        })        try:            resp = urlopen(url, data.encode('utf8'))            return resp.getcode(), resp.read()        except IOError as e:            return getattr(e, "code", None), getattr(e, "read", e.__str__)()    def regAccount(self):        log.debug("Registering account...")        code, result = self.__sendSignedRequest(CA + "/acme/new-reg", {            "resource": "new-reg",            "agreement": "https://letsencrypt.org/documents/LE-SA-v1.1.1-August-1-2016.pdf"        })        if code == 201:            log.info("Account registered!")        elif code == 409:            log.debug("Already registered!")        else:            raise ValueError("Error registering: {0} {1}".format(code, result))    def sign(self):        passif __name__ == '__main__':    gc=GetCerts()

还有参数文件的格式如下:

{"ops":"d:/Wamp/Apache24/bin/openssl.exe","ops_cnf":"d:/Wamp/Apache24/conf/openssl.cnf","CA":"https://acme-v01.api.letsencrypt.org/directory","IP_VERSION":"","CONTACT_EMAIL":"","RENEW_DAYS":"30","KEY_ALGO":"rsa","KEYSIZE":"4096","LICENSE":"https://letsencrypt.org/documents/LE-SA-v1.1.1-August-1-2016.pdf"}

域名的配置文件:

[{"dns": "dnspod", "id": "你的id", "token": "你的token", "domain": "你的域名(不要W)", "record": ["www"]}]

代码保存下来,剩下的工作就是制定执行运行任务了,当然可以直接把输出目录直接定义到Apache的证书目录去,每次自动更新后Apache再次载入就可以了,这时候的网站就出现了一把绿色的钥匙而不是大大红色的叉了。