通过pyrad模块,实现一个简单的radius论证服务。准入规则从数据库获取。
此开发是为了辅助 vmware connection server 作双因子认证,从而实现对安全服务器接入的链接服务器加多一层认证,实现外网连入云桌面的准入控制。
准入规则的修改是简单的表操作,可以结合其它系统来管理(我这边案例是用了泛微的OA流程审批,从而实现准入经过流程审批后生效)
程序运行后,即会开启一个认证服务
设置vmware connection server 的认证
通过上面设置,经过此链接服务器的云桌面客户端就会多加了一层认证,因为我这里只做准入,所以验证对用户是透明的,只是一次录入,通行码即原来ad用户密码。 如下图所示,可以看到有个通行码的认证了。
代码相对简洁:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Project: radius
# File : server.py
# Author : Long.Xu <fangkailove@yeah.net>
# http://gnolux.blog.csdn.net
# QQ:26564303 weixin:wxgnolux
# Time : 2022/3/17 20:29
# Copyright 2022 Long.Xu All rights Reserved.
from __future__ import print_function
import logging
import os.path
import cx_Oracle
import poll
from pyrad import dictionary, packet, server
poll.install()
logging.basicConfig(filename="radius.log", level="DEBUG",
format="%(asctime)s [%(levelname)-8s] %(message)s")
class FakeServer(server.Server):
def HandleAuthPacket(self, pkt):
# logging.debug("Received an authentication request")
# for attr in pkt.keys():
# logging.debug("%s: %s" % (attr, pkt[attr]))
reply = self.CreateReplyPacket(pkt)
try:
reply.code = packet.AccessReject
if pkt["User-Name"]:
userid = pkt["User-Name"][0]
#取数据库数据作用户验证
db = cx_Oracle.connect('user', 'pwd', '192.168.123.3:1521/ecology')
cur = db.cursor()
sql = """
select GH, XM,kssj, JSSJ, QZSX from ecology.uf_clouddesktop
where gh=:userid
and trunc(sysdate,'dd') between to_date(kssj,'yyyy-mm-dd') and to_date(jssj, 'yyyy-mm-dd')
and qzsx = 1
"""
# sql = "select * from ecology.hrmresource where loginid = :userid"
cur.execute(sql, userid=userid)
rows = cur.fetchall()
if len(rows) > 0:
reply.code = packet.AccessAccept
logging.debug("%s,AccessAccept" % userid)
else:
logging.debug("%s,AccessReject" % userid)
cur.close()
db.close()
except Exception as e:
logging.error(str(e))
self.SendReplyPacket(pkt.fd, reply)
def HandleAcctPacket(self, pkt):
print("Received an accounting request")
print("Attributes: ")
for attr in pkt.keys():
print("%s: %s" % (attr, pkt[attr]))
reply = self.CreateReplyPacket(pkt)
self.SendReplyPacket(pkt.fd, reply)
def HandleCoaPacket(self, pkt):
print("Received an coa request")
print("Attributes: ")
for attr in pkt.keys():
print("%s: %s" % (attr, pkt[attr]))
reply = self.CreateReplyPacket(pkt)
self.SendReplyPacket(pkt.fd, reply)
def HandleDisconnectPacket(self, pkt):
print("Received an disconnect request")
print("Attributes: ")
for attr in pkt.keys():
print("%s: %s" % (attr, pkt[attr]))
reply = self.CreateReplyPacket(pkt)
# COA NAK
reply.code = 45
self.SendReplyPacket(pkt.fd, reply)
if __name__ == '__main__':
# create server and read dictionary
dicfile = os.path.join(os.path.dirname(__file__), 'dictionary')
srv = FakeServer(dict=dictionary.Dictionary(dicfile), coa_enabled=True)
# add clients (address, secret, name)
srv.hosts["127.0.0.1"] = server.RemoteHost("127.0.0.1", b"Kah3choteereethiejeimaeziecumi", "localhost")
srv.BindToAddress("0.0.0.0")
print('*' * 70)
print('simple radius server for vmware horizon access control')
print('develop by xulong ,weixin:wxgnolux,e-mail:fangkailove@yeah.net')
print('*' * 70)
print('ip:127.0.0.1,port:1821, secret:xxxxxxxxxxxxxxxxx')
print('>>>>>>>>service is running>>>>>>>>>>> ')
print('*' * 70)
# start server
srv.Run()