一瞥之见

0%

一个合格的缓存管理

讲述了我在实际工程中对于缓存管理的认识

要点

缓存是我们再熟悉不过的事情了。以如下我遇到的一种情况为例:数据库存储的基本是一些配置,要求要多个表进行全表缓存(因为行数不多,基本几百行。行数若太多请辩证的看待下面的想法),修改频率也不频繁,但是修改后有些表的缓存需要能够立即更新,有些则对实时性要求没有那么高。下面是整理的一些需求要点:

  1. 能够提供最大刷新时间,对于一些修改后也不需要及时生效的可以通过最大刷新时间进行刷新
  2. 正确且灵活的处理缓存击穿的问题(比如在一定时间内,同一个key击穿的只确认一次是否更新)
  3. 有机制能及时的主动更新某表缓存
  4. 若backend数据库故障后全部以缓存的为准(到达最大刷新时间也不更新)【辩证的看待此条,只是在我的场景里适用】
  5. 一个进程、同一个表只持有一份缓存【进程单例】(这个有点和缓存无关,只是顺便说一下我下面的实现是绑定到app这个各个流程都能访问到的变量的原因)

实现

下面以我实际在一个aiohttp server项目中用的缓存管理数据结构为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
class CacheGroup:
"""
处理依赖数据库的缓存更新问题,name使用表名字,如果修改了对应表,调用这里的refresh,注册进来的此表相关的缓存就都会刷新
"""
def __init__(self):
self._name_map_cache = defaultdict(lambda: [])

async def add(self, name, **kwargs):
cache = Cache(**kwargs)
await cache.prepare()
self._name_map_cache[name].append(cache)

async def refresh(self, name):
if name not in self._name_map_cache: return
caches = self._name_map_cache[name]
for cache in caches:
await cache.refresh()


class Cache:
def __init__(self, gen_func, app_key: str, app: web.Application,
refresh_min_seconds: int = 60*30, refresh_max_seconds: Optional[int] = None,
*args, **kwargs):
"""
通过 实例.get(key) 方法获取缓存值

:param gen_func: 生产缓存数据的函数,返回值作为value赋值给app的对应app_key属性, 正因为传入了这个,在需要刷新时再调用下即可了
:param app_key:
:param app: aiohttp的app实例
:param refresh_min_seconds: 击穿缓存的refresh_min_seconds时间内,认为缓存是绝对有效的,不更新缓存。若为0,则每次击穿都新查
:param refresh_max_seconds: 在此时间后,进行一次更新,若为None,表示不更新
"""
if isinstance(refresh_min_seconds, int) and isinstance(refresh_max_seconds, int):
if refresh_max_seconds < refresh_min_seconds:
raise Exception(f'refresh_max_seconds {refresh_max_seconds} should not < refresh_min_seconds {refresh_min_seconds}')

self.__class__.app = app
self.key = app_key
self._gen_func = gen_func
self.refresh_max_seconds = refresh_max_seconds
self.refresh_min_seconds = refresh_min_seconds

self.args = args
self.kwargs = kwargs

self._last_refresh = TimeUtil.now()

@property
def alive_seconds(self):
return (TimeUtil.now() - self._last_refresh).seconds

@property
def data(self):
return self._result

async def get(self, key, default=None):
# 最大过期时间刷新
if (
isinstance(self.refresh_max_seconds, int) and self.refresh_max_seconds > 0 and
self.alive_seconds > self.refresh_max_seconds
):
await self.refresh()

# 缓存击中
if self._result.get(key):
return self._result[key]

# 在有效时间内
if self.refresh_min_seconds > 0 and self.alive_seconds < self.refresh_min_seconds:
return default

await self.refresh()
return self._result.get(key, default)

def __getattr__(self, item):
warnings.warn('if u want to visit cache, use "await ins.get(key)"')

def __len__(self):
return len(self._result)

def __contains__(self, item):
# 注意这里,时间还是调用了get,从而触发了最大过期时间...的检测
asyncio.ensure_future(self.get(item))
return (item in self.data)

async def _get_result(self):
try:
resp = await self._gen_func(self.app, *self.args, **self.kwargs)
except:
return None
return resp

async def refresh(self):
self._last_refresh = TimeUtil.now()
result = await self._get_result()
if not result: return # 防御。避免组件出错时,无法获取数据,导致一直刷新数据引崩。此时使用老的缓存数据
self._result = result

async def prepare(self):
result = await self._get_result()
assert result, f'{self.key}, prepare cache failed'
assert isinstance(result, dict), f'{self.key}, value must be dict like object (Key-Value)'

self._result = result
self.app[self.key] = self

def __str__(self):
return f'cache obj to app key {self.key}, alive {TimeUtil.beautyDuration(self.alive_seconds)[0]} '

可以看到的是,上面的结构已经实现了要点里面的1、2、4、5四个点了,那么第3点该如何实现呢?

  1. 首先我们明确的知道,可以通过await app[‘cache_group’].refresh(‘xxx_key’)来刷新xxx_key代表的某表

  2. 其次问题就是如果让在多台机器的各个进程都能够能够及时的知道xxx_key代表的数据库表发生了修改

    这里我同样还是使用了基于redis pub/sub的方式进行,因为首先合适的修改方式肯定不是手动去直接改数据库(风险太高),那么别的方式肯定是很容易同时触发一个pub xxx_key到相应channel的,这样每个进程收到后再调用await app[‘cache_group’].refresh(‘xxx_key’)即可完成“及时”的刷新