一 Leaf号段模式设计
二 Leaf号段模式解析
- 初始化数据源和Dao层
@Bean
@Order(1)
@ConditionalOnMissingBean({IdGen.class})
public IdGen idGen() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl(properties.getJdbcUrl());
dataSource.setUsername(properties.getUsername());
dataSource.setPassword(properties.getPassword());
try {
dataSource.init();
} catch (SQLException exception) {
exception.printStackTrace();
}
TransactionFactory transactionFactory = new JdbcTransactionFactory();
Environment environment = new Environment("development", transactionFactory, dataSource);
org.apache.ibatis.session.Configuration configuration =
new org.apache.ibatis.session.Configuration(environment);
configuration.addMapper(SegmentMapper.class);
SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
SegmentDao segmentDao = new SegmentDao();
segmentDao.setSqlSessionFactory(sqlSessionFactory);
SegmentIdGenImpl idGen = new SegmentIdGenImpl();
idGen.setSegmentDao(segmentDao);
// init buffer
idGen.init();
return idGen;
}
- idGen.init() 方法初始化缓存
@Override
public void init() {
updateIdCacheFromDB();
scheduledUpdateIdCacheFromDB();
idCacheInitOk = true;
logger.info("IdCache initialized...");
}
protected void updateIdCacheFromDB() {
List<String> insertTas = segmentDao.getAllTags();
if (insertTas == null || insertTas.size() == 0) {
return;
}
Set<String> curTags = cache.keySet();
List<String> dbTags = new ArrayList<>(insertTas);
Set<String> cacheTags = new HashSet<>(curTags);
insertTas.removeAll(curTags);
for (String insertTag : insertTas) {
// 这里初始化SegmentBuffer、里面默认两个号段Segment
// public SegmentBuffer() {
// segments = new Segment[] {new Segment(), new Segment()};
// currentPos = 0;
// initOk = false;
// lock = new ReentrantReadWriteLock();
// nextReady = false;
// atomicBoolean = new AtomicBoolean(false);
// }
SegmentBuffer segmentBuffer = new SegmentBuffer();
segmentBuffer.setTag(insertTag);
Segment segment = segmentBuffer.getCurrent();
segment.setMaxId(0);
segment.setStep(0);
segment.setValue(new AtomicLong(0));
cache.put(insertTag, segmentBuffer);
}
// 删除数据库没有的Tags
cacheTags.removeAll(dbTags);
for (String removeTag : cacheTags) {
if (cache.containsKey(removeTag)) {
cache.remove(removeTag);
}
}
}
protected void scheduledUpdateIdCacheFromDB() {
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r);
t.setName("Update-IdCache-Thread");
t.setDaemon(true);
return t;
});
service.scheduleWithFixedDelay(() -> updateIdCacheFromDB(), 60, 300, TimeUnit.SECONDS);
}
- 获取ID逻辑
- 第一次先从数据库刷新最新的Segment号段
@Override
public Long genId(String bizTag) {
if (!idCacheInitOk) {
throw new RuntimeException("IdCache is not ready.");
}
if (cache.containsKey(bizTag)) {
SegmentBuffer segmentBuffer = cache.get(bizTag);
if (!segmentBuffer.isInitOk()) {
synchronized (segmentBuffer) {
if (!segmentBuffer.isInitOk()) {
updateSegmentBufferFromDB(segmentBuffer.getTag(), segmentBuffer.getCurrent());
segmentBuffer.setInitOk(true);
}
}
}
return getIdFromSegmentBuffer(segmentBuffer);
}
throw new RuntimeException("SegmentBuffer is not ready.");
}
protected void updateSegmentBufferFromDB(String tag, Segment current) {
// 每次更新maxId,获取一个新的号段
try {
// 从数据库获取最新的号段放到缓存中的segment中
Segment segment = segmentDao.updateMaxIdAndGetSegment(tag);
current.setBizTag(segment.getBizTag());
current.setMaxId(segment.getMaxId());
current.setStep(segment.getStep());
current.getValue().set(current.getMaxId() - current.getStep());
} catch (Exception e) {
logger.warn("update segment {} from db {}", tag, current);
}
}
- 获取ID以及双Segment设计
protected Long getIdFromSegmentBuffer(final SegmentBuffer buffer) {
while (true) {
buffer.rLock().lock();
try {
final Segment segment = buffer.getCurrent();
// 懒加载第二个Segment
// 如果nextSegment未准备好,其它线程阻塞,只允许一个线程进来预加载segment
if (!buffer.isNextReady()
&& segment.getIdle() < 0.5 * segment.getStep()
&& buffer.getAtomicBoolean().compareAndSet(false, true)) {
service.execute(() -> {
Segment next = buffer.getSegments()[buffer.nextPos()];
boolean updateOK = false;
try {
updateSegmentBufferFromDB(buffer.getTag(), next);
updateOK = true;
} catch (Exception e) {
throw new RuntimeException(e.getCause());
} finally {
if (updateOK) {
// 该代码必须和下面的buffer.switchPos互斥,因此需要单开线程池利用buffer的写锁互斥功能
buffer.wLock().lock();
buffer.setNextReady(true);
buffer.getAtomicBoolean().set(false);
buffer.wLock().unlock();
} else {
buffer.getAtomicBoolean().set(false);
}
}
});
}
long value = segment.getValue().getAndIncrement();
if (value < segment.getMaxId()) {
return value;
}
} finally {
buffer.rLock().unlock();
}
//假如处于segment预加载中,则等加载完毕
waitAndSleep(buffer);
// 同时只能有一个线程进行切换
buffer.wLock().lock();
try {
final Segment segment = buffer.getCurrent();
long value = segment.getValue().getAndIncrement();
if (value < segment.getMaxId()) {
return value;
}
if (buffer.isNextReady()) {
buffer.switchPos();
buffer.setNextReady(false);
} else {
// 如果到这里,说明双buff不够用了,需要调整buffer中Id号码个数,加大step
logger.warn("Both segment is not ready, step must be increased!");
}
} finally {
buffer.wLock().unlock();
}
}
}
protected void waitAndSleep(SegmentBuffer buffer) {
int roll = 0;
while (buffer.getAtomicBoolean().get()) {
roll += 1;
if (roll > 10000) {
try {
TimeUnit.MILLISECONDS.sleep(10);
break;
} catch (InterruptedException e) {
logger.warn("Thread {} Interrupted",Thread.currentThread().getName());
break;
}
}
}
}
三 使用方式
项目地址 ID生成器号段模式Starter
一 创建数据库和表
CREATE DATABASE IF NOT EXISTS segment;
USE segment;
CREATE TABLE `segment`
(
`biz_tag` varchar(128) NOT NULL DEFAULT '', -- your biz unique name
`max_id` bigint(20) NOT NULL DEFAULT '1',
`step` int(11) NOT NULL,
`description` varchar(256) DEFAULT NULL,
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`biz_tag`)
) ENGINE=InnoDB;
insert into segment(biz_tag, max_id, step, description)
-- 初始化业务模块和号段
values ('segment-test', 1, 10, '系统测试')
二 配置yml
component:
idworker:
enabled: true
jdbc-url: jdbc:mysql://127.0.0.1:3306/segment?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false&nullCatalogMeansCurrent=true&useInformationSchema=false
username: root
password: qwer1234
三 并发测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class OsComponentIdWorkerApplicationTests {
@Autowired
private SegmentService segmentService;
@Test
public void contextLoads() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(100);
for (int i = 0; i < 1000; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 0; j < 100; j++) {
Long test = segmentService.getSegmentId("segment-test");
System.out.println(test);
}
}
});
thread.setName("" + i);
thread.start();
countDownLatch.countDown();
}
Thread.sleep(30000);
}
}