BaseInfluxDB.java
import lombok.extern.slf4j.Slf4j;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.apache.commons.beanutils.BeanUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Repository
@Slf4j
public class BaseInfluxDB<T> {
@Autowired
private InfluxDB influxDB;
//配置数据库和数据保存策略
private String dataBaseName="water_database";
// private String dataBaseName="engine_demo";
private String retentionPolicyName="";
public boolean insert(String measurement,Map<String, String> tags, Map<String, Object> fields){
boolean flag=false;
try{
Point.Builder builder = Point.measurement(measurement); //表名字
builder.tag(tags);
builder.fields(fields);
influxDB.write(dataBaseName, retentionPolicyName, builder.build()); //数据库名字和 策略名字
flag=true;
}catch (Exception exception){
exception.printStackTrace();
}
return flag;
}
//基本查询
public List<T> query(String command ,Class<T> clazz){
List<T> resultList = new ArrayList<T>();
try{
QueryResult queryResult = influxDB.query(new Query(command, dataBaseName)); //数据库和查询语句
List<QueryResult.Result> results = queryResult.getResults(); //查询结果
for(QueryResult.Result result :results){
List<QueryResult.Series> seriess = result.getSeries();//有结果组装返回对象
if(seriess!=null){
//直接返回对象
for(QueryResult.Series series :seriess){
String name = series.getName();
List<List<Object>> value = series.getValues();
List<String> key = series.getColumns();
for(List<Object> object :value){
Map<String,Object> beanProperMap =new HashMap<>();
//匹配属性和属性值
for(int i=0;i<object.size();i++){
String k=key.get(i);
Object v =object.get(i);
beanProperMap.put(k,v);
}
//创建对象并且赋值
T bean =clazz.newInstance();
try {
BeanUtils.populate(bean,beanProperMap);
} catch (Exception e) {
e.printStackTrace();
}
//保存集合
resultList.add(bean);
}
}
}
}
}catch (Exception exception){
exception.printStackTrace();
}
return resultList;
}
}
上面是基本工具类
下面是具体的measurement 对应的mapper
CalculatDataMapper.java
import com.jovision.sfwl.modules.datacenter.entity.influxdb.CalculatData;
import lombok.Data;
import org.springframework.stereotype.Repository;
import java.util.List;
import java.util.Map;
@Repository
@Data
public class CalculatDataMapper extends BaseInfluxDB<CalculatData> {
private String measurement="calculat_db"; //要查询的表名
public boolean insertCalculatDataToInfluxDB(Map<String, String> tags, Map<String, Object> fields){
//influxdb 默认要求存在一个field 可以没有tag
if(fields.size()<=0){
return false;
}
boolean result = super.insert(measurement, tags, fields);
return result;
}
public List<CalculatData> queryAllCalculatDataFormInfluxDB(){
List<CalculatData> resultList = super.query("select * from "+measurement,CalculatData.class);
return resultList;
}
}
下面是具体方法中的 sql语句拼接
@Override
public JsonResult queryAnalysisDataFromInfluxDB(DataBaseQuery query) {
try{
Integer currentPage = query.getPageIndex();//当前页
Integer pageSize = query.getPageSize();//显示条数
if(pageSize<=0){
pageSize=10;
}
String datectionItem=query.getDetectionItemId();//数据项
String startTime=query.getStartTime();
String endTime=query.getEndTime();
//基本查询sql
StringBuilder baseSql = new StringBuilder(sql);
//查询总量sql
StringBuilder countSql = new StringBuilder("select count(dataValue) as count from "+calculatDataMapper.getMeasurement());
//拼装查询条件
StringBuilder appendSql = new StringBuilder();
appendSql.append(" where 1=1 ");
if(EmptyUtils.isNotEmpty(datectionItem)){
appendSql.append(" and dataName='"+datectionItem+"'");
}
if(EmptyUtils.isNotEmpty(startTime)){
appendSql.append(" and time >='"+startTime+"'");
}
if(EmptyUtils.isNotEmpty(endTime)){
appendSql.append(" and time <='"+endTime+"'");
}
appendSql.append(" ORDER BY time DESC ");
String baseSqlStr =baseSql.append(appendSql).append(" limit ").append(pageSize).append(" OFFSET ").append((currentPage-1)*pageSize).append(" tz('Asia/Shanghai')").toString();
appendSql.append(" tz('Asia/Shanghai')");
log.info("---------解析数据InfluxDBsql-------->"+baseSqlStr);
List<CalculatData> count = calculatDataMapper.query(countSql.append(appendSql).toString(),CalculatData.class); //查询总条数
Map<String,Object> resultMap = new HashMap<>();
if(count.size()>0){
int total =count.get(0).getCount();//总条数
int totalPageNum = (total + pageSize - 1) / pageSize; //总页数
List<CalculatData> analysisData = calculatDataMapper.query(baseSqlStr,CalculatData.class);
resultMap.put("total",total);
resultMap.put("size",pageSize);
resultMap.put("current",currentPage);
resultMap.put("pages",totalPageNum);
resultMap.put("records",analysisData);
return JsonResult.success("查询成功",resultMap);
}else{
//查询条件查不到时返回
resultMap.put("total",0);
resultMap.put("size",pageSize);
resultMap.put("current",currentPage);
resultMap.put("pages",0);
resultMap.put("records",null);
return JsonResult.success("查询成功",resultMap);
}
}catch (Exception exception){
exception.printStackTrace();
}
return JsonResult.error("查询失败");
}
pom.xml 引入
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.19</version>
</dependency>