babymaker  e95a6a9342d4604277fe7cc6149b6b5b24447d89
send_skim_ntuples.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 from __future__ import print_function
4 
5 import argparse
6 import glob
7 import os
8 import numpy
9 import itertools
10 import subprocess
11 import re
12 
13 import utilities
14 
15 def getSkimName(cut):
16  cut = cut.replace(">=","GE")
17  cut = cut.replace("<=","SE")
18  cut = cut.replace("&","_");
19  cut = cut.replace(">","G")
20  cut = cut.replace("<","S")
21  cut = cut.replace("=","");
22  cut = cut.replace("(","")
23  cut = cut.replace(")","")
24  cut = cut.replace("+","");
25  cut = cut.replace("[","")
26  cut = cut.replace("]","")
27  cut = cut.replace("|","_");
28  cut = cut.replace("$","")
29  cut = cut.replace(",","_")
30  cut = cut.replace("!","NOT");
31  cut = cut.replace(" ","")
32  cut = cut.replace("@","")
33  return cut
34 
35 def splitJobs(files, num_jobs):
36  return [ a.tolist() for a in numpy.array_split(numpy.array(files), num_jobs) if len(a.tolist()) > 0 ]
37 
38 def sendSkimJob(in_files, out_files, cut, overwrite, cache, exe_name):
39  python_dir = utilities.fullPath(os.path.dirname(__file__))
40  run_dir = os.path.join(os.path.dirname(out_files[0]), "run")
41  utilities.ensureDir(run_dir)
42  run_file = os.path.join(run_dir, exe_name)
43 
44  with open(run_file, "w") as f:
45  f.write('#! /usr/bin/env python\n')
46  f.write('import sys\n')
47  f.write('sys.path.append("'+python_dir+'")\n')
48  f.write('import subprocess\n')
49  f.write('import cache\n')
50  for in_file, out_file in itertools.izip(in_files, out_files):
51  if os.path.exists(out_file) and not overwrite:
52  continue
53  if cache:
54  f.write('cache.cacheRun(["'+out_file+'","'+in_file+'"],["'
55  +os.path.join(python_dir,'skim_ntuple.py')
56  +'","'+cut+'","'+out_file+'","'+in_file
57  +'"],False,10000000000,0.5,False)\n')
58  else:
59  f.write('subprocess.call(["'+os.path.join(python_dir,'skim_ntuple.py')
60  +'","'+cut+'","'+out_file+'","'+in_file+'"])\n')
61  os.chmod(run_file, 0755)
62 
63  subprocess.call(["JobSubmit.csh","run/wrapper.sh",run_file])
64 
65 def sendSkims(in_dir, num_jobs, cut, out_parent, file_tag, overwrite, cache):
66  in_dir = utilities.fullPath(in_dir)
67  skim_name = getSkimName(cut)
68 
69  if out_parent == None:
70  dir_pat = re.compile("(.*?/cms[0-9]+/cms[0-9]+r0/babymaker/babies/[0-9]{4}_[0-9]{2}_[0-9]{2}/.*?)/")
71  match = dir_pat.search(in_dir+"/")
72  out_parent = match.group(0)
73 
74  out_dir = os.path.join(out_parent,"skim_"+skim_name)
75 
76  in_files = [ f for f in glob.glob(utilities.fullPath(os.path.join(in_dir, "*"+file_tag+"*.root"))) ]
77  out_files = [ f.replace(in_dir, out_dir).replace(".root","_"+skim_name+".root") for f in in_files ]
78 
79  in_files = splitJobs(in_files, num_jobs)
80  out_files = splitJobs(out_files, num_jobs)
81 
82  total_jobs = 0
83  for ijob in xrange(len(in_files)):
84  total_jobs += 1
85  sendSkimJob(in_files[ijob], out_files[ijob], cut, overwrite, cache,
86  skim_name+"_"+file_tag+"_"+str(ijob)+"_of_"+str(num_jobs)+".py")
87 
88  print("Submitted "+str(total_jobs)+" jobs.")
89  print("Output sent to {}".format(out_dir))
90 
91 if __name__ == "__main__":
92  parser = argparse.ArgumentParser(description="Submits jobs to skim non-SMS ntuples.",
93  formatter_class=argparse.ArgumentDefaultsHelpFormatter)
94  parser.add_argument("in_dir", help="Directory from which to read pre-skim ntuples. E.g. /net/cmsX/cmsXr0/babymaker/babies/YYYY_MM_DD/data/unskimmed/alldata")
95  parser.add_argument("cut", help="Skim cut to apply.")
96  parser.add_argument("out_dir", default=None, nargs="?",
97  help="Parent directory in which to place skim_XYZ directory. If omitted, attempts to use the YYYY_MM_DD/data_or_mc directory corresponding to the input directory.")
98  parser.add_argument("num_jobs", type=int, nargs="?", default=100,
99  help="Number of jobs over which to divide skimming.")
100  parser.add_argument("file_tag", metavar="file_tag", default="", nargs="?",
101  help="Only skim files matching %(metavar)s. Matches all files if blank.")
102  parser.add_argument("-o","--overwrite", action="store_true",
103  help="Remake skimmed output file even if it already exists.")
104  parser.add_argument("--cache", action="store_true",
105  help="Enable use of file caching system")
106  args = parser.parse_args()
107 
108  sendSkims(args.in_dir, args.num_jobs, args.cut, args.out_dir, args.file_tag, args.overwrite, args.cache)
def ensureDir(path)
Definition: utilities.py:37
def sendSkimJob(in_files, out_files, cut, overwrite, cache, exe_name)
def sendSkims(in_dir, num_jobs, cut, out_parent, file_tag, overwrite, cache)
def fullPath(path)
Definition: utilities.py:34
def splitJobs(files, num_jobs)