babymaker  e95a6a9342d4604277fe7cc6149b6b5b24447d89
cache.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 from __future__ import print_function
4 
5 import argparse
6 import glob
7 import subprocess
8 import os
9 import tempfile
10 import shutil
11 import time
12 import sys
13 import signal
14 
15 import utilities
16 
18  def __init__(self, signum, frame):
19  self.signum = signum
20  self.frame = frame
21  def __str__(self):
22  return "Caught signal number "+str(self.signum)
23 
24 def signalHandler(signum, frame):
25  raise SignalError(signum, frame)
26 
27 def mkdirPermissions(path, mode):
28  if not path or path == "/":
29  return
30  (head, tail) = os.path.split(path)
31  mkdirPermissions(head, mode)
32  try:
33  os.mkdir(path)
34  os.chmod(path, mode)
35  except OSError:
36  pass
37 
38 def cacheUpToDate(cache_path, net_path):
39  cache = None
40  net = None
41  try: cache = os.stat(cache_path)
42  except OSError as e:
43  if e.errno == 2: return False
44  else: raise
45  try: net = os.stat(net_path)
46  except OSError as e:
47  if e.errno == 2: return True
48  else: raise
49  return cache.st_mtime >= net.st_mtime and cache.st_size == net.st_size
50 
51 def expand(files):
52  expanded = []
53  for f in files:
54  globbed = glob.glob(f)
55  if len(globbed) > 0:
56  for g in globbed:
57  expanded.append(utilities.fullPath(g))
58  else:
59  expanded.append(utilities.fullPath(f))
60  return expanded
61 
62 def isNetFile(path):
63  return path.startswith("/net/")
64 
65 def cachePath(path):
66  cache_root = utilities.fullPath("/scratch/babymaker")
67  return os.path.join(cache_root, path[5:])
68 
69 def lastTime(path):
70  s = os.stat(path)
71  return max(s.st_ctime, s.st_mtime, s.st_atime)
72 
73 def mapFiles(command, file_map):
74  #Replace executable arguments with cached equivalent
75 
76  expanded_args = []
77  for arg in command:
78  globbed = glob.glob(arg)
79  if len(globbed) > 0:
80  #Argument represents file(s)
81  for f in globbed:
82  expanded_args.append(utilities.fullPath(f))
83  else:
84  expanded_args.append(arg)
85 
86  command = []
87  inv_file_map = dict((cached,net) for net,cached in file_map.iteritems())
88  for arg in expanded_args:
89  if arg in file_map and cacheUpToDate(file_map[arg], arg):
90  #Check if generated cache for file
91  command.append(file_map[arg])
92  elif isNetFile(arg):
93  #Check if pre-existing cache
94  cache_path = cachePath(arg)
95  if cacheUpToDate(cache_path, arg):
96  command.append(cache_path)
97  inv_file_map[cache_path] = arg
98  else:
99  command.append(arg)
100  else:
101  command.append(arg)
102 
103  return command, inv_file_map
104 
105 def netCopy(src, dst):
106  print("Copying "+src+" to "+dst+"\n")
107  try:
108  shutil.copy(src, dst)
109  while not cacheUpToDate(src, dst):
110  #Want cache to be newer so it's considered up to date
111  now = time.time()
112  os.utime(src, (now, now))
113  except:
114  try:
115  os.remove(dst)
116  finally:
117  os.remove(src)
118  utilities.ePrint("Failed to copy "+src+" to "+dst+"\n")
119  raise
120 
121 def removeOldCache(file_map):
122  #Deletes oldest cached file
123  found_file = False
124  oldest_mod_time = 0
125  oldest_path = ""
126  for root, dirs, files in os.walk(utilities.fullPath("/scratch/babymaker")):
127  for f in files:
128  path = os.path.join(root, f)
129  if path in file_map.itervalues(): continue
130  mod_time = lastTime(path)
131  if mod_time < oldest_mod_time or not found_file:
132  found_file = True
133  oldest_mod_time = mod_time
134  oldest_path = path
135 
136  if time.time()-oldest_mod_time <= 86400.:
137  #Don't delete files used in last 24 hours
138  return False
139  oldest_path = utilities.fullPath(oldest_path)
140  if found_file:
141  print("Deleting "+oldest_path+" from cache\n")
142  try: os.remove(oldest_path)
143  except: return False
144  while oldest_path != "/" and oldest_path != "":
145  try: os.rmdir(oldest_path)
146  except OSError: pass
147  finally: oldest_path = os.path.dirname(oldest_path)
148  return True
149  else:
150  return False
151 
152 def cacheCopy(src, dst, min_free, file_map, no_delete):
153  #Cache a copy of src if possible, removing old files from cache if necessary
154 
155  src_size = os.stat(src).st_size * 2 #Safety factor of 2 to account for file growth if cached copy is modified
156 
157  du = os.statvfs(utilities.fullPath("/scratch/babymaker"))
158  avail = du.f_bsize*du.f_bavail
159  while avail-src_size < min_free:
160  #Keep deleting until there's room
161  if no_delete: return
162  removed_file = removeOldCache(file_map)
163  if not removed_file: return
164  du = os.statvfs(utilities.fullPath("/scratch/babymaker"))
165  avail = du.f_bsize*du.f_bavail
166  print("Caching "+src+" to "+dst+"\n")
167  try:
168  shutil.copy(src, dst)
169  os.chmod(dst, 0775)
170  while not cacheUpToDate(dst, src):
171  now = time.time()
172  os.utime(dst, (now, now))
173  except:
174  os.remove(dst)
175  utilities.ePrint("Failed to cache "+src+" to "+dst+"\n")
176  raise
177 
178 def syncCache(net_path, cache_path):
179  try:
180  now = time.time()
181  os.utime(cache_path, (now, now))
182  cache_m_time = os.path.getmtime(cache_path)
183  while not cacheUpToDate(cache_path, net_path):
184  #Make sure cache is newer
185  cache_m_time += 1.
186  now = max(cache_m_time, time.time())
187  os.utime(cache_path, (now, now))
188  except:
189  os.remove(cache_path)
190  utilities.ePrint("Failed to sync cache times")
191  raise
192 
193 def execute(command, file_map, fragile):
194  inv_file_map = dict()
195  if not fragile:
196  command, inv_file_map = mapFiles(command, file_map)
197  else:
198  inv_file_map = dict((cached,net) for net,cached in file_map.iteritems())
199 
200  if len(command) <= 0: return
201 
202  args = ["run/wrapper.sh"]
203  for a in command:
204  args.append(a.lstrip())
205  command = args
206 
207  try:
208  old_mod_times = dict()
209  before_time = round(time.time()-2.)
210  # 2 second safety margin in case executable modifies within access
211  # time resolution (typically 1 second)
212  for f in inv_file_map.iterkeys():
213  os.utime(f, (before_time, before_time))
214  old_mod_times[f] = os.path.getmtime(f)
215 
216  exit_code = 0
217  print("Executing",command,"\n")
219  try:
220  exit_code = subprocess.call(command)
221  except SignalError as e:
222  if e.signum != signal.SIGCLD and e.signum != signal.SIGCHLD:
223  raise e
225 
226  if exit_code != 0:
227  raise Exception("Executable returned non-zero exit code.")
228  except:
229  for f in inv_file_map.iterkeys():
230  os.remove(f)
231  utilities.ePrint("Failed to execute",command,"\n")
232  raise
233  else:
234  for cache_path, net_path in inv_file_map.iteritems():
235  if os.path.getmtime(cache_path) > old_mod_times[cache_path]:
236  #Copy modified files back to /net
237  netCopy(cache_path, net_path)
238  else:
239  syncCache(net_path, cache_path)
240 
241 def cacheRecurse(caches, file_map, command, fragile, min_free, no_delete):
242  if len(caches)==0:
243  #Caching done, run exectuable
244  execute(command, file_map, fragile)
245  return
246 
247  net_path = caches[0]
248  if not isNetFile(net_path):
249  utilities.ePrint("Cannot cache "+net_path+"\n")
250  cacheRecurse(caches[1:], file_map, command, fragile, min_free, no_delete)
251  return
252 
253  mkdirPermissions(os.path.dirname(net_path), 0775)
254  if not os.path.exists(net_path):
255  #If /net file does not exist, create new empty file
256  with open(net_path, "a"):
257  pass
258  cache_path = cachePath(net_path)
259  mkdirPermissions(os.path.dirname(cache_path), 0775)
260 
261  if not cacheUpToDate(cache_path, net_path):
262  #Cache doesn't exist or is outdated, so copy file from /net
263  cacheCopy(net_path, cache_path, min_free, file_map, no_delete)
264 
265  if cacheUpToDate(cache_path, net_path):
266  #Only use cached file if it was created and up-to-date
267  file_map[net_path] = cache_path
268 
269  cacheRecurse(caches[1:], file_map, command, fragile, min_free, no_delete)
270 
271 def cacheRun(caches, command, fragile, abs_limit, rel_limit, no_delete):
272  for s in [sig for sig in dir(signal) if sig.startswith("SIG")
273  and not sig.startswith("SIG_")
274  and sig!="SIGKILL"
275  and sig!="SIGSTOP"]:
276  signum = getattr(signal, s)
277  signal.signal(signum,signalHandler)
278 
279  if not os.path.isdir("/scratch/babymaker"):
280  cacheRecurse([], dict(), command, True, 0, True)
281  return
282  caches = expand(caches)
283  du = os.statvfs(utilities.fullPath("/scratch/babymaker"))
284  min_free = max(abs_limit, du.f_bsize*du.f_blocks*rel_limit)
285  cacheRecurse(caches, dict(), command, fragile, min_free, no_delete)
286 
287 if __name__ == "__main__":
288  parser = argparse.ArgumentParser(description="Automatically creates and, if necessary, deletes local caches of files from /net/cmsX/cmsXr0/babymaker and remaps any files in the provided command to their cached version.",
289  formatter_class=argparse.ArgumentDefaultsHelpFormatter)
290  parser.add_argument("-c", "--cache", nargs="+", default=[],
291  help="Files that should be copied to cache or, if not already existing, created on the cache and moved to /net/cmsX/cmsXr0/babymaker upon completion.")
292  parser.add_argument("-e", "--execute", nargs="+", default=[],
293  help="Command to execute. ./run/wrapper.sh is automatically prepended.")
294  parser.add_argument("--fragile", action="store_true",
295  help="By default, wildcards are expanded and cached paths replaced in the arguments to the provided executable. Setting this flag runs the command \"as is.\" Files will still be cached, but the executable will not automatically use the cached version.")
296  parser.add_argument("--abs_limit", default=10000000000, type=int,
297  help="Minimum number of bytes to leave available in cache.")
298  parser.add_argument("--rel_limit", default=0.5, type=float,
299  help="Minimum fraction of cache to leave available.")
300  parser.add_argument("--no_delete", action="store_false",
301  help="If cache is full, prevents deletion of old files to make room for new ones. Note that it is possible to delete a cached file currently being used by another script, so it is polite to use this flag if the cache is under heavy load.")
302  args = parser.parse_args()
303 
304  cacheRun(args.cache, args.execute, args.fragile, args.abs_limit, args.rel_limit, args.no_delete)
def lastTime(path)
Definition: cache.py:69
def cacheUpToDate(cache_path, net_path)
Definition: cache.py:38
def cacheRecurse(caches, file_map, command, fragile, min_free, no_delete)
Definition: cache.py:241
def execute(command, file_map, fragile)
Definition: cache.py:193
def isNetFile(path)
Definition: cache.py:62
def expand(files)
Definition: cache.py:51
def removeOldCache(file_map)
Definition: cache.py:121
def cacheRun(caches, command, fragile, abs_limit, rel_limit, no_delete)
Definition: cache.py:271
def flush()
Definition: utilities.py:47
def mapFiles(command, file_map)
Definition: cache.py:73
def mkdirPermissions(path, mode)
Definition: cache.py:27
def cacheCopy(src, dst, min_free, file_map, no_delete)
Definition: cache.py:152
def __str__(self)
Definition: cache.py:21
def ePrint(args, kwargs)
Definition: utilities.py:44
def signalHandler(signum, frame)
Definition: cache.py:24
def __init__(self, signum, frame)
Definition: cache.py:18
def netCopy(src, dst)
Definition: cache.py:105
def cachePath(path)
Definition: cache.py:65
def fullPath(path)
Definition: utilities.py:34
def syncCache(net_path, cache_path)
Definition: cache.py:178