文章目录
- 1. 数据准备
- 2. AlertOperateController - 安全事件批量标记为待流转
- 3. AlertServiceIml - 将事件批量标记为待流转
- 1. IRiskEventService - 已经标记过的事件当作标记失败处理
- 2. IAletDao - 处置完成的事件当作标记失败处理
- 3. IAletDao - 级联的事件当作标记失败处理
- 4. IAletDao - 查询剩余的未标记的安全事件数据
- 4.1 NgsocEsTemplate - 处理查询的一个hit,获取source内容,并反序列化为指定类型的对象
- 4.2 JsonTranscodeUtil - 将一个object转化为json String,然后将该json String转化为tClass的一个实例
- 4.3 JsonUtil - 获取用于处理json的ObjectMapper
- 5. AlertServiceIml - 将未标记的安全事件标记为待流转
1. 数据准备
查询es中的数据:
GET /futurex-incident-index/_search
查询结果:
{
"took" : 4,
"timed_out" : false,
"_shards" : {
"total" : 2,
"successful" : 2,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "futurex-incident.2021-07-16",
"_type" : "_doc",
"_id" : "00178319-88c0-40c3-860e-20b6c70a83ad",
"_score" : 1.0,
"_source" : {
"uuId" : "00178319-88c0-40c3-860e-20b6c70a83ad",
"dealTime" : "1970-01-01 00:00:00",
"usecaseId" : "107005",
"hostBranchId" : 0,
"dealStatus" : 1,
"whiteListIds" : [ ],
"whiteStatus" : "未加白",
"dealAction" : "待处置",
"updateTime" : "2022-04-22 01:19:40"
}
},
{
"_index" : "futurex-incident.2021-07-16",
"_type" : "_doc",
"_id" : "662ebc14-8c30-41dd-8d12-afb14f50b0ee",
"_score" : 1.0,
"_source" : {
"uuId" : "662ebc14-8c30-41dd-8d12-afb14f50b0ee",
"dealTime" : "1970-01-01 00:00:00",
"usecaseId" : "30009",
"hostBranchId" : 0,
"dealStatus" : 2,
"whiteListIds" : [ ],
"whiteStatus" : "未加白",
"dealAction" : "处置中",
"updateTime" : "2022-04-22 01:19:40"
}
},
{
"_index" : "futurex-incident.2021-07-16",
"_type" : "_doc",
"_id" : "b969929e-6406-41bc-a6d6-47063970c5b9",
"_score" : 1.0,
"_source" : {
"uuId" : "b969929e-6406-41bc-a6d6-47063970c5b9",
"dealTime" : "1970-01-01 00:00:00",
"usecaseId" : "70034",
"hostBranchId" : 0,
"dealStatus" : 1,
"whiteListIds" : [ ],
"whiteStatus" : "未加白",
"dealAction" : "待处置",
"updateTime" : "2022-04-22 01:19:40"
}
}
]
}
}
2. AlertOperateController - 安全事件批量标记为待流转
/**
* 安全事件批量标记为待流转
* 1. 调用安全运营接口查询有哪些已经被标记
* 2. 调用安全运营接口将未标记的事件标记为待流转
*
* @param toBeTransferOperateWithLogQo 输入参数
* @throws IncidentException 业务流程失败,抛出此异常
*/
@PreAuthorize("hasAnyAuthority('superAdmin','incidentEdit')")
@PostMapping(value = "/transfer")
@ApiOperation("批量标记为待流转")
@CheckValidateAble
public ApiResponse<BatchOperateResult> toBeTransferred(
@RequestBody @Validated UpdateWithLogQo<ToBeTransferOperateQo> toBeTransferOperateWithLogQo
) throws IncidentException {
BatchOperateResult batchOperateResult = this.alertService.toBeTransferred(toBeTransferOperateWithLogQo.getData());
return this.dealBatchResultOrThrow(batchOperateResult);
}
3. AlertServiceIml - 将事件批量标记为待流转
/**
* 将事件批量标记为待流转
*
* @param toBeTransferOperateQo 标记为待流转输入参数
* @return 标记成功的数量
* @throws IncidentException 失败时抛出异常
*/
BatchOperateResult toBeTransferred(ToBeTransferOperateQo toBeTransferOperateQo) throws IncidentException;
@Override
public BatchOperateResult toBeTransferred(ToBeTransferOperateQo toBeTransferOperateQo) throws IncidentException {
try {
log.info("共有{}个事件期望被标记为待流转", toBeTransferOperateQo.getIds().size());
// 1. 获取没有被标记的安全事件
// TODO 如果Feign的全局配置解析能够解决判断ApiResponse的code,则此处不再需要ApiResponseUtils
HashMap<String, Boolean> result = riskEventService.whetherCircle(toBeTransferOperateQo.getIds());
List<String> notAddIds = result.entrySet().stream()
.filter(item -> !item.getValue())
.map(Map.Entry::getKey)
.collect(Collectors.toList());
int success = 0;
// 已经标记过的事件在此处当作标记失败处理
int fail = toBeTransferOperateQo.getIds().size() - notAddIds.size();
log.info("其中{}个事件已经标记为待流转", fail);
// 2. 查询处置完成的事件并当作标记失败处理
List<String> disposedIds = this.alertDao.getDisposedIds(notAddIds, null);
log.info("其中{}个事件已经为处置完成", disposedIds.size());
fail += disposedIds.size();
notAddIds.removeAll(disposedIds);
// 3. 查询级联的事件当作标记失败处理
List<String> cascadeIds = this.alertDao.getCascadeIds(notAddIds, null);
log.info("其中{}个事件为级联上报的事件", cascadeIds.size());
fail += cascadeIds.size();
notAddIds.removeAll(cascadeIds);
if (notAddIds.size() > 0) {
// 4. 查询剩余的未标记的安全事件数据
List<Alert> alerts = this.alertDao.getAlerts(notAddIds);
// 5. 将未标记的安全事件标记为待流转
List<AlertNotificationDto> alertList = createAlertNotificationList(alerts);
BatchOperateResult addResult = new BatchOperateResult();
BeanUtils.copyProperties(riskEventService.saveBatch(alertList), addResult);
success = addResult.getSuccess();
fail += addResult.getFail();
}
log.info("{}个事件标记成功, {}个事件标记失败", success, fail);
// 6. 构建返回值,标记多少个,未标记多少个
return BatchOperateResult.builder().fail(fail).success(success).build();
} catch (IOException | JsonSerializeException | NotificationException e) {
throw new IncidentException(
I18nUtils.i18n(I18nConstant.AlertOperateConstant.EXCEPTION_TO_BE_TRANSFERRED), e
);
}
}
1. IRiskEventService - 已经标记过的事件当作标记失败处理
将一个安全事件标为待流转后,这个安全事件会进入t_risk_event这张表中,表中关联了eventId,eventName等安全事件相关的类型,同时还有一个字段status,标识了安全事件的流转状态。
CREATE TABLE `t_risk_event` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '唯一id',
`eventId` varchar(255) NOT NULL COMMENT '事件id',
`eventName` varchar(255) DEFAULT '' COMMENT '事件名称',
`eventType` varchar(255) DEFAULT '' COMMENT '事件类型',
`branchId` varchar(255) DEFAULT '' COMMENT '资产组id',
`assetId` varchar(255) DEFAULT '' COMMENT '资产id',
`ip` varchar(255) DEFAULT '' COMMENT '主机ip',
`riskSource` varchar(255) DEFAULT NULL COMMENT '事件来源',
`level` varchar(32) DEFAULT NULL COMMENT '事件等级',
`tag` varchar(255) DEFAULT '' COMMENT '事件标签',
`createTime` timestamp(3) NULL DEFAULT NULL COMMENT '创建时间',
`firstTime` timestamp(3) NULL DEFAULT NULL COMMENT '首次发生时间',
`attr` varchar(255) DEFAULT '' COMMENT '附属信息',
`status` varchar(255) DEFAULT NULL COMMENT '流转状态',
`processDefinitionId` varchar(255) DEFAULT NULL COMMENT '通报剧本',
`version` int(11) DEFAULT NULL COMMENT '版本号 发起流程时版本号递增\r\n0 未流转版本号\r\n1 已流转版本号\r\n2 已流转版本号\r\n...',
`startTime` timestamp(3) NULL DEFAULT NULL COMMENT '发起流程时间',
`processDefinition` varchar(255) DEFAULT NULL COMMENT '剧本名称',
`latestTime` timestamp(3) NULL DEFAULT NULL COMMENT '最近发生时间',
`updateTime` timestamp(3) NULL DEFAULT NULL COMMENT '更新时间',
`judgeTime` timestamp(3) NULL DEFAULT NULL COMMENT '无需流转研判时间',
`branch` varchar(255) DEFAULT NULL COMMENT '资产组',
`assetOwner` varchar(255) DEFAULT NULL COMMENT '资产责任人',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_eventId_riskSource` (`eventId`,`riskSource`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-d4muMk7i-1650797350064)(img/image-20220424162321813.png)]
/**
* 判断是否流转
*
* @param eventIds 事件id
* @return hashmap true false
*/
HashMap<String, Boolean> whetherCircle(List<String> eventIds);
@Override
public HashMap<String, Boolean> whetherCircle(List<String> eventIds) {
if (CollectionUtil.isEmpty(eventIds)) {
return new HashMap<>(0);
}
List<String> notCircleIdList;
// 构造查询条件queryWrapper
LambdaQueryWrapper<RiskEventEntity> queryWrapper = new LambdaQueryWrapper<>();
// 要查询的字段:eventName,eventId
queryWrapper.select(RiskEventEntity::getEventName, RiskEventEntity::getEventId);
// 查询已经被标记的安全事件,被标记的安全事件会进入t_risk_event表
queryWrapper.in(RiskEventEntity::getEventId, eventIds);
List<RiskEventEntity> riskEventEntityList = this.list(queryWrapper);
// 键-安全事件id, 值-是否被标记为待流转
HashMap<String, Boolean> hashMap = new HashMap<>(16);
for (RiskEventEntity riskEventEntity : riskEventEntityList) {
hashMap.put(riskEventEntity.getEventId(), true);
}
if (CollectionUtil.isNotEmpty(riskEventEntityList)) {
notCircleIdList = riskEventEntityList.stream().map(RiskEventEntity::getEventId).collect(Collectors.toList());
// 排除已经被标记为待流转的安全事件
eventIds.removeAll(notCircleIdList);
for (String eventId : eventIds) {
hashMap.put(eventId, false);
}
} else {
for (String eventId : eventIds) {
hashMap.put(eventId, false);
}
}
return hashMap;
}
2. IAletDao - 处置完成的事件当作标记失败处理
/**
* 从给定的事件id列表中获取处置完成的事件id
*
* @param ids 给定的id列表
* @param indices 指定索引,如果指定为空,则默认为futurex-incident-index
* @return 处置完成的id列表
* @throws IOException es查询失败
*/
List<String> getDisposedIds(List<String> ids, String[] indices) throws IOException;
// 枚举常量:DISPOSED(3, "alert.deal.status.disposed");
@Override
public List<String> getDisposedIds(List<String> ids, String[] indices) throws IOException {
return this.getUuIdByEqCondition(ids, indices, "dealStatus", DealStatusEnum.DISPOSED.getStatusCode());
}
/**
*
* @param ids 安全事件id
* @param indices es索引
* @param field es索引中的字段
* @param value 待处置(标为误报)
* @return 返回满足条件的事件id
* @throws IOException
*/
private List<String> getUuIdByEqCondition(List<String> ids, String[] indices, String field, Object value) throws IOException {
// ids[1,2,3],那么查询结果就是uuid=1,uuid=2,uuid=3的文档
QueryBuilder idsQueryBuilder = QueryBuilders.termsQuery("uuId", ids);
// 查询 “dealStatus=处置完成” 的文档
QueryBuilder conditionQueryBuilder = QueryBuilders.termQuery(field, value);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(idsQueryBuilder);
boolQueryBuilder.must(conditionQueryBuilder);
return getUuId(boolQueryBuilder, ids, indices);
}
3. IAletDao - 级联的事件当作标记失败处理
/**
* 从给定的事件id列表中获取级联的事件id
*
* @param ids 给定的id列表
* @param indices 指定索引,如果指定为空,则默认为futurex-incident-index
* @return 处置完成的id列表
* @throws IOException es查询失败
*/
List<String> getCascadeIds(List<String> ids, String[] indices) throws IOException;
@Override
public List<String> getCascadeIds(List<String> ids, String[] indices) throws IOException {
return getUuIdByRangeCondition(ids, indices, "hostBranchId", CascadeUtil.MIN_CASCADE_BID, null);
}
/**
* 从一组id列表中查询出满足条件的id列表,使用range查询
*
* @param ids 事件id列表
* @param indices 查询的索引
* @param field 过滤的字段名 hostBranchId
* @param from range查询的起始值,表示查询大于等于该值的文档
* @param to range查询的结束值,表示查询小于等于该值的文档
* @return 返回满足条件的事件id
* @throws IOException es查询失败
*/
private List<String> getUuIdByRangeCondition(List<String> ids, String[] indices, String field, Object from, Object to) throws IOException {
// ids[1,2,3],那么查询结果就是uuid=1,uuid=2,uuid=3的文档
QueryBuilder idsQueryBuilder = QueryBuilders.termsQuery("uuId", ids);
RangeQueryBuilder cascadeQueryBuilder = QueryBuilders.rangeQuery(field);
if (from != null) {
cascadeQueryBuilder.gte(from);
}
if (to != null) {
cascadeQueryBuilder.lte(to);
}
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(idsQueryBuilder);
boolQueryBuilder.must(cascadeQueryBuilder);
return getUuId(boolQueryBuilder, ids, indices);
}
4. IAletDao - 查询剩余的未标记的安全事件数据
/**
* 批量查询事件信息
*
* @param ids 事件id列表
* @return 事件列表
* @throws IOException es查询失败抛出此异常
* @throws JsonSerializeException es查询结果反序列化失败抛出此异常
*/
List<Alert> getAlerts(List<String> ids) throws IOException, JsonSerializeException;
@Override
public List<Alert> getAlerts(List<String> ids) throws IOException, JsonSerializeException {
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.termsQuery("uuId", ids));
builder.size(ids.size());
SearchRequest search = new SearchRequest();
search.source(builder);
search.indices(INCIDENT_INDEX);
// 处理返回值
List<Alert> alerts = new ArrayList<>();
SearchResponse response = this.restHighLevelClient.search(search, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
alerts.add(NgsocEsTemplate.getSourceFromSearchHit(hit, Alert.class));
}
return alerts;
}
4.1 NgsocEsTemplate - 处理查询的一个hit,获取source内容,并反序列化为指定类型的对象
/**
* 处理查询的一个hit,获取source内容,并反序列化为指定类型的对象
*
* @param hit hit对象
* @param cls 查询返回的对象类型
* @param <T> 查询返回的对象类型
* @return 指定类型对象
*/
public static <T> T getSourceFromSearchHit(SearchHit hit, Class<T> cls) throws JsonSerializeException {
String source = hit.getSourceAsString();
try {
return JsonTranscodeUtil.transcode(source, cls);
} catch (JsonProcessingException e) {
throw new JsonSerializeException(
String.format(
"failed to deserialize json data to class [%s] object, json data: %s", cls.getName(), source
), e
);
}
}
4.2 JsonTranscodeUtil - 将一个object转化为json String,然后将该json String转化为tClass的一个实例
/**
* 将一个object转化为json String,然后将该json String转化为tClass的一个实例
*
* @param inputObject 原始对象
* @param tClass 返回值类型
* @param <T> tClass
* @return 只在inputObject为null的时候返回值为null
* @throws JsonProcessingException 转化失败时
*/
@Nullable
public static <T> T transcode(@Nullable Object inputObject, @NotNull Class<T> tClass) throws JsonProcessingException {
// 如果inputObject是null则返回null
if (inputObject == null) {
return null;
}
// 如果inputObject是tClass实例,则不转了,直接返回
if (tClass.isInstance(inputObject)) {
return (T) inputObject;
}
// 从inputObject转换为pojoString
String pojoString;
if (!(inputObject instanceof CharSequence)) {
pojoString = getObjectMapper().writeValueAsString(inputObject);
} else {
pojoString = inputObject.toString();
}
// 如果tClass是String的父类,那么直接返回String
if (tClass.isAssignableFrom(String.class)) {
// noinspection unchecked
return (T) pojoString;
}
// 将pojoString转换为tClass对象
T t = getObjectMapper().readValue(pojoString, tClass);
return t;
}
4.3 JsonUtil - 获取用于处理json的ObjectMapper
/**
* 获取用于处理json的ObjectMapper
* 根据文档,ObjectMapper类是线程安全的
* 同时存在多个ObjectMapper并不会导致任何问题(除了浪费的内存/创建时间)
* 所以不需要加同步
*
* @return objectMapper
*/
@NotNull
public static ObjectMapper getObjectMapper() {
if (OBJECT_MAPPER == null) {
ObjectMapper localObjectMapper = new ObjectMapper();
localObjectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);
localObjectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
OBJECT_MAPPER = localObjectMapper;
}
return OBJECT_MAPPER;
}
5. AlertServiceIml - 将未标记的安全事件标记为待流转
5.1 AlertServiceIml - 保存流转事件请求Vo
将查询到的数据保存到t_risk_event表中,这个表保存的就是标记为待流转的安全事件,首先构造插入数据:
private List<AlertNotificationDto> createAlertNotificationList(List<Alert> alerts) {
List<AlertNotificationDto> list = new ArrayList<>();
for (Alert alert : alerts) {
AlertNotificationDto alertNotificationDto = AlertNotificationDto.builder()
.id(alert.getUuId())
.name(alert.getName())
.srcIp(alert.getSrcIp())
.srcPort(alert.getSrcPort())
.dstIp(alert.getDstIp())
.dstPort(alert.getDstPort())
.hostIp(alert.getHostIp())
.hostAssetId(alert.getHostAssetId())
.hostBranchId(alert.getHostBranchId())
.attackType(alert.getAttackType())
.riskTag(alert.getRiskTag())
.severity(alert.getSeverity())
.url(alert.getUrl())
.firstTime(alert.getFirstTime())
.lastTime(alert.getLastTime())
.riskSource(RISK_SOURCE)
.build();
list.add(alertNotificationDto);
}
return list;
}
5.2 RiskEventServiceImpl - 保存流转事件
/**
* 创建流转事件
*
* @param alertNotificationDtos qo
* @return 成功和失败数量
* @throws NotificationException
*/
BatchOperateResult saveBatch(List<AlertNotificationDto> alertNotificationDtos) throws NotificationException;
@Override
public BatchOperateResult saveBatch(List<AlertNotificationDto> alertList) throws NotificationException {
if (CollectionUtil.isEmpty(alertList)) {
return BatchOperateResult.builder().success(0).fail(0).build();
}
// 給请求Vo设置资产组branchName和资产责任人assetOwner
iSafeAssetService.alertSetAssetOwnerAndBranch(alertList);
// 从List<AlertNotificationDto ---> eventIds
List<String> eventIds = alertList.stream().map(AlertNotificationDto::getId).collect(Collectors.toList());
// 构造查询条件
LambdaQueryWrapper<RiskEventEntity> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.select(RiskEventEntity::getEventId);
queryWrapper.in(RiskEventEntity::getEventId, eventIds);
// 获得ids列表--已存在
List<String> eventIdsExist = this.list(queryWrapper).stream().map(RiskEventEntity::getEventId).collect(Collectors.toList());
// 获得ids列表--不存在 eventIds
eventIds.removeAll(eventIdsExist);
// eventIds ---> List<AlertNotificationDto>
List<AlertNotificationDto> list = alertList.stream()
.filter(alertNotificationQo -> eventIds.contains(alertNotificationQo.getId())).collect(Collectors.toList());
List<RiskEventEntity> riskEventEntityList = new ArrayList<>();
for (AlertNotificationDto alertNotificationDto : list) {
RiskEventEntity riskEventEntity = new RiskEventEntity(alertNotificationDto);
riskEventEntityList.add(riskEventEntity);
}
// 保存安全事件
return getBatchOperateResult(alertList, riskEventEntityList);
}
5.2.1 ISafeAssetServiceImpl - 给请求vo设置资产责任人和资产组名称
/**
* 给Vo设置资产责任人和资产名称
*
* @param alertNotificatonDtos qos
* @throws NotificationException 自定义异常
*/
void alertSetAssetOwnerAndBranch(List<AlertNotificationDto> alertNotificatonDtos) throws NotificationException;
@Override
public void alertSetAssetOwnerAndBranch(List<AlertNotificationDto> alertNotificatonDtos) throws NotificationException {
List<Long> hostAssetIds = new ArrayList<>();
for (AlertNotificationDto alertNotificatonDto : alertNotificatonDtos) {
Long hostAssetId = alertNotificatonDto.getHostAssetId();
hostAssetIds.add(hostAssetId);
}
// 根据主机资产hostAssetIds获取资产信息
AssetQo assetQo = AssetQo.builder().assetIds(hostAssetIds).expandEnable(true).build();
HashMap<Long, AssetVo> map = new HashMap<>(16);
PageData<AssetVo> pageData = getAssetVo(assetQo);
List<AssetVo> assetVoList = pageData.getData();
for (AssetVo assetVo : assetVoList) {
map.put(assetVo.getId(), assetVo);
}
for (AlertNotificationDto alertNotificatonDto : alertNotificatonDtos) {
Long hostAssetId = alertNotificatonDto.getHostAssetId();
if (map.containsKey(hostAssetId)) {
// 设置资产组名称branchName
alertNotificatonDto.setBranchName(map.get(hostAssetId).getBranch());
// 设置资产责任人
List<AssetOwnerDto> assetOwnerDtoList = map.get(hostAssetId).getAssetOwner();
if (!CollectionUtils.isEmpty(assetOwnerDtoList)) {
List<String> asserOwners = assetOwnerDtoList.stream().map(AssetOwnerDto::getName).collect(Collectors.toList());
alertNotificatonDto.setAssetOwner(String.join(",", asserOwners));
}
}
}
}
5.2.2 保存安全事件
/**
* 获得成功和失败的数量
*
* @param alertList qos
* @param riskEventEntityList 事件列表
* @return 成功失败的数量
*/
private BatchOperateResult getBatchOperateResult(List<AlertNotificationDto> alertList, List<RiskEventEntity> riskEventEntityList) {
// 保存安全事件
boolean save = this.saveBatch(riskEventEntityList);
BatchOperateResult batchOperateResult = new BatchOperateResult();
if (save && CollectionUtil.isNotEmpty(riskEventEntityList)) {
batchOperateResult.setSuccess(riskEventEntityList.size());
batchOperateResult.setFail(alertList.size() - riskEventEntityList.size());
} else {
batchOperateResult.setSuccess(0);
batchOperateResult.setFail(alertList.size());
}
return batchOperateResult;
}