废话不多,直接上脚本,首先是zookeeper的脚本:
cat /root/zookeeper_install.py
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import getopt
import sys
import os
import tarfile
import subprocess
import socket
import logging
from tqdm import tqdm
import glob
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
filename='/tmp/zookeeper.log',
filemode='w')
def gprint(text):
GREEN = '\033[32m'
NORMAL = '\033[0m'
print(GREEN + text + NORMAL)
def rprint(text):
RED = '\033[31m'
NORMAL = '\033[0m'
print(RED + text + NORMAL)
def usage():
filePath = os.path.abspath(__file__)
gprint(f'''
-p Port
-i IPs (comma-separated)
-v Version (default: 3.8.0)
[ usage: {filePath} -p port -i ip1,ip2,ip3,ip4,ip5 -v 3.8.0 ] default port 2181 version 3.8.0
''')
def get_host_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
class ZookeeperInstaller:
def __init__(self, port, ip_list, version):
self.port = port
self.ip_list = ip_list
self.version = version
self.base_dir = '/usr/local/zookeeper'
self.data_dir = '/data/zookeeper'
print(f"Zookeeper version to install: {self.version}")
def port_check(self):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
result = sock.connect_ex(('127.0.0.1', int(self.port)))
if result == 0:
rprint(f'Port {self.port} is already in use')
logging.error(f'Port {self.port} is already in use')
sys.exit(1)
except Exception as e:
logging.error(f"Error checking port: {e}")
def install_zookeeper(self):
if os.path.isdir(self.base_dir):
logging.info('Zookeeper base directory already exists, skipping installation')
else:
local_files = glob.glob(f'apache-zookeeper-{self.version}-bin.tar.gz')
if local_files:
filename = local_files[0]
gprint(f'Using local zookeeper package: {filename}')
self.extract_and_install(filename)
else:
gprint('Downloading zookeeper package')
cmd = f'wget https://archive.apache.org/dist/zookeeper/zookeeper-{self.version}/apache-zookeeper-{self.version}-bin.tar.gz -P .'
status, result = subprocess.getstatusoutput(cmd)
if status == 0:
filename = f'apache-zookeeper-{self.version}-bin.tar.gz'
self.extract_and_install(filename)
else:
logging.error(f'Failed to download zookeeper package: {result}')
sys.exit(1)
def extract_and_install(self, filename):
try:
with tarfile.open(filename) as tar:
tar.extractall(path='.')
bin_name = filename.split('/')[-1].replace('.tar.gz', '')
os.rename(bin_name, self.base_dir)
logging.info('Zookeeper package installed successfully')
subprocess.getstatusoutput(f'chown -R zookeeper:zookeeper {self.base_dir}')
except Exception as e:
logging.error(f"Error extracting and installing zookeeper: {e}")
sys.exit(1)
def configure_zookeeper(self):
if os.path.isdir(self.data_dir):
rprint(f'{self.data_dir} already exists, please check')
logging.error(f'{self.data_dir} already exists, please check')
sys.exit(1)
else:
gprint('Starting initial configuration')
os.makedirs(self.data_dir, exist_ok=True)
server_configs = '\n'.join([f'server.{idx + 1}={ip}:2888:3888' for idx, ip in enumerate(self.ip_list)])
config_txt = f'''
tickTime=2000
initLimit=20
syncLimit=5
dataDir={self.data_dir}
clientPort={self.port}
autopurge.snapRetainCount=30
autopurge.purgeInterval=72
{server_configs}
'''
with open(f'{self.base_dir}/conf/zoo.cfg', 'w') as f:
f.write(config_txt)
self.setup_user_and_permissions()
def setup_user_and_permissions(self):
subprocess.getstatusoutput('groupadd -f zookeeper')
subprocess.getstatusoutput('id -u zookeeper &>/dev/null || useradd -Ms /sbin/nologin -g zookeeper zookeeper')
subprocess.getstatusoutput(f'chown -R zookeeper:zookeeper {self.data_dir}')
subprocess.getstatusoutput(f'chown -R zookeeper:zookeeper {self.base_dir}')
def create_myid_file(self):
myid_file = f'{self.data_dir}/myid'
zookeeper_id = self.ip_list.index(get_host_ip()) + 1
if os.path.isfile(myid_file):
rprint(f'{myid_file} already exists')
logging.error(f'{myid_file} already exists')
else:
with open(myid_file, 'w') as f:
f.write(str(zookeeper_id))
subprocess.getstatusoutput(f'chown zookeeper:zookeeper {myid_file}')
def create_service_file(self):
service_file = '/usr/lib/systemd/system/zookeeper.service'
if os.path.isfile(service_file):
rprint(f'{service_file} service name already exists')
logging.error(f'{service_file} service name already exists')
else:
service_txt = '''[Unit]
Description=Zookeeper Service
After=network.target
[Service]
Type=forking
User=zookeeper
Group=zookeeper
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
LimitNOFILE=1024000
LimitMEMLOCK=65536
[Install]
WantedBy=multi-user.target'''
with open(service_file, 'w') as f:
f.write(service_txt)
subprocess.getstatusoutput('systemctl daemon-reload')
def start_zookeeper(self):
cmd = 'sudo -u zookeeper /usr/local/zookeeper/bin/zkServer.sh start'
status, output = subprocess.getstatusoutput(cmd)
if status == 0:
gprint('Zookeeper started successfully')
logging.debug('Zookeeper started successfully')
else:
rprint('Failed to start zookeeper, check log for details')
logging.debug(f'Failed to start zookeeper, status: {status}, output: {output}')
status_info, output_info = subprocess.getstatusoutput('systemctl status zookeeper.service')
logging.debug(f'Zookeeper status info: {output_info}')
journal_info, journal_output = subprocess.getstatusoutput('journalctl -xe | tail -n 50')
logging.debug(f'Journalctl output: {journal_output}')
self.debug_logs()
def debug_logs(self):
# Print Zookeeper logs to help debugging
zk_log_path = f'{self.base_dir}/logs'
zk_logs = glob.glob(f'{zk_log_path}/*.log')
for log_file in zk_logs:
logging.debug(f'Log file: {log_file}')
with open(log_file, 'r') as f:
log_contents = f.read()
logging.debug(f'Contents of {log_file}:\n{log_contents}')
if __name__ == '__main__':
host_ip = get_host_ip()
zookeeper_id = host_ip.split('.')[-1]
if len(sys.argv) < 2:
rprint("No parameters provided!")
usage()
sys.exit(1)
try:
opts, args = getopt.getopt(sys.argv[1:], "p:i:v:", ["port=", "ip=", "version="])
zookeeperport = None
ip_list = []
version = '3.8.0'
for name, value in opts:
if name in ('-p', '--port'):
zookeeperport = value
elif name in ('-i', '--ip'):
ip_list = value.split(',')
elif name in ('-v', '--version'):
version = value
if not zookeeperport or not ip_list:
rprint("Missing required parameters!")
usage()
sys.exit(1)
for port in tqdm(zookeeperport.split(',')):
installer = ZookeeperInstaller(port=port, ip_list=ip_list, version=version)
installer.port_check()
installer.install_zookeeper()
installer.configure_zookeeper()
installer.create_myid_file()
installer.create_service_file()
installer.start_zookeeper()
except getopt.GetoptError as e:
rprint(f"Parameter error: {e}")
usage()
sys.exit(1)
安装时候,可以执行如下命令
python3 zookeeper_install.py -p 2181 -v 3.8.0 -i 172.16.128.58,172.16.128.59,172.16.128.60
接着是kafka的脚本
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import getopt
import sys
import os
import tarfile
import subprocess
import socket
import logging
from tqdm import tqdm
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
filename='/tmp/kafka.log',
filemode='w')
def gprint(text):
GREEN = '\033[32m'
NORMAL = '\033[0m'
print(GREEN + text + NORMAL)
def rprint(text):
RED = '\033[31m'
NORMAL = '\033[0m'
print(RED + text + NORMAL)
def usage():
filePath = os.path.abspath(__file__)
gprint('''
-p Port \n
-i ip \n
[ usage: %s -p port -i ip1,ip2,ip3,ip4,ip5 -v 3.6.0] default port 9092 version 3.6.0 ''' % filePath)
def get_host_ip():
"""
查询本机ip地址
:return: ip
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
class kafkaWork:
''' install before check '''
def __init__(self, port, ip_list, version):
self.port = port
self.ip_list = ip_list
self.version = version
def portCheck(self):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('127.0.0.1', int(self.port)))
if result == 0:
rprint('%s Current port to survive' % self.port)
logging.error('%s Current port to survive' % self.port)
sys.exit()
except Exception as e:
logging.error(e)
def baseDirCheck(self):
baseDir = '/usr/local/kafka/'
if os.path.isdir(baseDir):
logging.info('kafka cluster basedir exists, continue')
else:
filename = 'kafka_2.13-%s.tgz' % self.version
if os.path.exists(filename):
gprint(f'Using local kafka package: {filename}')
self.extract_and_install(filename, baseDir)
else:
cmd = 'wget https://archive.apache.org/dist/kafka/{v}/kafka_2.13-{v}.tgz -P /opt/'.format(v=self.version)
status, result = subprocess.getstatusoutput(cmd)
if status == 0:
filename = '/opt/kafka_2.13-%s.tgz' % self.version
self.extract_and_install(filename, baseDir)
else:
logging.error('Failed to download kafka package')
def extract_and_install(self, filename, baseDir):
try:
tar = tarfile.open(filename)
tar.extractall(path='/opt/')
tar.close()
os.chdir('/opt/')
os.rename('kafka_2.13-%s' % self.version, baseDir)
logging.info('kafka package installed successfully')
xcmd = 'chown -R kafka. %s' % baseDir
subprocess.getstatusoutput(xcmd)
except Exception as e:
logging.error(e)
def dataDirCheck(self):
dataDir = '/data/kafka/'
if os.path.isdir(dataDir):
rprint('%s already exists, please check' % dataDir)
logging.error('[%s already exists, please check]' % dataDir)
sys.exit()
else:
os.mkdir('/data/kafka')
configTxt = '''broker.id={broker_id}
port={port}
listeners=PLAINTEXT://{host_ip}:{port}
auto.create.topics.enable=false
unclean.leader.election.enable=false
auto.leader.rebalance.enable=false
num.network.threads=3
num.io.threads=8
message.max.bytes=10000120
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=10
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=120
default.replication.factor=3
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect={zookeeper_connect}
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
min.insync.replicas=2
#request.required.acks=-1
default.replication.factor=3
num.replica.fetchers=2
controlled.shutdown.enable=true
'''.format(broker_id=broker_id, port=self.port, host_ip=host_ip, zookeeper_connect=','.join([f"{ip}:2181" for ip in self.ip_list]))
fileName = '/usr/local/kafka/config/server.properties'
with open(fileName, 'w') as f:
f.write(configTxt)
kafkaGroup = 'cat /etc/group | grep kafka'
kafkaUser = 'cat /etc/passwd | grep kafka'
gstatus, gresult = subprocess.getstatusoutput(kafkaGroup)
if gstatus == 0:
pass
else:
gcmd = 'groupadd kafka'
subprocess.getstatusoutput(gcmd)
ustatus, uresult = subprocess.getstatusoutput(kafkaUser)
if uresult == 0:
pass
else:
ucmd = 'useradd -Ms /sbin/nologin -g kafka kafka'
subprocess.getstatusoutput(ucmd)
changefile1 = 'sed -i s/-Xmx1G/-Xmx10G/g /usr/local/kafka/bin/kafka-server-start.sh'
changefile2 = 'sed -i s/-Xms1G/-Xms10G/g /usr/local/kafka/bin/kafka-server-start.sh'
addfile1 = "sed -i '29 a\\\texport JMX_PORT=9999' /usr/local/kafka/bin/kafka-server-start.sh"
subprocess.getstatusoutput(changefile1)
subprocess.getstatusoutput(changefile2)
subprocess.getstatusoutput(addfile1)
def serviceCheck(self):
serviceFile = '/usr/lib/systemd/system/kafka.service'
if os.path.isfile(serviceFile):
rprint('%s service name already exists' % serviceFile)
logging.error('[%s service name already exists]' % serviceFile)
else:
configTxt = '''[Unit]
Description=kafka.service
After=network.target remote-fs.target zookeeper.service
[Service]
Type=forking
User=kafka
Group=kafka
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
LimitNOFILE=1024000
LimitMEMLOCK=65536
[Install]
WantedBy=multi-user.target'''
with open(serviceFile, 'w') as f:
f.write(configTxt)
cmd = 'systemctl daemon-reload'
dmd = 'chown kafka. /usr/local/kafka/ -R'
subprocess.getstatusoutput(cmd)
subprocess.getstatusoutput(dmd)
def kafkaSetup(self):
kafkaLogDir = '/data/kafka/logs'
if not os.path.exists(kafkaLogDir):
os.makedirs(kafkaLogDir)
subprocess.getstatusoutput('chown kafka.kafka %s' % kafkaLogDir)
def kafkaStart(self):
cmd = 'systemctl start kafka'
status, output = subprocess.getstatusoutput(cmd)
if status == 0:
gprint('\t \033[5;1;32;40m kafka start success!\033[0m')
logging.debug('\t \033[5;1;32;40mkafka start success!\033[0m')
else:
rprint('kafka start failed, please check log file')
logging.debug('kafka start failed, please check log file')
if __name__ == '__main__':
host_ip = get_host_ip()
broker_id = host_ip.split('.')[3]
if len(sys.argv) < 3:
rprint("No Enter parameter!!!")
usage()
else:
try:
options, args = getopt.getopt(sys.argv[1:], "p:i:v:", ["port=", "ip=", "version="])
for name, value in options:
if name in ('-p', '-port'):
kafkaport = value
elif name in ('-i', '-ip'):
ip = value
ip_list = ip.split(',')
if len(ip_list) < 3:
raise ValueError("At least 3 IP addresses are required")
elif name in ('-v', '-version'):
version = value
for port in tqdm(kafkaport.split(',')):
workflow = kafkaWork(port=port, ip_list=ip_list, version=version)
workflow.portCheck()
workflow.baseDirCheck()
workflow.dataDirCheck()
workflow.serviceCheck()
workflow.kafkaSetup()
workflow.kafkaStart()
except getopt.GetoptError:
rprint("Please check if the parameters are incorrect")
usage()
except ValueError as ve:
rprint(str(ve))
usage()
except IndexError:
rprint("Insufficient number of IP addresses provided. Please provide at least 3 IP addresses.")
usage()
安装效果图
python3 kafka_install.py -p 9092 -i 172.16.128.58,172.16.128.59,172.16.128.60 -v 3.4.0