背景

​ 由于某些设计,系统不得不频繁读取整个用户表的数据做统计过滤使用,包括但不限于按部门找匹配员工、按项目找匹配员工等等等,直接查询表,sql优化的情况下查询倒是很快也不过是几百毫秒,But但是由于这些需求本身是服务于底层的比如权限功能,所以是特别高频的调用。直接读取数据库势必会造成数据库的连接暴增,很难保证DB的稳定性,说不定哪天就扛不住挂了。因此前人将他放到Redis的String结构上了,形成了一个大Value。Redis就不用多说了,单线程的,每秒读写都是上万的,但是这种大Value以及大Key都是很容易阻塞Redis的。

​ 实测过Redis读取这样一个9000条(7~8M大小)且不含大文本的表数据表时==getstring获取7~8M左右的大value耗时还可以接受==,只是set进去耗时长点2s多点

改进方案是改成hash(key -field-value,分别是表名-用户id-用户实体),但是hash结构适合查询单个用户id ,想要查询全部fields对应的value就不靠谱了, 用getvalues官方都不推荐,影响性能阻塞Redis,推荐使用HScan用游标分段遍历,我封装过也不适合没有性能,只是用HScan目的是保证Redis高可用不阻塞。适合用于批量的异步删除掉hash中的fields。

那么如何才能设计出一个适合以上场景的高可用方案呢?引出下文,哈哈~

==如果有幸正在阅读的你有过这个思考,并且有更好的方案,欢迎指教~==

目的

解决负载均衡中的服务器内存缓存同步更新问题,保存各服务器内存始终最新且一致

思想:

​ 数据每发生改变时,就通过reids设置版本号(incrby),
​ 每个服务器尝试写入ip到这个版本号的set集合中,
​ 写入成功表示没有被同步过,执行同步动作更新本地的内存缓存。
​ 否则跳过直接取内存缓存用

小缺点

但是可以改进

变化频率高的时候, redis存的数量也多,不过可以优化,版本号自增后就删掉旧版本的缓存

封装了一个内存全局协助同步辅助类

==三部曲==

  1. 通过这个GlobleAssistMemoryHelper.SetVersion来设置每次改动后的自增型版本
  2. 再配合GlobleAssistMemoryHelper.SetLocalReceivedTag来写入这次新改动的版本号改动所在的服务器的ip到Set集合中
  3. 获取内存缓存时调用GlobleAssistMemoryHelper.SetLocalReceivedTag写入当前服务器ip,如果写入失败说明已同步过最新的,直接读内存缓存,否则刷新最新数据到内存缓存

以上三步,如此便保证了每个服务器获取内存时始终是最新的那份!

附上源码

// <copyright file="GlobleAssistMemoryHelper.cs" company="PlaceholderCompany">
// Copyright (c) PlaceholderCompany. All rights reserved.
// </copyright>

#pragma warning disable SA1124,SA1202
namespace CCM.Console.Common
{
    using System;
    using System.Collections.Generic;
    using System.Text;

    /// <summary>
    /// author : eric.ding 西柚
    /// blog : https://www.dcmickey.cn
    /// 【非线程安全】内存全局协助同步类
    /// 背景:
    /// 解决负载均衡中的服务器内存缓存同步更新问题
    /// 思想:
    /// 数据每发生改变时,就通过reids设置版本号,
    /// 每个服务器尝试写入ip到这个版本号的set集合中,
    /// 写入成功表示没有被同步过,执行同步动作更新本地的内存缓存。
    /// 否则跳过直接取内存缓存用
    /// 缺点:
    /// 变化频率高的时候, redis存的数量也多,不过可以优化,版本号自增后就删掉旧版本的缓存
    /// </summary>
    public class GlobleAssistMemoryHelper
    {
        /// <summary>
        /// 枚举
        /// </summary>
        public enum GlobleAssistRateTypeEnum
        {
            /// <summary>
            /// 永久缓存
            /// </summary>
            Forever = 0,

            /// <summary>
            /// 每日缓存
            /// </summary>
            Daily = 1,
        }

        #region 设置配置版本

        /// <summary>
        /// 设置配置当前版本
        /// </summary>
        /// <param name="configName">配置标识名</param>
        /// <param name="rateType">缓存级别类型</param>
        public static void SetVersion(string configName, GlobleAssistRateTypeEnum rateType)
        {
            CSRedisHelper.IncrBy(configName);
            if (rateType == GlobleAssistRateTypeEnum.Daily)
            {
                // 每天类的版本控制 以0点为起点
                CSRedisHelper.KeyExpire(configName, DateTime.Now.TodayLeftSeconds());
            }
        }

        /// <summary>
        /// 设置当前配置版本
        /// </summary>
        /// <param name="configName">配置标识名</param>
        /// <returns>版本号</returns>
        private static long GetVersion(string configName)
        {
            return CSRedisHelper.GetStr<long>(configName);
        }
        #endregion

        #region  每个版本提供的的服务api

        /// <summary>
        /// 给当前版本打上标记
        /// 返回true表示 设置成功, 返回false表示已存在,重复添加
        /// </summary>
        /// <param name="configName">配置业务标识</param>
        /// <param name="rateType">配置频率范围</param>
        /// <param name="ipAddress">ip地址 不传会自动获取</param>
        /// <param name="version">版本 不传会自动获取</param>
        /// <returns>返回true表示 设置成功, 返回false表示已存在,重复添加</returns>
        public static bool SetLocalReceivedTag(string configName, GlobleAssistRateTypeEnum rateType, string ipAddress = null, long version = -1)
        {
            if (version <= 0)
            {
                version = GetVersion(configName);
            }

            if (ipAddress == null)
            {
                ipAddress = IPAddressHelper.GetIpAddress();
            }

            var expireTime = -1;
            if (rateType == GlobleAssistRateTypeEnum.Daily)
            {
                expireTime = DateTime.Now.TodayLeftSeconds();
            }

            var execCount = CSRedisHelper.SAdd($"{configName}:{version}", ipAddress, expireTime);
            return execCount == 1;
        }

        #endregion
    }
}

更上一层楼

封装了可以直接使用的内存服务,也就是内部已经包含调用上一步的接口动作

存入和读取内存的对象时都是通过json正反序列化后的对象,避免共享内存导致数据被修改

/// <summary>
    /// Author: eric.ding
    /// 分布式协同内存
    /// 通过服务器ip和redis的incr以及set数据结构来保证分布式系统内存统一
    /// </summary>
    public static class DistributeAssistMemoryCacheHelper
    {
        /// <summary>
        /// 全局负载下的同步锁,当数据修改后,通过此key来通知到各个负载机器更新内存缓存数据
        /// key作为string类型是是增量版本号,每次内容变化都需要incr提升版本
        /// 然后SetLocalReceivedTag中会用此key拼接$"{key}:{version}"来作为set结构的key,用来存放已处理的服务器ip
        /// 以此保证内存在分布式系统中一致性
        /// </summary>
        /// <param name="cacheKey">缓存的key</param>
        /// <returns>全局唯一锁</returns>
        private static string GlobleLockKey(string cacheKey) => $"{cacheKey}.lock";

        /// <summary>
        /// 设置数据缓存
        /// </summary>
        /// <param name="cacheKey">缓存Key</param>
        /// <param name="objObject">缓存内容</param>
        /// <param name="seconds">过期秒</param>
        /// <param name="isNeedNotifyAll">是否需要通知分布式系统其他机器,比如内容变化的则要通知给true, 本地内存失效重新写入的则不需要给false</param>
        /// <param name="priority">缓存项的优先级,你可以设置缓存项的到期策略一样,你还可以为缓存项赋予优先级。如果服务器内存紧缺的话,就会基于此优先级对缓存项进行清理以回收内存</param>
        public static void SetCache<T>(string cacheKey, T objObject, long seconds, bool isNeedNotifyAll, CacheItemPriority priority = CacheItemPriority.Low)
        {
            var value = JsonConvert.DeserializeObject<T>(objObject.PackJson());
            MemoryCacheCoreHelper.SetCache(cacheKey, value, seconds, priority);

            // 当数据修改后,需要通过redis来通知到各个负载机器更新内存缓存数据
            if (isNeedNotifyAll)
            {
                GlobleAssistMemoryHelper.SetVersion(GlobleLockKey(cacheKey), GlobleAssistMemoryHelper.GlobleAssistRateTypeEnum.Daily);
            }

            GlobleAssistMemoryHelper.SetLocalReceivedTag(GlobleLockKey(cacheKey), GlobleAssistMemoryHelper.GlobleAssistRateTypeEnum.Daily);
        }

        /// <summary>
        /// 获取数据缓存
        /// 深度拷贝 否则会操作同一个内存地址
        /// </summary>
        /// <param name="cacheKey">键</param>
        /// <returns>结果</returns>
        public static T GetCache<T>(string cacheKey)
        {
            try
            {
                // set 不进去表示处理过,缓存是最新的,直接取用
                if (!GlobleAssistMemoryHelper.SetLocalReceivedTag(GlobleLockKey(cacheKey), GlobleAssistMemoryHelper.GlobleAssistRateTypeEnum.Daily))
                {
                    var result = MemoryCacheCoreHelper.GetCache<T>(cacheKey);
                    return JsonConvert.DeserializeObject<T>(result.PackJson());
                }

                return default(T);
            }
            catch (Exception)
            {
                return default(T);
            }
        }

        /// <summary>
        /// 删除数据缓存
        /// </summary>
        /// <param name="cacheKey">键</param>
        public static void DelCache(string cacheKey)
        {
            MemoryCacheCoreHelper.DelCache(cacheKey);
        }
    }

食用方法:

// 设置缓存,只需要10毫秒
DistributeAssistMemoryCacheHelper.SetCache<List<RedisUser>>(key, usrslist, 24 * 60 * 60 - 500, isfromChange); 
// 读取缓存,几乎不耗时
result = DistributeAssistMemoryCacheHelper.GetCache<List<USROrganization>>(key);
// 删除缓存
DistributeAssistMemoryCacheHelper.DelCache(key);