3 from __future__
import print_function
22 return "Caught signal number "+str(self.
signum)
28 if not path
or path ==
"/":
30 (head, tail) = os.path.split(path)
41 try: cache = os.stat(cache_path)
43 if e.errno == 2:
return False 45 try: net = os.stat(net_path)
47 if e.errno == 2:
return True 49 return cache.st_mtime >= net.st_mtime
and cache.st_size == net.st_size
54 globbed = glob.glob(f)
63 return path.startswith(
"/net/")
67 return os.path.join(cache_root, path[5:])
71 return max(s.st_ctime, s.st_mtime, s.st_atime)
78 globbed = glob.glob(arg)
84 expanded_args.append(arg)
87 inv_file_map = dict((cached,net)
for net,cached
in file_map.iteritems())
88 for arg
in expanded_args:
91 command.append(file_map[arg])
96 command.append(cache_path)
97 inv_file_map[cache_path] = arg
103 return command, inv_file_map
106 print(
"Copying "+src+
" to "+dst+
"\n")
108 shutil.copy(src, dst)
112 os.utime(src, (now, now))
128 path = os.path.join(root, f)
129 if path
in file_map.itervalues():
continue 131 if mod_time < oldest_mod_time
or not found_file:
133 oldest_mod_time = mod_time
136 if time.time()-oldest_mod_time <= 86400.:
141 print(
"Deleting "+oldest_path+
" from cache\n")
142 try: os.remove(oldest_path)
144 while oldest_path !=
"/" and oldest_path !=
"":
145 try: os.rmdir(oldest_path)
147 finally: oldest_path = os.path.dirname(oldest_path)
155 src_size = os.stat(src).st_size * 2
158 avail = du.f_bsize*du.f_bavail
159 while avail-src_size < min_free:
163 if not removed_file:
return 165 avail = du.f_bsize*du.f_bavail
166 print(
"Caching "+src+
" to "+dst+
"\n")
168 shutil.copy(src, dst)
172 os.utime(dst, (now, now))
181 os.utime(cache_path, (now, now))
182 cache_m_time = os.path.getmtime(cache_path)
186 now = max(cache_m_time, time.time())
187 os.utime(cache_path, (now, now))
189 os.remove(cache_path)
194 inv_file_map = dict()
196 command, inv_file_map =
mapFiles(command, file_map)
198 inv_file_map = dict((cached,net)
for net,cached
in file_map.iteritems())
200 if len(command) <= 0:
return 202 args = [
"run/wrapper.sh"]
204 args.append(a.lstrip())
208 old_mod_times = dict()
209 before_time = round(time.time()-2.)
212 for f
in inv_file_map.iterkeys():
213 os.utime(f, (before_time, before_time))
214 old_mod_times[f] = os.path.getmtime(f)
217 print(
"Executing",command,
"\n")
220 exit_code = subprocess.call(command)
221 except SignalError
as e:
222 if e.signum != signal.SIGCLD
and e.signum != signal.SIGCHLD:
227 raise Exception(
"Executable returned non-zero exit code.")
229 for f
in inv_file_map.iterkeys():
234 for cache_path, net_path
in inv_file_map.iteritems():
235 if os.path.getmtime(cache_path) > old_mod_times[cache_path]:
241 def cacheRecurse(caches, file_map, command, fragile, min_free, no_delete):
244 execute(command, file_map, fragile)
250 cacheRecurse(caches[1:], file_map, command, fragile, min_free, no_delete)
254 if not os.path.exists(net_path):
256 with open(net_path,
"a"):
263 cacheCopy(net_path, cache_path, min_free, file_map, no_delete)
267 file_map[net_path] = cache_path
269 cacheRecurse(caches[1:], file_map, command, fragile, min_free, no_delete)
271 def cacheRun(caches, command, fragile, abs_limit, rel_limit, no_delete):
272 for s
in [sig
for sig
in dir(signal)
if sig.startswith(
"SIG")
273 and not sig.startswith(
"SIG_")
276 signum = getattr(signal, s)
277 signal.signal(signum,signalHandler)
279 if not os.path.isdir(
"/scratch/babymaker"):
284 min_free = max(abs_limit, du.f_bsize*du.f_blocks*rel_limit)
285 cacheRecurse(caches, dict(), command, fragile, min_free, no_delete)
287 if __name__ ==
"__main__":
288 parser = argparse.ArgumentParser(description=
"Automatically creates and, if necessary, deletes local caches of files from /net/cmsX/cmsXr0/babymaker and remaps any files in the provided command to their cached version.",
289 formatter_class=argparse.ArgumentDefaultsHelpFormatter)
290 parser.add_argument(
"-c",
"--cache", nargs=
"+", default=[],
291 help=
"Files that should be copied to cache or, if not already existing, created on the cache and moved to /net/cmsX/cmsXr0/babymaker upon completion.")
292 parser.add_argument(
"-e",
"--execute", nargs=
"+", default=[],
293 help=
"Command to execute. ./run/wrapper.sh is automatically prepended.")
294 parser.add_argument(
"--fragile", action=
"store_true",
295 help=
"By default, wildcards are expanded and cached paths replaced in the arguments to the provided executable. Setting this flag runs the command \"as is.\" Files will still be cached, but the executable will not automatically use the cached version.")
296 parser.add_argument(
"--abs_limit", default=10000000000, type=int,
297 help=
"Minimum number of bytes to leave available in cache.")
298 parser.add_argument(
"--rel_limit", default=0.5, type=float,
299 help=
"Minimum fraction of cache to leave available.")
300 parser.add_argument(
"--no_delete", action=
"store_false",
301 help=
"If cache is full, prevents deletion of old files to make room for new ones. Note that it is possible to delete a cached file currently being used by another script, so it is polite to use this flag if the cache is under heavy load.")
302 args = parser.parse_args()
304 cacheRun(args.cache, args.execute, args.fragile, args.abs_limit, args.rel_limit, args.no_delete)
def cacheUpToDate(cache_path, net_path)
def cacheRecurse(caches, file_map, command, fragile, min_free, no_delete)
def execute(command, file_map, fragile)
def removeOldCache(file_map)
def cacheRun(caches, command, fragile, abs_limit, rel_limit, no_delete)
def mapFiles(command, file_map)
def mkdirPermissions(path, mode)
def cacheCopy(src, dst, min_free, file_map, no_delete)
def signalHandler(signum, frame)
def __init__(self, signum, frame)
def syncCache(net_path, cache_path)