分享一段前段时间写的模拟Redis扩容做的Hash一致性闭环算法
前置条件
Java语言 采用Map 模拟Redis环境
适用场景
Redis扩容采用Hash闭环、Nginx多级缓存采用Hash闭环+Lua手动负载器 等等其他需要做扩容的方案
一致性hash解决的问题
分布式缓存中如果使用简单hash,节点增加和减少对于系统影响非常大。而一致性hash则高效地解决了这个问题。一致性hash不是说节点增加和减少完全没有影响,而是相比简单hash大大降低了这个影响。
一致性hash算法
一致性hash算法也是通过hash取模,只不过取模的值不再是节点个数这样一个可变的量,而是一个相对而言比较大的不变值,例如int型最大值。我们将0和int型最大值首尾相连这样就形成了一个闭环,我们称其为hash环
如果当前有某个节点宕机,那么受影响的数据,只是当前那一个区间的数据
只需要将这一个区间内的数据 重新
源码
import org.apache.commons.lang3.StringUtils;
import java.util.*;
import java.util.regex.Pattern;
/**
* Created Date by 2019/12/27 0027.
* 模拟 一致性hash算法 前半部分 redis 分布区域选择
* 这里用随机数代替hash
*
* @author Parker
*/
public class TestHash {
private static String default2 = "default2";
/** 生成所有指令 */
private static Map<String, Map<String, String>> instructions = new HashMap<>();
/**上下文指令 */
private static Stack<String> contextKeys = new Stack<>();
/**上下文指令 */
private static Stack<String> instructionKeys = new Stack<>();
/** redis 集合 */
private static Map<Integer, String> redisMap = new TreeMap<>(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1.compareTo(o2);
}
});
/** redis 仓库 */
private static Map<String, List<Integer>> redisRepository = new HashMap<>();
/**
* 静态块
*/
static{
contextKeys.add("system");
instructions = TestHash.createInstructions();
// 添加三台redis服务器 在一个闭环
TestHash.createRedisMap("xxoo1-redis",1456);
TestHash.createRedisMap("xxoo2-redis",5437);
TestHash.createRedisMap("xxoo3-redis",8524);
// 初始化数据
TestHash.addNum(1000);
TestHash.addNum(2000);
TestHash.addNum(5000);
TestHash.addNum(6000);
TestHash.addNum(7000);
TestHash.addNum(8000);
TestHash.addNum(10000);
TestHash.addNum(15000);
TestHash.addNum(20000);
TestHash.addNum(25000);
TestHash.addNum(40000);
}
public static void main(String[] args) {
System.out.println("欢迎使用 redis一致性hash算法 模拟系统 !");
System.out.println();
// 输入检测
TestHash.invokeScanner(contextKeys,null,false);
}
/**
* 生成Redis 仓库
*
* 生成仓库 则 锁定类 这里用 synchronized 代替
* ☆☆☆☆☆☆☆☆☆☆☆☆☆
* 现实环境中 需要做分布式锁 避免在迁移数据过程中 又有新的数据 做改动 ****** ☆
* @return
*/
public synchronized static void createRedisMap(String redisCode,Integer num) {
Set<Integer> integers = redisMap.keySet();
Iterator<Integer> iterator = integers.iterator();
List<Integer> keys = new ArrayList<>(integers.size());
while (iterator.hasNext()){
keys.add(iterator.next());
}
//正序
keys.sort(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1.compareTo(o2);
}
});
// 调用闭环算法
Integer key = null;
if(keys != null && !keys.isEmpty()){
key = TestHash.closedLoopAlgorithm(num,keys);
}
// 根据 闭环算法 获得数据
List<Integer> datas = null;
if(null != key){
datas = redisRepository.get(redisMap.get(key));
}
// 创建集合
List<Integer> list = new ArrayList<>();
redisRepository.put(redisCode,list);
redisMap.put(num,redisCode);
// 因为上方有新增redis 获取 新的 keys
integers = redisMap.keySet();
iterator = integers.iterator();
List<Integer> keysT = new ArrayList<>(integers.size());
while (iterator.hasNext()){
keysT.add(iterator.next());
}
// 数据迁移
if(datas != null && !datas.isEmpty()){
List<Integer> newDatas = new ArrayList<>();
// 循环所有数据 查找出 要存放在新容器里的数据
for (Integer data : datas) {
// 调用闭环算法
Integer keyT = TestHash.closedLoopAlgorithm(data,keysT);
if(keyT.equals(num)){
newDatas.add(data);
}
}
// 删除 旧数据
for (Integer newData : newDatas) {
// 传入keys 因为 keys已做更改 传入旧keys
TestHash.deleteOldData(newData,keys);
}
// 添加新数据
List<Integer> newQueue = redisRepository.get(redisMap.get(num));
for (Integer newData : newDatas) {
newQueue.add(newData);
}
}
}
/**
* 生成Redis 数据
* @return
*/
public static void addNum(Integer num) {
Integer key = 0;
Set<Integer> integers = redisMap.keySet();
Iterator<Integer> iterator = integers.iterator();
List<Integer> keys = new ArrayList<>(integers.size());
while (iterator.hasNext()){
keys.add(iterator.next());
}
//正序
keys.sort(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1.compareTo(o2);
}
});
// 调用闭环算法
key = TestHash.closedLoopAlgorithm(num,keys);
List<Integer> datas = redisRepository.get(redisMap.get(key));
datas.add(num);
List<Integer> nums = new ArrayList<>();
for (Integer integer : keys) {
if(redisRepository.get(redisMap.get(integer)) != null){
nums.addAll(redisRepository.get(redisMap.get(integer)));
}
}
System.out.println();
System.out.println("当前数据:");
System.out.println(nums.toString());
for (Integer integer : keys) {
if(redisRepository.get(redisMap.get(integer)) != null){
System.out.println(redisMap.get(integer)+":"+redisRepository.get(redisMap.get(integer)).toString());
}
}
System.out.println();
}
/**
* 查看Redis 所有数据
* @return
*/
public static void findAll() {
Set<Integer> integers = redisMap.keySet();
Iterator<Integer> iterator = integers.iterator();
List<Integer> keys = new ArrayList<>(integers.size());
while (iterator.hasNext()){
keys.add(iterator.next());
}
//正序
keys.sort(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1.compareTo(o2);
}
});
List<Integer> nums = new ArrayList<>();
for (Integer integer : keys) {
if(redisRepository.get(redisMap.get(integer)) != null){
nums.addAll(redisRepository.get(redisMap.get(integer)));
}
}
System.out.println();
System.out.println("当前数据:");
System.out.println(nums.toString());
for (Integer integer : keys) {
if(redisRepository.get(redisMap.get(integer)) != null){
System.out.println(redisMap.get(integer)+":"+redisRepository.get(redisMap.get(integer)).toString());
}
}
System.out.println();
}
/**
* 删除Redis数据
* @return
*/
public static void deleteData(Integer num) {
Integer key = 0;
Set<Integer> integers = redisMap.keySet();
Iterator<Integer> iterator = integers.iterator();
List<Integer> keys = new ArrayList<>(integers.size());
while (iterator.hasNext()){
keys.add(iterator.next());
}
//正序
keys.sort(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1.compareTo(o2);
}
});
// 调用闭环算法
key = TestHash.closedLoopAlgorithm(num,keys);
List<Integer> datas = redisRepository.get(redisMap.get(key));
boolean deFlag = false;
int count = 0;
for (int i = 0; i < datas.size(); i++) {
if(num.equals(datas.get(i))){
deFlag = true;
count = i;
break;
}
}
if(deFlag){
datas.remove(count);
}
List<Integer> nums = new ArrayList<>();
for (Integer integer : keys) {
if(redisRepository.get(redisMap.get(integer)) != null){
nums.addAll(redisRepository.get(redisMap.get(integer)));
}
}
System.out.println();
System.out.println("当前数据:");
System.out.println(nums.toString());
for (Integer integer : keys) {
if(redisRepository.get(redisMap.get(integer)) != null){
System.out.println(redisMap.get(integer)+":"+redisRepository.get(redisMap.get(integer)).toString());
}
}
System.out.println();
}
/**
* 删除Redis数据
* @return
*/
public synchronized static void deleteOldData(Integer num,List<Integer> keys) {
Integer key = 0;
//正序
keys.sort(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1.compareTo(o2);
}
});
// 调用闭环算法
key = TestHash.closedLoopAlgorithm(num,keys);
List<Integer> datas = redisRepository.get(redisMap.get(key));
boolean deFlag = false;
int count = 0;
for (int i = 0; i < datas.size(); i++) {
if(num.equals(datas.get(i))){
deFlag = true;
count = i;
break;
}
}
if(deFlag){
datas.remove(count);
}
List<Integer> nums = new ArrayList<>();
for (Integer integer : keys) {
if(redisRepository.get(redisMap.get(integer)) != null){
nums.addAll(redisRepository.get(redisMap.get(integer)));
}
}
}
/**
* 闭环算法 核心算法
* ☆☆☆☆☆☆☆☆☆☆
* @param num
* @param keys
* @return
*/
public static Integer closedLoopAlgorithm(Integer num,List<Integer> keys){
Integer key = 0;
if(num > keys.get(keys.size()-1) || num <= keys.get(0)){
key = keys.get(0);
}else {
for (int i = keys.size() - 2; i >= 0; i--) {
if(num > keys.get(i)){
key = keys.get(i+1);
break;
}
}
}
return key;
}
/**
* 下方代码 是输入 相关 与 一致性算法无关
*/
/*-----------------------------------------------------------------------------------------------------------------------------------------------*/
/**
* 创建随机数
* @param
*/
public static int createRandom(){
// 最大数
Integer max = 10000;
return new Random().nextInt(max);
}
/**
* 输入方案
*/
public static void invokeScanner(Stack<String> contextKey,String instruction,boolean errorF){
try{
String key = contextKey.peek();
String[] keys = key.split("_");
List<String> keyList = Arrays.asList(keys);
// 如果输入错误 进行排查
boolean errorFlag = true;
if(errorF && keyList.size() > 1){
List<String> tempList = new ArrayList<>();
for (int i = 0; i < keyList.size()-1; i++) {
tempList.add(keyList.get(i));
}
keyList = tempList;
errorFlag = false;
}
// 顶级
if("system".equals(keys[0])){
// 一级
if(keyList.size() == 1){
if(errorFlag){
contextKey.add(contextKey.peek()+"_top");
}
instruction = TestHash.validataScanner(instructions.get(TestHash.createInstructionKey()));
// 验证指令输入
String instructionKeyTemp = null;
if(instructionKeys.size() > 0){
instructionKeyTemp = instructionKeys.peek();
}
TestHash.invokeScanner(contextKey,instructionKeyTemp,false);
}else{
// 一级
if("top".equals(keyList.get(1))){
// 查询当前所有redis集群
if("1".equals(instruction)){
if(errorFlag){
contextKey.add(contextKey.peek()+"_getRedis");
}
// 打印当前 redis 服务器数量
TestHash.printRedis(redisMap);
// 发起二次输入
// 验证指令输入
if(errorFlag){
contextKey.add(contextKey.peek()+"_back");
}
instruction = TestHash.validataScanner(instructions.get(default2));
TestHash.invokeScanner(contextKey,instruction,false);
}
// 增加redis集群
else if("2".equals(instruction)){
if(errorFlag){
contextKey.add(contextKey.peek()+"_addRedis");
}
String code = "";
Integer num = 0;
System.out.println();
System.out.println("------------------------------");
System.out.println("请输入redis编号:");
boolean fl = true;
while (fl){
// 开启输入模式
Scanner scanner = new Scanner(System.in);
String str = scanner.nextLine();
String pattern = "[\\w\\d]+";
boolean matches = Pattern.matches(pattern, str);
if(matches){
List<Integer> integers = redisRepository.get(str);
if(integers != null){
System.out.println("------------------------------");
System.out.println();
System.out.println("[ "+str+" ] 编号已存在");
System.out.println();
System.out.println("请输入redis编号:");
System.out.println();
continue;
}
code = str;
fl = false;
}else{
System.out.println("------------------------------");
System.out.println();
System.out.println("[ "+str+" ] 只能包含字母、数字与_");
System.out.println();
System.out.println("请输入redis编号:");
System.out.println();
}
}
System.out.println();
System.out.println("------------------------------");
System.out.println("请输入redis数据范围:");
boolean fl1 = true;
while (fl1){
// 开启输入模式
Scanner scanner = new Scanner(System.in);
String str = scanner.nextLine();
String pattern = "[\\d]+";
boolean matches = Pattern.matches(pattern, str);
if(matches){
num = Integer.parseInt(str);
String s = redisMap.get(num);
if(!StringUtils.isEmpty(s)){
System.out.println("------------------------------");
System.out.println();
System.out.println("[ "+str+" ] 数据范围已存在");
System.out.println();
System.out.println("请输入redis数据范围:");
System.out.println();
continue;
}
fl1 = false;
}else{
System.out.println("------------------------------");
System.out.println();
System.out.println("[ "+str+" ] 只能包含数字");
System.out.println();
System.out.println("请输入redis数据范围:");
System.out.println();
}
}
TestHash.createRedisMap(code,num);
// 打印当前 redis 服务器数量
TestHash.printRedis(redisMap);
// 发起二次输入
// 验证指令输入
if(errorFlag){
contextKey.add(contextKey.peek()+"_add");
}
instruction = TestHash.validataScanner(instructions.get(default2));
TestHash.invokeScanner(contextKey,instruction,false);
}
// 增加redis数据
else if("3".equals(instruction)){
if(errorFlag){
contextKey.add(contextKey.peek()+"_addRedisData");
}
System.out.println();
System.out.println("------------------------------");
System.out.println("请输入数值:");
Integer num = 0;
boolean fl = true;
while (fl){
// 开启输入模式
Scanner scanner = new Scanner(System.in);
String str = scanner.nextLine();
String pattern = "[\\d]+";
boolean matches = Pattern.matches(pattern, str);
if(matches){
num = Integer.parseInt(str);
fl = false;
}else{
System.out.println("------------------------------");
System.out.println();
System.out.println("[ "+str+" ] 只能包含数字");
System.out.println();
System.out.println("请输入数值:");
System.out.println();
}
}
TestHash.addNum(num);
// 打印当前 redis 数值
//TestHash.printRedisData();
// 发起二次输入
// 验证指令输入
if(errorFlag){
contextKey.add(contextKey.peek()+"_add");
}
instruction = TestHash.validataScanner(instructions.get(default2));
TestHash.invokeScanner(contextKey,instruction,false);
}
// 查询当前所有redis数据
if("4".equals(instruction)){
if(errorFlag){
contextKey.add(contextKey.peek()+"_findAll");
}
// 打印当前 redis 所有数据
TestHash.findAll();
// 发起二次输入
// 验证指令输入
if(errorFlag){
contextKey.add(contextKey.peek()+"_findAll");
}
instruction = TestHash.validataScanner(instructions.get(default2));
TestHash.invokeScanner(contextKey,instruction,false);
}
// 删除redis数据
else if("5".equals(instruction)){
if(errorFlag){
contextKey.add(contextKey.peek()+"_deleteRedisData");
}
System.out.println();
System.out.println("------------------------------");
System.out.println("请输入数值:");
Integer num = 0;
boolean fl = true;
while (fl){
// 开启输入模式
Scanner scanner = new Scanner(System.in);
String str = scanner.nextLine();
String pattern = "[\\d]+";
boolean matches = Pattern.matches(pattern, str);
if(matches){
num = Integer.parseInt(str);
fl = false;
}else{
System.out.println("------------------------------");
System.out.println();
System.out.println("[ "+str+" ] 只能包含数字");
System.out.println();
System.out.println("请输入数值:");
System.out.println();
}
}
TestHash.deleteData(num);
// 打印当前 redis 数值
//TestHash.printRedisData();
// 发起二次输入
// 验证指令输入
if(errorFlag){
contextKey.add(contextKey.peek()+"_delete");
}
instruction = TestHash.validataScanner(instructions.get(default2));
TestHash.invokeScanner(contextKey,instruction,false);
}
else{
//二级
if(keyList.size() > 2){
// 查询当前所有redis集群
if("getRedis".equals(keyList.get(2))){
// 返回上一层
if("0".equals(instruction)){
// 取两次 包含本次的正确 指令
contextKey.pop();
contextKey.pop();
instructionKeys.pop();
instructionKeys.pop();
String instructionKeyTemp = null;
if(instructionKeys.size() > 0){
instructionKeyTemp = instructionKeys.peek();
}
TestHash.invokeScanner(contextKey,instructionKeyTemp,true);
}
}else if("addRedis".equals(keyList.get(2))){
// 返回上一层
if("0".equals(instruction)){
// 取两次 包含本次的正确 指令
contextKey.pop();
contextKey.pop();
instructionKeys.pop();
instructionKeys.pop();
String instructionKeyTemp = null;
if(instructionKeys.size() > 0){
instructionKeyTemp = instructionKeys.peek();
}
TestHash.invokeScanner(contextKey,instructionKeyTemp,true);
}
}else if("addRedisData".equals(keyList.get(2))){
// 返回上一层
if("0".equals(instruction)){
// 取两次 包含本次的正确 指令
contextKey.pop();
contextKey.pop();
instructionKeys.pop();
instructionKeys.pop();
String instructionKeyTemp = null;
if(instructionKeys.size() > 0){
instructionKeyTemp = instructionKeys.peek();
}
TestHash.invokeScanner(contextKey,instructionKeyTemp,true);
}
}else if("findAll".equals(keyList.get(2))){
// 返回上一层
if("0".equals(instruction)){
// 取两次 包含本次的正确 指令
contextKey.pop();
contextKey.pop();
instructionKeys.pop();
instructionKeys.pop();
String instructionKeyTemp = null;
if(instructionKeys.size() > 0){
instructionKeyTemp = instructionKeys.peek();
}
TestHash.invokeScanner(contextKey,instructionKeyTemp,true);
}
}else if("deleteRedisData".equals(keyList.get(2))){
// 返回上一层
if("0".equals(instruction)){
// 取两次 包含本次的正确 指令
contextKey.pop();
contextKey.pop();
instructionKeys.pop();
instructionKeys.pop();
String instructionKeyTemp = null;
if(instructionKeys.size() > 0){
instructionKeyTemp = instructionKeys.peek();
}
TestHash.invokeScanner(contextKey,instructionKeyTemp,true);
}
}
}
}
}
}
}
// 如果没有拦截到 则狗咬狗尾巴
String instructionKeyTemp = null;
if(instructionKeys.size() > 0){
instructionKeyTemp = instructionKeys.peek();
}
TestHash.invokeScanner(contextKey,instructionKeyTemp,true);
}catch (Exception e){
//e.printStackTrace();
System.out.println(e.getMessage());
// 错误
TestHash.errorScanner(e.getMessage());
}
}
/**
* 判断输入
* @param map 指令集合
*/
public static String validataScanner(Map<String, String> map) throws Exception{
// 打印指令
TestHash.printScannerKeys(map);
// 开启输入模式
Scanner scanner = new Scanner(System.in);
String str = scanner.nextLine();
String obj = map.get(str);
if(obj != null){
instructionKeys.add(str);
return str;
}
// 否则就报错
throw new Exception(str);
}
/**
* 生成指令Key
* @return
*/
public static String createInstructionKey() {
return contextKeys.peek();
}
/**
* 生成所有指令
* @return
*/
public static Map<String,Map<String, String>> createInstructions() {
//总 指令
Map<String, Map<String, String>> map = new HashMap<>();
//第一层指令
Map<String, String> allMap = new TreeMap<>(new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return o1.compareTo(o2);
}
});
allMap.put("1","查询当前所有redis集群");
allMap.put("2","增加集群");
allMap.put("3","添加数据");
allMap.put("4","查看所有数据");
allMap.put("5","删除数据");
map.put("system_top",allMap);
//默认二级命令
Map<String, String> default2Map = new TreeMap<>(new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return o1.compareTo(o2);
}
});
default2Map.put("0","返回上级菜单");
map.put("default2",default2Map);
return map;
}
/**
* 输入错误
*/
public static void errorScanner(String str) {
String strT = "";
if(!StringUtils.isEmpty(str)){
strT = "[ "+str+" ] ";
}
System.out.println();
System.out.println();
System.out.println();
System.out.println();
System.out.println();
System.out.println();
System.out.println(strT + "输入参数有误,请重新输入!");
System.out.println();
String instructionKeyTemp = null;
if(instructionKeys.size() > 0){
instructionKeyTemp = instructionKeys.peek();
}
// 回调
TestHash.invokeScanner(contextKeys,instructionKeyTemp,true);
}
/**
* 打印指令
*/
public static void printScannerKeys(Map<String, String> map) {
// 打印命令
Set<String> keySet = map.keySet();
Iterator<String> iter = keySet.iterator();
System.out.println();
System.out.println("------------------------------");
while (iter.hasNext()) {
String key = iter.next();
System.out.println("指令 [ "+key+" ]" + " " + map.get(key));
}
System.out.println();
System.out.println("请输入指令:");
}
/**
* 打印当前redis服务器
* @param map
*/
public static void printRedis(Map<Integer, String> map){
Set<Integer> keySet = map.keySet();
Iterator<Integer> iter = keySet.iterator();
System.out.println();
while (iter.hasNext()) {
Integer key = iter.next();
System.out.println(key + ":" + map.get(key));
}
System.out.println();
}
}