建表:
// 创建kudu连接
val kuduClient = new KuduClient.KuduClientBuilder("172.20.85.29:7051").build()
// 设置表名
val tableName = "kudu_test"
// 创建列
val colums = List[ColumnSchema]((new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).key(true).nullable(false).build()),
(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT64).nullable(true).build()),
(new ColumnSchema.ColumnSchemaBuilder("city", Type.STRING).nullable(true).build()))
val schema: Schema = new Schema(colums.asJava)
// 设置hash分区
val cto: CreateTableOptions = new CreateTableOptions()
cto.setRangePartitionColumns(List("name").asJava).setNumReplicas(3)
// 执行建表语句
kuduClient.createTable(tableName, schema, cto)
// 关闭kudu连接
kuduClient.close()
kuduClient API 对表插入数据
// 创建kudu连接
val kuduClient = new KuduClient.KuduClientBuilder("172.20.85.29:7051").build()
// 设置表名
val tableName = "kudu_test"
// 获得表的连接
val kuduTable = kuduClient.openTable(tableName)
// 开启一个会话
val session = kuduClient.newSession()
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH)
// 创建插入对象并设置插入数据
val insert = kuduTable.newInsert()
val row = insert.getRow()
row.addString(0, "Ronnie")
row.addInt(1, 21)
row.addString(2, "beijing")
//执行插入语句
session.apply(insert)
// 同步数据,关闭会话
session.flush()
session.close()
kuduClient API 对表修改数据
// 创建kudu连接
val kuduClient = new KuduClient.KuduClientBuilder("172.20.85.29:7051").build()
// 设置表名
val tableName = "kudu_test"
// 获得表的连接
val kuduTable = kuduClient.openTable(tableName)
// 开启一个会话
val session = kuduClient.newSession()
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH)
// 创建updata对象
val update = kuduTable.newUpdate()
val rowUpdata = update.getRow()
rowUpdata.addString("name", "nnnn")
rowUpdata.addInt("age", 22)
rowUpdata.addString("city", "ddddd")
// 同步数据并关闭会话
session.apply(update)
session.flush()
// 关闭kudu连接
kuduClient.close()
kuduClient API 对表执行upsert操作
如果该主键存在则执行updata操作,即修改表数据;如果该主键不存在,则执行insert操作,即插入数据。
// 创建kudu连接
val kuduClient = new KuduClient.KuduClientBuilder("172.20.85.29:7051").build()
// 设置表名
val tableName = "kudu_test"
// 获得表的连接
val kuduTable = kuduClient.openTable(tableName)
// 开启一个会话
val session = kuduClient.newSession()
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH)
// 创建upsert对象
val upsert = kuduTable.newUpsert()
val rowUpsert = upsert.getRow()
rowUpsert.addString("name", "nnnn")
rowUpsert.addInt("age", 19)
rowUpsert.addString("city", "mmmm")
// 执行upsert操作
session.apply(upsert)
// 同步数据并关闭会话
session.flush()
session.close()
// 关闭kudu连接
kuduClient.close()
// 关闭kudu连接
kuduClient.close()
kuduClient API 对表执行删除数据操作
// 创建kudu连接
val kuduClient = new KuduClient.KuduClientBuilder("172.20.85.29:7051").build()
// 设置表名
val tableName = "kudu_test"
// 获得表的连接
val kuduTable = kuduClient.openTable(tableName)
// 开启一个会话
val session = kuduClient.newSession()
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH)
// 创建删除对象并指定要删除的行
val delete = kuduTable.newDelete()
delete.getRow().addString("name", "Ronnie")
// 执行删除操作
session.apply(delete)
// 同步数据并关闭会话
session.flush()
session.close()
// 关闭kudu连接
kuduClient.close()