0
点赞
收藏
分享

微信扫一扫

python 实现radius认证服务-作为vmware 的双因子认证服务器-实现Horizon外网接入准入控制


 通过pyrad模块,实现一个简单的radius论证服务。准入规则从数据库获取。

此开发是为了辅助 vmware connection server 作双因子认证,从而实现对安全服务器接入的链接服务器加多一层认证,实现外网连入云桌面的准入控制。

准入规则的修改是简单的表操作,可以结合其它系统来管理(我这边案例是用了泛微的OA流程审批,从而实现准入经过流程审批后生效)

程序运行后,即会开启一个认证服务

python 实现radius认证服务-作为vmware 的双因子认证服务器-实现Horizon外网接入准入控制_oracle

 设置vmware connection server  的认证

python 实现radius认证服务-作为vmware 的双因子认证服务器-实现Horizon外网接入准入控制_开发语言_02

 

python 实现radius认证服务-作为vmware 的双因子认证服务器-实现Horizon外网接入准入控制_.net_03

 

 通过上面设置,经过此链接服务器的云桌面客户端就会多加了一层认证,因为我这里只做准入,所以验证对用户是透明的,只是一次录入,通行码即原来ad用户密码。 如下图所示,可以看到有个通行码的认证了。

python 实现radius认证服务-作为vmware 的双因子认证服务器-实现Horizon外网接入准入控制_oracle_04

 

 

 

 

代码相对简洁:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Project: radius
# File : server.py
# Author : Long.Xu <fangkailove@yeah.net>
# http://gnolux.blog.csdn.net
# QQ:26564303 weixin:wxgnolux
# Time : 2022/3/17 20:29
# Copyright 2022 Long.Xu All rights Reserved.

from __future__ import print_function

import logging
import os.path

import cx_Oracle

import poll
from pyrad import dictionary, packet, server

poll.install()

logging.basicConfig(filename="radius.log", level="DEBUG",
format="%(asctime)s [%(levelname)-8s] %(message)s")


class FakeServer(server.Server):

def HandleAuthPacket(self, pkt):
# logging.debug("Received an authentication request")
# for attr in pkt.keys():
# logging.debug("%s: %s" % (attr, pkt[attr]))

reply = self.CreateReplyPacket(pkt)
try:
reply.code = packet.AccessReject
if pkt["User-Name"]:
userid = pkt["User-Name"][0]
#取数据库数据作用户验证
db = cx_Oracle.connect('user', 'pwd', '192.168.123.3:1521/ecology')
cur = db.cursor()
sql = """
select GH, XM,kssj, JSSJ, QZSX from ecology.uf_clouddesktop
where gh=:userid
and trunc(sysdate,'dd') between to_date(kssj,'yyyy-mm-dd') and to_date(jssj, 'yyyy-mm-dd')
and qzsx = 1
"""
# sql = "select * from ecology.hrmresource where loginid = :userid"

cur.execute(sql, userid=userid)
rows = cur.fetchall()
if len(rows) > 0:
reply.code = packet.AccessAccept
logging.debug("%s,AccessAccept" % userid)
else:
logging.debug("%s,AccessReject" % userid)
cur.close()
db.close()
except Exception as e:
logging.error(str(e))
self.SendReplyPacket(pkt.fd, reply)

def HandleAcctPacket(self, pkt):

print("Received an accounting request")
print("Attributes: ")
for attr in pkt.keys():
print("%s: %s" % (attr, pkt[attr]))

reply = self.CreateReplyPacket(pkt)
self.SendReplyPacket(pkt.fd, reply)

def HandleCoaPacket(self, pkt):

print("Received an coa request")
print("Attributes: ")
for attr in pkt.keys():
print("%s: %s" % (attr, pkt[attr]))

reply = self.CreateReplyPacket(pkt)
self.SendReplyPacket(pkt.fd, reply)

def HandleDisconnectPacket(self, pkt):

print("Received an disconnect request")
print("Attributes: ")
for attr in pkt.keys():
print("%s: %s" % (attr, pkt[attr]))

reply = self.CreateReplyPacket(pkt)
# COA NAK
reply.code = 45
self.SendReplyPacket(pkt.fd, reply)


if __name__ == '__main__':
# create server and read dictionary
dicfile = os.path.join(os.path.dirname(__file__), 'dictionary')
srv = FakeServer(dict=dictionary.Dictionary(dicfile), coa_enabled=True)

# add clients (address, secret, name)
srv.hosts["127.0.0.1"] = server.RemoteHost("127.0.0.1", b"Kah3choteereethiejeimaeziecumi", "localhost")
srv.BindToAddress("0.0.0.0")

print('*' * 70)
print('simple radius server for vmware horizon access control')
print('develop by xulong ,weixin:wxgnolux,e-mail:fangkailove@yeah.net')
print('*' * 70)
print('ip:127.0.0.1,port:1821, secret:xxxxxxxxxxxxxxxxx')
print('>>>>>>>>service is running>>>>>>>>>>> ')
print('*' * 70)
# start server
srv.Run()

 

 

举报

相关推荐

0 条评论