0
点赞
收藏
分享

微信扫一扫

2020-09-10 influxdb


inflxudb 实时数据库,用来存储大量的实时数据,经过测试十万条数据存储只需要七八十毫秒,速度非常快,查询速度也非常好,吞吐量极高,并且本身具有归档功能(即:编写cq连续查询,可以取一段时间的平均值,自动存储),这样为数据存储/统计,非常友好.

与传统数据库比较

无须安装一件运行

链接:https://pan.baidu.com/s/1BxWXkgDhyOFWXbJ2C5nQ7g

提取码:o6l0

概念      MySQL      InfluxDB

数据库(同)  database    database

表(不同)  table  measurement

列(不同)  column  tag(带索引的,非必须)、field(不带索引)、timestemp(唯一主键)

retention policy:数据存储策略(默认策略为autogen)InfluxDB没有删除数据操作,规定数据的保留时间达到清除数据的目的;

常用命令

-- 查看所有的数据库

show databases;

-- 使用特定的数据库

use database_name;

-- 查看所有的measurement

show measurements;

-- 查询10条数据

select*frommeasurement_namelimit10;

-- 数据中的时间字段默认显示的是一个纳秒时间戳,改成可读格式

precisionrfc3339;-- 之后再查询,时间就是rfc3339标准格式

-- 或可以在连接数据库的时候,直接带该参数

influx -precisionrfc3339

-- 查看一个measurement中所有的tag key

show tag keys

-- 查看一个measurement中所有的field key

show field keys

-- 查看一个measurement中所有的保存策略(可以有多个,一个标识为default)

show retention policies;


-- 查看当前数据库的Retention Policies

SHOW RETENTION POLICIESON"testDB"

-- 创建新的Retention Policies

CREATERETENTION POLICY"rp_name"ON"db_name"DURATION30d REPLICATION1DEFAULT

-- rp_name:策略名

-- db_name:具体的数据库名

-- 30d:保存30天,30天之前的数据将被删除

-- 它具有各种时间参数,比如:h(小时),w(星期)

-- REPLICATION 1:副本个数,这里填1就可以了

-- DEFAULT 设为默认的策略

-- 修改策略

ALTERRETENTION POLICY"2_hours"ON"telegraf"DURATION4h DEFAULT

-- 删除策略

dropretention POLICY"2_hours"ON"telegraf"

-- # 创建表

-- # 直接在插入数据的时候指定表名(weather就是表名)

insertweather,altitude=1000,area=北 temperature=11,humidity=-4


-- # 删除表

DROPMEASUREMENT"measurementName"


--  显示用户

SHOW USERS

-- # 创建用户

CREATEUSER"username"WITH PASSWORD'password'

-- # 创建管理员权限的用户

CREATEUSER"username"WITH PASSWORD'password'WITH ALL PRIVILEGES


-- # 删除用户

DROPUSER"username"


-- 创建数据库

CREATEDATABASE mydb


-- 登陆

auth

用户名

密码

-- 插入数据

insertorders,phone=20website=90

-- 查询所有的cq

show continuous queries  

-- 删除一个cq

DROPCONTINUOUS QUERY ON

默认端口8086

建表语句

createdatabase water_database

insert calculat_db,createTime="2020-08-20",dataName="A_por_6654"dataValue=23,reportTime="2020-08-20"

insert problem_db,problemName="A_S_234",problemValue=66itemMinValue=23,itemMaxValue=64

insert warndata_db,createTime="2020-08-20",dataName="A_por_6654"dataValue=16,itemWarnValueMin=20,itemWarnValueMax=53,areaCode="ZW001",areaName="中维",areaPointId="adfdsfduiedfijisf",deviceName="中维设备",content="xxx数据项低于最低阈值4"

CREATECONTINUOUS QUERY cq_1mONwater_databaseBEGINSELECTmean(dataValue)ASmeanValueINTOwater_database.rp_ten_year.cal_mean_dbFROMwater_database.rp_ten_year.calculat_dbGROUPBYtime(1m),dataName END
SELECTmean(dataValue)ASmeanValueINTOcal_mean_dbFROMcalculat_dbGROUPBYtime(1m),dataName

备注

1 也可以使用linux版的,没有什么差别,他如同大多数nosql数据一样,可以封装一个util 去操作它

2 其中归档功能必须是有实时数据后,才会触发归档(当时写好连续查询一直以为有问题,坑了我好久)

3 influxdb 的分页 SELECT * FROM "calculat_db" limit 20 OFFSET 0  ====>> limit 代表 每页条数  offset 代表 当前页码且从0 开始

下面分享下我项目中百度到的工具类

TsdbService
```
import com.jovision.sfwl.modules.datacenter.influxdb.bean.UniteMetricData;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.QueryResult;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public interface TsdbService {
void createDatabase(String database);
    void dropDatabase(String database);
    boolean databaseExist(String database);
    void createRetentionPolicy();
    void createRetentionPolicy(String database, String policyName, String duration, int replication, Boolean isDefault);
    void dropRetentionPolicy();
    void dropRetentionPolicy(String database, String retentionPolicy);
    void createContinuousQuery(String measurement, String extendPolicy);
    boolean continuousQueryExists(String measurement);
    boolean continuousQueryExists(String database, String cqName);
    void dropContinuousQuery(String databaseName, String cqName);
    boolean measurementsExists(String measurement);
    boolean measurementsExists(String database, String measurement);
    QueryResultquery(String command);
    QueryResultdataQuery(String command);
    void insert(Point point1);
    void insert(String measurement, TimeUnit timeUnit, UniteMetricData data);
    void batchInsert(BatchPoints batchPoints);
    PointpointBuilder(String measurement,
                      Map tags,
                      Map fields,
                      long time,
                      TimeUnit timeunit);
    BatchPointsbuildBatchPoints();
    BatchPointsbatchPointsBuilder(String database, InfluxDB.ConsistencyLevel level, TimeUnit precision);
    BatchPointsbatchPointsBuilder(String database, InfluxDB.ConsistencyLevel level, TimeUnit precision, String retentionPolicy);
}
```

TsdbServiceImpl

```
import com.jovision.sfwl.modules.datacenter.influxdb.bean.UniteMetricData;
import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Service("tsdbService")
public class TsdbServiceImplimplements TsdbService {
private static final Loggerlogger = LoggerFactory.getLogger(TsdbServiceImpl.class);
    private static final InfluxDB.ConsistencyLevelCONSISTENCY_LEVEL = InfluxDB.ConsistencyLevel.ANY;
    private static final TimeUnitPRECESION = TimeUnit.SECONDS;
    @Value("${tsdb.server.url}")
private Stringurl;
    /**
    * 用户名
    */
    @Value("${tsdb.server.username}")
private Stringusername;
    /**
    * 密码
    */
    @Value("${tsdb.server.password}")
private Stringpassword;
    /**
    * 数据库
    */
    @Value("${tsdb.server.database}")
private Stringdatabase;
    /**
    * 保留策略
    */
    @Value("${tsdb.server.retentionpolicy}")
private StringretentionPolicy;
    private InfluxDBinfluxDB;
    @PostConstruct
    public void init() {
/* List serverAddressList = new ArrayList<>();
for (String host : hosts.split(",")) {
serverAddressList.add(String.format("%s:%s", host, port));
}
influxDB = InfluxDBFactory.connect(serverAddressList, username, password);
*/
        influxDB = InfluxDBFactory.connect(url, username, password);
        try {
// 如果指定的数据库不存在,则新建一个新的数据库,并新建一个默认的数据保留规则
            if (!this.databaseExist(database)) {
createDatabase(database);
                createRetentionPolicy();
            }
}catch (Exception e) {
// 该数据库可能设置动态代理,不支持创建数据库
            logger.error("[TsdbService] occur error when init tsdb, err msg: {}", e);
        }finally {
influxDB.setRetentionPolicy(retentionPolicy);
        }
influxDB.setLogLevel(InfluxDB.LogLevel.NONE);
        // Flush every 1000 Points, at least every 100ms
// bufferLimit represents the maximum number of points can stored in the retry buffer
// exceptionHandler represents a consumer function to handle asynchronous errors
// threadFactory represents the ThreadFactory instance to be used
        influxDB.enableBatch(BatchOptions.DEFAULTS
                .actions(1000)
.flushDuration(100)
.bufferLimit(10)
.exceptionHandler((points, e) -> {
List target =new ArrayList<>();
                    points.forEach(target::add);
                    String msg = String.format("failed to write points:%s\n", target.toString().substring(0, 10000));
                    logger.error(msg, e);
                })
.threadFactory(
Executors.defaultThreadFactory()
));
    }
/**
    * 测试连接是否正常
    *
    * @return true 正常
    */
    public boolean ping() {
boolean isConnected =false;
        Pong pong;
        try {
pong =influxDB.ping();
            if (pong !=null) {
isConnected =true;
            }
}catch (Exception e) {
e.printStackTrace();
        }
return isConnected;
    }
@Override
    public void createDatabase(String database) {
influxDB.query(new Query("CREATE DATABASE " + database, ""));
    }
@Override
    public void dropDatabase(String database) {
influxDB.query(new Query("DROP DATABASE " + database, ""));
    }
@Override
    public boolean databaseExist(String database) {
return influxDB.databaseExists(database);
    }
@Override
    public void createRetentionPolicy() {
String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
                "default_policy", database, "90d", 3);
        this.query(command);
    }
/**
    * @return void
    * @Description
    * @Author 寂寞旅行
    * @Date 10:48 2020/8/20
    * @Param [database, policyName, duration, replication, isDefault]
    * 数据库    策略名        时间      1            是否为默认策略
    **/
    @Override
    public void createRetentionPolicy(String database, String policyName, String duration, int replication, Boolean isDefault) {
String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s ", policyName,
                database, duration, replication);
        if (isDefault) {
sql = sql.concat(" DEFAULT");
        }
this.query(sql);
    }
@Override
    public void dropRetentionPolicy() {
this.dropRetentionPolicy(database, retentionPolicy);
    }
@Override
    public void dropRetentionPolicy(String database, String retentionPolicy) {
String sql = String.format("DROP RETENTION POLICY %s ON %s", retentionPolicy, database);
        this.query(sql);
    }
@Override
    public void createContinuousQuery(String measurement, String extendPolicy) {
String cqName = String.format("cq_%s", measurement);
        String originMeasurement = String.format("%s.%s.%s", database, retentionPolicy, measurement);
        String cqMeasurement = String.format("%s.%s.%s_hour", database, extendPolicy, measurement);
        String sql = String.format("CREATE CONTINUOUS QUERY \"%s\" ON %s RESAMPLE EVERY 1h FOR 2h BEGIN SELECT MEAN(*) INTO %s FROM %s GROUP BY time(1h),* FILL(none) END",
                cqName, database, cqMeasurement, originMeasurement);
        this.query(sql);
    }
@Override
    public boolean continuousQueryExists(String measurement) {
String cqName = String.format("cq_%s", measurement);
        return continuousQueryExists(database, cqName);
    }
@Override
    public boolean continuousQueryExists(String database, String cqName) {
String sql ="SHOW CONTINUOUS QUERIES";
        QueryResult result = query(sql);
        List seriesList = result.getResults().get(0).getSeries();
        if (seriesList !=null) {
for (QueryResult.Series series : seriesList) {
if (database.equals(series.getName())) {
List> continuousQueryList = series.getValues();
                    if (continuousQueryList ==null) {
return false;
                    }else {
for (List queryResult : continuousQueryList) {
if (cqName.equals(queryResult.get(0))) {
return true;
                            }
}
}
}
}
}
return false;
    }
@Override
    public void dropContinuousQuery(String databaseName, String cqName) {
String sql = String.format("DROP CONTINUOUS QUERY %s ON %s", cqName, databaseName);
        QueryResult result = query(sql);
    }
@Override
    public boolean measurementsExists(String measurement) {
return measurementsExists(database, measurement);
    }
@Override
    public boolean measurementsExists(String database, String measurement) {
String sql = String.format("SHOW MEASUREMENTS ON %s", database);
        QueryResult result = query(sql);
        if (result !=null) {
List seriesList = result.getResults().get(0).getSeries();
            if (seriesList !=null) {
QueryResult.Series series = seriesList.get(0);
                List> valueList = series.getValues();
                for (List value : valueList) {
if (measurement.equals(value.get(0))) {
return true;
                    }
}
}
}
return false;
    }
@Override
    public QueryResultquery(String command) {
return influxDB.query(new Query(command, database));
    }
@Override
    public QueryResultdataQuery(String command) {
return influxDB.query(new Query(command, database), TimeUnit.MILLISECONDS);
    }
@Override
    public void insert(Point point1) {
influxDB.write(point1);
    }
@Override
    public void insert(String measurement, TimeUnit timeUnit, UniteMetricData data) {
timeUnit = timeUnit ==null ? TimeUnit.MILLISECONDS : timeUnit;
        Point point = pointBuilder(measurement, data.getTags(), data.getFields(), data.getTimestamp(), timeUnit);
        influxDB.write(database, retentionPolicy, point);
    }
@Override
    public void batchInsert(BatchPoints batchPoints) {
influxDB.write(batchPoints);
    }
@Override
    public PointpointBuilder(String measurement,
                              Map tags,
                              Map fields,
                              long time,
                              TimeUnit timeunit) {
Point point = Point.measurement(measurement).time(time, timeunit).tag(tags).fields(fields).build();
        return point;
    }
@Override
    public BatchPointsbuildBatchPoints() {
return this.batchPointsBuilder(database, CONSISTENCY_LEVEL, PRECESION);
    }
@Override
    public BatchPointsbatchPointsBuilder(String database, InfluxDB.ConsistencyLevel level, TimeUnit precision) {
return batchPointsBuilder(database, level, precision, null);
    }
@Override
    public BatchPointsbatchPointsBuilder(String database, InfluxDB.ConsistencyLevel level, TimeUnit precision, String retentionPolicy) {
return BatchPoints.database(database).consistency(level).precision(precision).retentionPolicy(retentionPolicy).build();
    }
}
```

UniteMetricData

```
import lombok.Data;
import java.util.Map;
@Data
public class UniteMetricData {
private static final long serialVersionUID =8968059029015805484L;
    private Maptags;
    private Mapfields;
    private long timestamp;
    public UniteMetricData(Map tags, Map fields, long timestamp) {
this.tags = tags;
        this.fields = fields;
        this.timestamp = timestamp;
    }
public MapgetTags() {
return tags;
    }
public void setTags(Map tags) {
this.tags = tags;
    }
public MapgetFields() {
return fields;
    }
public void setFields(Map fields) {
this.fields = fields;
    }
public long getTimestamp() {
return timestamp;
    }
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
    }
}   
```

linux下安装

```

wget https://dl.influxdata.com/influxdb/releases/influxdb-1.5.3.x86_64.rpm

sudo yum localinstall influxdb-1.5.3.x86_64.rpm

```

安装好后更改influx.conf配置 使其加入用户认证过程

vim ****influx.conf

更改 auth-enabled-false 改为 auth-enabled = true

保存退出(esc +:wq )

然后启动ifnlux

```

sudo service influxdb start

```

举报

相关推荐

0 条评论