DistributedAtomicLong
原子增量操作的计数器,首先尝试使用乐观锁进行增量操作,如果失败,则采用可选的InterProcessMutex
(悲观锁)进行增量操作。 对于乐观锁和悲观锁,重试策略都用于重试增量操作。
各种增量方法都会返回一个AtomicValue
实例,通过调用AtomicValue
实例的succeeded()
可以查询增量操作是否执行成功,除了get()
外,其他任何方法都不保证一定成功。
AtomicValue
接口源码(原子操作返回值的抽象):
public interface AtomicValue<T>
{
/**
* 如果操作成功,则返回true
* 如果返回false,则操作失败
*/
public boolean succeeded();
/**
* 返回操作前计数器的值
*/
public T preValue();
/**
* 返回操作后计数器的值
*/
public T postValue();
/**
* 返回操作的调试统计信息,比如乐观锁、悲观锁尝试的次数与时间
*/
public AtomicStats getStats();
}
DistributedAtomicLong
类中的内部类AtomicLong
实现了AtomicValue
接口,但实际上只是起到封装的作用,所有的调用都委托给了bytes
属性(其他实现类的实例)。
private class AtomicLong implements AtomicValue<Long>
{
private AtomicValue<byte[]> bytes;
private AtomicLong(AtomicValue<byte[]> bytes)
{
this.bytes = bytes;
}
@Override
public boolean succeeded()
{
return bytes.succeeded();
}
@Override
public Long preValue()
{
return bytesToValue(bytes.preValue());
}
@Override
public Long postValue()
{
return bytesToValue(bytes.postValue());
}
@Override
public AtomicStats getStats()
{
return bytes.getStats();
}
}
DistributedAtomicLong
类实现了DistributedAtomicNumber
接口,并且DistributedAtomicLong
将各种原子操作的执行委托给了DistributedAtomicValue
。
public class DistributedAtomicLong implements DistributedAtomicNumber<Long>
{
private final DistributedAtomicValue value;
...
}
DistributedAtomicNumber
接口是分布式原子数值类型的抽象,定义了分布式原子数值类型需要提供的方法。
public interface DistributedAtomicNumber<T>
{
public AtomicValue<T> get() throws Exception;
public AtomicValue<T> compareAndSet(T expectedValue, T newValue) throws Exception;
public AtomicValue<T> trySet(T newValue) throws Exception;
public boolean initialize(T value) throws Exception;
public void forceSet(T newValue) throws Exception;
public AtomicValue<T> increment() throws Exception;
public AtomicValue<T> decrement() throws Exception;
public AtomicValue<T> add(T delta) throws Exception;
public AtomicValue<T> subtract(T delta) throws Exception;
}
目前DistributedAtomicNumber
接口有两种实现,除了DistributedAtomicLong
类,还有DistributedAtomicInteger
类。
并且DistributedAtomicInteger
也是将各种原子操作的执行委托给了DistributedAtomicValue
,所以这两种实现是类似的,只不过表示的数值类型不同而已。
public class DistributedAtomicInteger implements DistributedAtomicNumber<Integer>
{
private final DistributedAtomicValue value;
...
}
DistributedAtomicValue
是原子操作真正的执行者,因此可以知道内部类AtomicLong
的bytes
属性是MutableAtomicValue
实例。
public AtomicValue<byte[]> get() throws Exception
{
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
...
return result;
}
测试
pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kaven</groupId>
<artifactId>zookeeper</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
</project>
CuratorFrameworkProperties
类(提供CuratorFramework
需要的一些配置信息,以及创建CuratorFramework
实例的方法):
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorFrameworkProperties {
// 连接地址
public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
// 连接超时时间
public static final int CONNECTION_TIMEOUT_MS = 40000;
// Session超时时间
public static final int SESSION_TIMEOUT_MS = 10000;
// 命名空间
public static final String NAMESPACE = "MyNamespace";
// 重试策略
public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
public static CuratorFramework getCuratorFramework() {
// 创建CuratorFramework实例
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
.retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
.connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
.namespace(CuratorFrameworkProperties.NAMESPACE)
.build();
curator.start();
assert curator.getState().equals(CuratorFrameworkState.STARTED);
return curator;
}
}
DistributedAtomicLongRunnable
类(实现了Runnable
接口,模拟分布式节点操作分布式原子长整型):
package com.kaven.zookeeper;
import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.RetryNTimes;
public class DistributedAtomicLongRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework实例,表示不同的分布式节点
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 共享计数器的路径
String counterPath = "/kaven";
// 创建DistributedAtomicLong实例,用于操作分布式原子长整型
// new RetryNTimes(100, 5)是乐观锁的重试策略实例
DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
new RetryNTimes(100, 5));
// 初始化
boolean initialize = atomicLong.initialize(100L);
if(initialize) {
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 成功");
}
else {
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 失败");
}
// 比较再设置,当Zookeeper中的值与期望值相等时才能设置新值
AtomicValue<Long> longAtomicValue = atomicLong.compareAndSet(100L, 501L);
if(longAtomicValue.succeeded()) {
System.out.println(Thread.currentThread().getName() + " compareAndSet 成功");
}
else {
System.out.println(Thread.currentThread().getName() + " compareAndSet 失败");
}
}
}
启动类:
package com.kaven.zookeeper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
// 分布式节点处理业务
for (int i = 0; i < 15; i++) {
EXECUTOR_SERVICE.execute(new DistributedAtomicLongRunnable());
}
}
}
模拟15
个分布式节点操作分布式原子长整型,输出如下所示:
pool-1-thread-12初始化 atomicLong 成功
pool-1-thread-11初始化 atomicLong 失败
pool-1-thread-10初始化 atomicLong 失败
pool-1-thread-14初始化 atomicLong 失败
pool-1-thread-15初始化 atomicLong 失败
pool-1-thread-8初始化 atomicLong 失败
pool-1-thread-13初始化 atomicLong 失败
pool-1-thread-6初始化 atomicLong 失败
pool-1-thread-1初始化 atomicLong 失败
pool-1-thread-7初始化 atomicLong 失败
pool-1-thread-5初始化 atomicLong 失败
pool-1-thread-3初始化 atomicLong 失败
pool-1-thread-9初始化 atomicLong 失败
pool-1-thread-2初始化 atomicLong 失败
pool-1-thread-4初始化 atomicLong 失败
pool-1-thread-8 compareAndSet 失败
pool-1-thread-14 compareAndSet 失败
pool-1-thread-10 compareAndSet 失败
pool-1-thread-6 compareAndSet 失败
pool-1-thread-15 compareAndSet 失败
pool-1-thread-13 compareAndSet 失败
pool-1-thread-7 compareAndSet 失败
pool-1-thread-9 compareAndSet 失败
pool-1-thread-11 compareAndSet 失败
pool-1-thread-5 compareAndSet 失败
pool-1-thread-12 compareAndSet 失败
pool-1-thread-1 compareAndSet 失败
pool-1-thread-3 compareAndSet 成功
pool-1-thread-4 compareAndSet 失败
pool-1-thread-2 compareAndSet 失败
输出是符合预期的,两种操作都只有一个节点执行成功。DistributedAtomicValue
类的initialize
和compareAndSet
方法如下所示,其实就是创建Zookeeper
节点(只有一个服务能创建成功)和基于版本设置节点的值(在博主的测试程序中,也只能有一个服务将该操作执行成功),而这两种操作并没有使用锁(乐观锁和悲观锁)。
public boolean initialize(byte[] value) throws Exception
{
try
{
client.create().creatingParentContainersIfNeeded().forPath(path, value);
}
catch ( KeeperException.NodeExistsException ignore )
{
// ignore
return false;
}
return true;
}
public AtomicValue<byte[]> compareAndSet(byte[] expectedValue, byte[] newValue) throws Exception
{
Stat stat = new Stat();
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
boolean createIt = getCurrentValue(result, stat);
if ( !createIt && Arrays.equals(expectedValue, result.preValue) )
{
try
{
client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
result.succeeded = true;
result.postValue = newValue;
}
catch ( KeeperException.BadVersionException dummy )
{
result.succeeded = false;
}
catch ( KeeperException.NoNodeException dummy )
{
result.succeeded = false;
}
}
else
{
result.succeeded = false;
}
return result;
}
increment
、decrement
、add
以及subtract
这四种操作是类似的,博主只演示increment
操作。
DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
new RetryNTimes(100, 5));
boolean initialize = atomicLong.initialize(100L);
if(initialize) {
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 成功");
}
else {
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 失败");
}
for (int i = 0; i < 1000; i++) {
Thread.sleep(5);
atomicLong.increment();
}
System.out.println(Thread.currentThread().getName() + "操作成功");
System.out.println(Thread.currentThread().getName() + "当前的值为" + atomicLong.get().postValue());
输出如下所示:
pool-1-thread-8初始化 atomicLong 失败
pool-1-thread-1初始化 atomicLong 失败
pool-1-thread-3初始化 atomicLong 失败
pool-1-thread-14初始化 atomicLong 失败
pool-1-thread-5初始化 atomicLong 失败
pool-1-thread-12初始化 atomicLong 成功
pool-1-thread-2初始化 atomicLong 失败
pool-1-thread-4初始化 atomicLong 失败
pool-1-thread-15初始化 atomicLong 失败
pool-1-thread-13初始化 atomicLong 失败
pool-1-thread-11初始化 atomicLong 失败
pool-1-thread-9初始化 atomicLong 失败
pool-1-thread-7初始化 atomicLong 失败
pool-1-thread-6初始化 atomicLong 失败
pool-1-thread-10初始化 atomicLong 失败
pool-1-thread-15操作成功
pool-1-thread-15当前的值为14289
pool-1-thread-3操作成功
pool-1-thread-3当前的值为14305
pool-1-thread-13操作成功
pool-1-thread-13当前的值为14420
pool-1-thread-2操作成功
pool-1-thread-2当前的值为14681
pool-1-thread-4操作成功
pool-1-thread-4当前的值为14876
pool-1-thread-1操作成功
pool-1-thread-1当前的值为14906
pool-1-thread-5操作成功
pool-1-thread-5当前的值为14953
pool-1-thread-8操作成功
pool-1-thread-8当前的值为14972
pool-1-thread-14操作成功
pool-1-thread-14当前的值为15001
pool-1-thread-7操作成功
pool-1-thread-7当前的值为15020
pool-1-thread-10操作成功
pool-1-thread-10当前的值为15051
pool-1-thread-11操作成功
pool-1-thread-11当前的值为15053
pool-1-thread-9操作成功
pool-1-thread-9当前的值为15060
pool-1-thread-12操作成功
pool-1-thread-12当前的值为15093
pool-1-thread-6操作成功
pool-1-thread-6当前的值为15100
最后的值为15100
符合预期。increment
、decrement
、add
以及subtract
这四个方法都调用worker
方法来完成。
@Override
public AtomicValue<Long> increment() throws Exception
{
return worker(1L);
}
@Override
public AtomicValue<Long> decrement() throws Exception
{
return worker(-1L);
}
@Override
public AtomicValue<Long> add(Long delta) throws Exception
{
return worker(delta);
}
@Override
public AtomicValue<Long> subtract(Long delta) throws Exception
{
return worker(-1 * delta);
}
DistributedAtomicLong
类的worker
方法则是调用DistributedAtomicValue
类的trySet
方法来完成。
private AtomicValue<Long> worker(final Long addAmount) throws Exception
{
Preconditions.checkNotNull(addAmount, "addAmount cannot be null");
MakeValue makeValue = new MakeValue()
{
@Override
public byte[] makeFrom(byte[] previous)
{
long previousValue = (previous != null) ? bytesToValue(previous) : 0;
long newValue = previousValue + addAmount;
return valueToBytes(newValue);
}
};
AtomicValue<byte[]> result = value.trySet(makeValue);
return new AtomicLong(result);
}
DistributedAtomicValue
类的trySet
方法尝试以原子方式将计数器的值设置为给定值,首先尝试使用乐观锁进行操作,如果失败,则采用可选的InterProcessMutex
(悲观锁)进行操作。
// 尝试以原子方式将计数器的值设置为给定值
public AtomicValue<byte[]> trySet(final byte[] newValue) throws Exception
{
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
MakeValue makeValue = new MakeValue()
{
@Override
public byte[] makeFrom(byte[] previous)
{
return newValue;
}
};
// 尝试使用乐观锁
tryOptimistic(result, makeValue);
if ( !result.succeeded() && (mutex != null) )
{
// 如果在乐观锁下执行不成功,并且有悲观锁
// 尝试使用悲观锁
tryWithMutex(result, makeValue);
}
return result;
}
DistributedAtomicLong
类的trySet
方法用于尝试设置计数器的值,也是通过调用DistributedAtomicValue
类的trySet
方法来完成。
@Override
public AtomicValue<Long> trySet(Long newValue) throws Exception
{
return new AtomicLong(value.trySet(valueToBytes(newValue)));
}
DistributedAtomicLong
类的forceSet
方法用于强制设置计数器的值,通过调用DistributedAtomicValue
类的forceSet
方法来完成。
@Override
public void forceSet(Integer newValue) throws Exception
{
value.forceSet(valueToBytes(newValue));
}
DistributedAtomicValue
类的forceSet
方法如下所示,就是直接设置Zookeeper
节点的值。
/**
* 强制设置值
*/
public void forceSet(byte[] newValue) throws Exception
{
try
{
client.setData().forPath(path, newValue);
}
catch ( KeeperException.NoNodeException dummy )
{
try
{
client.create().creatingParentContainersIfNeeded().forPath(path, newValue);
}
catch ( KeeperException.NodeExistsException dummy2 )
{
client.setData().forPath(path, newValue);
}
}
}
这些方法比较简单,博主就不演示了。
给DistributedAtomicLong
设置悲观锁可以如下所示进行操作:
PromotedToLock promotedToLock = PromotedToLock.builder()
// 用于分布式锁的Zookeeper路径
.lockPath("/lock")
// 锁的重试策略
.retryPolicy(new RetryNTimes(100, 5))
// 锁的超时时间
.timeout(10000, TimeUnit.SECONDS)
.build();
DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
new RetryNTimes(100, 5),
promotedToLock);
Curator
框架的共享计数器DistributedAtomicLong
就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。