其实用动态IP绑定一级域名倒不是难题,问问度娘或者谷哥都可以找到答案。真正的问题是营运商封锁了80端口,这才是巧妇难为无米之炊的关键。前段时间想了一条思路,需要一个独立的服务器,这个想法实验了半年,证实是不行的,但是折腾的结果是做站还是可以的。
好了,不故弄玄虚了,其实我也没办法解决掉80端口被封的问题,只是换了个思路利用443端口做https的网站。本文所需记录的关键也就是这里了,因为https需要证书的,付费的证书咱没考虑,而选择的Let's Encrypt免费证书有时间限制的,期限已到就需要重新申请。所以有两样需要计划任务,一个是重拨号之后IP地址的更新,一个是证书到期后证书的重新申请。
关于IP解析,目前国内好用的有Dnspod的和阿里的,并且这两家都提供有解析更新API,所以很是方便,我这次用的是Dnspod的解析,他的开发文档地址是:https://www.dnspod.cn/docs/index.html,按照他的手册来并不复杂,就不多说了。重点来说说Let's Encrypt免费证书的申请,Let's Encrypt对证书的时效是90天,就是说90天后证书就会时效,但这个可以用计划任务来处理,脚本写好后90天运行一次就可以了。但它还有另外一个问题,就是Let's Encrypt需要你向它证明这个域名是属于你的,这有两种方式,第一是在你服务器里建立一个文件,Let's Encrypt会通过访问传统http来访问到这个文件就可以了,对于这个方式那么问题就来了,80端口已经被封了如何能访问得到呢?所以此路是不通的。第二种方式就是在dns服务器发送一个记录类型为txt的自定义解析,记录值就是Let's Encrypt计算得出的字符串,由Let's Encrypt去访问,能访问得到并且一样就OK通过了,这个没有问题,并且动态IP解析本来就要做一样的工作。
申请的代码如下:
import osimport sysimport jsonimport copyimport dns.resolverfrom subprocess import Popen, PIPEtry: from urllib.request import urlopen # Python 3except ImportError: from urllib2 import urlopen # Python 2import platformimport reimport binasciiimport base64import hashlibimport shutilimport timeimport textwrapimport loggingimport dnspod # 域名解析模块CA = "https://acme-v01.api.letsencrypt.org"system = platform.system()sourcePath = './source/'certsPath = './certs/'log = logging.getLogger(__name__)log.addHandler(logging.StreamHandler())log.setLevel(logging.INFO)class GetCerts(object): """docstring for GetCerts""" def __init__(self): super(GetCerts, self).__init__() self.config = self.__loadConfig() self.domain = self.__loadDomain() self.__cearteDir() self.accountkey = 'account.key' self.ops = self.config['ops'] if system == 'Windows' else 'openssl' self.cearteAccountKey() self.cearteDomainSource() self.loadAccountKey() self.regAccount() for i in self.domain: self.__loadPrivateKey(i) @staticmethod def __b64(b): return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "") @staticmethod def __waitChallengeDeployed(domain): count = 0 is_deployed = GetCerts.__checkDomain(domain) while count < 240 and not is_deployed: time.sleep(10) count += 1 is_deployed = GetCerts.__checkDomain(domain) return is_deployed @staticmethod def __checkDomain(domain): try: answers = dns.resolver.query('_acme-challenge.{0}'.format(domain), 'TXT') if len(answers) > 0: return True except dns.exception.DNSException as e: log.debug("TXT not found: %s", e) except: log.error("Unexpected error: %s", sys.exc_info()[0]) raise return False # 建立账号证书文件夹 def __cearteDir(self): if not os.path.exists(certsPath): os.makedirs(certsPath) if not os.path.exists(sourcePath): os.makedirs(sourcePath) # 载入程序设置文件 def __loadConfig(self): try: file = open('config.json', 'r') except: log.debug("缺少配置文件,请配置基本参数") return False else: return json.loads(file.read()) # 载入域名列表文件 def __loadDomain(self): try: file = open('domain.json', 'r') except: log.debug("缺少域名文件,请配置域名参数") return False else: return json.loads(file.read()) # 建立账号证书文件 def cearteAccountKey(self): self.regKey = False if os.path.isfile(self.accountkey): command = [self.ops,'rsa','-in',self.accountkey,'-check'] proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE) proc.communicate() if proc.returncode != 0: command = [self.ops,'genrsa', '-out',self.accountkey,'4096'] proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE) proc.communicate() self.regKey = True else: command = [self.ops,'genrsa', '-out',self.accountkey,'4096'] proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE) proc.communicate() self.regKey = True # 建立域名私钥和请求证书 def cearteDomainSource(self): for i in self.domain: csrFile = '%s%s.csr' % (sourcePath, i['domain']) keyFile = '%s%s.key' % (sourcePath, i['domain']) if os.path.isfile(keyFile): command = [self.ops, 'rsa', '-in', keyFile, '-check'] proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE) proc.communicate() if proc.returncode != 0: command = [self.ops,'genrsa', '-out', keyFile, '4096'] proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE) proc.communicate() else: command = [self.ops, 'genrsa', '-out', keyFile, '4096'] proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE) proc.communicate() domains = str() if len(i['record']) > 1: for r in i['record']: domains += 'DNS:%s,' % i['domain'] if r == '@' else 'DNS:%s.%s,' % (r, i['domain']) else: domains = i['domain'] if i['record'][0] == '@' else '%s.%s' % (i['record'][0], i['domain']) if not os.path.isfile(csrFile): if system == 'Windows': command = [self.ops, "req", "-new", "-sha256", "-nodes", "-keyout", keyFile,"-subj", "/CN={0}".format(domains), "-out", csrFile, '-config', self.config['ops_cnf']] else: command = [self.ops, "req", "-new", "-sha256", "-nodes", "-keyout", keyFile,"-subj", "/CN={0}".format(domains), "-out", csrFile] proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE) proc.communicate() def loadAccountKey(self): command = [self.ops, "rsa", "-in", self.accountkey, "-noout", "-text"] proc = Popen(command,stdin=PIPE, stdout=PIPE, stderr=PIPE) out, err = proc.communicate() if proc.returncode != 0: raise IOError("OpenSSL Error: {0}".format(err)) pub_hex, pub_exp = re.search( r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)", out.decode('utf8'), re.MULTILINE | re.DOTALL).groups() pub_exp = "{0:x}".format(int(pub_exp)) pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp self.header = { "alg": "RS256", "jwk": { "e": GetCerts.__b64(binascii.unhexlify(pub_exp.encode("utf-8"))), "kty": "RSA", "n": GetCerts.__b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), }, } account_key_json = json.dumps(self.header['jwk'], sort_keys=True, separators=(',', ':')) self.thumbprint = GetCerts.__b64(hashlib.sha256(account_key_json.encode('utf8')).digest()) def __loadPrivateKey(self, domain): self.dns = dnspod if domain['dns'] == 'dnspod' else alidns self.dns.ID = domain['id'] self.dns.TOKEN = domain['token'] csrFile = '%s%s.csr' % (sourcePath, domain['domain']) command = [self.ops, "req", "-in", csrFile, "-noout", "-text"] proc = Popen(command, stdout = PIPE, stderr = PIPE) out, err = proc.communicate() if proc.returncode != 0: raise IOError("Error loading {0}: {1}".format(csrFile, err)) domains = set([]) common_name = re.search(r"Subject:.*? CN=([^\s,;/]+)", out.decode('utf8')) if common_name is not None: domains.add(common_name.group(1)) subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE|re.DOTALL) if subject_alt_names is not None: for san in subject_alt_names.group(1).split(", "): if san.startswith("DNS:"): domains.add(san[4:]) print('domains',domains) for i in domains: print(i) code, result = self.__sendSignedRequest(CA + "/acme/new-authz", { "resource": "new-authz", "identifier": {"type": "dns", "value": i}, }) log.debug("Requesting challenges: {0} {1}".format(code, result)) if code != 201: raise ValueError("Error requesting challenges: {0} {1}".format(code, result)) challenge = [c for c in json.loads(result.decode('utf8'))['challenges'] if c['type'] == "dns-01"][0] token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token']) keyauthorization = "{0}.{1}".format(token, self.thumbprint) dnstoken = GetCerts.__b64(hashlib.sha256(keyauthorization.encode('utf8')).digest()) ndd = i.split(".") if len(ndd) == 2: subdomain = "_acme-challenge" basedomain = ndd[0] + "." + ndd[1] else: subdomain = "_acme-challenge." + ndd[0] basedomain = ndd[1] + "." + ndd[2] self.dns.updateRecord(basedomain, subdomain, dnstoken, 'TXT') try: is_deployed = GetCerts.__waitChallengeDeployed(i) if is_deployed: code, result = self.__sendSignedRequest(challenge['uri'], { "resource": "challenge", "keyAuthorization": keyauthorization, }) # print('code:',code,'result:',result) if code != 202: raise ValueError("Error triggering challenge: {0} {1}".format(code, result)) while True: try: resp = urlopen(challenge['uri']) challenge_status = json.loads(resp.read().decode('utf8')) # print('challenge_status:',challenge_status) log.debug(challenge_status) except IOError as e: raise ValueError("Error checking challenge: {0} {1}".format( e.code, json.loads(e.read().decode('utf8')))) if challenge_status['status'] == "pending": log.debug("Pending") time.sleep(1) elif challenge_status['status'] == "valid": log.debug("{0} verified!".format(i)) break else: raise ValueError("{0} challenge did not pass: {1}".format( i, challenge_status)) finally: pass log.info("Signing certificate...") command = [self.ops, "req", "-in", csrFile, "-outform", "DER"] proc = Popen(command, stdout=PIPE, stderr=PIPE) csr_der, err = proc.communicate() code, result = self.__sendSignedRequest(CA + "/acme/new-cert", { "resource": "new-cert", "csr": self.__b64(csr_der), }) if code != 201: raise ValueError("Error signing certificate: {0} {1}".format(code, result)) sign_cert = """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n""".format( "\n".join(textwrap.wrap(base64.b64encode(result).decode('utf8'), 64))) sign_cert_file = open(certsPath+domain['domain']+'.crt', 'w') sign_cert_file.write(sign_cert) sign_cert_file.close() log.info("Certificate signed %s", domain['domain']+'.crt') shutil.copy(certsPath+domain['domain']+'.crt', certsPath+domain['domain']+'.pem') chain_cert = urlopen("https://letsencrypt.org/certs/lets-encrypt-x3-cross-signed.pem") with open(certsPath+domain['domain']+'.pem', 'ab') as output: output.write(chain_cert.read()) log.info("Certificate chain signed %s", domain['domain']+'.chained.pem') def __sendSignedRequest(self, url, payload): payload64 = GetCerts.__b64(json.dumps(payload).encode('utf8')) protected = copy.deepcopy(self.header) protected["nonce"] = urlopen(CA + "/directory").headers['Replay-Nonce'] protected64 = GetCerts.__b64(json.dumps(protected).encode('utf8')) command = [self.ops, "dgst", "-sha256", "-sign", self.accountkey] proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE) out, err = proc.communicate("{0}.{1}".format(protected64, payload64).encode('utf8')) if proc.returncode != 0: raise IOError("OpenSSL Error: {0}".format(err)) data = json.dumps({ "header": self.header, "protected": protected64, "payload": payload64, "signature": GetCerts.__b64(out), }) try: resp = urlopen(url, data.encode('utf8')) return resp.getcode(), resp.read() except IOError as e: return getattr(e, "code", None), getattr(e, "read", e.__str__)() def regAccount(self): log.debug("Registering account...") code, result = self.__sendSignedRequest(CA + "/acme/new-reg", { "resource": "new-reg", "agreement": "https://letsencrypt.org/documents/LE-SA-v1.1.1-August-1-2016.pdf" }) if code == 201: log.info("Account registered!") elif code == 409: log.debug("Already registered!") else: raise ValueError("Error registering: {0} {1}".format(code, result)) def sign(self): passif __name__ == '__main__': gc=GetCerts()
还有参数文件的格式如下:
{"ops":"d:/Wamp/Apache24/bin/openssl.exe","ops_cnf":"d:/Wamp/Apache24/conf/openssl.cnf","CA":"https://acme-v01.api.letsencrypt.org/directory","IP_VERSION":"","CONTACT_EMAIL":"","RENEW_DAYS":"30","KEY_ALGO":"rsa","KEYSIZE":"4096","LICENSE":"https://letsencrypt.org/documents/LE-SA-v1.1.1-August-1-2016.pdf"}
域名的配置文件:
[{"dns": "dnspod", "id": "你的id", "token": "你的token", "domain": "你的域名(不要W)", "record": ["www"]}]
代码保存下来,剩下的工作就是制定执行运行任务了,当然可以直接把输出目录直接定义到Apache的证书目录去,每次自动更新后Apache再次载入就可以了,这时候的网站就出现了一把绿色的钥匙而不是大大红色的叉了。