# -*- coding: utf-8 -*-
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
 from abc import ABC, abstractmethod
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
 from pyflink.datastream.state import MapStateDescriptor
 from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
 from pyflink.common.typeinfo import Types, TypeInformation
 from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
 from pyflink.datastream.connectors import DeliveryGuarantee
 from pyflink.common.serialization import SimpleStringSchema
 import json
 import re
 from datetime import datetime
 from elasticsearch import Elasticsearch
 from pyflink.datastream.functions import RuntimeContext, FlatMapFunction
 from pyflink.common.typeinfo import Types
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
 from pyflink.common.serialization import SimpleStringSchema
 import re
 import redis
 # 创建 StreamExecutionEnvironment 对象
 env = StreamExecutionEnvironment.get_execution_environment()
 env.set_parallelism(1)
 env.add_jars("file:///root/flink-sql-connector-kafka_2.11-1.14.4.jar")
TEST_KAFKA_SERVERS = "127.0.0.1:9092"
 TEST_KAFKA_TOPIC = "topic_elink"
 TEST_GROUP_ID = "pyflink_group"
 def get_kafka_customer_properties(kafka_servers: str, group_id: str):
     properties = {
         "bootstrap.servers": kafka_servers,
         "fetch.max.bytes": "67108864",
         "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
         "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
         "enable.auto.commit": "false",  # 关闭kafka 自动提交,此处不能传bool 类型会报错
         "group.id": group_id,
     }
     return properties
 properties = get_kafka_customer_properties(TEST_KAFKA_SERVERS, TEST_GROUP_ID)
 class LogEvent:
     buss_seq = None
     message = None
     index_name = None
     
     def __init__(self, bus_seq,message,index_name):
         self.bus_seq = bus_seq
         self.message = message
         self.index_name= index_name
    def to_dict(self):
         return {
             "bus_seq": self.bus_seq,
             "message": self.message,
             "index_name" : self.index_name
         }
    
 class MyMapFunction(FlatMapFunction):
    def open(self, runtime_context: RuntimeContext):
        self.process_id_to_bus_seq = runtime_context.get_map_state(MapStateDescriptor('process_id', Types.STRING(), Types.STRING()))
       
    def close(self):
        pass
   def flat_map(self,line):
       bus_seq=''
       process_id=''
       message=''
       message = line.replace("\n", "")
       line = json.loads(message)['message']
       if not line.startswith("ES"):
           return 
       if '<Serial>' in line:
          try:
              pat=re.compile(r"<Serial>(\d+)</Serial>")
              bus_seq=pat.search(line).group(1)
              process_id=line.split()[1]
              self.process_id_to_bus_seq.put(process_id, bus_seq)
          except:
              return 
       process_id=line.split()[1]
       if not len(process_id)==6 :
           process_id=line.split()[2]
       bus_seq=self.process_id_to_bus_seq.get(process_id)
       if not bus_seq:
           bus_seq='0'
       #self.r.delete(process_id)
       #log_event = LogEvent(bus_seq.decode('UTF-8'),line)
       #LogEvent['bus_seq']=bus_seq.decode('UTF-8')
       date_str = datetime.now().strftime("%Y-%m-%d")
       index_name='flink-test'+date_str
       try:
          log_event=LogEvent(bus_seq,line,index_name)
       except:
           return 
       yield log_event.to_dict()
     
   
 data_stream = env.add_source(
         FlinkKafkaConsumer(topics=TEST_KAFKA_TOPIC,
                            properties=properties,
                            deserialization_schema=SimpleStringSchema()) \
             .set_commit_offsets_on_checkpoints(True) \
             .set_start_from_latest()
     ).name(f"消费{TEST_KAFKA_TOPIC}主题数据")
 env.add_jars("file:///root/lib/flink-sql-connector-elasticsearch7-3.0.1-1.16.jar")
es7_sink = Elasticsearch7SinkBuilder() \
     .set_bulk_flush_max_actions(1) \
     .set_emitter(ElasticsearchEmitter.dynamic_index('index_name')) \
     .set_hosts(['127.0.0.1:9200']) \
     .build()
 def get_line_key(line):
           message=''
           message = line.replace("\n", "")
           line = json.loads(message)['message']
           try:
               process_id=line.split()[1]
               if not len(process_id)==6 :
                   process_id=line.split()[2]
           except:
               process_id='9999'
           return    process_id 
 #data_stream.key_by(get_line_key).flat_map(MyMapFunction(),output_type=Types.MAP(Types.STRING(),Types.STRING())).sink_to(es7_sink)
 data_stream.key_by(get_line_key).flat_map(MyMapFunction(),output_type=Types.MAP(Types.STRING(),Types.STRING())).sink_to(es7_sink)
 # 执行任务
 env.execute('Add "bus_seq" to each line')










