1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
| class CacheGroup: """ 处理依赖数据库的缓存更新问题,name使用表名字,如果修改了对应表,调用这里的refresh,注册进来的此表相关的缓存就都会刷新 """ def __init__(self): self._name_map_cache = defaultdict(lambda: [])
async def add(self, name, **kwargs): cache = Cache(**kwargs) await cache.prepare() self._name_map_cache[name].append(cache)
async def refresh(self, name): if name not in self._name_map_cache: return caches = self._name_map_cache[name] for cache in caches: await cache.refresh()
class Cache: def __init__(self, gen_func, app_key: str, app: web.Application, refresh_min_seconds: int = 60*30, refresh_max_seconds: Optional[int] = None, *args, **kwargs): """ 通过 实例.get(key) 方法获取缓存值
:param gen_func: 生产缓存数据的函数,返回值作为value赋值给app的对应app_key属性, 正因为传入了这个,在需要刷新时再调用下即可了 :param app_key: :param app: aiohttp的app实例 :param refresh_min_seconds: 击穿缓存的refresh_min_seconds时间内,认为缓存是绝对有效的,不更新缓存。若为0,则每次击穿都新查 :param refresh_max_seconds: 在此时间后,进行一次更新,若为None,表示不更新 """ if isinstance(refresh_min_seconds, int) and isinstance(refresh_max_seconds, int): if refresh_max_seconds < refresh_min_seconds: raise Exception(f'refresh_max_seconds {refresh_max_seconds} should not < refresh_min_seconds {refresh_min_seconds}')
self.__class__.app = app self.key = app_key self._gen_func = gen_func self.refresh_max_seconds = refresh_max_seconds self.refresh_min_seconds = refresh_min_seconds
self.args = args self.kwargs = kwargs
self._last_refresh = TimeUtil.now()
@property def alive_seconds(self): return (TimeUtil.now() - self._last_refresh).seconds
@property def data(self): return self._result
async def get(self, key, default=None): if ( isinstance(self.refresh_max_seconds, int) and self.refresh_max_seconds > 0 and self.alive_seconds > self.refresh_max_seconds ): await self.refresh()
if self._result.get(key): return self._result[key]
if self.refresh_min_seconds > 0 and self.alive_seconds < self.refresh_min_seconds: return default
await self.refresh() return self._result.get(key, default)
def __getattr__(self, item): warnings.warn('if u want to visit cache, use "await ins.get(key)"')
def __len__(self): return len(self._result)
def __contains__(self, item): asyncio.ensure_future(self.get(item)) return (item in self.data)
async def _get_result(self): try: resp = await self._gen_func(self.app, *self.args, **self.kwargs) except: return None return resp
async def refresh(self): self._last_refresh = TimeUtil.now() result = await self._get_result() if not result: return self._result = result
async def prepare(self): result = await self._get_result() assert result, f'{self.key}, prepare cache failed' assert isinstance(result, dict), f'{self.key}, value must be dict like object (Key-Value)'
self._result = result self.app[self.key] = self
def __str__(self): return f'cache obj to app key {self.key}, alive {TimeUtil.beautyDuration(self.alive_seconds)[0]} '
|