表格存储(Tablestore)面向海量结构化数据提供 Serverless 表存储服务,同时针对物联网场景深度优化提供一站式的 IoTstore 解决方案。
适用于海量账单、IM 消息、物联网、车联网、风控、推荐等场景中的结构化数据存储,提供海量数据低成本存储、毫秒级的在线数据查询和检索以及灵活的数据分析能力。
tableStore更新时,必须有PK
private static List<Row> getAllRows(SyncClient client, String tableName, String indexName, SearchQuery searchQuery, SearchRequest.ColumnsToGet columnsToGet) {
if (Objects.isNull(columnsToGet) || CollectionUtils.isEmpty(columnsToGet.getColumns())) {
log.warn("SearchRequest.ColumnsToGet must not be null,when getAllRows()");
throw new RuntimeException("SearchRequest.ColumnsToGet must not be null,when getAllRows()");
}
searchQuery.setGetTotalCount(true);
SearchRequest searchRequest = new SearchRequest(tableName, indexName, searchQuery);
searchRequest.setColumnsToGet(columnsToGet);
SearchResponse response = client.search(searchRequest);
List<Row> rows = new ArrayList<>();
// 读到NextToken为null为止,即读出全部数据
while (response.getNextToken() != null) {
if (response.isAllSuccess()) {
rows.addAll(response.getRows());
// 把Token设置到下一次请求中
searchRequest.getSearchQuery().setToken(response.getNextToken());
response = client.search(searchRequest);
continue;
}
log.error("查询没有全部成功,requestId:{},searchRequest:{} ", response.getRequestId(), JSON.toJSONString(searchRequest));
throw new RuntimeException("not all SUCCESS");
}
return rows;
}
/**
* 批量更新场景,只要PrimaryKeys
*
* @param client
* @param tableName
* @param indexName
* @param searchQuery
* @return
*/
private static List<PrimaryKey> getAllPrimaryKeys(SyncClient client, String tableName, String indexName, SearchQuery searchQuery) {
searchQuery.setGetTotalCount(true);
SearchRequest searchRequest = new SearchRequest(tableName, indexName, searchQuery);
SearchRequest.ColumnsToGet columnsToGet = new SearchRequest.ColumnsToGet();
columnsToGet.setColumns(Collections.singletonList("id"));
searchRequest.setColumnsToGet(columnsToGet);
SearchResponse response = client.search(searchRequest);
List<PrimaryKey> resultPrimaryKeys = new ArrayList<>();
// 读到NextToken为null为止,即读出全部数据
while (response.getNextToken() != null) {
if (response.isAllSuccess()) {
response.getRows().stream().map(Row::getPrimaryKey).forEach(resultPrimaryKeys::add);
// 把Token设置到下一次请求中
searchRequest.getSearchQuery().setToken(response.getNextToken());
response = client.search(searchRequest);
continue;
}
log.error("查询没有全部成功,requestId:{},searchRequest:{} ", response.getRequestId(), JSON.toJSONString(searchRequest));
throw new RuntimeException("not all SUCCESS");
}
return resultPrimaryKeys;
}
总结:
这个东西本身可能技术还不成熟,使用的人少,有问题很难解决
遇到的问题:
(1)没有一个GUI工具,使用门槛高
(2)查询的GetRange不方便,把查询出来的数据使用System.out.println打印出来的是乱码
(3)batch insert时报错及解析:
表格存储Table Store限制项:https://help.aliyun.com/knowledge_detail/38573.html
BatchGetRow 一次操作请求读取的行数 不超过 100 N/A
BatchWriteRow 一次操作请求写入行数 不超过 200 N/A
BatchWriteRow 一次操作的数据大小 不超过 4 MB N/A
GetRange 一次返回的数据 5000 行或者 4 MB 一次返回数据的行数超过 5000 行,或者返回数据的数据大小大于 4 MB。以上任一条件满足时,超出上限的数据将会按行级别被截掉并返回下一行数据主键信息
一次 HTTP 请求 Request Body 的数据大小 不超过 5 MB
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore</artifactId>
<version>4.2.1</version>
</dependency>
2017-05-18 18:14:55.520 ERROR [http-nio-5000-exec-1]com.xiaoyi.app.tool.car.service.BatchService -/862442030078001_2378171_1.2.0_1493620263150.tar.gz,contentLength:2238,Rows count exceeds the upper limit: 200.
com.alicloud.openservices.tablestore.TableStoreException: Rows count exceeds the upper limit: 200.
如何高效存储海量GPS数据:https://yq.aliyun.com/articles/74460
从SQL到NoSQL—如何使用表格存储 https://yq.aliyun.com/articles/64411
使用MaxCompute访问TableStore(OTS) 简明手册 https://yq.aliyun.com/articles/72075
表格存储数据模型和查询操作 https://yq.aliyun.com/articles/38621
sdk:
表操作:https://help.aliyun.com/document_detail/43012.html?spm=5176.doc43017.6.692.FqI5C4
多行数据操作:https://help.aliyun.com/document_detail/43017.html
主键列自增:https://help.aliyun.com/document_detail/47731.html?spm=5176.doc43012.6.697.MeAkYn
最佳实践,表操作:https://help.aliyun.com/document_detail/27356.html?spm=5176.2020520106.122.1.mc3CbL
最佳实践,过滤器:https://helpcdn.aliyun.com/document_detail/35193.html?spm=5176.doc35194.6.594.A3mFNT
api 多行数据操作: https://help.aliyun.com/document_detail/43017.html?spm=5176.doc27283.6.694.tZydZm
api表格存储:GetRange https://help.aliyun.com/document_detail/27309.html?spm=5176.doc27304.6.629.S7OXV5
import com.alicloud.openservices.tablestore.ClientConfiguration;
import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.model.filter.SingleColumnValueFilter;
import com.xiaoyi.app.business.car.biz.CarGpsBiz;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Created by tang.cheng on 2017/5/18.
*/
@Component
public class TableStoreUtil {
private static final String TABLE_NAME = "tt_test";
private static final String PRIMARY_KEY_NAME = "pk_id";
private static final Logger LOGGER = LoggerFactory.getLogger(TableStoreUtil.class);
public static void main(String[] args) throws InterruptedException {
final String endPoint = "";
final String accessKeyId = "";
final String accessKeySecret = "";
final String instanceName = "tt_test_instance";
// ClientConfiguration提供了很多配置项,以下只列举部分。
ClientConfiguration clientConfiguration = new ClientConfiguration();
// 设置建立连接的超时时间。
clientConfiguration.setConnectionTimeoutInMillisecond(5000);
// 设置socket超时时间。
clientConfiguration.setSocketTimeoutInMillisecond(5000);
// 设置重试策略,若不设置,采用默认的重试策略。
clientConfiguration.setRetryStrategy(new AlwaysRetryStrategy());
SyncClient client = new SyncClient(endPoint, accessKeyId, accessKeySecret,
instanceName, clientConfiguration);
try {
// createTable(client, CarGpsBiz.TABLE_NAME);
batchRange(client, CarGpsBiz.TABLE_NAME);
// deleteTable(client, CarGpsBiz.TABLE_NAME);
/* createTable(client);
TimeUnit.SECONDS.sleep(10);*/
// listTable(client);
// describeTable(client);
// deleteTable(client,"sampleTable");
// deleteTable(client,"sampleTable2");
/* batchWriteRow(client);
System.out.println("==============batchGetRow==============");
batchGetRow(client);*/
} catch (TableStoreException e) {
System.err.println("操作失败,详情:" + e.getMessage());
System.err.println("Request ID:" + e.getRequestId());
} catch (ClientException e) {
System.err.println("请求失败,详情:" + e.getMessage());
} catch (Exception e) {
e.printStackTrace();
} finally {
// deleteTable(client);
}
}
public static void batchRange(SyncClient client, String tableName) {
// 等同于 SELECT * FROM UserHistory WHERE user_id = '10100'
RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria(tableName);
// 设置起始主键
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
String imei = "/862442030051651";
primaryKeyBuilder.addPrimaryKeyColumn("partition_key", PrimaryKeyValue.INF_MIN);
primaryKeyBuilder.addPrimaryKeyColumn("imei", PrimaryKeyValue.INF_MIN);
primaryKeyBuilder.addPrimaryKeyColumn("time", PrimaryKeyValue.INF_MIN);
primaryKeyBuilder.addPrimaryKeyColumn("tid", PrimaryKeyValue.INF_MIN);
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());
// 设置结束主键
primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("partition_key", PrimaryKeyValue.INF_MAX);
primaryKeyBuilder.addPrimaryKeyColumn("imei", PrimaryKeyValue.INF_MAX);
primaryKeyBuilder.addPrimaryKeyColumn("time", PrimaryKeyValue.INF_MAX);
primaryKeyBuilder.addPrimaryKeyColumn("tid", PrimaryKeyValue.INF_MAX);
rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
// 设置读取最新版本
rangeRowQueryCriteria.setMaxVersions(1);
// 默认读取所有的属性列
GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
List<Row> rows = getRangeResponse.getRows();
System.out.println("rows size:" + rows.size());
for (int i = 0; i < 2; i++) {
for (Column column : rows.get(i).getColumns()) {
LOGGER.info("{},{}", column.getName(), column.getValue());
}
}
}
public static void createTable(SyncClient client, String tableName) {
TableMeta tableMeta = new TableMeta(tableName);
tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("partition_key", PrimaryKeyType.STRING));
tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("imei", PrimaryKeyType.STRING));
tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("time", PrimaryKeyType.INTEGER));
tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("tid", PrimaryKeyType.INTEGER));
int timeToLive = 182 * 24 * 3600; // 数据的过期时间, 单位秒, -1代表永不过期. 假如设置过期时间为一年, 即为 365 * 24 * 3600.
int maxVersions = 3; // 保存的最大版本数, 设置为3即代表每列上最多保存3个最新的版本.
TableOptions tableOptions = new TableOptions(timeToLive, maxVersions);
CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions);
client.createTable(request);
}
public static void listTable(SyncClient client) {
ListTableResponse response = client.listTable();
System.out.println("表的列表如下:");
for (String tableName : response.getTableNames()) {
System.out.println(tableName);
}
}
public static void deleteTable(SyncClient client, String tableName) {
DeleteTableRequest request = new DeleteTableRequest(tableName);
client.deleteTable(request);
}
public static void describeTable(SyncClient client) {
DescribeTableRequest request = new DescribeTableRequest(TABLE_NAME);
DescribeTableResponse response = client.describeTable(request);
TableMeta tableMeta = response.getTableMeta();
System.out.println("表的名称:" + tableMeta.getTableName());
System.out.println("表的主键:");
for (PrimaryKeySchema primaryKeySchema : tableMeta.getPrimaryKeyList()) {
System.out.println(primaryKeySchema);
}
TableOptions tableOptions = response.getTableOptions();
System.out.println("表的TTL:" + tableOptions.getTimeToLive());
System.out.println("表的MaxVersions:" + tableOptions.getMaxVersions());
ReservedThroughputDetails reservedThroughputDetails = response.getReservedThroughputDetails();
System.out.println("表的预留读吞吐量:"
+ reservedThroughputDetails.getCapacityUnit().getReadCapacityUnit());
System.out.println("表的预留写吞吐量:"
+ reservedThroughputDetails.getCapacityUnit().getWriteCapacityUnit());
}
public static void batchWriteRow(SyncClient client) {
BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
// 构造rowPutChange1
PrimaryKeyBuilder pk1Builder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pk1Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString("pk1"));
RowPutChange rowPutChange1 = new RowPutChange(TABLE_NAME, pk1Builder.build());
// 添加一些列
for (int i = 0; i < 10; i++) {
rowPutChange1.addColumn(new Column("Col" + i, ColumnValue.fromLong(i)));
}
// 添加到batch操作中
batchWriteRowRequest.addRowChange(rowPutChange1);
// 构造rowPutChange2
PrimaryKeyBuilder pk2Builder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pk2Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString("pk2"));
RowPutChange rowPutChange2 = new RowPutChange(TABLE_NAME, pk2Builder.build());
// 添加一些列
for (int i = 0; i < 10; i++) {
rowPutChange2.addColumn(new Column("Col" + i, ColumnValue.fromLong(i)));
}
// 添加到batch操作中
batchWriteRowRequest.addRowChange(rowPutChange2);
// 构造rowUpdateChange
PrimaryKeyBuilder pk3Builder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pk3Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString("pk3"));
RowUpdateChange rowUpdateChange = new RowUpdateChange(TABLE_NAME, pk3Builder.build());
// 添加一些列
for (int i = 0; i < 10; i++) {
rowUpdateChange.put(new Column("Col" + i, ColumnValue.fromLong(i)));
}
// 删除一列
rowUpdateChange.deleteColumns("Col10");
// 添加到batch操作中
batchWriteRowRequest.addRowChange(rowUpdateChange);
// 构造rowDeleteChange
PrimaryKeyBuilder pk4Builder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pk4Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString("pk4"));
RowDeleteChange rowDeleteChange = new RowDeleteChange(TABLE_NAME, pk4Builder.build());
// 添加到batch操作中
batchWriteRowRequest.addRowChange(rowDeleteChange);
BatchWriteRowResponse response = client.batchWriteRow(batchWriteRowRequest);
System.out.println("是否全部成功:" + response.isAllSucceed());
if (!response.isAllSucceed()) {
for (BatchWriteRowResponse.RowResult rowResult : response.getFailedRows()) {
System.out.println("失败的行:" + batchWriteRowRequest.getRowChange(rowResult.getTableName(), rowResult.getIndex()).getPrimaryKey());
System.out.println("失败原因:" + rowResult.getError());
}
/**
* 可以通过createRequestForRetry方法再构造一个请求对失败的行进行重试.这里只给出构造重试请求的部分.
* 推荐的重试方法是使用SDK的自定义重试策略功能, 支持对batch操作的部分行错误进行重试. 设定重试策略后, 调用接口处即不需要增加重试代码.
*/
BatchWriteRowRequest retryRequest = batchWriteRowRequest.createRequestForRetry(response.getFailedRows());
}
}
private static void batchGetRow(SyncClient client) {
MultiRowQueryCriteria multiRowQueryCriteria = new MultiRowQueryCriteria(TABLE_NAME);
// 加入10个要读取的行
for (int i = 0; i < 10; i++) {
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString("pk" + i));
PrimaryKey primaryKey = primaryKeyBuilder.build();
multiRowQueryCriteria.addRow(primaryKey);
}
// 添加条件
multiRowQueryCriteria.setMaxVersions(1);
multiRowQueryCriteria.addColumnsToGet("Col0");
multiRowQueryCriteria.addColumnsToGet("Col1");
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("Col0",
SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromLong(0));
singleColumnValueFilter.setPassIfMissing(false);
multiRowQueryCriteria.setFilter(singleColumnValueFilter);
BatchGetRowRequest batchGetRowRequest = new BatchGetRowRequest();
// batchGetRow支持读取多个表的数据, 一个multiRowQueryCriteria对应一个表的查询条件, 可以添加多个multiRowQueryCriteria.
batchGetRowRequest.addMultiRowQueryCriteria(multiRowQueryCriteria);
BatchGetRowResponse batchGetRowResponse = client.batchGetRow(batchGetRowRequest);
System.out.println("是否全部成功:" + batchGetRowResponse.isAllSucceed());
if (!batchGetRowResponse.isAllSucceed()) {
for (BatchGetRowResponse.RowResult rowResult : batchGetRowResponse.getFailedRows()) {
System.out.println("失败的行:" + batchGetRowRequest.getPrimaryKey(rowResult.getTableName(), rowResult.getIndex()));
System.out.println("失败原因:" + rowResult.getError());
}
/**
* 可以通过createRequestForRetry方法再构造一个请求对失败的行进行重试.这里只给出构造重试请求的部分.
* 推荐的重试方法是使用SDK的自定义重试策略功能, 支持对batch操作的部分行错误进行重试. 设定重试策略后, 调用接口处即不需要增加重试代码.
*/
BatchGetRowRequest retryRequest = batchGetRowRequest.createRequestForRetry(batchGetRowResponse.getFailedRows());
}
}
}
demo1:
import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.model.*;
/**
* Created by yizheng on 16/4/28.
*/
public class TableOperationSample {
/**
* 本示例中建立一张表,名为sampleTable,只含有一个主键, 主键名为pk.
*/
private static final String TABLE_NAME = "sampleTable2";
private static final String PRIMARY_KEY_NAME = "pk";
public static void main(String[] args) {
final String endPoint = "";
final String accessId = "";
final String accessKey = "";
final String instanceName = "";
SyncClient client = new SyncClient(endPoint, accessId, accessKey,
instanceName);
try {
// 创建表
createTable(client);
// list table查看表的列表
listTable(client);
// 查看表的属性
describeTable(client);
// 更新表的属性
updateTable(client);
// update table完之后查看表的属性
describeTable(client);
// list table查看表的列表
listTable(client);
} catch (TableStoreException e) {
System.err.println("操作失败,详情:" + e.getMessage());
System.err.println("Request ID:" + e.getRequestId());
} catch (ClientException e) {
System.err.println("请求失败,详情:" + e.getMessage());
}
client.shutdown();
}
public static void createTable(SyncClient client) {
TableMeta tableMeta = new TableMeta(TABLE_NAME);
tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema(PRIMARY_KEY_NAME, PrimaryKeyType.STRING));
int timeToLive = -1; // 数据的过期时间, 单位秒, -1代表永不过期. 假如设置过期时间为一年, 即为 365 * 24 * 3600.
int maxVersions = 3; // 保存的最大版本数, 设置为3即代表每列上最多保存3个最新的版本.
TableOptions tableOptions = new TableOptions(timeToLive, maxVersions);
CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions);
client.createTable(request);
}
public static void updateTable(SyncClient client) {
int timeToLive = -1;
int maxVersions = 5; //更新最大版本数为5.
TableOptions tableOptions = new TableOptions(timeToLive, maxVersions);
UpdateTableRequest request = new UpdateTableRequest(TABLE_NAME);
request.setTableOptionsForUpdate(tableOptions);
client.updateTable(request);
}
public static void describeTable(SyncClient client) {
DescribeTableRequest request = new DescribeTableRequest(TABLE_NAME);
DescribeTableResponse response = client.describeTable(request);
TableMeta tableMeta = response.getTableMeta();
System.out.println("表的名称:" + tableMeta.getTableName());
System.out.println("表的主键:");
for (PrimaryKeySchema primaryKeySchema : tableMeta.getPrimaryKeyList()) {
System.out.println(primaryKeySchema);
}
TableOptions tableOptions = response.getTableOptions();
System.out.println("表的TTL:" + tableOptions.getTimeToLive());
System.out.println("表的MaxVersions:" + tableOptions.getMaxVersions());
ReservedThroughputDetails reservedThroughputDetails = response.getReservedThroughputDetails();
System.out.println("表的预留读吞吐量:"
+ reservedThroughputDetails.getCapacityUnit().getReadCapacityUnit());
System.out.println("表的预留写吞吐量:"
+ reservedThroughputDetails.getCapacityUnit().getWriteCapacityUnit());
}
public static void deleteTable(SyncClient client) {
DeleteTableRequest request = new DeleteTableRequest(TABLE_NAME);
client.deleteTable(request);
}
public static void listTable(SyncClient client) {
ListTableResponse response = client.listTable();
System.out.println("表的列表如下:");
for (String tableName : response.getTableNames()) {
System.out.println(tableName);
}
}
}
demo2
import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.model.condition.ColumnCondition;
import com.alicloud.openservices.tablestore.model.condition.SingleColumnValueCondition;
import com.alicloud.openservices.tablestore.model.filter.SingleColumnValueFilter;
import java.util.Iterator;
public class PkAutoIncrSample {
/**
* 本示例中建立一张表,名为sampleTable,两个主键, 主键分别为pk1,pk2.
*/
private static final String TABLE_NAME = "sampleTable_pk";
private static final String PRIMARY_KEY_NAME_1 = "pk1";
private static final String PRIMARY_KEY_NAME_2 = "pk2";
public static void main(String[] args) {
final String endPoint = "";
final String accessId = "";
final String accessKey = "";
final String instanceName = "";
SyncClient client = new SyncClient(endPoint, accessId, accessKey,
instanceName);
try {
// 建表
createTable(client);
System.out.println("create table succeeded.");
// 等待表load完毕.
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// putRow
PrimaryKey pk = putRow(client);
System.out.println("put row succeeded,pk:" + pk.toString());
// getRow
getRow(client, pk);
// updateRow
updateRow(client, pk);
getRowWithFilter(client, pk);
// 使用condition递增一列
updateRowWithCondition(client, pk);
// getRow
getRow(client, pk);
// 再写入两行
putRow(client);
putRow(client);
// getRange
getRange(client, "a", "z");
// 使用iterator进行getRange
getRangeByIterator(client, "a", "z");
batchWriteRow(client);
} catch (TableStoreException e) {
System.err.println("操作失败,详情:" + e.getMessage());
System.err.println("Request ID:" + e.getRequestId());
} catch (ClientException e) {
System.err.println("请求失败,详情:" + e.getMessage());
} finally {
// 为了安全,这里不能默认删表,如果需要删表,需用户自己手动打开
// deleteTable(client);
}
client.shutdown();
}
private static void createTable(SyncClient client) {
TableMeta tableMeta = new TableMeta(TABLE_NAME);
tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema(PRIMARY_KEY_NAME_1, PrimaryKeyType.STRING));
tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema(PRIMARY_KEY_NAME_2, PrimaryKeyType.INTEGER, PrimaryKeyOption.AUTO_INCREMENT));
int timeToLive = -1; // 数据的过期时间, 单位秒, -1代表永不过期. 假如设置过期时间为一年, 即为 365 * 24 * 3600.
int maxVersions = 1; // 保存的最大版本数, 设置为1即代表每列上最多保存一个版本(保存最新的版本).
TableOptions tableOptions = new TableOptions(timeToLive, maxVersions);
CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions);
client.createTable(request);
}
private static void deleteTable(SyncClient client) {
DeleteTableRequest request = new DeleteTableRequest(TABLE_NAME);
client.deleteTable(request);
}
private static PrimaryKey putRow(SyncClient client) {
// 构造主键
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_1, PrimaryKeyValue.fromString("chengdu"));
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_2, PrimaryKeyValue.AUTO_INCRMENT);
PrimaryKey primaryKey = primaryKeyBuilder.build();
RowPutChange rowPutChange = new RowPutChange(TABLE_NAME, primaryKey);
rowPutChange.setReturnType(ReturnType.RT_PK);
//加入一些属性列
long ts = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 3; j++) {
rowPutChange.addColumn(new Column("Col" + i, ColumnValue.fromLong(j), ts + j));
}
}
PutRowResponse response = client.putRow(new PutRowRequest(rowPutChange));
// 打印出消耗的CU
CapacityUnit cu = response.getConsumedCapacity().getCapacityUnit();
System.out.println("Read CapacityUnit:" + cu.getReadCapacityUnit());
System.out.println("Write CapacityUnit:" + cu.getWriteCapacityUnit());
// 打印出返回的PK列
PrimaryKey pk = response.getRow().getPrimaryKey();
System.out.println("PrimaryKey:" + pk.toString());
return pk;
}
private static void updateRow(SyncClient client, PrimaryKey pk) {
RowUpdateChange rowUpdateChange = new RowUpdateChange(TABLE_NAME, pk);
// 更新一些列
for (int i = 0; i < 10; i++) {
rowUpdateChange.put(new Column("Col" + i, ColumnValue.fromLong(i)));
}
// 删除某列的某一版本
rowUpdateChange.deleteColumn("Col10", 1465373223000L);
// 删除某一列
rowUpdateChange.deleteColumns("Col11");
rowUpdateChange.setCondition(new Condition(RowExistenceExpectation.EXPECT_EXIST));
client.updateRow(new UpdateRowRequest(rowUpdateChange));
}
private static void deleteRow(SyncClient client, PrimaryKey pk) {
RowDeleteChange rowDeleteChange = new RowDeleteChange(TABLE_NAME, pk);
client.deleteRow(new DeleteRowRequest(rowDeleteChange));
}
private static void batchWriteRow(SyncClient client) {
BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
// 构造rowPutChange1
PrimaryKeyBuilder pk1Builder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pk1Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_1, PrimaryKeyValue.fromString("Hangzhou"));
pk1Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_2, PrimaryKeyValue.AUTO_INCRMENT);
RowPutChange rowPutChange1 = new RowPutChange(TABLE_NAME, pk1Builder.build());
rowPutChange1.setReturnType(ReturnType.RT_PK);
// 添加一些列
rowPutChange1.addColumn(new Column("Column_0", ColumnValue.fromLong(99)));
// 添加到batch操作中
batchWriteRowRequest.addRowChange(rowPutChange1);
// 构造rowPutChange2
PrimaryKeyBuilder pk2Builder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pk2Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_1, PrimaryKeyValue.fromString("Hangzhou"));
pk2Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_2, PrimaryKeyValue.AUTO_INCRMENT);
RowPutChange rowPutChange2 = new RowPutChange(TABLE_NAME, pk2Builder.build());
rowPutChange2.setReturnType(ReturnType.RT_PK);
// 添加一些列
rowPutChange2.addColumn(new Column("Column_0", ColumnValue.fromLong(100)));
// 添加到batch操作中
batchWriteRowRequest.addRowChange(rowPutChange2);
// 构造rowUpdateChange
PrimaryKeyBuilder pk3Builder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pk3Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_1, PrimaryKeyValue.fromString("Hangzhou"));
pk3Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_2, PrimaryKeyValue.AUTO_INCRMENT);
RowUpdateChange rowUpdateChange = new RowUpdateChange(TABLE_NAME, pk3Builder.build());
rowUpdateChange.setReturnType(ReturnType.RT_PK);
// 添加一列
rowUpdateChange.put(new Column("Column_0", ColumnValue.fromLong(101)));
// 删除一列
rowUpdateChange.deleteColumns("Column_1");
// 添加到batch操作中
batchWriteRowRequest.addRowChange(rowUpdateChange);
// 构造rowDeleteChange
PrimaryKeyBuilder pk4Builder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pk4Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_1, PrimaryKeyValue.fromString("Hangzhou"));
pk4Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_2, PrimaryKeyValue.fromLong(1));
RowDeleteChange rowDeleteChange = new RowDeleteChange(TABLE_NAME, pk4Builder.build());
rowDeleteChange.setReturnType(ReturnType.RT_PK);
// 添加到batch操作中
batchWriteRowRequest.addRowChange(rowDeleteChange);
BatchWriteRowResponse response = client.batchWriteRow(batchWriteRowRequest);
System.out.println("是否全部成功:" + response.isAllSucceed());
if (!response.isAllSucceed()) {
for (BatchWriteRowResponse.RowResult rowResult : response.getFailedRows()) {
System.out.println("失败的行:" + batchWriteRowRequest.getRowChange(rowResult.getTableName(), rowResult.getIndex()).getPrimaryKey());
System.out.println("失败原因:" + rowResult.getError());
}
/*
* 可以通过createRequestForRetry方法再构造一个请求对失败的行进行重试.这里只给出构造重试请求的部分.
* 推荐的重试方法是使用SDK的自定义重试策略功能, 支持对batch操作的部分行错误进行重试. 设定重试策略后, 调用接口处即不需要增加重试代码.
*/
BatchWriteRowRequest retryRequest = batchWriteRowRequest.createRequestForRetry(response.getFailedRows());
} else {
for (BatchWriteRowResponse.RowResult rowResult : response.getSucceedRows()) {
PrimaryKey pk = rowResult.getRow().getPrimaryKey();
System.out.println("Return PK:" + pk.jsonize());
}
}
}
private static void getRow(SyncClient client, PrimaryKey pk) {
// 读一行
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria(TABLE_NAME, pk);
// 设置读取最新版本
criteria.setMaxVersions(1);
GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
Row row = getRowResponse.getRow();
System.out.println("读取完毕, 结果为: ");
System.out.println(row);
// 设置读取某些列
criteria.addColumnsToGet("Col0");
getRowResponse = client.getRow(new GetRowRequest(criteria));
row = getRowResponse.getRow();
System.out.println("读取完毕, 结果为: ");
System.out.println(row);
}
private static void getRowWithFilter(SyncClient client, PrimaryKey pk) {
// 读一行
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria(TABLE_NAME, pk);
// 设置读取最新版本
criteria.setMaxVersions(1);
// 设置过滤器, 当Col0的值为0时返回该行.
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("Col0",
SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromLong(0));
// 如果不存在Col0这一列, 也不返回.
singleColumnValueFilter.setPassIfMissing(false);
// 只判断最新版本
singleColumnValueFilter.setLatestVersionsOnly(true);
criteria.setFilter(singleColumnValueFilter);
GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
Row row = getRowResponse.getRow();
System.out.println("读取完毕, 结果为: ");
System.out.println(row);
}
// 通过Condition实现乐观锁机制, 递增一列.
private static void updateRowWithCondition(SyncClient client, PrimaryKey pk) {
// 读一行
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria(TABLE_NAME, pk);
criteria.setMaxVersions(1);
GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
Row row = getRowResponse.getRow();
long col0Value = row.getLatestColumn("Col0").getValue().asLong();
// 条件更新Col0这一列, 使列值+1
RowUpdateChange rowUpdateChange = new RowUpdateChange(TABLE_NAME, pk);
Condition condition = new Condition(RowExistenceExpectation.EXPECT_EXIST);
ColumnCondition columnCondition = new SingleColumnValueCondition("Col0", SingleColumnValueCondition.CompareOperator.EQUAL, ColumnValue.fromLong(col0Value));
condition.setColumnCondition(columnCondition);
rowUpdateChange.setCondition(condition);
rowUpdateChange.put(new Column("Col0", ColumnValue.fromLong(col0Value + 1)));
try {
client.updateRow(new UpdateRowRequest(rowUpdateChange));
} catch (TableStoreException ex) {
System.out.println(ex.toString());
}
}
private static void getRange(SyncClient client, String startPkValue, String endPkValue) {
RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria(TABLE_NAME);
// 设置起始主键
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_1, PrimaryKeyValue.fromString(startPkValue));
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_2, PrimaryKeyValue.fromLong(0));
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());
// 设置结束主键
primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_1, PrimaryKeyValue.fromString(endPkValue));
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_2, PrimaryKeyValue.INF_MAX);
rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
rangeRowQueryCriteria.setMaxVersions(1);
System.out.println("GetRange的结果为:");
while (true) {
GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
for (Row row : getRangeResponse.getRows()) {
System.out.println(row);
}
// 若nextStartPrimaryKey不为null, 则继续读取.
if (getRangeResponse.getNextStartPrimaryKey() != null) {
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());
} else {
break;
}
}
}
private static void getRangeByIterator(SyncClient client, String startPkValue, String endPkValue) {
RangeIteratorParameter rangeIteratorParameter = new RangeIteratorParameter(TABLE_NAME);
// 设置起始主键
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_1, PrimaryKeyValue.fromString(startPkValue));
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_2, PrimaryKeyValue.fromLong(0));
rangeIteratorParameter.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());
// 设置结束主键
primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_1, PrimaryKeyValue.fromString(endPkValue));
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME_2, PrimaryKeyValue.INF_MAX);
rangeIteratorParameter.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
rangeIteratorParameter.setMaxVersions(1);
Iterator<Row> iterator = client.createRangeIterator(rangeIteratorParameter);
System.out.println("使用Iterator进行GetRange的结果为:");
while (iterator.hasNext()) {
Row row = iterator.next();
System.out.println(row);
}
}
}
demo3
import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.model.condition.ColumnCondition;
import com.alicloud.openservices.tablestore.model.condition.SingleColumnValueCondition;
import com.alicloud.openservices.tablestore.model.filter.SingleColumnValueFilter;
import java.util.Iterator;
public class DataOperationSample {
/**
* 本示例中建立一张表,名为sampleTable,只含有一个主键, 主键名为pk.
*/
private static final String TABLE_NAME = "sampleTable";
private static final String PRIMARY_KEY_NAME = "pk";
public static void main(String[] args) {
final String endPoint = "";
final String accessId = "";
final String accessKey = "";
final String instanceName = "";
SyncClient client = new SyncClient(endPoint, accessId, accessKey, instanceName);
try {
// 建表
createTable(client);
// 等待表load完毕.
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// putRow
putRow(client, "pkValue");
// getRow
getRow(client, "pkValue");
// updateRow
updateRow(client, "pkValue");
// 使用condition递增一列
updateRowWithCondition(client, "pkValue");
// getRow
getRow(client, "pkValue");
// 再写入两行
putRow(client, "aaa");
putRow(client, "bbb");
// getRange
getRange(client, "a", "z");
// 使用iterator进行getRange
getRangeByIterator(client, "a", "z");
batchWriteRow(client);
batchGetRow(client);
} catch (TableStoreException e) {
System.err.println("操作失败,详情:" + e.getMessage());
System.err.println("Request ID:" + e.getRequestId());
} catch (ClientException e) {
System.err.println("请求失败,详情:" + e.getMessage());
} finally {
// 为了安全,这里不能默认删表,如果需要删表,需用户自己手动打开
// deleteTable(client);
}
client.shutdown();
}
private static void createTable(SyncClient client) {
TableMeta tableMeta = new TableMeta(TABLE_NAME);
tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema(PRIMARY_KEY_NAME, PrimaryKeyType.STRING));
int timeToLive = -1; // 数据的过期时间, 单位秒, -1代表永不过期. 假如设置过期时间为一年, 即为 365 * 24 * 3600.
int maxVersions = 1; // 保存的最大版本数, 设置为1即代表每列上最多保存一个版本(保存最新的版本).
TableOptions tableOptions = new TableOptions(timeToLive, maxVersions);
CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions);
client.createTable(request);
}
private static void deleteTable(SyncClient client) {
DeleteTableRequest request = new DeleteTableRequest(TABLE_NAME);
client.deleteTable(request);
}
private static void putRow(SyncClient client, String pkValue) {
// 构造主键
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(pkValue));
PrimaryKey primaryKey = primaryKeyBuilder.build();
RowPutChange rowPutChange = new RowPutChange(TABLE_NAME, primaryKey);
//加入一些属性列
long ts = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 3; j++) {
rowPutChange.addColumn(new Column("Col" + i, ColumnValue.fromLong(j), ts + j));
}
}
client.putRow(new PutRowRequest(rowPutChange));
}
private static void updateRow(SyncClient client, String pkValue) {
// 构造主键
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(pkValue));
PrimaryKey primaryKey = primaryKeyBuilder.build();
RowUpdateChange rowUpdateChange = new RowUpdateChange(TABLE_NAME, primaryKey);
// 更新一些列
for (int i = 0; i < 10; i++) {
rowUpdateChange.put(new Column("Col" + i, ColumnValue.fromLong(i)));
}
// 删除某列的某一版本
rowUpdateChange.deleteColumn("Col10", 1465373223000L);
// 删除某一列
rowUpdateChange.deleteColumns("Col11");
client.updateRow(new UpdateRowRequest(rowUpdateChange));
}
private static void deleteRow(SyncClient client, String pkValue) {
// 构造主键
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(pkValue));
PrimaryKey primaryKey = primaryKeyBuilder.build();
RowDeleteChange rowDeleteChange = new RowDeleteChange(TABLE_NAME, primaryKey);
client.deleteRow(new DeleteRowRequest(rowDeleteChange));
}
public static void batchWriteRow(SyncClient client) {
BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
// 构造rowPutChange1
PrimaryKeyBuilder pk1Builder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pk1Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString("pk1"));
RowPutChange rowPutChange1 = new RowPutChange(TABLE_NAME, pk1Builder.build());
// 添加一些列
for (int i = 0; i < 10; i++) {
rowPutChange1.addColumn(new Column("Col" + i, ColumnValue.fromLong(i)));
}
// 添加到batch操作中
batchWriteRowRequest.addRowChange(rowPutChange1);
// 构造rowPutChange2
PrimaryKeyBuilder pk2Builder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pk2Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString("pk2"));
RowPutChange rowPutChange2 = new RowPutChange(TABLE_NAME, pk2Builder.build());
// 添加一些列
for (int i = 0; i < 10; i++) {
rowPutChange2.addColumn(new Column("Col" + i, ColumnValue.fromLong(i)));
}
// 添加到batch操作中
batchWriteRowRequest.addRowChange(rowPutChange2);
// 构造rowUpdateChange
PrimaryKeyBuilder pk3Builder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pk3Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString("pk3"));
RowUpdateChange rowUpdateChange = new RowUpdateChange(TABLE_NAME, pk3Builder.build());
// 添加一些列
for (int i = 0; i < 10; i++) {
rowUpdateChange.put(new Column("Col" + i, ColumnValue.fromLong(i)));
}
// 删除一列
rowUpdateChange.deleteColumns("Col10");
// 添加到batch操作中
batchWriteRowRequest.addRowChange(rowUpdateChange);
// 构造rowDeleteChange
PrimaryKeyBuilder pk4Builder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pk4Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString("pk4"));
RowDeleteChange rowDeleteChange = new RowDeleteChange(TABLE_NAME, pk4Builder.build());
// 添加到batch操作中
batchWriteRowRequest.addRowChange(rowDeleteChange);
BatchWriteRowResponse response = client.batchWriteRow(batchWriteRowRequest);
System.out.println("是否全部成功:" + response.isAllSucceed());
if (!response.isAllSucceed()) {
for (BatchWriteRowResponse.RowResult rowResult : response.getFailedRows()) {
System.out.println("失败的行:" + batchWriteRowRequest.getRowChange(rowResult.getTableName(), rowResult.getIndex()).getPrimaryKey());
System.out.println("失败原因:" + rowResult.getError());
}
/**
* 可以通过createRequestForRetry方法再构造一个请求对失败的行进行重试.这里只给出构造重试请求的部分.
* 推荐的重试方法是使用SDK的自定义重试策略功能, 支持对batch操作的部分行错误进行重试. 设定重试策略后, 调用接口处即不需要增加重试代码.
*/
BatchWriteRowRequest retryRequest = batchWriteRowRequest.createRequestForRetry(response.getFailedRows());
}
}
private static void getRow(SyncClient client, String pkValue) {
// 构造主键
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(pkValue));
PrimaryKey primaryKey = primaryKeyBuilder.build();
// 读一行
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria(TABLE_NAME, primaryKey);
// 设置读取最新版本
criteria.setMaxVersions(1);
GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
Row row = getRowResponse.getRow();
System.out.println("读取完毕, 结果为: ");
System.out.println(row);
// 设置读取某些列
criteria.addColumnsToGet("Col0");
getRowResponse = client.getRow(new GetRowRequest(criteria));
row = getRowResponse.getRow();
System.out.println("读取完毕, 结果为: ");
System.out.println(row);
}
private static void getRowWithFilter(SyncClient client, String pkValue) {
// 构造主键
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(pkValue));
PrimaryKey primaryKey = primaryKeyBuilder.build();
// 读一行
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria(TABLE_NAME, primaryKey);
// 设置读取最新版本
criteria.setMaxVersions(1);
// 设置过滤器, 当Col0的值为0时返回该行.
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("Col0",
SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromLong(0));
// 如果不存在Col0这一列, 也不返回.
singleColumnValueFilter.setPassIfMissing(false);
// 只判断最新版本
singleColumnValueFilter.setLatestVersionsOnly(true);
criteria.setFilter(singleColumnValueFilter);
GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
Row row = getRowResponse.getRow();
System.out.println("读取完毕, 结果为: ");
System.out.println(row);
}
private static void batchGetRow(SyncClient client) {
MultiRowQueryCriteria multiRowQueryCriteria = new MultiRowQueryCriteria(TABLE_NAME);
// 加入10个要读取的行
for (int i = 0; i < 10; i++) {
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString("pk" + i));
PrimaryKey primaryKey = primaryKeyBuilder.build();
multiRowQueryCriteria.addRow(primaryKey);
}
// 添加条件
multiRowQueryCriteria.setMaxVersions(1);
multiRowQueryCriteria.addColumnsToGet("Col0");
multiRowQueryCriteria.addColumnsToGet("Col1");
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("Col0",
SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromLong(0));
singleColumnValueFilter.setPassIfMissing(false);
multiRowQueryCriteria.setFilter(singleColumnValueFilter);
BatchGetRowRequest batchGetRowRequest = new BatchGetRowRequest();
// batchGetRow支持读取多个表的数据, 一个multiRowQueryCriteria对应一个表的查询条件, 可以添加多个multiRowQueryCriteria.
batchGetRowRequest.addMultiRowQueryCriteria(multiRowQueryCriteria);
BatchGetRowResponse batchGetRowResponse = client.batchGetRow(batchGetRowRequest);
System.out.println("是否全部成功:" + batchGetRowResponse.isAllSucceed());
if (!batchGetRowResponse.isAllSucceed()) {
for (BatchGetRowResponse.RowResult rowResult : batchGetRowResponse.getFailedRows()) {
System.out.println("失败的行:" + batchGetRowRequest.getPrimaryKey(rowResult.getTableName(), rowResult.getIndex()));
System.out.println("失败原因:" + rowResult.getError());
}
/**
* 可以通过createRequestForRetry方法再构造一个请求对失败的行进行重试.这里只给出构造重试请求的部分.
* 推荐的重试方法是使用SDK的自定义重试策略功能, 支持对batch操作的部分行错误进行重试. 设定重试策略后, 调用接口处即不需要增加重试代码.
*/
BatchGetRowRequest retryRequest = batchGetRowRequest.createRequestForRetry(batchGetRowResponse.getFailedRows());
}
}
// 通过Condition实现乐观锁机制, 递增一列.
private static void updateRowWithCondition(SyncClient client, String pkValue) {
// 构造主键
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(pkValue));
PrimaryKey primaryKey = primaryKeyBuilder.build();
// 读一行
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria(TABLE_NAME, primaryKey);
criteria.setMaxVersions(1);
GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
Row row = getRowResponse.getRow();
long col0Value = row.getLatestColumn("Col0").getValue().asLong();
// 条件更新Col0这一列, 使列值+1
RowUpdateChange rowUpdateChange = new RowUpdateChange(TABLE_NAME, primaryKey);
Condition condition = new Condition(RowExistenceExpectation.EXPECT_EXIST);
ColumnCondition columnCondition = new SingleColumnValueCondition("Col0", SingleColumnValueCondition.CompareOperator.EQUAL, ColumnValue.fromLong(col0Value));
condition.setColumnCondition(columnCondition);
rowUpdateChange.setCondition(condition);
rowUpdateChange.put(new Column("Col0", ColumnValue.fromLong(col0Value + 1)));
try {
client.updateRow(new UpdateRowRequest(rowUpdateChange));
} catch (TableStoreException ex) {
System.out.println(ex.toString());
}
}
private static void getRange(SyncClient client, String startPkValue, String endPkValue) {
RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria(TABLE_NAME);
// 设置起始主键
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(startPkValue));
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());
// 设置结束主键
primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(endPkValue));
rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
rangeRowQueryCriteria.setMaxVersions(1);
System.out.println("GetRange的结果为:");
while (true) {
GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
for (Row row : getRangeResponse.getRows()) {
System.out.println(row);
}
// 若nextStartPrimaryKey不为null, 则继续读取.
if (getRangeResponse.getNextStartPrimaryKey() != null) {
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());
} else {
break;
}
}
}
private static void getRangeByIterator(SyncClient client, String startPkValue, String endPkValue) {
RangeIteratorParameter rangeIteratorParameter = new RangeIteratorParameter(TABLE_NAME);
// 设置起始主键
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(startPkValue));
rangeIteratorParameter.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());
// 设置结束主键
primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(endPkValue));
rangeIteratorParameter.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
rangeIteratorParameter.setMaxVersions(1);
Iterator<Row> iterator = client.createRangeIterator(rangeIteratorParameter);
System.out.println("使用Iterator进行GetRange的结果为:");
while (iterator.hasNext()) {
Row row = iterator.next();
System.out.println(row);
}
}
}