0
点赞
收藏
分享

微信扫一扫

SQLServerBulkCopy大容量插入、更新操作

春意暖洋洋 2022-02-16 阅读 86

说明

pro环境在程序启动时需要运行50w update sql
1)分批1000每次,批量更新需要花时间120s左右;
2)采用下面类似第二个测试用例(先把数据插入临时表、临时表同步主表、删除临时表),最终只花了34s,其中插入花了14s,临时表更新到主表花了18s多

代码

package com.ydfind.driver.ok;

import com.microsoft.sqlserver.jdbc.SQLServerBulkCSVFileRecord;
import com.microsoft.sqlserver.jdbc.SQLServerBulkCopy;
import com.microsoft.sqlserver.jdbc.SQLServerBulkCopyOptions;
import com.microsoft.sqlserver.jdbc.SQLServerException;
import org.junit.Test;

import java.io.ByteArrayInputStream;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;

public class MyBulkCopyTest {

    // 1w数据7332ms
    @Test
    public void testBulkInsert() throws SQLException {
        // drd notes:
        long time = System.currentTimeMillis();

        String connectionUrl = "jdbc:sqlserver://localhost:1433;databaseName=dev-data;user=sa;password=465628578";
        String tableName = "bulkCopyTest";
        String deleteSql = "if exists (select * from dbo.sysobjects where id = object_id(N'[dbo].["
                + tableName + "]') and OBJECTPROPERTY(id, N'IsUserTable') = 1) DROP TABLE [" + tableName + "]";
        String createSql = "create table " + tableName + " (c1 bigint, c2 varchar(20))";
        // 要插入的数据
        List<Object[]> list = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            Object[] objects = new Object[2];
            objects[0] = Long.valueOf(i);
            objects[1] = "name_" + i;
            list.add(objects);
        }
        MySQLServerBulkRecord bulkRecord = new MySQLServerBulkRecord(list);
        bulkRecord.addColumnMetadata(1, "c1", Types.BIGINT, 19, 0);
//        bulkRecord.addColumnMetadata(1, "c1", Types.BIGINT, 19, 0);
        bulkRecord.addColumnMetadata(2, "c2", Types.VARCHAR, 20, 0);

        try (Connection connection = DriverManager.getConnection(connectionUrl);
             Statement statement = connection.createStatement();){
            connection.setAutoCommit(false);
            // 卸了旧的表
            statement.execute(deleteSql);
            // 创建新表
            statement.execute(createSql);

            // BULK INSERT
            SQLServerBulkCopyOptions options = new SQLServerBulkCopyOptions();
            options.setTableLock(true);
//            options.setBatchSize(10_0000);
            SQLServerBulkCopy bulkCopy = new SQLServerBulkCopy(connection);
            bulkCopy.setBulkCopyOptions(options);
            bulkCopy.setDestinationTableName(tableName);

            bulkCopy.writeToServer(bulkRecord);

            connection.commit();
            bulkCopy.close();
        }
        System.out.println("cost time " + (System.currentTimeMillis() - time));
    }

    public class MySQLServerBulkRecord extends SQLServerBulkCSVFileRecord {
        private List<Object[]> dataList;
        private int cur;
        public MySQLServerBulkRecord(List<Object[]> dataList) throws SQLServerException {
            super(new ByteArrayInputStream(new byte[]{}), "UTF-8", ",", false);
            this.dataList = dataList;
            cur = -1;
        }

        @Override
        public boolean next() throws SQLServerException {
            return ++cur < dataList.size();
        }

        @Override
        public Object[] getRowData() throws SQLServerException {
            if (cur >= dataList.size()) {
                return null;
            }
            return dataList.get(cur);
        }

    }

    // 更新到bulkCopyTest:先插入到临时表bulkCopyTestTemp,再update到bulkCopyTest,再删除临时表
    // 1w数据cost time 4362
    @Test
    public void testBulkUpdate() throws SQLException {
        // drd notes:
        long time = System.currentTimeMillis();

        String connectionUrl = "jdbc:sqlserver://localhost:1433;databaseName=dev-data;user=sa;password=465628578";
        String tableName = "bulkCopyTestTemp";
        String tableNameTarget = "bulkCopyTest";
        String deleteSql = "if exists (select * from dbo.sysobjects where id = object_id(N'[dbo].["
                + tableName + "]') and OBJECTPROPERTY(id, N'IsUserTable') = 1) DROP TABLE [" + tableName + "]";
        String createSql = "create table " + tableName + " (c1 bigint, c2 varchar(20))";
        String updateSql = "update " + tableNameTarget + " with(TABLOCK)  set c2 = t2.c2 from " + tableName + " t2 with(nolock) where " + tableNameTarget + ".c1 = t2.c1";
        // 要插入的数据
        List<Object[]> list = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            Object[] objects = new Object[2];
            objects[0] = Long.valueOf(i);
            objects[1] = "name1_" + i;
            list.add(objects);
        }
        MySQLServerBulkRecord bulkRecord = new MySQLServerBulkRecord(list);
        bulkRecord.addColumnMetadata(1, "c1", Types.BIGINT, 19, 0);
//        bulkRecord.addColumnMetadata(1, "c1", Types.BIGINT, 19, 0);
        bulkRecord.addColumnMetadata(2, "c2", Types.VARCHAR, 20, 0);

        try (Connection connection = DriverManager.getConnection(connectionUrl);
             Statement statement = connection.createStatement();){
            connection.setAutoCommit(false);
            // 临时表
            statement.execute(deleteSql);
            statement.execute(createSql);

            SQLServerBulkCopyOptions options = new SQLServerBulkCopyOptions();
            options.setTableLock(true);
            options.setBatchSize(10_0000);
            SQLServerBulkCopy bulkCopy = new SQLServerBulkCopy(connection);
            bulkCopy.setBulkCopyOptions(options);
            bulkCopy.setDestinationTableName(tableName);

            bulkCopy.writeToServer(bulkRecord);

            // 从临时表更新到主表
            statement.execute(updateSql);
            // 删除临时表
            statement.execute(deleteSql);

            connection.commit();
            bulkCopy.close();
        }
        // 1w数据7332ms
        System.out.println("cost time " + (System.currentTimeMillis() - time));
    }
}

分析

第一个测试用例后,结果:
在这里插入图片描述
第二个测试用例结果
在这里插入图片描述

举报

相关推荐

0 条评论