利用动态IP绑定一级域名做Web服务器
其实用动态IP绑定一级域名倒不是难题,问问度娘或者谷哥都可以找到答案。真正的问题是营运商封锁了80端口,这才是巧妇难为无米之炊的关键。前段时间想了一条思路,需要一个独立的服务器,这个想法实验了半年,证实是不行的,但是折腾的结果是做站还是可以的。
好了,不故弄玄虚了,其实我也没办法解决掉80端口被封的问题,只是换了个思路利用443端口做https的网站。本文所需记录的关键也就是这里了,因为https需要证书的,付费的证书咱没考虑,而选择的Let's Encrypt免费证书有时间限制的,期限已到就需要重新申请。所以有两样需要计划任务,一个是重拨号之后IP地址的更新,一个是证书到期后证书的重新申请。
关于IP解析,目前国内好用的有Dnspod的和阿里的,并且这两家都提供有解析更新API,所以很是方便,我这次用的是Dnspod的解析,他的开发文档地址是:https://www.dnspod.cn/docs/index.html,按照他的手册来并不复杂,就不多说了。重点来说说Let's Encrypt免费证书的申请,Let's Encrypt对证书的时效是90天,就是说90天后证书就会时效,但这个可以用计划任务来处理,脚本写好后90天运行一次就可以了。但它还有另外一个问题,就是Let's Encrypt需要你向它证明这个域名是属于你的,这有两种方式,第一是在你服务器里建立一个文件,Let's Encrypt会通过访问传统http来访问到这个文件就可以了,对于这个方式那么问题就来了,80端口已经被封了如何能访问得到呢?所以此路是不通的。第二种方式就是在dns服务器发送一个记录类型为txt的自定义解析,记录值就是Let's Encrypt计算得出的字符串,由Let's Encrypt去访问,能访问得到并且一样就OK通过了,这个没有问题,并且动态IP解析本来就要做一样的工作。
申请的代码如下:
import os
import sys
import json
import copy
import dns.resolver
from subprocess import Popen, PIPE
try:
from urllib.request import urlopen # Python 3
except ImportError:
from urllib2 import urlopen # Python 2
import platform
import re
import binascii
import base64
import hashlib
import shutil
import time
import textwrap
import logging
import dnspod # 域名解析模块
CA = "https://acme-v01.api.letsencrypt.org"
system = platform.system()
sourcePath = './source/'
certsPath = './certs/'
log = logging.getLogger(__name__)
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
class GetCerts(object):
"""docstring for GetCerts"""
def __init__(self):
super(GetCerts, self).__init__()
self.config = self.__loadConfig()
self.domain = self.__loadDomain()
self.__cearteDir()
self.accountkey = 'account.key'
self.ops = self.config['ops'] if system == 'Windows' else 'openssl'
self.cearteAccountKey()
self.cearteDomainSource()
self.loadAccountKey()
self.regAccount()
for i in self.domain:
self.__loadPrivateKey(i)
@staticmethod
def __b64(b):
return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")
@staticmethod
def __waitChallengeDeployed(domain):
count = 0
is_deployed = GetCerts.__checkDomain(domain)
while count < 240 and not is_deployed:
time.sleep(10)
count += 1
is_deployed = GetCerts.__checkDomain(domain)
return is_deployed
@staticmethod
def __checkDomain(domain):
try:
answers = dns.resolver.query('_acme-challenge.{0}'.format(domain), 'TXT')
if len(answers) > 0:
return True
except dns.exception.DNSException as e:
log.debug("TXT not found: %s", e)
except:
log.error("Unexpected error: %s", sys.exc_info()[0])
raise
return False
# 建立账号证书文件夹
def __cearteDir(self):
if not os.path.exists(certsPath):
os.makedirs(certsPath)
if not os.path.exists(sourcePath):
os.makedirs(sourcePath)
# 载入程序设置文件
def __loadConfig(self):
try:
file = open('config.json', 'r')
except:
log.debug("缺少配置文件,请配置基本参数")
return False
else:
return json.loads(file.read())
# 载入域名列表文件
def __loadDomain(self):
try:
file = open('domain.json', 'r')
except:
log.debug("缺少域名文件,请配置域名参数")
return False
else:
return json.loads(file.read())
# 建立账号证书文件
def cearteAccountKey(self):
self.regKey = False
if os.path.isfile(self.accountkey):
command = [self.ops,'rsa','-in',self.accountkey,'-check']
proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
proc.communicate()
if proc.returncode != 0:
command = [self.ops,'genrsa', '-out',self.accountkey,'4096']
proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
proc.communicate()
self.regKey = True
else:
command = [self.ops,'genrsa', '-out',self.accountkey,'4096']
proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
proc.communicate()
self.regKey = True
# 建立域名私钥和请求证书
def cearteDomainSource(self):
for i in self.domain:
csrFile = '%s%s.csr' % (sourcePath, i['domain'])
keyFile = '%s%s.key' % (sourcePath, i['domain'])
if os.path.isfile(keyFile):
command = [self.ops, 'rsa', '-in', keyFile, '-check']
proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
proc.communicate()
if proc.returncode != 0:
command = [self.ops,'genrsa', '-out', keyFile, '4096']
proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
proc.communicate()
else:
command = [self.ops, 'genrsa', '-out', keyFile, '4096']
proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
proc.communicate()
domains = str()
if len(i['record']) > 1:
for r in i['record']:
domains += 'DNS:%s,' % i['domain'] if r == '@' else 'DNS:%s.%s,' % (r, i['domain'])
else:
domains = i['domain'] if i['record'][0] == '@' else '%s.%s' % (i['record'][0], i['domain'])
if not os.path.isfile(csrFile):
if system == 'Windows':
command = [self.ops, "req", "-new", "-sha256", "-nodes", "-keyout", keyFile,"-subj", "/CN={0}".format(domains), "-out", csrFile, '-config', self.config['ops_cnf']]
else:
command = [self.ops, "req", "-new", "-sha256", "-nodes", "-keyout", keyFile,"-subj", "/CN={0}".format(domains), "-out", csrFile]
proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
proc.communicate()
def loadAccountKey(self):
command = [self.ops, "rsa", "-in", self.accountkey, "-noout", "-text"]
proc = Popen(command,stdin=PIPE, stdout=PIPE, stderr=PIPE)
out, err = proc.communicate()
if proc.returncode != 0:
raise IOError("OpenSSL Error: {0}".format(err))
pub_hex, pub_exp = re.search(
r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
out.decode('utf8'), re.MULTILINE | re.DOTALL).groups()
pub_exp = "{0:x}".format(int(pub_exp))
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
self.header = {
"alg": "RS256",
"jwk": {
"e": GetCerts.__b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
"kty": "RSA",
"n": GetCerts.__b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
},
}
account_key_json = json.dumps(self.header['jwk'], sort_keys=True, separators=(',', ':'))
self.thumbprint = GetCerts.__b64(hashlib.sha256(account_key_json.encode('utf8')).digest())
def __loadPrivateKey(self, domain):
self.dns = dnspod if domain['dns'] == 'dnspod' else alidns
self.dns.ID = domain['id']
self.dns.TOKEN = domain['token']
csrFile = '%s%s.csr' % (sourcePath, domain['domain'])
command = [self.ops, "req", "-in", csrFile, "-noout", "-text"]
proc = Popen(command, stdout = PIPE, stderr = PIPE)
out, err = proc.communicate()
if proc.returncode != 0:
raise IOError("Error loading {0}: {1}".format(csrFile, err))
domains = set([])
common_name = re.search(r"Subject:.*? CN=([^\s,;/]+)", out.decode('utf8'))
if common_name is not None:
domains.add(common_name.group(1))
subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE|re.DOTALL)
if subject_alt_names is not None:
for san in subject_alt_names.group(1).split(", "):
if san.startswith("DNS:"):
domains.add(san[4:])
print('domains',domains)
for i in domains:
print(i)
code, result = self.__sendSignedRequest(CA + "/acme/new-authz", {
"resource": "new-authz",
"identifier": {"type": "dns", "value": i},
})
log.debug("Requesting challenges: {0} {1}".format(code, result))
if code != 201:
raise ValueError("Error requesting challenges: {0} {1}".format(code, result))
challenge = [c for c in json.loads(result.decode('utf8'))['challenges'] if c['type'] == "dns-01"][0]
token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token'])
keyauthorization = "{0}.{1}".format(token, self.thumbprint)
dnstoken = GetCerts.__b64(hashlib.sha256(keyauthorization.encode('utf8')).digest())
ndd = i.split(".")
if len(ndd) == 2:
subdomain = "_acme-challenge"
basedomain = ndd[0] + "." + ndd[1]
else:
subdomain = "_acme-challenge." + ndd[0]
basedomain = ndd[1] + "." + ndd[2]
self.dns.updateRecord(basedomain, subdomain, dnstoken, 'TXT')
try:
is_deployed = GetCerts.__waitChallengeDeployed(i)
if is_deployed:
code, result = self.__sendSignedRequest(challenge['uri'], {
"resource": "challenge",
"keyAuthorization": keyauthorization,
})
# print('code:',code,'result:',result)
if code != 202:
raise ValueError("Error triggering challenge: {0} {1}".format(code, result))
while True:
try:
resp = urlopen(challenge['uri'])
challenge_status = json.loads(resp.read().decode('utf8'))
# print('challenge_status:',challenge_status)
log.debug(challenge_status)
except IOError as e:
raise ValueError("Error checking challenge: {0} {1}".format(
e.code, json.loads(e.read().decode('utf8'))))
if challenge_status['status'] == "pending":
log.debug("Pending")
time.sleep(1)
elif challenge_status['status'] == "valid":
log.debug("{0} verified!".format(i))
break
else:
raise ValueError("{0} challenge did not pass: {1}".format(
i, challenge_status))
finally:
pass
log.info("Signing certificate...")
command = [self.ops, "req", "-in", csrFile, "-outform", "DER"]
proc = Popen(command, stdout=PIPE, stderr=PIPE)
csr_der, err = proc.communicate()
code, result = self.__sendSignedRequest(CA + "/acme/new-cert", {
"resource": "new-cert",
"csr": self.__b64(csr_der),
})
if code != 201:
raise ValueError("Error signing certificate: {0} {1}".format(code, result))
sign_cert = """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n""".format(
"\n".join(textwrap.wrap(base64.b64encode(result).decode('utf8'), 64)))
sign_cert_file = open(certsPath+domain['domain']+'.crt', 'w')
sign_cert_file.write(sign_cert)
sign_cert_file.close()
log.info("Certificate signed %s", domain['domain']+'.crt')
shutil.copy(certsPath+domain['domain']+'.crt', certsPath+domain['domain']+'.pem')
chain_cert = urlopen("https://letsencrypt.org/certs/lets-encrypt-x3-cross-signed.pem")
with open(certsPath+domain['domain']+'.pem', 'ab') as output:
output.write(chain_cert.read())
log.info("Certificate chain signed %s", domain['domain']+'.chained.pem')
def __sendSignedRequest(self, url, payload):
payload64 = GetCerts.__b64(json.dumps(payload).encode('utf8'))
protected = copy.deepcopy(self.header)
protected["nonce"] = urlopen(CA + "/directory").headers['Replay-Nonce']
protected64 = GetCerts.__b64(json.dumps(protected).encode('utf8'))
command = [self.ops, "dgst", "-sha256", "-sign", self.accountkey]
proc = Popen(command, stdin = PIPE, stdout = PIPE, stderr = PIPE)
out, err = proc.communicate("{0}.{1}".format(protected64, payload64).encode('utf8'))
if proc.returncode != 0:
raise IOError("OpenSSL Error: {0}".format(err))
data = json.dumps({
"header": self.header, "protected": protected64,
"payload": payload64, "signature": GetCerts.__b64(out),
})
try:
resp = urlopen(url, data.encode('utf8'))
return resp.getcode(), resp.read()
except IOError as e:
return getattr(e, "code", None), getattr(e, "read", e.__str__)()
def regAccount(self):
log.debug("Registering account...")
code, result = self.__sendSignedRequest(CA + "/acme/new-reg", {
"resource": "new-reg",
"agreement": "https://letsencrypt.org/documents/LE-SA-v1.1.1-August-1-2016.pdf"
})
if code == 201:
log.info("Account registered!")
elif code == 409:
log.debug("Already registered!")
else:
raise ValueError("Error registering: {0} {1}".format(code, result))
def sign(self):
pass
if __name__ == '__main__':
gc=GetCerts()
还有参数文件的格式如下:
{
"ops":"d:/Wamp/Apache24/bin/openssl.exe",
"ops_cnf":"d:/Wamp/Apache24/conf/openssl.cnf",
"CA":"https://acme-v01.api.letsencrypt.org/directory",
"IP_VERSION":"",
"CONTACT_EMAIL":"",
"RENEW_DAYS":"30",
"KEY_ALGO":"rsa",
"KEYSIZE":"4096",
"LICENSE":"https://letsencrypt.org/documents/LE-SA-v1.1.1-August-1-2016.pdf"
}
域名的配置文件:
[{"dns": "dnspod", "id": "你的id", "token": "你的token", "domain": "你的域名(不要W)", "record": ["www"]}]
代码保存下来,剩下的工作就是制定执行运行任务了,当然可以直接把输出目录直接定义到Apache的证书目录去,每次自动更新后Apache再次载入就可以了,这时候的网站就出现了一把绿色的钥匙而不是大大红色的叉了。
本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。