diff options
author | nkozlovskiy <nmk@ydb.tech> | 2023-09-29 12:24:06 +0300 |
---|---|---|
committer | nkozlovskiy <nmk@ydb.tech> | 2023-09-29 12:41:34 +0300 |
commit | e0e3e1717e3d33762ce61950504f9637a6e669ed (patch) | |
tree | bca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/python/pickleshare/py2/pickleshare.py | |
parent | 38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff) | |
download | ydb-e0e3e1717e3d33762ce61950504f9637a6e669ed.tar.gz |
add ydb deps
Diffstat (limited to 'contrib/python/pickleshare/py2/pickleshare.py')
-rw-r--r-- | contrib/python/pickleshare/py2/pickleshare.py | 352 |
1 files changed, 352 insertions, 0 deletions
diff --git a/contrib/python/pickleshare/py2/pickleshare.py b/contrib/python/pickleshare/py2/pickleshare.py new file mode 100644 index 0000000000..086f84f6ea --- /dev/null +++ b/contrib/python/pickleshare/py2/pickleshare.py @@ -0,0 +1,352 @@ +#!/usr/bin/env python + +""" PickleShare - a small 'shelve' like datastore with concurrency support + +Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike +shelve, many processes can access the database simultaneously. Changing a +value in database is immediately visible to other processes accessing the +same database. + +Concurrency is possible because the values are stored in separate files. Hence +the "database" is a directory where *all* files are governed by PickleShare. + +Example usage:: + + from pickleshare import * + db = PickleShareDB('~/testpickleshare') + db.clear() + print "Should be empty:",db.items() + db['hello'] = 15 + db['aku ankka'] = [1,2,313] + db['paths/are/ok/key'] = [1,(5,46)] + print db.keys() + del db['aku ankka'] + +This module is certainly not ZODB, but can be used for low-load +(non-mission-critical) situations where tiny code size trumps the +advanced features of a "real" object database. + +Installation guide: pip install pickleshare + +Author: Ville Vainio <vivainio@gmail.com> +License: MIT open source license. + +""" + +from __future__ import print_function + + +__version__ = "0.7.5" + +try: + from pathlib import Path +except ImportError: + # Python 2 backport + from pathlib2 import Path + +import os,stat,time +try: + import collections.abc as collections_abc +except ImportError: + import collections as collections_abc +try: + import cPickle as pickle +except ImportError: + import pickle +import errno +import sys + +if sys.version_info[0] >= 3: + string_types = (str,) +else: + string_types = (str, unicode) + +def gethashfile(key): + return ("%02x" % abs(hash(key) % 256))[-2:] + +_sentinel = object() + +class PickleShareDB(collections_abc.MutableMapping): + """ The main 'connection' object for PickleShare database """ + def __init__(self,root): + """ Return a db object that will manage the specied directory""" + if not isinstance(root, string_types): + root = str(root) + root = os.path.abspath(os.path.expanduser(root)) + self.root = Path(root) + if not self.root.is_dir(): + # catching the exception is necessary if multiple processes are concurrently trying to create a folder + # exists_ok keyword argument of mkdir does the same but only from Python 3.5 + try: + self.root.mkdir(parents=True) + except OSError as e: + if e.errno != errno.EEXIST: + raise + # cache has { 'key' : (obj, orig_mod_time) } + self.cache = {} + + + def __getitem__(self,key): + """ db['key'] reading """ + fil = self.root / key + try: + mtime = (fil.stat()[stat.ST_MTIME]) + except OSError: + raise KeyError(key) + + if fil in self.cache and mtime == self.cache[fil][1]: + return self.cache[fil][0] + try: + # The cached item has expired, need to read + with fil.open("rb") as f: + obj = pickle.loads(f.read()) + except: + raise KeyError(key) + + self.cache[fil] = (obj,mtime) + return obj + + def __setitem__(self,key,value): + """ db['key'] = 5 """ + fil = self.root / key + parent = fil.parent + if parent and not parent.is_dir(): + parent.mkdir(parents=True) + # We specify protocol 2, so that we can mostly go between Python 2 + # and Python 3. We can upgrade to protocol 3 when Python 2 is obsolete. + with fil.open('wb') as f: + pickle.dump(value, f, protocol=2) + try: + self.cache[fil] = (value, fil.stat().st_mtime) + except OSError as e: + if e.errno != errno.ENOENT: + raise + + def hset(self, hashroot, key, value): + """ hashed set """ + hroot = self.root / hashroot + if not hroot.is_dir(): + hroot.mkdir() + hfile = hroot / gethashfile(key) + d = self.get(hfile, {}) + d.update( {key : value}) + self[hfile] = d + + + + def hget(self, hashroot, key, default = _sentinel, fast_only = True): + """ hashed get """ + hroot = self.root / hashroot + hfile = hroot / gethashfile(key) + + d = self.get(hfile, _sentinel ) + #print "got dict",d,"from",hfile + if d is _sentinel: + if fast_only: + if default is _sentinel: + raise KeyError(key) + + return default + + # slow mode ok, works even after hcompress() + d = self.hdict(hashroot) + + return d.get(key, default) + + def hdict(self, hashroot): + """ Get all data contained in hashed category 'hashroot' as dict """ + hfiles = self.keys(hashroot + "/*") + hfiles.sort() + last = len(hfiles) and hfiles[-1] or '' + if last.endswith('xx'): + # print "using xx" + hfiles = [last] + hfiles[:-1] + + all = {} + + for f in hfiles: + # print "using",f + try: + all.update(self[f]) + except KeyError: + print("Corrupt",f,"deleted - hset is not threadsafe!") + del self[f] + + self.uncache(f) + + return all + + def hcompress(self, hashroot): + """ Compress category 'hashroot', so hset is fast again + + hget will fail if fast_only is True for compressed items (that were + hset before hcompress). + + """ + hfiles = self.keys(hashroot + "/*") + all = {} + for f in hfiles: + # print "using",f + all.update(self[f]) + self.uncache(f) + + self[hashroot + '/xx'] = all + for f in hfiles: + p = self.root / f + if p.name == 'xx': + continue + p.unlink() + + + + def __delitem__(self,key): + """ del db["key"] """ + fil = self.root / key + self.cache.pop(fil,None) + try: + fil.unlink() + except OSError: + # notfound and permission denied are ok - we + # lost, the other process wins the conflict + pass + + def _normalized(self, p): + """ Make a key suitable for user's eyes """ + return str(p.relative_to(self.root)).replace('\\','/') + + def keys(self, globpat = None): + """ All keys in DB, or all keys matching a glob""" + + if globpat is None: + files = self.root.rglob('*') + else: + files = self.root.glob(globpat) + return [self._normalized(p) for p in files if p.is_file()] + + def __iter__(self): + return iter(self.keys()) + + def __len__(self): + return len(self.keys()) + + def uncache(self,*items): + """ Removes all, or specified items from cache + + Use this after reading a large amount of large objects + to free up memory, when you won't be needing the objects + for a while. + + """ + if not items: + self.cache = {} + for it in items: + self.cache.pop(it,None) + + def waitget(self,key, maxwaittime = 60 ): + """ Wait (poll) for a key to get a value + + Will wait for `maxwaittime` seconds before raising a KeyError. + The call exits normally if the `key` field in db gets a value + within the timeout period. + + Use this for synchronizing different processes or for ensuring + that an unfortunately timed "db['key'] = newvalue" operation + in another process (which causes all 'get' operation to cause a + KeyError for the duration of pickling) won't screw up your program + logic. + """ + + wtimes = [0.2] * 3 + [0.5] * 2 + [1] + tries = 0 + waited = 0 + while 1: + try: + val = self[key] + return val + except KeyError: + pass + + if waited > maxwaittime: + raise KeyError(key) + + time.sleep(wtimes[tries]) + waited+=wtimes[tries] + if tries < len(wtimes) -1: + tries+=1 + + def getlink(self,folder): + """ Get a convenient link for accessing items """ + return PickleShareLink(self, folder) + + def __repr__(self): + return "PickleShareDB('%s')" % self.root + + + +class PickleShareLink: + """ A shortdand for accessing nested PickleShare data conveniently. + + Created through PickleShareDB.getlink(), example:: + + lnk = db.getlink('myobjects/test') + lnk.foo = 2 + lnk.bar = lnk.foo + 5 + + """ + def __init__(self, db, keydir ): + self.__dict__.update(locals()) + + def __getattr__(self,key): + return self.__dict__['db'][self.__dict__['keydir']+'/' + key] + def __setattr__(self,key,val): + self.db[self.keydir+'/' + key] = val + def __repr__(self): + db = self.__dict__['db'] + keys = db.keys( self.__dict__['keydir'] +"/*") + return "<PickleShareLink '%s': %s>" % ( + self.__dict__['keydir'], + ";".join([Path(k).basename() for k in keys])) + +def main(): + import textwrap + usage = textwrap.dedent("""\ + pickleshare - manage PickleShare databases + + Usage: + + pickleshare dump /path/to/db > dump.txt + pickleshare load /path/to/db < dump.txt + pickleshare test /path/to/db + """) + DB = PickleShareDB + import sys + if len(sys.argv) < 2: + print(usage) + return + + cmd = sys.argv[1] + args = sys.argv[2:] + if cmd == 'dump': + if not args: args= ['.'] + db = DB(args[0]) + import pprint + pprint.pprint(db.items()) + elif cmd == 'load': + cont = sys.stdin.read() + db = DB(args[0]) + data = eval(cont) + db.clear() + for k,v in db.items(): + db[k] = v + elif cmd == 'testwait': + db = DB(args[0]) + db.clear() + print(db.waitget('250')) + elif cmd == 'test': + test() + stress() + +if __name__== "__main__": + main() + + |