Redis介绍
Redis是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。相比Memcached它支持存储的类型相对更多(字符、哈希、集合、有序集合、列表、GEO),同时Redis是线程安全的。2010年3月15日起,Redis的开发工作由VMware主持,2013年5月开始,Redis的开发由Pivotal赞助。
Lettuce
Lettuce和Jedis的都是连接Redis Server的客户端程序。Jedis在实现上是直连redis server,多线程环境下非线程安全,除非使用连接池,为每个Jedis实例增加物理连接。Lettuce基于Netty的连接实例(StatefulRedisConnection),可以在多个线程间并发访问,且线程安全,满足多线程环境下的并发访问,同时它是可伸缩的设计,一个连接实例不够的情况也可以按需增加连接实例
导入依赖
        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>5.0.4.RELEASE</version>
        </dependency>
单机模式下代码测试
package testRedis;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisStringCommands;
public class BasicUsage {
    public static void main(String[] args) {
        // client
        RedisClient client = RedisClient.create("redis://localhost");
        // connection, 线程安全的长连接,连接丢失时会自动重连,直到调用 close 关闭连接。
        StatefulRedisConnection<String, String> connection = client.connect();
        // sync, 默认超时时间为 60s. 
        RedisStringCommands<String, String> sync = connection.sync();
        sync.set("host", "note.abeffect.com");
        String value = sync.get("host");
        System.out.println(value);
        // close connection
        connection.close();
        // shutdown
        client.shutdown();
    }
}
集群模式
public class LettuceClusterClient {  
 
    public static void main(String[] args) {  
        ArrayList<RedisURI> list = new ArrayList<>();  
        list.add(RedisURI.create("redis://192.168.37.128:7000"));  
        list.add(RedisURI.create("redis://192.168.37.128:7001"));  
        list.add(RedisURI.create("redis://192.168.37.128:7002"));  
        list.add(RedisURI.create("redis://192.168.37.128:7003"));  
        list.add(RedisURI.create("redis://192.168.37.128:7004"));  
        list.add(RedisURI.create("redis://192.168.37.128:7005"));  
        RedisClusterClient client = RedisClusterClient.create(list);  
        //RedisClusterClient client = RedisClusterClient.create("redis://192.168.37.128:7000");  
        StatefulRedisClusterConnection<String, String> connect = client.connect();  
          
        /* 同步执行的命令 */  
        RedisAdvancedClusterCommands<String, String> commands = connect.sync();  
        String str = commands.get("test2");  
        System.out.println(str);  
          
        /*  异步执行的命令  */  
//      RedisAdvancedClusterAsyncCommands<String, String> commands= connect.async();  
//      RedisFuture<String> future = commands.get("test2");  
//      try {  
//          String str = future.get();  
//          System.out.println(str);  
//      } catch (InterruptedException e) {  
//          e.printStackTrace();  
//      } catch (ExecutionException e) {  
//          e.printStackTrace();  
//      }  
          
        connect.close();  
        client.shutdown();  
    }  
} 
其它同步使用方式
设定超时时间为 20s
        RedisClient client = RedisClient.create(RedisURI.create("localhost", 6379));
        client.setDefaultTimeout(Duration.ofSeconds(20));
使用 RedisURI
        RedisURI redisUri = RedisURI.Builder.redis("localhost").withPassword("authentication").withDatabase(2).build();
        RedisClient client = RedisClient.create(redisUri);
或者
        RedisURI redisUri = RedisURI.create("redis://authentication@localhost/2");
        RedisClient client = RedisClient.create(redisUri);
异步使用
异步调用,可以避免将 CPU 浪费在等待网络 IO 和磁盘 IO 时上,实现提高资源使用率。
基本例子
package testRedis;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisStringAsyncCommands;
public class AsynchronousAPI {
    public static void main(String[] args) {
        // client
        RedisClient client = RedisClient.create("redis://localhost");
        // connect
        StatefulRedisConnection<String, String> connection = client.connect();
        // async
        RedisStringAsyncCommands<String, String> async = connection.async();
        RedisFuture<String> future = async.get("host");
        try {
            String value = future.get(60, TimeUnit.SECONDS);
            System.out.println(value);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
        }
        connection.close();
        client.shutdown();
    }
}
使用 Consumer 监听器
        RedisFuture<String> future = async.get("host");
        future.thenAccept(new Consumer<String>() {
            @Override
            public void accept(String value) {
                System.out.println(value);
            }
        });
使用 Lambda 表达式
        RedisFuture<String> future = async.get("host");        
        future.thenAccept(System.out::println);
使用独立的线程池
为了防止阻塞默认的线程池,可以在单独的线程池中执行异步请求。
        Executor sharedExecutor = Executors.newFixedThreadPool(1);
        RedisFuture<String> future = async.get("host");
        future.thenAcceptAsync(System.out::println, sharedExecutor);
更多的例子见Asynchronous-API 官方文档
Reactive 调用
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.RedisStringReactiveCommands;
public class ReactiveAPI {
    public static void main(String[] args) {
        // client
        RedisClient client = RedisClient.create("redis://localhost");
        // connect
        StatefulRedisConnection<String, String> connection = client.connect();
        // reactive
        RedisStringReactiveCommands<String, String> reactive = connection.reactive();
        reactive.get("host").subscribe(System.out::println);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        connection.close();
        client.shutdown();
    }
}
更多查看Reactive API 官方文档,或者 Reactive 相关资料。
Pub/Sub
import io.lettuce.core.RedisClient;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
public class PubSubApi {
    public static void main(String[] args) {
        RedisClient client = RedisClient.create("redis://localhost");
        // connection
        RedisPubSubListener<String, String> listener = new RedisPubSubListener<String, String>() {
            @Override
            public void message(String pattern, String channel) {
                System.out.println("message:" + pattern + "," + channel);
            }
            @Override
            public void message(String pattern, String channel, String message) {
                System.out.println("message:" + pattern + "," + channel + "," + message);
            }
            @Override
            public void psubscribed(String pattern, long count) {
                System.out.println("psub:" + pattern + "," + count);
            }
            @Override
            public void punsubscribed(String pattern, long count) {
                System.out.println("punsub:" + pattern + "," + count);
            }
            @Override
            public void subscribed(String channel, long count) {
                System.out.println("sub:" + channel + "," + count);
            }
            @Override
            public void unsubscribed(String channel, long count) {
                System.out.println("ubsub:" + channel + "," + count);
            }
        };
        StatefulRedisPubSubConnection<String, String> pubSubConnection = client.connectPubSub();
        pubSubConnection.addListener(listener);
        RedisPubSubCommands<String, String> connection = pubSubConnection.sync();
        connection.subscribe("channel");
        try {
            Thread.sleep(100000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        pubSubConnection.close();
        client.shutdown();
    }
}
启动中,在 redis 中执行:
127.0.0.1:6379> PUBLISH channel 1
(integer) 1
127.0.0.1:6379> PUBLISH channel 2
(integer) 1
127.0.0.1:6379> PUBLISH channel 3
(integer) 1
输出结果
[DEBUG] (main) Using Console logging
[DEBUG] (main) Starting UnsafeSupport init in Java 1.8
[TRACE] (main) sun.misc.Unsafe.theUnsafe ok
[TRACE] (main) sun.misc.Unsafe.copyMemory ok
[TRACE] (main) java.nio.Buffer.address ok
[DEBUG] (main) Unsafe is available
Aug 19, 2018 4:41:34 PM io.lettuce.core.EpollProvider <clinit>
信息: Starting without optional epoll library
Aug 19, 2018 4:41:34 PM io.lettuce.core.KqueueProvider <clinit>
信息: Starting without optional kqueue library
sub: channel, 1
message: channel, 1
message: channel, 2
message: channel, 3










