0
点赞
收藏
分享

微信扫一扫

Flink连接Phoenix操作Hbase数据库

谷中百合517 2022-02-10 阅读 65

需要导入的pom依赖:

<phoenix.version>5.0.0-HBase-2.0</phoenix.version>
<dependency>
        <groupId>org.apache.phoenix</groupId>
        <artifactId>phoenix-core</artifactId>
        <version>${phoenix.version}</version>
</dependency>

写测试类进行Phoenix连接测试:

import java.sql.{DriverManager, ResultSet}
import com.zw.bigdata.common.pojo.LandRecord
import scala.reflect.classTag
import scala.reflect.runtime.universe._


object  PhoenixUtil {

//加载配置信息:
val driverName = "org.apache.phoenix.jdbc.PhoenixDriver"

//我的测试集群分别为test001,test002,test003
//phoenix对应的测试库为hbase
val phoenixUrl = "jdbc:phoenix:test001,test002,test003:2181:/hbase;autocommit=true"

Class.forName(driverName)

//创建连接
val conn = DriverManager.getConnection(phoenixUrl)
val statement = conn.createStatement()


//声明一个查询Phoenix的工具类
//传入的参数是泛型(这里准备的是样例类)和sql
def getQueryResult[T](querySql:String)(implicit m: Manifest[T]):Option[T] = {
try{
  //执行查询语句
  val result:ResultSet = statement.executeQuery(querySql)
  while (result.next()){
    val recordClass = classTag[T].runtimeClass
    val record:T = recordClass.getConstructor().newInstance().asInstanceOf[T]
    typeOf[T].members.withFilter(!_.isMethod).map(row=>{
//          println(result.getString(row.name.toString.trim))
      val method = recordClass.getMethod(s"set${row.name.toString.trim.capitalize}",classOf[String])
      method.invoke(record,result.getString(row.name.toString.trim))
    })
    return Some(record)
  }
  result.close()
  return None
}catch {
  case ex:Exception=>ex.printStackTrace()
    release()
    return None
   }
}


def main(args: Array[String]): Unit = {
   //测试sql
   val selectSql = s"""select "book_id","book_name" from "phoenix_test_bookName0209" where "ROW" = '${book_id}' """
   
   //println(getQueryResult[LandpageBookRecord](selectSql ))
   //这里传入的是 LandpageBookRecord 样例类
   //此时返回的value值是被some() 所包裹着(整条样例类数据)
   //如果想去掉some()的包裹,可以用value.getOrElse() 去掉some()
   val value = getQueryResult[LandpageBookRecord](selectSql )
   
   //个人业务需要,需要将获取到样例类个别字段数据
   //用到模式匹配
   val landPvUvRecord =
      value match {
         //如果不为空的话,可以通过lbRecord打点的方式进行值获取
         //(lbRecord自定义变量),为空则用(无业务逻辑的)默认值填充
        case Some(lbRecord) => (lbRecord.book_name,lbRecord.book_id)
        case None=>("-99","-99")
      }
      
    val book_name_id = landPvUvRecord._1
    println(s"book_name_id is:${book_name_id}")
    val book_name = landPvUvRecord._2
    println(s"book_name  is :${book_name}")
    
     }
}      

样例类:

import scala.beans.BeanProperty
class LandpageBookRecord {
//我的业务逻辑是向hbase表中进行数据查询,使用Phoenix作为中间件吗,
//便于sql书写,LandpageBook表对应的字段目前只有三个
//所以样例类也就是三个字段,但是业务上只需要我查询出来两个字段即可
//这里又注释掉一个字段,并且实现了toString(方法有返回值将业务需要的两字段带了出去)
//  @BeanProperty
//  var ROW:String=_
@BeanProperty
var book_id:String=_
@BeanProperty
var book_name:String=_

override def toString: String = {
return s"LandpageBookRecord[book_id=${book_id},book_name=${book_name}]"
  }
}
举报

相关推荐

0 条评论