1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 | Demo/pdist/rcslib.py
"""RCS interface module. Defines the class RCS, which represents a directory with rcs version files and (possibly) corresponding work files. """ import fnmatch import os import re import string import tempfile class RCS: """RCS interface class (local filesystem version). An instance of this class represents a directory with rcs version files and (possible) corresponding work files. Methods provide access to most rcs operations such as checkin/checkout, access to the rcs metadata (revisions, logs, branches etc.) as well as some filesystem operations such as listing all rcs version files. XXX BUGS / PROBLEMS - The instance always represents the current directory so it's not very useful to have more than one instance around simultaneously """ # Characters allowed in work file names okchars = string.ascii_letters + string.digits + '-_=+' def __init__(self): """Constructor.""" pass def __del__(self): """Destructor.""" pass # --- Informational methods about a single file/revision --- def log(self, name_rev, otherflags = ''): """Return the full log text for NAME_REV as a string. Optional OTHERFLAGS are passed to rlog. """ f = self._open(name_rev, 'rlog ' + otherflags) data = f.read() status = self._closepipe(f) if status: data = data + "%s: %s" % status elif data[-1] == '\n': data = data[:-1] return data def head(self, name_rev): """Return the head revision for NAME_REV""" dict = self.info(name_rev) return dict['head'] def info(self, name_rev): """Return a dictionary of info (from rlog -h) for NAME_REV The dictionary's keys are the keywords that rlog prints (e.g. 'head' and its values are the corresponding data (e.g. '1.3'). XXX symbolic names and locks are not returned """ f = self._open(name_rev, 'rlog -h') dict = {} while 1: line = f.readline() if not line: break if line[0] == '\t': # XXX could be a lock or symbolic name # Anything else? continue i = string.find(line, ':') if i > 0: key, value = line[:i], string.strip(line[i+1:]) dict[key] = value status = self._closepipe(f) if status: raise IOError, status return dict # --- Methods that change files --- def lock(self, name_rev): """Set an rcs lock on NAME_REV.""" name, rev = self.checkfile(name_rev) cmd = "rcs -l%s %s" % (rev, name) return self._system(cmd) def unlock(self, name_rev): """Clear an rcs lock on NAME_REV.""" name, rev = self.checkfile(name_rev) cmd = "rcs -u%s %s" % (rev, name) return self._system(cmd) def checkout(self, name_rev, withlock=0, otherflags=""): """Check out NAME_REV to its work file. If optional WITHLOCK is set, check out locked, else unlocked. The optional OTHERFLAGS is passed to co without interpretation. Any output from co goes to directly to stdout. """ name, rev = self.checkfile(name_rev) if withlock: lockflag = "-l" else: lockflag = "-u" cmd = 'co %s%s %s %s' % (lockflag, rev, otherflags, name) return self._system(cmd) def checkin(self, name_rev, message=None, otherflags=""): """Check in NAME_REV from its work file. The optional MESSAGE argument becomes the checkin message (default "<none>" if None); or the file description if this is a new file. The optional OTHERFLAGS argument is passed to ci without interpretation. Any output from ci goes to directly to stdout. """ name, rev = self._unmangle(name_rev) new = not self.isvalid(name) if not message: message = "<none>" if message and message[-1] != '\n': message = message + '\n' lockflag = "-u" if new: f = tempfile.NamedTemporaryFile() f.write(message) f.flush() cmd = 'ci %s%s -t%s %s %s' % \ (lockflag, rev, f.name, otherflags, name) else: message = re.sub(r'([\"$`])', r'\\\1', message) cmd = 'ci %s%s -m"%s" %s %s' % \ (lockflag, rev, message, otherflags, name) return self._system(cmd) # --- Exported support methods --- def listfiles(self, pat = None): """Return a list of all version files matching optional PATTERN.""" files = os.listdir(os.curdir) files = filter(self._isrcs, files) if os.path.isdir('RCS'): files2 = os.listdir('RCS') files2 = filter(self._isrcs, files2) files = files + files2 files = map(self.realname, files) return self._filter(files, pat) def isvalid(self, name): """Test whether NAME has a version file associated.""" namev = self.rcsname(name) return (os.path.isfile(namev) or os.path.isfile(os.path.join('RCS', namev))) def rcsname(self, name): """Return the pathname of the version file for NAME. The argument can be a work file name or a version file name. If the version file does not exist, the name of the version file that would be created by "ci" is returned. """ if self._isrcs(name): namev = name else: namev = name + ',v' if os.path.isfile(namev): return namev namev = os.path.join('RCS', os.path.basename(namev)) if os.path.isfile(namev): return namev if os.path.isdir('RCS'): return os.path.join('RCS', namev) else: return namev def realname(self, namev): """Return the pathname of the work file for NAME. The argument can be a work file name or a version file name. If the work file does not exist, the name of the work file that would be created by "co" is returned. """ if self._isrcs(namev): name = namev[:-2] else: name = namev if os.path.isfile(name): return name name = os.path.basename(name) return name def islocked(self, name_rev): """Test whether FILE (which must have a version file) is locked. XXX This does not tell you which revision number is locked and ignores any revision you may pass in (by virtue of using rlog -L -R). """ f = self._open(name_rev, 'rlog -L -R') line = f.readline() status = self._closepipe(f) if status: raise IOError, status if not line: return None if line[-1] == '\n': line = line[:-1] return self.realname(name_rev) == self.realname(line) def checkfile(self, name_rev): """Normalize NAME_REV into a (NAME, REV) tuple. Raise an exception if there is no corresponding version file. """ name, rev = self._unmangle(name_rev) if not self.isvalid(name): raise os.error, 'not an rcs file %r' % (name,) return name, rev # --- Internal methods --- def _open(self, name_rev, cmd = 'co -p', rflag = '-r'): """INTERNAL: open a read pipe to NAME_REV using optional COMMAND. Optional FLAG is used to indicate the revision (default -r). Default COMMAND is "co -p". Return a file object connected by a pipe to the command's output. """ name, rev = self.checkfile(name_rev) namev = self.rcsname(name) if rev: cmd = cmd + ' ' + rflag + rev return os.popen("%s %r" % (cmd, namev)) def _unmangle(self, name_rev): """INTERNAL: Normalize NAME_REV argument to (NAME, REV) tuple. Raise an exception if NAME contains invalid characters. A NAME_REV argument is either NAME string (implying REV='') or a tuple of the form (NAME, REV). """ if type(name_rev) == type(''): name_rev = name, rev = name_rev, '' else: name, rev = name_rev for c in rev: if c not in self.okchars: raise ValueError, "bad char in rev" return name_rev def _closepipe(self, f): """INTERNAL: Close PIPE and print its exit status if nonzero.""" sts = f.close() if not sts: return None detail, reason = divmod(sts, 256) if reason == 0: return 'exit', detail # Exit status signal = reason&0x7F if signal == 0x7F: code = 'stopped' signal = detail else: code = 'killed' if reason&0x80: code = code + '(coredump)' return code, signal def _system(self, cmd): """INTERNAL: run COMMAND in a subshell. Standard input for the command is taken from /dev/null. Raise IOError when the exit status is not zero. Return whatever the calling method should return; normally None. A derived class may override this method and redefine it to capture stdout/stderr of the command and return it. """ cmd = cmd + " </dev/null" sts = os.system(cmd) if sts: raise IOError, "command exit status %d" % sts def _filter(self, files, pat = None): """INTERNAL: Return a sorted copy of the given list of FILES. If a second PATTERN argument is given, only files matching it are kept. No check for valid filenames is made. """ if pat: def keep(name, pat = pat): return fnmatch.fnmatch(name, pat) files = filter(keep, files) else: files = files[:] files.sort() return files def _remove(self, fn): """INTERNAL: remove FILE without complaints.""" try: os.unlink(fn) except os.error: pass def _isrcs(self, name): """INTERNAL: Test whether NAME ends in ',v'.""" return name[-2:] == ',v' |