0
点赞
收藏
分享

微信扫一扫

ZooKeeper : Curator框架之共享计数器DistributedAtomicLong


DistributedAtomicLong

原子增量操作的计数器,首先尝试使用乐观锁进行增量操作,如果失败,则采用可选的​​InterProcessMutex​​(悲观锁)进行增量操作。 对于乐观锁和悲观锁,重试策略都用于重试增量操作。

各种增量方法都会返回一个​​AtomicValue​​​实例,通过调用​​AtomicValue​​​实例的​​succeeded()​​​可以查询增量操作是否执行成功,除了​​get()​​ 外,其他任何方法都不保证一定成功。

​AtomicValue​​接口源码(原子操作返回值的抽象):

public interface AtomicValue<T>
{
/**
* 如果操作成功,则返回true
* 如果返回false,则操作失败
*/
public boolean succeeded();

/**
* 返回操作前计数器的值
*/
public T preValue();

/**
* 返回操作后计数器的值
*/
public T postValue();

/**
* 返回操作的调试统计信息,比如乐观锁、悲观锁尝试的次数与时间
*/
public AtomicStats getStats();
}

​DistributedAtomicLong​​​类中的内部类​​AtomicLong​​​实现了​​AtomicValue​​​接口,但实际上只是起到封装的作用,所有的调用都委托给了​​bytes​​属性(其他实现类的实例)。

private class AtomicLong implements AtomicValue<Long>
{
private AtomicValue<byte[]> bytes;

private AtomicLong(AtomicValue<byte[]> bytes)
{
this.bytes = bytes;
}

@Override
public boolean succeeded()
{
return bytes.succeeded();
}

@Override
public Long preValue()
{
return bytesToValue(bytes.preValue());
}

@Override
public Long postValue()
{
return bytesToValue(bytes.postValue());
}

@Override
public AtomicStats getStats()
{
return bytes.getStats();
}
}

​DistributedAtomicLong​​​类实现了​​DistributedAtomicNumber​​​接口,并且​​DistributedAtomicLong​​​将各种原子操作的执行委托给了​​DistributedAtomicValue​​。

public class DistributedAtomicLong implements DistributedAtomicNumber<Long>
{
private final DistributedAtomicValue value;
...
}

​DistributedAtomicNumber​​接口是分布式原子数值类型的抽象,定义了分布式原子数值类型需要提供的方法。

public interface DistributedAtomicNumber<T>
{
public AtomicValue<T> get() throws Exception;
public AtomicValue<T> compareAndSet(T expectedValue, T newValue) throws Exception;
public AtomicValue<T> trySet(T newValue) throws Exception;
public boolean initialize(T value) throws Exception;
public void forceSet(T newValue) throws Exception;
public AtomicValue<T> increment() throws Exception;
public AtomicValue<T> decrement() throws Exception;
public AtomicValue<T> add(T delta) throws Exception;
public AtomicValue<T> subtract(T delta) throws Exception;
}

目前​​DistributedAtomicNumber​​​接口有两种实现,除了​​DistributedAtomicLong​​​类,还有​​DistributedAtomicInteger​​类。

ZooKeeper : Curator框架之共享计数器DistributedAtomicLong_初始化


并且​​DistributedAtomicInteger​​​也是将各种原子操作的执行委托给了​​DistributedAtomicValue​​,所以这两种实现是类似的,只不过表示的数值类型不同而已。

public class DistributedAtomicInteger implements DistributedAtomicNumber<Integer>
{
private final DistributedAtomicValue value;
...
}

​DistributedAtomicValue​​​是原子操作真正的执行者,因此可以知道内部类​​AtomicLong​​​的​​bytes​​​属性是​​MutableAtomicValue​​实例。

public AtomicValue<byte[]>     get() throws Exception
{
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
...
return result;
}

测试

​pom.xml​​:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.kaven</groupId>
<artifactId>zookeeper</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
</project>

​CuratorFrameworkProperties​​​类(提供​​CuratorFramework​​​需要的一些配置信息,以及创建​​CuratorFramework​​实例的方法):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorFrameworkProperties {
// 连接地址
public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
// 连接超时时间
public static final int CONNECTION_TIMEOUT_MS = 40000;
// Session超时时间
public static final int SESSION_TIMEOUT_MS = 10000;
// 命名空间
public static final String NAMESPACE = "MyNamespace";
// 重试策略
public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);

public static CuratorFramework getCuratorFramework() {
// 创建CuratorFramework实例
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
.retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
.connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
.namespace(CuratorFrameworkProperties.NAMESPACE)
.build();
curator.start();
assert curator.getState().equals(CuratorFrameworkState.STARTED);
return curator;
}
}

​DistributedAtomicLongRunnable​​​类(实现了​​Runnable​​接口,模拟分布式节点操作分布式原子长整型):

package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.RetryNTimes;

public class DistributedAtomicLongRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework实例,表示不同的分布式节点
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

// 共享计数器的路径
String counterPath = "/kaven";

// 创建DistributedAtomicLong实例,用于操作分布式原子长整型
// new RetryNTimes(100, 5)是乐观锁的重试策略实例
DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
new RetryNTimes(100, 5));

// 初始化
boolean initialize = atomicLong.initialize(100L);
if(initialize) {
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 成功");
}
else {
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 失败");
}

// 比较再设置,当Zookeeper中的值与期望值相等时才能设置新值
AtomicValue<Long> longAtomicValue = atomicLong.compareAndSet(100L, 501L);
if(longAtomicValue.succeeded()) {
System.out.println(Thread.currentThread().getName() + " compareAndSet 成功");
}
else {
System.out.println(Thread.currentThread().getName() + " compareAndSet 失败");
}
}
}

启动类:

package com.kaven.zookeeper;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

public static void main(String[] args) throws Exception {
// 分布式节点处理业务
for (int i = 0; i < 15; i++) {
EXECUTOR_SERVICE.execute(new DistributedAtomicLongRunnable());
}
}
}

模拟​​15​​个分布式节点操作分布式原子长整型,输出如下所示:

pool-1-thread-12初始化 atomicLong 成功
pool-1-thread-11初始化 atomicLong 失败
pool-1-thread-10初始化 atomicLong 失败
pool-1-thread-14初始化 atomicLong 失败
pool-1-thread-15初始化 atomicLong 失败
pool-1-thread-8初始化 atomicLong 失败
pool-1-thread-13初始化 atomicLong 失败
pool-1-thread-6初始化 atomicLong 失败
pool-1-thread-1初始化 atomicLong 失败
pool-1-thread-7初始化 atomicLong 失败
pool-1-thread-5初始化 atomicLong 失败
pool-1-thread-3初始化 atomicLong 失败
pool-1-thread-9初始化 atomicLong 失败
pool-1-thread-2初始化 atomicLong 失败
pool-1-thread-4初始化 atomicLong 失败
pool-1-thread-8 compareAndSet 失败
pool-1-thread-14 compareAndSet 失败
pool-1-thread-10 compareAndSet 失败
pool-1-thread-6 compareAndSet 失败
pool-1-thread-15 compareAndSet 失败
pool-1-thread-13 compareAndSet 失败
pool-1-thread-7 compareAndSet 失败
pool-1-thread-9 compareAndSet 失败
pool-1-thread-11 compareAndSet 失败
pool-1-thread-5 compareAndSet 失败
pool-1-thread-12 compareAndSet 失败
pool-1-thread-1 compareAndSet 失败
pool-1-thread-3 compareAndSet 成功
pool-1-thread-4 compareAndSet 失败
pool-1-thread-2 compareAndSet 失败

输出是符合预期的,两种操作都只有一个节点执行成功。​​DistributedAtomicValue​​​类的​​initialize​​​和​​compareAndSet​​​方法如下所示,其实就是创建​​Zookeeper​​节点(只有一个服务能创建成功)和基于版本设置节点的值(在博主的测试程序中,也只能有一个服务将该操作执行成功),而这两种操作并没有使用锁(乐观锁和悲观锁)。

public boolean initialize(byte[] value) throws Exception
{
try
{
client.create().creatingParentContainersIfNeeded().forPath(path, value);
}
catch ( KeeperException.NodeExistsException ignore )
{
// ignore
return false;
}
return true;
}

public AtomicValue<byte[]> compareAndSet(byte[] expectedValue, byte[] newValue) throws Exception
{
Stat stat = new Stat();
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
boolean createIt = getCurrentValue(result, stat);
if ( !createIt && Arrays.equals(expectedValue, result.preValue) )
{
try
{
client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
result.succeeded = true;
result.postValue = newValue;
}
catch ( KeeperException.BadVersionException dummy )
{
result.succeeded = false;
}
catch ( KeeperException.NoNodeException dummy )
{
result.succeeded = false;
}
}
else
{
result.succeeded = false;
}
return result;
}

​increment​​​、​​decrement​​​、​​add​​​以及​​subtract​​​这四种操作是类似的,博主只演示​​increment​​操作。

DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
new RetryNTimes(100, 5));

boolean initialize = atomicLong.initialize(100L);
if(initialize) {
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 成功");
}
else {
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 失败");
}

for (int i = 0; i < 1000; i++) {
Thread.sleep(5);
atomicLong.increment();
}
System.out.println(Thread.currentThread().getName() + "操作成功");
System.out.println(Thread.currentThread().getName() + "当前的值为" + atomicLong.get().postValue());

输出如下所示:

pool-1-thread-8初始化 atomicLong 失败
pool-1-thread-1初始化 atomicLong 失败
pool-1-thread-3初始化 atomicLong 失败
pool-1-thread-14初始化 atomicLong 失败
pool-1-thread-5初始化 atomicLong 失败
pool-1-thread-12初始化 atomicLong 成功
pool-1-thread-2初始化 atomicLong 失败
pool-1-thread-4初始化 atomicLong 失败
pool-1-thread-15初始化 atomicLong 失败
pool-1-thread-13初始化 atomicLong 失败
pool-1-thread-11初始化 atomicLong 失败
pool-1-thread-9初始化 atomicLong 失败
pool-1-thread-7初始化 atomicLong 失败
pool-1-thread-6初始化 atomicLong 失败
pool-1-thread-10初始化 atomicLong 失败
pool-1-thread-15操作成功
pool-1-thread-15当前的值为14289
pool-1-thread-3操作成功
pool-1-thread-3当前的值为14305
pool-1-thread-13操作成功
pool-1-thread-13当前的值为14420
pool-1-thread-2操作成功
pool-1-thread-2当前的值为14681
pool-1-thread-4操作成功
pool-1-thread-4当前的值为14876
pool-1-thread-1操作成功
pool-1-thread-1当前的值为14906
pool-1-thread-5操作成功
pool-1-thread-5当前的值为14953
pool-1-thread-8操作成功
pool-1-thread-8当前的值为14972
pool-1-thread-14操作成功
pool-1-thread-14当前的值为15001
pool-1-thread-7操作成功
pool-1-thread-7当前的值为15020
pool-1-thread-10操作成功
pool-1-thread-10当前的值为15051
pool-1-thread-11操作成功
pool-1-thread-11当前的值为15053
pool-1-thread-9操作成功
pool-1-thread-9当前的值为15060
pool-1-thread-12操作成功
pool-1-thread-12当前的值为15093
pool-1-thread-6操作成功
pool-1-thread-6当前的值为15100

最后的值为​​15100​​​符合预期。​​increment​​​、​​decrement​​​、​​add​​​以及​​subtract​​​这四个方法都调用​​worker​​方法来完成。

@Override
public AtomicValue<Long> increment() throws Exception
{
return worker(1L);
}

@Override
public AtomicValue<Long> decrement() throws Exception
{
return worker(-1L);
}

@Override
public AtomicValue<Long> add(Long delta) throws Exception
{
return worker(delta);
}

@Override
public AtomicValue<Long> subtract(Long delta) throws Exception
{
return worker(-1 * delta);
}

​DistributedAtomicLong​​​类的​​worker​​​方法则是调用​​DistributedAtomicValue​​​类的​​trySet​​方法来完成。

private AtomicValue<Long>   worker(final Long addAmount) throws Exception
{
Preconditions.checkNotNull(addAmount, "addAmount cannot be null");

MakeValue makeValue = new MakeValue()
{
@Override
public byte[] makeFrom(byte[] previous)
{
long previousValue = (previous != null) ? bytesToValue(previous) : 0;
long newValue = previousValue + addAmount;
return valueToBytes(newValue);
}
};

AtomicValue<byte[]> result = value.trySet(makeValue);
return new AtomicLong(result);
}

​DistributedAtomicValue​​​类的​​trySet​​​方法尝试以原子方式将计数器的值设置为给定值,首先尝试使用乐观锁进行操作,如果失败,则采用可选的​​InterProcessMutex​​(悲观锁)进行操作。

// 尝试以原子方式将计数器的值设置为给定值
public AtomicValue<byte[]> trySet(final byte[] newValue) throws Exception
{
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);

MakeValue makeValue = new MakeValue()
{
@Override
public byte[] makeFrom(byte[] previous)
{
return newValue;
}
};
// 尝试使用乐观锁
tryOptimistic(result, makeValue);
if ( !result.succeeded() && (mutex != null) )
{
// 如果在乐观锁下执行不成功,并且有悲观锁
// 尝试使用悲观锁
tryWithMutex(result, makeValue);
}

return result;
}

​DistributedAtomicLong​​​类的​​trySet​​​方法用于尝试设置计数器的值,也是通过调用​​DistributedAtomicValue​​​类的​​trySet​​方法来完成。

@Override
public AtomicValue<Long> trySet(Long newValue) throws Exception
{
return new AtomicLong(value.trySet(valueToBytes(newValue)));
}

​DistributedAtomicLong​​​类的​​forceSet​​​方法用于强制设置计数器的值,通过调用​​DistributedAtomicValue​​​类的​​forceSet​​方法来完成。

@Override
public void forceSet(Integer newValue) throws Exception
{
value.forceSet(valueToBytes(newValue));
}

​DistributedAtomicValue​​​类的​​forceSet​​​方法如下所示,就是直接设置​​Zookeeper​​节点的值。

/**
* 强制设置值
*/
public void forceSet(byte[] newValue) throws Exception
{
try
{
client.setData().forPath(path, newValue);
}
catch ( KeeperException.NoNodeException dummy )
{
try
{
client.create().creatingParentContainersIfNeeded().forPath(path, newValue);
}
catch ( KeeperException.NodeExistsException dummy2 )
{
client.setData().forPath(path, newValue);
}
}
}

这些方法比较简单,博主就不演示了。

给​​DistributedAtomicLong​​设置悲观锁可以如下所示进行操作:

PromotedToLock promotedToLock = PromotedToLock.builder()
// 用于分布式锁的Zookeeper路径
.lockPath("/lock")
// 锁的重试策略
.retryPolicy(new RetryNTimes(100, 5))
// 锁的超时时间
.timeout(10000, TimeUnit.SECONDS)
.build();

DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
new RetryNTimes(100, 5),
promotedToLock);

​Curator​​​框架的共享计数器​​DistributedAtomicLong​​就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。


举报

相关推荐

0 条评论