0
点赞
收藏
分享

微信扫一扫

基于python3.6.8 自动化安装kafka集群

废话不多,直接上脚本,首先是zookeeper的脚本:

cat /root/zookeeper_install.py

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-

import getopt
import sys
import os
import tarfile
import subprocess
import socket
import logging
from tqdm import tqdm
import glob

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    filename='/tmp/zookeeper.log',
                    filemode='w')

def gprint(text):
    GREEN = '\033[32m'
    NORMAL = '\033[0m'
    print(GREEN + text + NORMAL)

def rprint(text):
    RED = '\033[31m'
    NORMAL = '\033[0m'
    print(RED + text + NORMAL)

def usage():
    filePath = os.path.abspath(__file__)
    gprint(f'''
    -p Port
    -i IPs (comma-separated)
    -v Version (default: 3.8.0)
    [ usage: {filePath} -p port -i ip1,ip2,ip3,ip4,ip5 -v 3.8.0 ] default port 2181 version 3.8.0
    ''')

def get_host_ip():
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    finally:
        s.close()
    return ip

class ZookeeperInstaller:
    def __init__(self, port, ip_list, version):
        self.port = port
        self.ip_list = ip_list
        self.version = version
        self.base_dir = '/usr/local/zookeeper'
        self.data_dir = '/data/zookeeper'
        print(f"Zookeeper version to install: {self.version}")

    def port_check(self):
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
                result = sock.connect_ex(('127.0.0.1', int(self.port)))
                if result == 0:
                    rprint(f'Port {self.port} is already in use')
                    logging.error(f'Port {self.port} is already in use')
                    sys.exit(1)
        except Exception as e:
            logging.error(f"Error checking port: {e}")

    def install_zookeeper(self):
        if os.path.isdir(self.base_dir):
            logging.info('Zookeeper base directory already exists, skipping installation')
        else:
            local_files = glob.glob(f'apache-zookeeper-{self.version}-bin.tar.gz')
            if local_files:
                filename = local_files[0]
                gprint(f'Using local zookeeper package: {filename}')
                self.extract_and_install(filename)
            else:
                gprint('Downloading zookeeper package')
                cmd = f'wget https://archive.apache.org/dist/zookeeper/zookeeper-{self.version}/apache-zookeeper-{self.version}-bin.tar.gz -P .'
                status, result = subprocess.getstatusoutput(cmd)
                if status == 0:
                    filename = f'apache-zookeeper-{self.version}-bin.tar.gz'
                    self.extract_and_install(filename)
                else:
                    logging.error(f'Failed to download zookeeper package: {result}')
                    sys.exit(1)

    def extract_and_install(self, filename):
        try:
            with tarfile.open(filename) as tar:
                tar.extractall(path='.')
            bin_name = filename.split('/')[-1].replace('.tar.gz', '')
            os.rename(bin_name, self.base_dir)
            logging.info('Zookeeper package installed successfully')
            subprocess.getstatusoutput(f'chown -R zookeeper:zookeeper {self.base_dir}')
        except Exception as e:
            logging.error(f"Error extracting and installing zookeeper: {e}")
            sys.exit(1)

    def configure_zookeeper(self):
        if os.path.isdir(self.data_dir):
            rprint(f'{self.data_dir} already exists, please check')
            logging.error(f'{self.data_dir} already exists, please check')
            sys.exit(1)
        else:
            gprint('Starting initial configuration')
            os.makedirs(self.data_dir, exist_ok=True)
            server_configs = '\n'.join([f'server.{idx + 1}={ip}:2888:3888' for idx, ip in enumerate(self.ip_list)])
            config_txt = f'''
tickTime=2000
initLimit=20
syncLimit=5
dataDir={self.data_dir}
clientPort={self.port}
autopurge.snapRetainCount=30
autopurge.purgeInterval=72
{server_configs}
'''
            with open(f'{self.base_dir}/conf/zoo.cfg', 'w') as f:
                f.write(config_txt)
            self.setup_user_and_permissions()

    def setup_user_and_permissions(self):
        subprocess.getstatusoutput('groupadd -f zookeeper')
        subprocess.getstatusoutput('id -u zookeeper &>/dev/null || useradd -Ms /sbin/nologin -g zookeeper zookeeper')
        subprocess.getstatusoutput(f'chown -R zookeeper:zookeeper {self.data_dir}')
        subprocess.getstatusoutput(f'chown -R zookeeper:zookeeper {self.base_dir}')

    def create_myid_file(self):
        myid_file = f'{self.data_dir}/myid'
        zookeeper_id = self.ip_list.index(get_host_ip()) + 1
        if os.path.isfile(myid_file):
            rprint(f'{myid_file} already exists')
            logging.error(f'{myid_file} already exists')
        else:
            with open(myid_file, 'w') as f:
                f.write(str(zookeeper_id))
            subprocess.getstatusoutput(f'chown zookeeper:zookeeper {myid_file}')

    def create_service_file(self):
        service_file = '/usr/lib/systemd/system/zookeeper.service'
        if os.path.isfile(service_file):
            rprint(f'{service_file} service name already exists')
            logging.error(f'{service_file} service name already exists')
        else:
            service_txt = '''[Unit]
Description=Zookeeper Service
After=network.target

[Service]
Type=forking
User=zookeeper
Group=zookeeper
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
LimitNOFILE=1024000
LimitMEMLOCK=65536

[Install]
WantedBy=multi-user.target'''
            with open(service_file, 'w') as f:
                f.write(service_txt)
            subprocess.getstatusoutput('systemctl daemon-reload')

    def start_zookeeper(self):
        cmd = 'sudo -u zookeeper /usr/local/zookeeper/bin/zkServer.sh start'
        status, output = subprocess.getstatusoutput(cmd)
        if status == 0:
            gprint('Zookeeper started successfully')
            logging.debug('Zookeeper started successfully')
        else:
            rprint('Failed to start zookeeper, check log for details')
            logging.debug(f'Failed to start zookeeper, status: {status}, output: {output}')
            status_info, output_info = subprocess.getstatusoutput('systemctl status zookeeper.service')
            logging.debug(f'Zookeeper status info: {output_info}')
            journal_info, journal_output = subprocess.getstatusoutput('journalctl -xe | tail -n 50')
            logging.debug(f'Journalctl output: {journal_output}')
            self.debug_logs()

    def debug_logs(self):
        # Print Zookeeper logs to help debugging
        zk_log_path = f'{self.base_dir}/logs'
        zk_logs = glob.glob(f'{zk_log_path}/*.log')
        for log_file in zk_logs:
            logging.debug(f'Log file: {log_file}')
            with open(log_file, 'r') as f:
                log_contents = f.read()
                logging.debug(f'Contents of {log_file}:\n{log_contents}')

if __name__ == '__main__':
    host_ip = get_host_ip()
    zookeeper_id = host_ip.split('.')[-1]
    if len(sys.argv) < 2:
        rprint("No parameters provided!")
        usage()
        sys.exit(1)
    try:
        opts, args = getopt.getopt(sys.argv[1:], "p:i:v:", ["port=", "ip=", "version="])
        zookeeperport = None
        ip_list = []
        version = '3.8.0'
        for name, value in opts:
            if name in ('-p', '--port'):
                zookeeperport = value
            elif name in ('-i', '--ip'):
                ip_list = value.split(',')
            elif name in ('-v', '--version'):
                version = value
        if not zookeeperport or not ip_list:
            rprint("Missing required parameters!")
            usage()
            sys.exit(1)
        for port in tqdm(zookeeperport.split(',')):
            installer = ZookeeperInstaller(port=port, ip_list=ip_list, version=version)
            installer.port_check()
            installer.install_zookeeper()
            installer.configure_zookeeper()
            installer.create_myid_file()
            installer.create_service_file()
            installer.start_zookeeper()
    except getopt.GetoptError as e:
        rprint(f"Parameter error: {e}")
        usage()
        sys.exit(1)

安装时候,可以执行如下命令

python3 zookeeper_install.py -p 2181 -v 3.8.0 -i 172.16.128.58,172.16.128.59,172.16.128.60

接着是kafka的脚本

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-

import getopt
import sys
import os
import tarfile
import subprocess
import socket
import logging
from tqdm import tqdm

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    filename='/tmp/kafka.log',
                    filemode='w')

def gprint(text):
    GREEN = '\033[32m'
    NORMAL = '\033[0m'
    print(GREEN + text + NORMAL)

def rprint(text):
    RED = '\033[31m'
    NORMAL = '\033[0m'
    print(RED + text + NORMAL)

def usage():
    filePath = os.path.abspath(__file__)
    gprint('''
    -p Port \n
    -i ip \n
    [ usage: %s -p port -i ip1,ip2,ip3,ip4,ip5 -v 3.6.0] default port 9092 version 3.6.0 ''' % filePath)

def get_host_ip():
    """
    查询本机ip地址
    :return: ip
    """
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    finally:
        s.close()
    return ip

class kafkaWork:
    ''' install before check '''

    def __init__(self, port, ip_list, version):
        self.port = port
        self.ip_list = ip_list
        self.version = version

    def portCheck(self):
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            result = sock.connect_ex(('127.0.0.1', int(self.port)))
            if result == 0:
                rprint('%s Current port to survive' % self.port)
                logging.error('%s Current port to survive' % self.port)
                sys.exit()
        except Exception as e:
            logging.error(e)

    def baseDirCheck(self):
        baseDir = '/usr/local/kafka/'
        if os.path.isdir(baseDir):
            logging.info('kafka cluster basedir exists, continue')
        else:
            filename = 'kafka_2.13-%s.tgz' % self.version
            if os.path.exists(filename):
                gprint(f'Using local kafka package: {filename}')
                self.extract_and_install(filename, baseDir)
            else:
                cmd = 'wget https://archive.apache.org/dist/kafka/{v}/kafka_2.13-{v}.tgz -P /opt/'.format(v=self.version)
                status, result = subprocess.getstatusoutput(cmd)
                if status == 0:
                    filename = '/opt/kafka_2.13-%s.tgz' % self.version
                    self.extract_and_install(filename, baseDir)
                else:
                    logging.error('Failed to download kafka package')

    def extract_and_install(self, filename, baseDir):
        try:
            tar = tarfile.open(filename)
            tar.extractall(path='/opt/')
            tar.close()
            os.chdir('/opt/')
            os.rename('kafka_2.13-%s' % self.version, baseDir)
            logging.info('kafka package installed successfully')
            xcmd = 'chown -R kafka. %s' % baseDir
            subprocess.getstatusoutput(xcmd)
        except Exception as e:
            logging.error(e)

    def dataDirCheck(self):
        dataDir = '/data/kafka/'
        if os.path.isdir(dataDir):
            rprint('%s already exists, please check' % dataDir)
            logging.error('[%s already exists, please check]' % dataDir)
            sys.exit()
        else:
            os.mkdir('/data/kafka')
            configTxt = '''broker.id={broker_id}
port={port}
listeners=PLAINTEXT://{host_ip}:{port}
auto.create.topics.enable=false
unclean.leader.election.enable=false
auto.leader.rebalance.enable=false
num.network.threads=3
num.io.threads=8
message.max.bytes=10000120
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=10
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=120
default.replication.factor=3
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect={zookeeper_connect}
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
min.insync.replicas=2
#request.required.acks=-1
default.replication.factor=3
num.replica.fetchers=2
controlled.shutdown.enable=true
'''.format(broker_id=broker_id, port=self.port, host_ip=host_ip, zookeeper_connect=','.join([f"{ip}:2181" for ip in self.ip_list]))
            fileName = '/usr/local/kafka/config/server.properties'
            with open(fileName, 'w') as f:
                f.write(configTxt)
            kafkaGroup = 'cat /etc/group | grep kafka'
            kafkaUser = 'cat /etc/passwd | grep kafka'
            gstatus, gresult = subprocess.getstatusoutput(kafkaGroup)
            if gstatus == 0:
                pass
            else:
                gcmd = 'groupadd kafka'
                subprocess.getstatusoutput(gcmd)
            ustatus, uresult = subprocess.getstatusoutput(kafkaUser)
            if uresult == 0:
                pass
            else:
                ucmd = 'useradd -Ms /sbin/nologin -g kafka kafka'
                subprocess.getstatusoutput(ucmd)
            changefile1 = 'sed -i s/-Xmx1G/-Xmx10G/g /usr/local/kafka/bin/kafka-server-start.sh'
            changefile2 = 'sed -i s/-Xms1G/-Xms10G/g /usr/local/kafka/bin/kafka-server-start.sh'
            addfile1 = "sed -i '29 a\\\texport JMX_PORT=9999' /usr/local/kafka/bin/kafka-server-start.sh"
            subprocess.getstatusoutput(changefile1)
            subprocess.getstatusoutput(changefile2)
            subprocess.getstatusoutput(addfile1)

    def serviceCheck(self):
        serviceFile = '/usr/lib/systemd/system/kafka.service'
        if os.path.isfile(serviceFile):
            rprint('%s service name already exists' % serviceFile)
            logging.error('[%s service name already exists]' % serviceFile)
        else:
            configTxt = '''[Unit]
Description=kafka.service
After=network.target remote-fs.target zookeeper.service

[Service]
Type=forking
User=kafka
Group=kafka
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
LimitNOFILE=1024000
LimitMEMLOCK=65536

[Install]
WantedBy=multi-user.target'''
            with open(serviceFile, 'w') as f:
                f.write(configTxt)
            cmd = 'systemctl daemon-reload'
            dmd = 'chown kafka. /usr/local/kafka/ -R'
            subprocess.getstatusoutput(cmd)
            subprocess.getstatusoutput(dmd)

    def kafkaSetup(self):
        kafkaLogDir = '/data/kafka/logs'
        if not os.path.exists(kafkaLogDir):
            os.makedirs(kafkaLogDir)
        subprocess.getstatusoutput('chown kafka.kafka %s' % kafkaLogDir)

    def kafkaStart(self):
        cmd = 'systemctl start kafka'
        status, output = subprocess.getstatusoutput(cmd)
        if status == 0:
            gprint('\t \033[5;1;32;40m kafka start success!\033[0m')
            logging.debug('\t \033[5;1;32;40mkafka start success!\033[0m')
        else:
            rprint('kafka start failed, please check log file')
            logging.debug('kafka start failed, please check log file')


if __name__ == '__main__':
    host_ip = get_host_ip()
    broker_id = host_ip.split('.')[3]
    if len(sys.argv) < 3:
        rprint("No Enter parameter!!!")
        usage()
    else:
        try:
            options, args = getopt.getopt(sys.argv[1:], "p:i:v:", ["port=", "ip=", "version="])
            for name, value in options:
                if name in ('-p', '-port'):
                    kafkaport = value
                elif name in ('-i', '-ip'):
                    ip = value
                    ip_list = ip.split(',')
                    if len(ip_list) < 3:
                        raise ValueError("At least 3 IP addresses are required")
                elif name in ('-v', '-version'):
                    version = value

            for port in tqdm(kafkaport.split(',')):
                workflow = kafkaWork(port=port, ip_list=ip_list, version=version)
                workflow.portCheck()
                workflow.baseDirCheck()
                workflow.dataDirCheck()
                workflow.serviceCheck()
                workflow.kafkaSetup()
                workflow.kafkaStart()
        except getopt.GetoptError:
            rprint("Please check if the parameters are incorrect")
            usage()
        except ValueError as ve:
            rprint(str(ve))
            usage()
        except IndexError:
            rprint("Insufficient number of IP addresses provided. Please provide at least 3 IP addresses.")
            usage()

安装效果图

python3 kafka_install.py -p 9092 -i 172.16.128.58,172.16.128.59,172.16.128.60 -v 3.4.0

基于python3.6.8 自动化安装kafka集群_python

举报

相关推荐

0 条评论