Module idict.persistence.locker
Expand source code
# Copyright (c) 2022. Davi Pereira dos Santos
# This file is part of the i-dict project.
# Please respect the license - more about this in the section (*) below.
#
# i-dict is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# i-dict is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with i-dict. If not, see <http://www.gnu.org/licenses/>.
#
# (*) Removing authorship by any means, e.g. by distribution of derived
# works or verbatim, obfuscated, compiled or rewritten versions of any
# part of this work is illegal and it is unethical regarding the effort and
# time spent here.
#
import shelve
from contextlib import contextmanager
from datetime import datetime, timedelta
from functools import partial
from random import random
from threading import Thread
from idict.persistence.shelchemy import sopen
from temporenc import packb, unpackb
from time import sleep
def ping(ctx, item, timeout, stop):
with ctx() as dic:
while not stop[0]:
t = timeout / 2
if t is None or t == 0:
break
while not stop[0] and t > 0:
sleep(min(0.2, t))
t -= 0.2
dic[item] = packb(datetime.now())
def alive(val, timeout):
return timeout is not None and datetime.now() < unpackb(val).datetime() + timedelta(seconds=timeout)
def locker(iterable, dict__url=None, timeout=None, logstep=1):
"""
Generator that skips items from 'iterable' already processed before or still being processed
Item processing is restarted if 'timeout' expires.
'dict_shelf' is a dict-like object or a sqlalchemy url to store and query each item status
'logstep' is the frequency of printed messages, 'None' means 'no logs'.
'timeout'=None keeps the job status as 'started' forever (or until it finishes)
# TODO: improve avoidance of race condition adopting a pre-started state
>>> from time import sleep
>>> names = ["a","b","c","d","e"]
>>> storage = {}
>>> for name in locker(names, dict__url=storage, timeout=10):
... print(f"Processing {name}")
... sleep(0.1)
... print(f"{name} processed!")
'a' is new, started
Processing a
a processed!
'a' done
'b' is new, started
Processing b
b processed!
'b' done
'c' is new, started
Processing c
c processed!
'c' done
'd' is new, started
Processing d
d processed!
'd' done
'e' is new, started
Processing e
e processed!
'e' done
>>> storage
{'a': b'd', 'b': b'd', 'c': b'd', 'd': b'd', 'e': b'd'}
>>> for name in locker(names, dict__url=storage, timeout=1):
... print(f"Processing {name}")
... sleep(0.1)
... print(f"{name} processed!")
'a' already done, skipping
'b' already done, skipping
'c' already done, skipping
'd' already done, skipping
'e' already done, skipping
"""
if dict__url is None:
ctx = partial(shelve.open, "/tmp/locker.db")
elif isinstance(dict__url, str):
ctx = partial(sopen, dict__url, autopack=False)
elif isinstance(dict__url, dict) and hasattr(dict__url, "__contains__"):
@contextmanager
def ctx():
yield dict__url
else:
ctx = dict__url
for c, item in enumerate(iterable):
with ctx() as dic:
while True:
if item in dic:
val = dic[item]
if val == b'd':
status, action = 'already done', "skipping"
elif not alive(val, timeout):
status, action = "expired", "restarted"
else:
status, action = 'already started', "skipping"
else:
status, action = "is new", "started"
if action == "skipping":
break
else:
# Check for race condition.
now = packb(datetime.now())
sleep((random() + 1) / 1000) # ~1ms
if item not in dic or not alive(dic[item], timeout):
break
if logstep is not None and c % logstep == 0:
print(f"'{item}' {status}, {action}")
if action != "skipping":
with ctx() as dic:
# Mark as started, as early as possible.
dic[item] = now
stop = [False]
t = Thread(target=ping, args=(ctx, item, timeout, stop), daemon=True)
t.start()
yield item
stop[0] = True
t.join()
with ctx() as dic:
dic[item] = b'd'
if logstep is not None and c % logstep == 0:
print(f"'{item}' done")
Functions
def alive(val, timeout)
-
Expand source code
def alive(val, timeout): return timeout is not None and datetime.now() < unpackb(val).datetime() + timedelta(seconds=timeout)
def locker(iterable, dict__url=None, timeout=None, logstep=1)
-
Generator that skips items from 'iterable' already processed before or still being processed
Item processing is restarted if 'timeout' expires. 'dict_shelf' is a dict-like object or a sqlalchemy url to store and query each item status 'logstep' is the frequency of printed messages, 'None' means 'no logs'. 'timeout'=None keeps the job status as 'started' forever (or until it finishes)
TODO: improve avoidance of race condition adopting a pre-started state
>>> from time import sleep >>> names = ["a","b","c","d","e"] >>> storage = {} >>> for name in locker(names, dict__url=storage, timeout=10): ... print(f"Processing {name}") ... sleep(0.1) ... print(f"{name} processed!") 'a' is new, started Processing a a processed! 'a' done 'b' is new, started Processing b b processed! 'b' done 'c' is new, started Processing c c processed! 'c' done 'd' is new, started Processing d d processed! 'd' done 'e' is new, started Processing e e processed! 'e' done >>> storage {'a': b'd', 'b': b'd', 'c': b'd', 'd': b'd', 'e': b'd'} >>> for name in locker(names, dict__url=storage, timeout=1): ... print(f"Processing {name}") ... sleep(0.1) ... print(f"{name} processed!") 'a' already done, skipping 'b' already done, skipping 'c' already done, skipping 'd' already done, skipping 'e' already done, skipping
Expand source code
def locker(iterable, dict__url=None, timeout=None, logstep=1): """ Generator that skips items from 'iterable' already processed before or still being processed Item processing is restarted if 'timeout' expires. 'dict_shelf' is a dict-like object or a sqlalchemy url to store and query each item status 'logstep' is the frequency of printed messages, 'None' means 'no logs'. 'timeout'=None keeps the job status as 'started' forever (or until it finishes) # TODO: improve avoidance of race condition adopting a pre-started state >>> from time import sleep >>> names = ["a","b","c","d","e"] >>> storage = {} >>> for name in locker(names, dict__url=storage, timeout=10): ... print(f"Processing {name}") ... sleep(0.1) ... print(f"{name} processed!") 'a' is new, started Processing a a processed! 'a' done 'b' is new, started Processing b b processed! 'b' done 'c' is new, started Processing c c processed! 'c' done 'd' is new, started Processing d d processed! 'd' done 'e' is new, started Processing e e processed! 'e' done >>> storage {'a': b'd', 'b': b'd', 'c': b'd', 'd': b'd', 'e': b'd'} >>> for name in locker(names, dict__url=storage, timeout=1): ... print(f"Processing {name}") ... sleep(0.1) ... print(f"{name} processed!") 'a' already done, skipping 'b' already done, skipping 'c' already done, skipping 'd' already done, skipping 'e' already done, skipping """ if dict__url is None: ctx = partial(shelve.open, "/tmp/locker.db") elif isinstance(dict__url, str): ctx = partial(sopen, dict__url, autopack=False) elif isinstance(dict__url, dict) and hasattr(dict__url, "__contains__"): @contextmanager def ctx(): yield dict__url else: ctx = dict__url for c, item in enumerate(iterable): with ctx() as dic: while True: if item in dic: val = dic[item] if val == b'd': status, action = 'already done', "skipping" elif not alive(val, timeout): status, action = "expired", "restarted" else: status, action = 'already started', "skipping" else: status, action = "is new", "started" if action == "skipping": break else: # Check for race condition. now = packb(datetime.now()) sleep((random() + 1) / 1000) # ~1ms if item not in dic or not alive(dic[item], timeout): break if logstep is not None and c % logstep == 0: print(f"'{item}' {status}, {action}") if action != "skipping": with ctx() as dic: # Mark as started, as early as possible. dic[item] = now stop = [False] t = Thread(target=ping, args=(ctx, item, timeout, stop), daemon=True) t.start() yield item stop[0] = True t.join() with ctx() as dic: dic[item] = b'd' if logstep is not None and c % logstep == 0: print(f"'{item}' done")
def ping(ctx, item, timeout, stop)
-
Expand source code
def ping(ctx, item, timeout, stop): with ctx() as dic: while not stop[0]: t = timeout / 2 if t is None or t == 0: break while not stop[0] and t > 0: sleep(min(0.2, t)) t -= 0.2 dic[item] = packb(datetime.now())
def random()
-
random() -> x in the interval [0, 1).