0
点赞
收藏
分享

微信扫一扫

kafka消费者类百搭

大明宫 2022-04-18 阅读 49
javakafka
kafka消费者类。运用JSON.toJSONString(String text,Class<T> clazz)将json类型text转换成实体类clazz类型。然后进行数据的处理更新。再提交给kafka并执行监听回调。
   @KafkaListener(topicPattern = Channels.PROPERTY_POST_CHANNEL + "-.*", groupId = "xiot-alarm")
    public void recvPropertyPost(List<ConsumerRecord<String, String>> records, Acknowledgment ack){

        List<PropertyPostMessage> messageList = records.stream().map(e -> JSON.parseObject(e.value(), PropertyPostMessage.class)).collect(Collectors.toList());
        log.info("topic:property-post 收到消息:" + JSON.toJSONString(messageList, SerializerFeature.WriteMapNullValue));

        try{
            //逐条解析消息
            for (PropertyPostMessage message : messageList){
                SiddhiInput siddhiInput = new SiddhiInput();
                siddhiInput.setProdNo(message.getProductNo());
                siddhiInput.setDevNo(message.getDeviceNo());
                siddhiInput.setTimestamp(message.getTimestamp());

                Map<String, Object> properties = message.getProperties();
                for (Map.Entry<String, Object> entry : properties.entrySet()) {
                    siddhiInput.setMetric(entry.getKey());
                    siddhiInput.setValue(entry.getValue().toString());

                    //发送到topic:alarm-siddhi-input
                    ListenableFuture<SendResult<String, String>> future =
                            kafkaTemplate.send(Channels.ALARM_SIDDHI_INPUT_CHANNEL, JSON.toJSONString(siddhiInput));
                    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                        @Override
                        public void onFailure(Throwable ex) {
                            log.error("向topic:alarm-siddhi-input 发送消息失败:" + ex.getMessage());
                        }

                        @Override
                        public void onSuccess(SendResult<String, String> output) {
                            log.info("向topic:alarm-siddhi-input 发送消息成功:" + output);
                        }
                    });
                }
            }

            //手动ack
            ack.acknowledge();
        }catch (Exception e){
            e.printStackTrace();
            throw e;
        
举报

相关推荐

0 条评论