环境:PYTHON 2.7
import json
import urllib2
from urllib2 import URLError
import sys
import time
import datetime
reload(sys)
sys.setdefaultencoding('utf-8')
class ZabbixTools:
def __init__(self):
self.url = 'http://192.168.243.99//zabbix/api_jsonrpc.php'
self.header = {'User-agent' : 'Mozilla/5.0 (Windows NT 6.2; WOW64; rv:22.0) Gecko/20100101 Firefox/22.0',"Content-Type":"application/json"}
def user_login(self):
data = json.dumps({
"jsonrpc": "2.0",
"method": "user.login",
"params": {
"user": "username",
"password": "pwd"
},
"id": 0
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
print ("Auth Failed, please Check your name and password:", e.code)
else:
response = json.loads(result.read())
result.close()
self.authID = response['result']
return self.authID
def host_getbyip(self,hostip):
data = json.dumps({
"jsonrpc":"2.0",
"method":"host.get",
"params":{
"output":["hostid","name","status","available"],
"filter": {"ip": hostip}
},
"auth":self.user_login(),
"id":1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
if hasattr(e, 'reason'):
print 'We failed to reach a server.'
print 'Reason: ', e.reason
elif hasattr(e, 'code'):
print 'The server could not fulfill the request.'
print 'Error code: ', e.code
else:
response = json.loads(result.read())
result.close()
lens=len(response['result'])
if lens > 0:
print hostip,response['result'][0]['available'] +" "+response['result'][0]['name'] + " "+response['result'][0]['hostid']
return response['result'][0]['hostid']
else:
print "error "+hostip
return "error"
def host_updateproxyid(self,hostips,proxyids):
hostid=self.host_getbyip(hostips)
data = json.dumps({
"jsonrpc":"2.0",
"method":"host.update",
"params": {
"hostid": hostid,
"proxy_hostid": proxyids
},
"auth": self.user_login(),
"id":1
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
print "Error as ", e
else:
result.close()
print "update" + hostips + " okay"
def host_getbyiptags(self,hostip):
data = json.dumps({
"jsonrpc":"2.0",
"method":"host.get",
"params":{
"output":["hostid","name","status","available"],
"selectTags": ["tag","value"],
"filter": {"ip": hostip}
},
"auth":self.user_login(),
"id":1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
if hasattr(e, 'reason'):
print 'We failed to reach a server.'
print 'Reason: ', e.reason
elif hasattr(e, 'code'):
print 'The server could not fulfill the request.'
print 'Error code: ', e.code
else:
response = json.loads(result.read())
result.close()
lens=len(response['result'])
if lens > 0:
print response['result'][0]["tags"],len(response['result'][0]["tags"])
if len(response['result'][0]["tags"]) > 0:
return response['result'][0]["tags"]
else:
return None
else:
print "error"
def create_host_tags(self,ips,lists):
host_id=self.host_getbyip(ips)
oldtag = self.host_getbyiptags(ips)
if oldtag != None:
for items in oldtag:
lists.append(items)
print lists
data = json.dumps({
"jsonrpc":"2.0",
"method":"host.update",
"params":{
"hostid": host_id,
"tags": lists
},
"auth": self.authID,
"id": 1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
print "Error as ", e
else:
response = json.loads(result.read())
result.close()
print "tags is created!"
def retimes(self,times):
timeArray = time.strptime(times, "%Y-%m-%d %H:%M:%S")
timestamp = time.mktime(timeArray)
return timestamp
def create_host_maintenance(self,names,ips,starttimes,stoptimes):
starts=self.retimes(starttimes)
stops=self.retimes(stoptimes)
data = json.dumps({
"jsonrpc":"2.0",
"method":"maintenance.create",
"params": {
"name": names,
"active_since": starts,
"active_till": stops,
#"hostids": [ "37587" ],
"hostids": ips,
"timeperiods": [
{
"timeperiod_type": 0,
"start_date": starts,
"start_time": 0,
"period": int(int(stops)-int(starts))
}
]
},
"auth": self.authID,
"id": 1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
print "Error as ", e
else:
response = json.loads(result.read())
result.close()
print "maintenance is created!"
def maintenanceiplist(self):
lists = []
myfile = open("m1.txt", 'r')
for line in myfile:
con = line.split()
ipaddr = con[0].strip()
hostids = test.host_getbyip(ipaddr)
if hostids != "error":
lists.append(hostids)
return lists
if __name__ == '__main__':
test = ZabbixTools()
test.user_login()
#创建维护,将准备维护的主机写在同一目录的m1.txt
lists=test.maintenanceiplist()
nowtime = str(time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime(time.time())))
test.create_host_maintenance(nowtime, lists, "2021-11-29 19:40:00", "2021-11-29 20:30:00")
#创建标签
# test.host_getbyiptags("10.1.0.1")
# test.create_host_tags("10.1.0.1","tagname2","tagvalue2")
# lists=[{"tag":"tglnamest","value":"valuelistst"},{"tag":"tglname2st","value":"valuelist2st"},{"tag":"tglname3st","value":"valuelist3st"}]