0
点赞
收藏
分享

微信扫一扫

SSL证书检查与报警

不管是iOS APP,微信小程序,还是浏览器,随着对安全要求的提高,HTTPS已经几乎是所有网站的标配的。作为运维,监控并保证SSL证书的可用性就成了一个很现实的问题。

在早期,域名不是很多的情况下,使用的SSL证书数量有限。可以采用手动录入,定期检查的方式来确保SSL证书在有效期内。但是随着业务的增长,手动录入到列表的方式可能会出现遗漏,于是采用了从域名解析的角度来处理。

不管是何种情况,证书检查的核心是可以通过域名(域名获取),自动判断SSL证书的有效期(证书有效性判断),并对即将到期的证书通过适当的方式进行报警通知(报警通知)。

证书有效性判断

这个是整个流程中的核心内容,在这里主要是用python代码实现,原理就是创建socket连接访问域名的443端口,获取证书信息。

代码如下

import socket
import ssl

def get_ssl_certificate_info(domain):
    try:
        # 建立一个与目标域名的SSL连接,并设置连接超时时间为5秒
        context = ssl.create_default_context()
        with socket.create_connection((domain, 443), timeout=5) as sock:
            with context.wrap_socket(sock, server_hostname=domain) as ssock:
                # 获取SSL证书
                cert = ssock.getpeercert()

        # 打印证书信息
        print("Subject:", cert['subject'])
        print("Issuer:", cert['issuer'])
        print("Valid From:", cert['notBefore'])
        print("Valid Until:", cert['notAfter'])
    except (socket.gaierror, ConnectionRefusedError, ssl.CertificateError, socket.timeout) as e:
        print(f"Error connecting to {domain}: {e}")

if __name__ == "__main__":
    domain = "example.com"  # 将此处替换为你要查询的域名
    get_ssl_certificate_info(domain)

一般来说443端口如果不通,可能是这个域名没有配置https服务,所以超时时间就值设定了5s。

SSL证书检查与报警_报警

域名获取

在上文提到过,在早期如果域名比较少,可以手动维护域名列表,但是随着业务的扩展,难免会出现遗留。这里就换个思路从DNS方面获取所有域名。

根据各个DNS服务商的SDK一般都可以获取到域名列表,这里以阿里云为例,通过python SDK 获取域名列表。

在以下代码中 

DescribeDomains 用于获取主域名

DescribeDomainRecords 用于获取域名记录

调用阿里云SDK需要ccess_key_id  access_key_secret

from alibabacloud_alidns20150109 import models as alidns_20150109_models
from alibabacloud_alidns20150109.client import Client as Alidns20150109Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient

class DescribeDomains:
    def __init__(self):
        pass

    @staticmethod
    def create_client(
        access_key_id: str,
        access_key_secret: str,
    ) -> Alidns20150109Client:
        """
        使用AK&SK初始化账号Client
        @param access_key_id:
        @param access_key_secret:
        @return: Client
        @throws Exception
        """
        config = open_api_models.Config(
            # 必填,您的 AccessKey ID,
            access_key_id=access_key_id,
            # 必填,您的 AccessKey Secret,
            access_key_secret=access_key_secret
        )
        # Endpoint 请参考 https://api.aliyun.com/product/Alidns
        config.endpoint = f'alidns.cn-hangzhou.aliyuncs.com'
        return Alidns20150109Client(config)

    @staticmethod
    def main(
        accessKeyId,
        accessKeySecret,
    ):
        # 请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID 和 ALIBABA_CLOUD_ACCESS_KEY_SECRET。 工程代码泄露可能会导致 AccessKey
        # 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考,建议使用更安全的 STS
        # 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html
        client = DescribeDomains.create_client(accessKeyId, accessKeySecret)
        describe_domains_request = alidns_20150109_models.DescribeDomainsRequest()
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            res = client.describe_domains_with_options(describe_domains_request, runtime)
            body = res.to_map()["body"]
            domains = []
            # 获取主域名列表并返回
            for d in body["Domains"]["Domain"]:
                domains.append(d["DomainName"])
            return True, domains
        except Exception as error:
            # 如有需要,请打印 error
            res = UtilClient.assert_as_string(error.message)
            return False, str(res)


class DescribeDomainRecords:
    def __init__(self):
        # 初始化查询参数
        self.page_number = 1
        self.page_size = 100
        self.total_records = []

    @staticmethod
    def create_client(
            access_key_id: str,
            access_key_secret: str,
    ) -> Alidns20150109Client:
        """
        使用AK&SK初始化账号Client
        @param access_key_id:
        @param access_key_secret:
        @return: Client
        @throws Exception
        """
        config = open_api_models.Config(
            # 必填,您的 AccessKey ID,
            access_key_id=access_key_id,
            # 必填,您的 AccessKey Secret,
            access_key_secret=access_key_secret
        )
        # Endpoint 请参考 https://api.aliyun.com/product/Alidns
        config.endpoint = f'alidns.cn-hangzhou.aliyuncs.com'
        return Alidns20150109Client(config)

    def main(
        self,
        accessKeyId,
        accessKeySecret,
        domain_name,
    ):
        while True:
            # 请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID 和 ALIBABA_CLOUD_ACCESS_KEY_SECRET。 工程代码泄露可能会导致 AccessKey
            # 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考,建议使用更安全的 STS
            # 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html
            client = DescribeDomainRecords.create_client(accessKeyId, accessKeySecret)
            describe_domain_records_request = alidns_20150109_models.DescribeDomainRecordsRequest(
                domain_name=domain_name,
                page_number=self.page_number,
                page_size=self.page_size
            )
            # 创建RuntimeOptions 实例并设置超时时间为3分钟
            runtime = util_models.RuntimeOptions()
            runtime.read_timeout = 180000
            try:
                # 复制代码运行请自行打印 API 的返回值
                res = client.describe_domain_records_with_options(describe_domain_records_request, runtime)
                body = res.to_map()["body"]
                records = body["DomainRecords"]["Record"]
                for d in records:
                    sub_domain = d["RR"] + "." + d["DomainName"]
                    self.total_records.append(sub_domain)

                # 判断是否还有下一页
                if len(records) < self.page_size:
                    return True, self.total_records
                else:
                    self.page_number += 1

            except Exception as error:
                # 如有需要,请打印 error
                res = UtilClient.assert_as_string(error.message)
                return False, str(res)

if __name__ == "__main__":
    key = "xxxx"
    secret = "yyyy"
    success, domains = DescribeDomains.main(key, secret)
    if success:
        for domain in domains:
            success, total_records = DescribeDomainRecords().main(key, secret, domain)
            if success:
                print(total_records)
            

报警通知

通知方式有多种,不管是钉钉还是微信,原理上大同小异。这里就不做过多赘述。


以上三部分组合在一起,再部署到crontab或者celery中就是一个完整的SSL证书有效期报警流程了。

举报

相关推荐

0 条评论