#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : meta.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 01/18/2018
#
# This file is part of Jacinle.
# Distributed under terms of the MIT license.
import functools
import operator
import six
import collections
import threading
import contextlib
__all__ = [
"gofor",
"run_once",
"try_run",
"map_exec",
"filter_exec",
"first_n",
"stmap",
"method2func",
"map_exec_method",
"decorator_with_optional_args",
"cond_with",
"cond_with_group",
"merge_iterable",
"dict_deep_update",
"dict_deep_kv",
"dict_deep_keys",
"assert_instance",
"assert_none",
"assert_notnone",
"notnone_property",
"synchronized",
"make_dummy_func",
"Singleton",
]
[docs]def gofor(v):
if isinstance(v, collections.Mapping):
return v.items()
assert_instance(v, collections.Iterable)
return enumerate(v)
[docs]def run_once(func):
has_run = False
@synchronized
@functools.wraps(func)
def new_func(*args, **kwargs):
nonlocal has_run
if not has_run:
has_run = True
return func(*args, **kwargs)
else:
return
return new_func
[docs]def try_run(lambda_):
try:
return lambda_()
except Exception:
return None
[docs]def map_exec(func, *iterables):
return list(map(func, *iterables))
[docs]def filter_exec(func, iterable):
return list(filter(func, iterable))
[docs]def first_n(iterable, n=10):
def gen():
it = iter(iterable)
for i in range(n):
try:
yield next(it)
except StopIteration:
return
return list(gen())
[docs]def stmap(func, iterable):
if isinstance(iterable, six.string_types):
return func(iterable)
elif isinstance(iterable, (collections.abc.Sequence, collections.UserList)):
return [stmap(func, v) for v in iterable]
elif isinstance(iterable, collections.abc.Set):
return {stmap(func, v) for v in iterable}
elif isinstance(iterable, (collections.abc.Mapping, collections.UserDict)):
return {k: stmap(func, v) for k, v in iterable.items()}
else:
return func(iterable)
[docs]def method2func(method_name):
return lambda x: getattr(x, method_name)()
[docs]def map_exec_method(method_name, iterable):
return list(map(method2func(method_name), iterable))
[docs]def decorator_with_optional_args(func=None, *, is_method=False):
def wrapper(f):
@functools.wraps(f)
def wrapped(*args, **kwargs):
if is_method:
if len(args) == 1:
return f(*args, **kwargs)
elif len(args) == 2:
return f(args[0], **kwargs)(args[1])
else:
raise ValueError(
"Decorator supports 0 or 1 positional arguments as the function to be wrapped."
)
else:
if len(args) == 0:
return f(**kwargs)
elif len(args) == 1:
return f(**kwargs)(args[0])
else:
raise ValueError(
"Decorator supports 0 or 1 positional arguments as the function to be wrapped."
)
return wrapped
if func is not None:
return wrapper(func)
else:
return wrapper
[docs]@contextlib.contextmanager
def cond_with(with_statement, cond):
if cond:
with with_statement as res:
yield res
else:
yield
[docs]@contextlib.contextmanager
def cond_with_group(cond, *with_statement):
if cond:
with contextlib.ExitStack() as stack:
res = [stack.enter_context(ctx) for ctx in with_statement]
yield res
else:
yield
[docs]def merge_iterable(v1, v2):
assert issubclass(type(v1), type(v2)) or issubclass(type(v2), type(v1))
if isinstance(v1, (dict, set)):
v = v1.copy().update(v2)
return v
return v1 + v2
[docs]def dict_deep_update(a, b):
for key in b:
if key in a and type(b[key]) is dict:
dict_deep_update(a[key], b[key])
else:
a[key] = b[key]
[docs]def dict_deep_kv(d, sort=True, sep=".", allow_dict=False):
# Not using collections.Sequence to avoid infinite recursion.
assert isinstance(d, (tuple, list, collections.Mapping))
result = list()
def _dfs(current, prefix=None):
for key, value in gofor(current):
current_key = key if prefix is None else prefix + sep + str(key)
if isinstance(current[key], (tuple, list, collections.Mapping)):
if allow_dict:
result.append((current_key, value))
_dfs(current[key], current_key)
else:
result.append((current_key, value))
_dfs(d)
if sort:
result.sort(key=operator.itemgetter(0))
return result
[docs]def dict_deep_keys(d, sort=True, sep=".", allow_dict=True):
kv = dict_deep_kv(d, sort=sort, sep=sep, allow_dict=allow_dict)
return [i[0] for i in kv]
[docs]def assert_instance(ins, clz, msg=None):
msg = msg or "{} (of type{}) is not of type {}".format(ins, type(ins), clz)
assert isinstance(ins, clz), msg
[docs]def assert_none(ins, msg=None):
msg = msg or "{} is not None".format(ins)
assert ins is None, msg
[docs]def assert_notnone(ins, msg=None, name="instance"):
msg = msg or "{} is None".format(name)
assert ins is not None, msg
[docs]class notnone_property:
def __init__(self, fget):
self.fget = fget
self.__module__ = fget.__module__
self.__name__ = fget.__name__
self.__doc__ = fget.__doc__
self.__prop_key = "{}_{}".format(fget.__name__, id(fget))
def __get__(self, instance, owner):
if instance is None:
return self.fget
v = self.fget(instance)
assert v is not None, "{}.{} can not be None, maybe not set yet".format(
type(instance).__name__, self.__name__
)
return v
[docs]@decorator_with_optional_args
def synchronized(mutex=None):
if mutex is None:
mutex = threading.Lock()
def wrapper(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
with mutex:
return func(*args, **kwargs)
wrapped_func.__sync_mutex__ = mutex
return wrapped_func
return wrapper
[docs]def make_dummy_func(message=None):
def func(*args, **kwargs):
raise NotImplementedError(message)
return func
[docs]class Singleton(type):
"https://stackoverflow.com/a/55629949/7672954"
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._locked_call(*args, **kwargs)
return cls._instances[cls]
@synchronized
def _locked_call(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
@property
def ready(cls):
if cls in cls._instances:
return True
return False
[docs] @classmethod
def clear(cls):
cls._instances.clear()