配置详解
input {
jdbc {
# mysql相关jdbc配置
jdbc_connection_string => "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false"
jdbc_user => "root"
jdbc_password => "123456"
# jdbc连接mysql驱动的文件目录,可去官网下载:https://dev.mysql.com/downloads/connector/j/
jdbc_driver_library => "./config/mysql-connector-java-5.1.46.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => "50000"
jdbc_default_timezone =>"Asia/Shanghai"
# mysql文件, 也可以直接写SQL语句在此处,后面一定跟上排序,否则有可能遗漏数据或者一直处理同一批数据,如下:
statement => "select * from user where update_time >= :sql_last_value order by update_time asc,id asc;"
#statement_filepath => "./config/jdbc.sql"
#启用追踪,则需要指定tracking_column,默认是timestamp()
use_column_value => true
# 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
tracking_column => "update_time"
#追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型()
tracking_column_type => "timestamp"
# 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
last_run_metadata_path => "./logstash_capital_last_id"
# 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run => false
#是否将 字段(column) 名称转小写
lowercase_column_names => false
# 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
schedule => "* * * * *"
#如果配置多个数据源需要用type来区分
type => "user"
}
}
filter {}
output {
#使用if语句判断type来指定输出的块()
if[type]=="testdb"{
elasticsearch {
hosts => "localhost:9200"
index => "user"
document_id => "%{id}"
template_overwrite => true
}
}
# 这里输出调试,正式运行时可以注释掉
stdout {
codec => json_lines
}
}
注意事项
后面一定跟上排序update_time asc,id asc,并开启分页,否则在update_time相等的数据过多时,有可能一直处理同一批数据,造成死循环,如下:
jdbc_paging_enabled => true
jdbc_page_size => "50000"
statement => "select * from user where update_time >= :sql_last_value order by update_time asc,id asc;"