Move bug-tool into the Python Museum

This commit is contained in:
Robin Gareus 2019-02-28 18:21:05 +01:00
parent c83ba53399
commit 06911bd7e2
No known key found for this signature in database
GPG key ID: A090BCE02CF57F04
10 changed files with 0 additions and 6883 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,9 +0,0 @@
import ClientCookie
def debug(text):
if ClientCookie.CLIENTCOOKIE_DEBUG: _debug(text)
def _debug(text, *args):
if args:
text = text % args
ClientCookie.DEBUG_STREAM.write(text+"\n")

View file

@ -1,224 +0,0 @@
"""HTTP header value parsing utility functions.
from ClientCookie._HeadersUtil import split_header_words
values = split_header_words(h.headers["Content-Type"])
This module provides a few functions that help parsing and construction of
valid HTTP header values.
Copyright 1997-1998, Gisle Aas
Copyright 2002-2003, John J. Lee
This code is free software; you can redistribute it and/or modify it under
the terms of the BSD License (see the file COPYING included with the
distribution).
"""
import re, string
from types import StringType
try:
from types import UnicodeType
STRING_TYPES = StringType, UnicodeType
except:
STRING_TYPES = StringType,
from _Util import startswith, endswith, http2time
try: True
except NameError:
True = 1
False = 0
def unmatched(match):
"""Return unmatched part of re.Match object."""
start, end = match.span(0)
return match.string[:start]+match.string[end:]
# XXX I really can't see what this =* was for (came from LWP, I guess)
#token_re = re.compile(r"^\s*(=*[^\s=;,]+)")
token_re = re.compile(r"^\s*([^=\s;,]+)")
quoted_value_re = re.compile(r"^\s*=\s*\"([^\"\\]*(?:\\.[^\"\\]*)*)\"")
value_re = re.compile(r"^\s*=\s*([^\s;,]*)")
escape_re = re.compile(r"\\(.)")
def split_header_words(header_values):
r"""Parse header values into a list of lists containing key,value pairs.
The function knows how to deal with ",", ";" and "=" as well as quoted
values after "=". A list of space separated tokens are parsed as if they
were separated by ";".
If the header_values passed as argument contains multiple values, then they
are treated as if they were a single value separated by comma ",".
This means that this function is useful for parsing header fields that
follow this syntax (BNF as from the HTTP/1.1 specification, but we relax
the requirement for tokens).
headers = #header
header = (token | parameter) *( [";"] (token | parameter))
token = 1*<any CHAR except CTLs or separators>
separators = "(" | ")" | "<" | ">" | "@"
| "," | ";" | ":" | "\" | <">
| "/" | "[" | "]" | "?" | "="
| "{" | "}" | SP | HT
quoted-string = ( <"> *(qdtext | quoted-pair ) <"> )
qdtext = <any TEXT except <">>
quoted-pair = "\" CHAR
parameter = attribute "=" value
attribute = token
value = token | quoted-string
Each header is represented by a list of key/value pairs. The value for a
simple token (not part of a parameter) is None. Syntactically incorrect
headers will not necessarily be parsed as you would want.
This is easier to describe with some examples:
>>> split_header_words(['foo="bar"; port="80,81"; discard, bar=baz'])
[[('foo', 'bar'), ('port', '80,81'), ('discard', None)], [('bar', 'baz')]]
>>> split_header_words(['text/html; charset="iso-8859-1"'])
[[('text/html', None), ('charset', 'iso-8859-1')]]
>>> split_header_words([r'Basic realm="\"foo\bar\""'])
[[('Basic', None), ('realm', '"foobar"')]]
"""
assert type(header_values) not in STRING_TYPES
result = []
for text in header_values:
orig_text = text
pairs = []
while text:
m = token_re.search(text)
if m:
text = unmatched(m)
name = m.group(1)
m = quoted_value_re.search(text)
if m: # quoted value
text = unmatched(m)
value = m.group(1)
value = escape_re.sub(r"\1", value)
else:
m = value_re.search(text)
if m: # unquoted value
text = unmatched(m)
value = m.group(1)
value = string.rstrip(value)
else:
# no value, a lone token
value = None
pairs.append((name, value))
elif startswith(string.lstrip(text), ","):
# concatenated headers, as per RFC 2616 section 4.2
text = string.lstrip(text)[1:]
if pairs: result.append(pairs)
pairs = []
else:
# skip junk
non_junk, nr_junk_chars = re.subn("^[=\s;]*", "", text)
assert nr_junk_chars > 0, (
"split_header_words bug: '%s', '%s', %s" %
(orig_text, text, pairs))
text = non_junk
if pairs: result.append(pairs)
return result
join_escape_re = re.compile(r"([\"\\])")
def join_header_words(lists):
"""Do the inverse of the conversion done by split_header_words.
Takes a list of lists of (key, value) pairs and produces a single header
value. Attribute values are quoted if needed.
>>> join_header_words([[("text/plain", None), ("charset", "iso-8859/1")]])
'text/plain; charset="iso-8859/1"'
>>> join_header_words([[("text/plain", None)], [("charset", "iso-8859/1")]])
'text/plain, charset="iso-8859/1"'
"""
headers = []
for pairs in lists:
attr = []
for k, v in pairs:
if v is not None:
if not re.search(r"^\w+$", v):
v = join_escape_re.sub(r"\\\1", v) # escape " and \
v = '"%s"' % v
if k is None: # Netscape cookies may have no name
k = v
else:
k = "%s=%s" % (k, v)
attr.append(k)
if attr: headers.append(string.join(attr, "; "))
return string.join(headers, ", ")
def parse_ns_headers(ns_headers):
"""Ad-hoc parser for Netscape protocol cookie-attributes.
The old Netscape cookie format for Set-Cookie can for instance contain
an unquoted "," in the expires field, so we have to use this ad-hoc
parser instead of split_header_words.
XXX This may not make the best possible effort to parse all the crap
that Netscape Cookie headers contain. Ronald Tschalar's HTTPClient
parser is probably better, so could do worse than following that if
this ever gives any trouble.
Currently, this is also used for parsing RFC 2109 cookies.
"""
known_attrs = ("expires", "domain", "path", "secure",
# RFC 2109 attrs (may turn up in Netscape cookies, too)
"port", "max-age")
result = []
for ns_header in ns_headers:
pairs = []
version_set = False
for param in re.split(r";\s*", ns_header):
param = string.rstrip(param)
if param == "": continue
if "=" not in param:
if string.lower(param) in known_attrs:
k, v = param, None
else:
# cookie with missing name
k, v = None, param
else:
k, v = re.split(r"\s*=\s*", param, 1)
k = string.lstrip(k)
if k is not None:
lc = string.lower(k)
if lc in known_attrs:
k = lc
if k == "version":
# This is an RFC 2109 cookie. Will be treated as RFC 2965
# cookie in rest of code.
# Probably it should be parsed with split_header_words, but
# that's too much hassle.
version_set = True
if k == "expires":
# convert expires date to seconds since epoch
if startswith(v, '"'): v = v[1:]
if endswith(v, '"'): v = v[:-1]
v = http2time(v) # None if invalid
pairs.append((k, v))
if pairs:
if not version_set:
pairs.append(("version", "0"))
result.append(pairs)
return result
def _test():
import doctest, _HeadersUtil
return doctest.testmod(_HeadersUtil)
if __name__ == "__main__":
_test()

View file

@ -1,377 +0,0 @@
"""Mozilla / Netscape cookie loading / saving.
Copyright 1997-1999 Gisle Aas (libwww-perl)
Copyright 2002-2003 Johnny Lee <typo_pl@hotmail.com> (MSIE Perl code)
Copyright 2002-2003 John J Lee <jjl@pobox.com> (The Python port)
This code is free software; you can redistribute it and/or modify it under
the terms of the BSD License (see the file COPYING included with the
distribution).
"""
import os, re, string, time, struct
if os.name == "nt":
import _winreg
from _ClientCookie import CookieJar, Cookie, MISSING_FILENAME_TEXT
from _Util import startswith
from _Debug import debug
try: True
except NameError:
True = 1
False = 0
def regload(path, leaf):
key = _winreg.OpenKey(_winreg.HKEY_CURRENT_USER, path, 0, _winreg.KEY_ALL_ACCESS)
try:
value = _winreg.QueryValueEx(key, leaf)[0]
except WindowsError:
value = None
return value
WIN32_EPOCH = 0x019db1ded53e8000L # 1970 Jan 01 00:00:00 in Win32 FILETIME
def epoch_time_offset_from_win32_filetime(filetime):
"""Convert from win32 filetime to seconds-since-epoch value.
MSIE stores create and expire times as Win32 FILETIME, which is 64
bits of 100 nanosecond intervals since Jan 01 1601.
Cookies code expects time in 32-bit value expressed in seconds since
the epoch (Jan 01 1970).
"""
if filetime < WIN32_EPOCH:
raise ValueError("filetime (%d) is before epoch (%d)" %
(filetime, WIN32_EPOCH))
return divmod((filetime - WIN32_EPOCH), 10000000L)[0]
def binary_to_char(c): return "%02X" % ord(c)
def binary_to_str(d): return string.join(map(binary_to_char, list(d)), "")
class MSIECookieJar(CookieJar):
"""
This class differs from CookieJar only in the format it uses to load cookies
from a file.
MSIECookieJar can read the cookie files of Microsoft Internet Explorer
(MSIE) for Windows, versions 5 and 6, on Windows NT and XP respectively.
Other configurations may also work, but are untested. Saving cookies in
MSIE format is NOT supported. If you save cookies, they'll be in the usual
Set-Cookie3 format, which you can read back in using an instance of the
plain old CookieJar class. Don't save using the same filename that you
loaded cookies from, because you may succeed in clobbering your MSIE
cookies index file!
You should be able to have LWP share Internet Explorer's cookies like
this (note you need to supply a username to load_from_registry if you're on
Windows 9x):
cookies = MSIECookieJar(delayload=1)
# find cookies index file in registry and load cookies from it
cookies.load_from_registry()
opener = ClientCookie.build_opener(ClientCookie.HTTPHandler(cookies))
response = opener.open("http://foo.bar.com/")
Iterating over a delayloaded MSIECookieJar instance will not cause any
cookies to be read from disk. To force reading of all cookies from disk,
call read_all_cookies. Note that the following methods iterate over self:
clear_temporary_cookies, clear_expired_cookies, __len__, __repr__, __str__
and as_string.
Additional methods:
load_from_registry(ignore_discard=False, ignore_expires=False,
username=None)
load_cookie_data(filename, ignore_discard=False, ignore_expires=False)
read_all_cookies()
"""
magic_re = re.compile(r"Client UrlCache MMF Ver \d\.\d.*")
padding = "\x0d\xf0\xad\x0b"
msie_domain_re = re.compile(r"^([^/]+)(/.*)$")
cookie_re = re.compile("Cookie\:.+\@([\x21-\xFF]+).*?"
"(.+\@[\x21-\xFF]+\.txt)")
# path under HKEY_CURRENT_USER from which to get location of index.dat
reg_path = r"software\microsoft\windows" \
r"\currentversion\explorer\shell folders"
reg_key = "Cookies"
def __init__(self, *args, **kwargs):
apply(CookieJar.__init__, (self, args, kwargs))
self._delayload_domains = {}
def set_cookie(self, cookie):
if self.delayload:
self._delayload_domain(cookie.domain)
CookieJar.set_cookie(self, cookie)
def _cookies_for_domain(self, domain, request, unverifiable):
debug("Checking %s for cookies to return" % domain)
if not self.policy.domain_return_ok(domain, request, unverifiable):
return []
if self.delayload:
self._delayload_domain(domain)
return CookieJar._cookies_for_domain(
self, domain, request, unverifiable)
def read_all_cookies(self):
"""Eagerly read in all cookies."""
if self.delayload:
for domain in self._delayload_domains.keys():
self._delayload_domain(domain)
def _delayload_domain(self, domain):
# if necessary, lazily load cookies for this domain
delayload_info = self._delayload_domains.get(domain)
if delayload_info is not None:
cookie_file, ignore_discard, ignore_expires = delayload_info
try:
self.load_cookie_data(cookie_file,
ignore_discard, ignore_expires)
except IOError:
debug("error reading cookie file, skipping: %s" % cookie_file)
else:
del self._delayload_domains[domain]
def _load_cookies_from_file(self, filename):
cookies = []
cookies_fh = open(filename)
try:
while 1:
key = cookies_fh.readline()
if key == "": break
rl = cookies_fh.readline
def getlong(rl=rl): return long(rl().rstrip())
def getstr(rl=rl): return rl().rstrip()
key = key.rstrip()
value = getstr()
domain_path = getstr()
flags = getlong() # 0x2000 bit is for secure I think
lo_expire = getlong()
hi_expire = getlong()
lo_create = getlong()
hi_create = getlong()
sep = getstr()
if "" in (key, value, domain_path, flags, hi_expire, lo_expire,
hi_create, lo_create, sep) or (sep != "*"):
break
m = self.msie_domain_re.search(domain_path)
if m:
domain = m.group(1)
path = m.group(2)
cookies.append({"KEY": key, "VALUE": value, "DOMAIN": domain,
"PATH": path, "FLAGS": flags, "HIXP": hi_expire,
"LOXP": lo_expire, "HICREATE": hi_create,
"LOCREATE": lo_create})
finally:
cookies_fh.close()
return cookies
def load_cookie_data(self, filename,
ignore_discard=False, ignore_expires=False):
"""Load cookies from file containing actual cookie data.
Old cookies are kept unless overwritten by newly loaded ones.
You should not call this method if the delayload attribute is set.
I think each of these files contain all cookies for one user, domain,
and path.
filename: file containing cookies -- usually found in a file like
C:\WINNT\Profiles\joe\Cookies\joe@blah[1].txt
"""
now = int(time.time())
cookie_data = self._load_cookies_from_file(filename)
for cookie in cookie_data:
flags = cookie["FLAGS"]
secure = ((flags & 0x2000) != 0)
filetime = (cookie["HIXP"] << 32) + cookie["LOXP"]
expires = epoch_time_offset_from_win32_filetime(filetime)
if expires < now:
discard = True
else:
discard = False
domain = cookie["DOMAIN"]
initial_dot = startswith(domain, ".")
if initial_dot:
domain_specified = True
else:
# MSIE 5 does not record whether the domain cookie-attribute
# was specified.
# Assuming it wasn't is conservative, because with strict
# domain matching this will match less frequently; with regular
# Netscape tail-matching, this will match at exactly the same
# times that domain_specified = True would. It also means we
# don't have to prepend a dot to achieve consistency with our
# own & Mozilla's domain-munging scheme.
domain_specified = False
# assume path_specified is false
# XXX is there other stuff in here? -- eg. comment, commentURL?
c = Cookie(0,
cookie["KEY"], cookie["VALUE"],
None, False,
domain, domain_specified, initial_dot,
cookie["PATH"], False,
secure,
expires,
discard,
None,
None,
{"flags": flags})
if not ignore_discard and c.discard:
continue
if not ignore_expires and c.is_expired(now):
continue
self.set_cookie(c)
def load_from_registry(self, ignore_discard=False, ignore_expires=False,
username=None):
"""
username: only required on win9x
"""
cookies_dir = regload(self.reg_path, self.reg_key)
filename = os.path.normpath(os.path.join(cookies_dir, "INDEX.DAT"))
self.load(filename, ignore_discard, ignore_expires, username)
def load(self, filename, ignore_discard=False, ignore_expires=False,
username=None):
"""Load cookies from an MSIE 'index.dat' cookies index file.
filename: full path to cookie index file
username: only required on win9x
"""
if filename is None:
if self.filename is not None: filename = self.filename
else: raise ValueError(MISSING_FILENAME_TEXT)
index = open(filename, "rb")
try:
self._really_load(index, filename, ignore_discard, ignore_expires,
username)
finally:
index.close()
def _really_load(self, index, filename, ignore_discard, ignore_expires,
username):
now = int(time.time())
if username is None:
username = string.lower(os.environ['USERNAME'])
cookie_dir = os.path.dirname(filename)
data = index.read(256)
if len(data) != 256:
raise IOError("%s file is too short" % filename)
# Cookies' index.dat file starts with 32 bytes of signature
# followed by an offset to the first record, stored as a little-
# endian DWORD.
sig, size, data = data[:32], data[32:36], data[36:]
size = struct.unpack("<L", size)[0]
# check that sig is valid
if not self.magic_re.match(sig) or size != 0x4000:
raise IOError("%s ['%s' %s] does not seem to contain cookies" %
(str(filename), sig, size))
# skip to start of first record
index.seek(size, 0)
sector = 128 # size of sector in bytes
while 1:
data = ""
# Cookies are usually in two contiguous sectors, so read in two
# sectors and adjust if not a Cookie.
to_read = 2 * sector
d = index.read(to_read)
if len(d) != to_read:
break
data = data + d
# Each record starts with a 4-byte signature and a count
# (little-endian DWORD) of sectors for the record.
sig, size, data = data[:4], data[4:8], data[8:]
size = struct.unpack("<L", size)[0]
to_read = (size - 2) * sector
## from urllib import quote
## print "data", quote(data)
## print "sig", quote(sig)
## print "size in sectors", size
## print "size in bytes", size*sector
## print "size in units of 16 bytes", (size*sector) / 16
## print "size to read in bytes", to_read
## print
if sig != "URL ":
assert (sig in ("HASH", "LEAK",
self.padding, "\x00\x00\x00\x00"),
"unrecognized MSIE index.dat record: %s" %
binary_to_str(sig))
if sig == "\x00\x00\x00\x00":
# assume we've got all the cookies, and stop
break
if sig == self.padding:
continue
# skip the rest of this record
assert to_read >= 0
if size != 2:
assert to_read != 0
index.seek(to_read, 1)
continue
# read in rest of record if necessary
if size > 2:
more_data = index.read(to_read)
if len(more_data) != to_read: break
data = data + more_data
cookie_re = ("Cookie\:%s\@([\x21-\xFF]+).*?" % username +
"(%s\@[\x21-\xFF]+\.txt)" % username)
m = re.search(cookie_re, data, re.I)
if m:
cookie_file = os.path.join(cookie_dir, m.group(2))
if not self.delayload:
try:
self.load_cookie_data(cookie_file,
ignore_discard, ignore_expires)
except IOError:
debug("error reading cookie file, skipping: %s" %
cookie_file)
else:
domain = m.group(1)
i = domain.find("/")
if i != -1:
domain = domain[:i]
self._delayload_domains[domain] = (
cookie_file, ignore_discard, ignore_expires)

View file

@ -1,171 +0,0 @@
"""Mozilla / Netscape cookie loading / saving.
Copyright 1997-1999 Gisle Aas (libwww-perl)
Copyright 2002-2003 John J Lee <jjl@pobox.com> (The Python port)
This code is free software; you can redistribute it and/or modify it under
the terms of the BSD License (see the file COPYING included with the
distribution).
"""
import sys, re, string, time
import ClientCookie
from _ClientCookie import CookieJar, Cookie, MISSING_FILENAME_TEXT
from _Util import startswith, endswith
from _Debug import debug
try: True
except NameError:
True = 1
False = 0
try: issubclass(Exception(), (Exception,))
except TypeError:
real_issubclass = issubclass
from _Util import compat_issubclass
issubclass = compat_issubclass
del compat_issubclass
class MozillaCookieJar(CookieJar):
"""
WARNING: you may want to backup your browser's cookies file if you use
this class to save cookies. I *think* it works, but there have been
bugs in the past!
This class differs from CookieJar only in the format it uses to save and
load cookies to and from a file. This class uses the Netscape/Mozilla
`cookies.txt' format.
Don't expect cookies saved while the browser is running to be noticed by
the browser (in fact, Mozilla on unix will overwrite your saved cookies if
you change them on disk while it's running; on Windows, you probably can't
save at all while the browser is running).
Note that the Netscape/Mozilla format will downgrade RFC2965 cookies to
Netscape cookies on saving.
In particular, the cookie version and port number information is lost,
together with information about whether or not Path, Port and Discard were
specified by the Set-Cookie2 (or Set-Cookie) header, and whether or not the
domain as set in the HTTP header started with a dot (yes, I'm aware some
domains in Netscape files start with a dot and some don't -- trust me, you
really don't want to know any more about this).
Note that though Mozilla and Netscape use the same format, they use
slightly different headers. The class saves cookies using the Netscape
header by default (Mozilla can cope with that).
"""
magic_re = "#( Netscape)? HTTP Cookie File"
header = """\
# Netscape HTTP Cookie File
# http://www.netscape.com/newsref/std/cookie_spec.html
# This is a generated file! Do not edit.
"""
def _really_load(self, f, filename, ignore_discard, ignore_expires):
now = time.time()
magic = f.readline()
if not re.search(self.magic_re, magic):
f.close()
raise IOError(
"%s does not look like a Netscape format cookies file" %
filename)
try:
while 1:
line = f.readline()
if line == "": break
# last field may be absent, so keep any trailing tab
if endswith(line, "\n"): line = line[:-1]
# skip comments and blank lines XXX what is $ for?
if (startswith(string.strip(line), "#") or
startswith(string.strip(line), "$") or
string.strip(line) == ""):
continue
domain, domain_specified, path, secure, expires, name, value = \
string.split(line, "\t")
secure = (secure == "TRUE")
domain_specified = (domain_specified == "TRUE")
if name == "": name = None
initial_dot = startswith(domain, ".")
assert domain_specified == initial_dot
discard = False
if expires == "":
expires = None
discard = True
# assume path_specified is false
c = Cookie(0, name, value,
None, False,
domain, domain_specified, initial_dot,
path, False,
secure,
expires,
discard,
None,
None,
{})
if not ignore_discard and c.discard:
continue
if not ignore_expires and c.is_expired(now):
continue
self.set_cookie(c)
except:
unmasked = (KeyboardInterrupt, SystemExit)
if ClientCookie.CLIENTCOOKIE_DEBUG:
unmasked = (Exception,)
etype = sys.exc_info()[0]
if issubclass(etype, IOError) or \
issubclass(etype, unmasked):
raise
raise IOError("invalid Netscape format file %s: %s" %
(filename, line))
def save(self, filename=None, ignore_discard=False, ignore_expires=False):
if filename is None:
if self.filename is not None: filename = self.filename
else: raise ValueError(MISSING_FILENAME_TEXT)
f = open(filename, "w")
try:
f.write(self.header)
now = time.time()
debug("Saving Netscape cookies.txt file")
for cookie in self:
if not ignore_discard and cookie.discard:
debug(" Not saving %s: marked for discard" % cookie.name)
continue
if not ignore_expires and cookie.is_expired(now):
debug(" Not saving %s: expired" % cookie.name)
continue
if cookie.secure: secure = "TRUE"
else: secure = "FALSE"
if startswith(cookie.domain, "."): initial_dot = "TRUE"
else: initial_dot = "FALSE"
if cookie.expires is not None:
expires = str(cookie.expires)
else:
expires = ""
if cookie.name is not None:
name = cookie.name
else:
name = ""
f.write(
string.join([cookie.domain, initial_dot, cookie.path,
secure, expires, name, cookie.value], "\t")+
"\n")
finally:
f.close()

View file

@ -1,459 +0,0 @@
"""Python backwards-compat., date/time routines, seekable file object wrapper.
Copyright 2002-2003 John J Lee <jjl@pobox.com>
This code is free software; you can redistribute it and/or modify it under
the terms of the BSD License (see the file COPYING included with the
distribution).
"""
try: True
except NameError:
True = 1
False = 0
import re, string, time
from types import TupleType
from StringIO import StringIO
try:
from exceptions import StopIteration
except ImportError:
from ClientCookie._ClientCookie import StopIteration
def startswith(string, initial):
if len(initial) > len(string): return False
return string[:len(initial)] == initial
def endswith(string, final):
if len(final) > len(string): return False
return string[-len(final):] == final
def compat_issubclass(obj, tuple_or_class):
# for 2.1 and below
if type(tuple_or_class) == TupleType:
for klass in tuple_or_class:
if issubclass(obj, klass):
return True
return False
return issubclass(obj, tuple_or_class)
def isstringlike(x):
try: x+""
except: return False
else: return True
try:
from calendar import timegm
timegm((2045, 1, 1, 22, 23, 32)) # overflows in 2.1
except:
# Number of days per month (except for February in leap years)
mdays = [0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
# Return 1 for leap years, 0 for non-leap years
def isleap(year):
return year % 4 == 0 and (year % 100 <> 0 or year % 400 == 0)
# Return number of leap years in range [y1, y2)
# Assume y1 <= y2 and no funny (non-leap century) years
def leapdays(y1, y2):
return (y2+3)/4 - (y1+3)/4
EPOCH = 1970
def timegm(tuple):
"""Unrelated but handy function to calculate Unix timestamp from GMT."""
year, month, day, hour, minute, second = tuple[:6]
assert year >= EPOCH
assert 1 <= month <= 12
days = 365*(year-EPOCH) + leapdays(EPOCH, year)
for i in range(1, month):
days = days + mdays[i]
if month > 2 and isleap(year):
days = days + 1
days = days + day - 1
hours = days*24 + hour
minutes = hours*60 + minute
seconds = minutes*60L + second
return seconds
# Date/time conversion routines for formats used by the HTTP protocol.
EPOCH = 1970
def my_timegm(tt):
year, month, mday, hour, min, sec = tt[:6]
if ((year >= EPOCH) and (1 <= month <= 12) and (1 <= mday <= 31) and
(0 <= hour <= 24) and (0 <= min <= 59) and (0 <= sec <= 61)):
return timegm(tt)
else:
return None
days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
months_lower = []
for month in months: months_lower.append(string.lower(month))
def time2isoz(t=None):
"""Return a string representing time in seconds since epoch, t.
If the function is called without an argument, it will use the current
time.
The format of the returned string is like "YYYY-MM-DD hh:mm:ssZ",
representing Universal Time (UTC, aka GMT). An example of this format is:
1994-11-24 08:49:37Z
"""
if t is None: t = time.time()
year, mon, mday, hour, min, sec = time.gmtime(t)[:6]
return "%04d-%02d-%02d %02d:%02d:%02dZ" % (
year, mon, mday, hour, min, sec)
def time2netscape(t=None):
"""Return a string representing time in seconds since epoch, t.
If the function is called without an argument, it will use the current
time.
The format of the returned string is like this:
Wdy, DD-Mon-YYYY HH:MM:SS GMT
"""
if t is None: t = time.time()
year, mon, mday, hour, min, sec, wday = time.gmtime(t)[:7]
return "%s %02d-%s-%04d %02d:%02d:%02d GMT" % (
days[wday], mday, months[mon-1], year, hour, min, sec)
UTC_ZONES = {"GMT": None, "UTC": None, "UT": None, "Z": None}
timezone_re = re.compile(r"^([-+])?(\d\d?):?(\d\d)?$")
def offset_from_tz_string(tz):
offset = None
if UTC_ZONES.has_key(tz):
offset = 0
else:
m = timezone_re.search(tz)
if m:
offset = 3600 * int(m.group(2))
if m.group(3):
offset = offset + 60 * int(m.group(3))
if m.group(1) == '-':
offset = -offset
return offset
def _str2time(day, mon, yr, hr, min, sec, tz):
# translate month name to number
# month numbers start with 1 (January)
try:
mon = months_lower.index(string.lower(mon))+1
except ValueError:
# maybe it's already a number
try:
imon = int(mon)
except ValueError:
return None
if 1 <= imon <= 12:
mon = imon
else:
return None
# make sure clock elements are defined
if hr is None: hr = 0
if min is None: min = 0
if sec is None: sec = 0
yr = int(yr)
day = int(day)
hr = int(hr)
min = int(min)
sec = int(sec)
if yr < 1000:
# find "obvious" year
cur_yr = time.localtime(time.time())[0]
m = cur_yr % 100
tmp = yr
yr = yr + cur_yr - m
m = m - tmp
if abs(m) > 50:
if m > 0: yr = yr + 100
else: yr = yr - 100
# convert UTC time tuple to seconds since epoch (not timezone-adjusted)
t = my_timegm((yr, mon, day, hr, min, sec, tz))
if t is not None:
# adjust time using timezone string, to get absolute time since epoch
if tz is None:
tz = "UTC"
tz = string.upper(tz)
offset = offset_from_tz_string(tz)
if offset is None:
return None
t = t - offset
return t
strict_re = re.compile(r"^[SMTWF][a-z][a-z], (\d\d) ([JFMASOND][a-z][a-z]) (\d\d\d\d) (\d\d):(\d\d):(\d\d) GMT$")
wkday_re = re.compile(
r"^(?:Sun|Mon|Tue|Wed|Thu|Fri|Sat)[a-z]*,?\s*", re.I)
loose_http_re = re.compile(
r"""^
(\d\d?) # day
(?:\s+|[-\/])
(\w+) # month
(?:\s+|[-\/])
(\d+) # year
(?:
(?:\s+|:) # separator before clock
(\d\d?):(\d\d) # hour:min
(?::(\d\d))? # optional seconds
)? # optional clock
\s*
([-+]?\d{2,4}|(?![APap][Mm]\b)[A-Za-z]+)? # timezone
\s*
(?:\(\w+\))? # ASCII representation of timezone in parens.
\s*$""", re.X)
def http2time(text):
"""Returns time in seconds since epoch of time represented by a string.
Return value is an integer.
None is returned if the format of str is unrecognized, the time is outside
the representable range, or the timezone string is not recognized. The
time formats recognized are the same as for parse_date. If the string
contains no timezone, UTC is assumed.
The timezone in the string may be numerical (like "-0800" or "+0100") or a
string timezone (like "UTC", "GMT", "BST" or "EST"). Currently, only the
timezone strings equivalent to UTC (zero offset) are known to the function.
The function loosely parses the following formats:
Wed, 09 Feb 1994 22:23:32 GMT -- HTTP format
Tuesday, 08-Feb-94 14:15:29 GMT -- old rfc850 HTTP format
Tuesday, 08-Feb-1994 14:15:29 GMT -- broken rfc850 HTTP format
09 Feb 1994 22:23:32 GMT -- HTTP format (no weekday)
08-Feb-94 14:15:29 GMT -- rfc850 format (no weekday)
08-Feb-1994 14:15:29 GMT -- broken rfc850 format (no weekday)
The parser ignores leading and trailing whitespace. The time may be
absent.
If the year is given with only 2 digits, then parse_date will select the
century that makes the year closest to the current date.
"""
# fast exit for strictly conforming string
m = strict_re.search(text)
if m:
g = m.groups()
mon = months_lower.index(string.lower(g[1])) + 1
tt = (int(g[2]), mon, int(g[0]),
int(g[3]), int(g[4]), float(g[5]))
return my_timegm(tt)
# No, we need some messy parsing...
# clean up
text = string.lstrip(text)
text = wkday_re.sub("", text, 1) # Useless weekday
# tz is time zone specifier string
day, mon, yr, hr, min, sec, tz = [None]*7
# loose regexp parse
m = loose_http_re.search(text)
if m is not None:
day, mon, yr, hr, min, sec, tz = m.groups()
else:
return None # bad format
return _str2time(day, mon, yr, hr, min, sec, tz)
iso_re = re.compile(
"""^
(\d{4}) # year
[-\/]?
(\d\d?) # numerical month
[-\/]?
(\d\d?) # day
(?:
(?:\s+|[-:Tt]) # separator before clock
(\d\d?):?(\d\d) # hour:min
(?::?(\d\d(?:\.\d*)?))? # optional seconds (and fractional)
)? # optional clock
\s*
([-+]?\d\d?:?(:?\d\d)?
|Z|z)? # timezone (Z is "zero meridian", i.e. GMT)
\s*$""", re.X)
def iso2time(text):
"""
As for httpstr2time, but parses the ISO 8601 formats:
1994-02-03 14:15:29 -0100 -- ISO 8601 format
1994-02-03 14:15:29 -- zone is optional
1994-02-03 -- only date
1994-02-03T14:15:29 -- Use T as separator
19940203T141529Z -- ISO 8601 compact format
19940203 -- only date
"""
# clean up
text = string.lstrip(text)
# tz is time zone specifier string
day, mon, yr, hr, min, sec, tz = [None]*7
# loose regexp parse
m = iso_re.search(text)
if m is not None:
# XXX there's an extra bit of the timezone I'm ignoring here: is
# this the right thing to do?
yr, mon, day, hr, min, sec, tz, _ = m.groups()
else:
return None # bad format
return _str2time(day, mon, yr, hr, min, sec, tz)
# XXX Andrew Dalke kindly sent me a similar class in response to my request on
# comp.lang.python, which I then proceeded to lose. I wrote this class
# instead, but I think he's released his code publicly since, could pinch the
# tests from it, at least...
class seek_wrapper:
"""Adds a seek method to a file object.
This is only designed for seeking on readonly file-like objects.
Wrapped file-like object must have a read method. The readline method is
only supported if that method is present on the wrapped object. The
readlines method is always supported. xreadlines and iteration are
supported only for Python 2.2 and above.
Public attribute: wrapped (the wrapped file object).
WARNING: All other attributes of the wrapped object (ie. those that are not
one of wrapped, read, readline, readlines, xreadlines, __iter__ and next)
are passed through unaltered, which may or may not make sense for your
particular file object.
"""
# General strategy is to check that cache is full enough, then delegate
# everything to the cache (self._cache, which is a StringIO.StringIO
# instance. Seems to be some cStringIO.StringIO problem on 1.5.2 -- I
# get a StringOobject, with no readlines method.
# Invariant: the end of the cache is always at the same place as the
# end of the wrapped file:
# self.wrapped.tell() == len(self._cache.getvalue())
def __init__(self, wrapped):
self.wrapped = wrapped
self.__have_readline = hasattr(self.wrapped, "readline")
self.__cache = StringIO()
def __getattr__(self, name): return getattr(self.wrapped, name)
def seek(self, offset, whence=0):
# make sure we have read all data up to the point we are seeking to
pos = self.__cache.tell()
if whence == 0: # absolute
to_read = offset - pos
elif whence == 1: # relative to current position
to_read = offset
elif whence == 2: # relative to end of *wrapped* file
# since we don't know yet where the end of that file is, we must
# read everything
to_read = None
if to_read >= 0 or to_read is None:
if to_read is None:
self.__cache.write(self.wrapped.read())
else:
self.__cache.write(self.wrapped.read(to_read))
self.__cache.seek(pos)
return self.__cache.seek(offset, whence)
def read(self, size=-1):
pos = self.__cache.tell()
self.__cache.seek(pos)
end = len(self.__cache.getvalue())
available = end - pos
# enough data already cached?
if size <= available and size != -1:
return self.__cache.read(size)
# no, so read sufficient data from wrapped file and cache it
to_read = size - available
assert to_read > 0 or size == -1
self.__cache.seek(0, 2)
if size == -1:
self.__cache.write(self.wrapped.read())
else:
self.__cache.write(self.wrapped.read(to_read))
self.__cache.seek(pos)
return self.__cache.read(size)
def readline(self, size=-1):
if not self.__have_readline:
raise NotImplementedError("no readline method on wrapped object")
# line we're about to read might not be complete in the cache, so
# read another line first
pos = self.__cache.tell()
self.__cache.seek(0, 2)
self.__cache.write(self.wrapped.readline())
self.__cache.seek(pos)
data = self.__cache.readline()
if size != -1:
r = data[:size]
self.__cache.seek(pos+size)
else:
r = data
return r
def readlines(self, sizehint=-1):
pos = self.__cache.tell()
self.__cache.seek(0, 2)
self.__cache.write(self.wrapped.read())
self.__cache.seek(pos)
try:
return self.__cache.readlines(sizehint)
except TypeError: # 1.5.2 hack
return self.__cache.readlines()
def __iter__(self): return self
def next(self):
line = self.readline()
if line == "": raise StopIteration
return line
xreadlines = __iter__
def __repr__(self):
return ("<%s at %s whose wrapped object = %s>" %
(self.__class__.__name__, `id(self)`, `self.wrapped`))
def close(self):
self.read = None
self.readline = None
self.readlines = None
self.seek = None
if self.wrapped: self.wrapped.close()
self.wrapped = None

View file

@ -1,49 +0,0 @@
# Import names so that they can be imported directly from the package, like
# this:
#from ClientCookie import <whatever>
try: True
except NameError:
True = 1
False = 0
import sys
# don't edit these here: do eg.
# import ClientCookie; ClientCookie.HTTP_DEBUG = 1
DEBUG_STREAM = sys.stderr
CLIENTCOOKIE_DEBUG = False
REDIRECT_DEBUG = False
HTTP_DEBUG = False
from _ClientCookie import VERSION, __doc__, \
CookieJar, Cookie, \
CookiePolicy, DefaultCookiePolicy, \
lwp_cookie_str
from _MozillaCookieJar import MozillaCookieJar
from _MSIECookieJar import MSIECookieJar
try:
from urllib2 import AbstractHTTPHandler
except ImportError:
pass
else:
from ClientCookie._urllib2_support import \
HTTPHandler, build_opener, install_opener, urlopen, \
HTTPRedirectHandler
from ClientCookie._urllib2_support import \
OpenerDirector, BaseProcessor, \
HTTPRequestUpgradeProcessor, \
HTTPEquivProcessor, SeekableProcessor, HTTPCookieProcessor, \
HTTPRefererProcessor, HTTPStandardHeadersProcessor, \
HTTPRefreshProcessor, HTTPErrorProcessor, \
HTTPResponseDebugProcessor
import httplib
if hasattr(httplib, 'HTTPS'):
from ClientCookie._urllib2_support import HTTPSHandler
del AbstractHTTPHandler, httplib
from _Util import http2time
str2time = http2time
del http2time
del sys

View file

@ -1,713 +0,0 @@
"""Integration with Python standard library module urllib2.
Also includes a redirection bugfix, support for parsing HTML HEAD blocks for
the META HTTP-EQUIV tag contents, and following Refresh header redirects.
Copyright 2002-2003 John J Lee <jjl@pobox.com>
This code is free software; you can redistribute it and/or modify it under
the terms of the BSD License (see the file COPYING included with the
distribution).
"""
import copy, time
import ClientCookie
from _ClientCookie import CookieJar, request_host
from _Util import isstringlike
from _Debug import _debug
try: True
except NameError:
True = 1
False = 0
CHUNK = 1024 # size of chunks fed to HTML HEAD parser, in bytes
try:
from urllib2 import AbstractHTTPHandler
except ImportError:
pass
else:
import urlparse, urllib2, urllib, httplib, htmllib, formatter, string
from urllib2 import URLError, HTTPError
import types, string, socket
from cStringIO import StringIO
from _Util import seek_wrapper
try:
import threading
_threading = threading; del threading
except ImportError:
import dummy_threading
_threading = dummy_threading; del dummy_threading
# This fixes a bug in urllib2 as of Python 2.1.3 and 2.2.2
# (http://www.python.org/sf/549151)
# 2.2.3 is broken here (my fault!), 2.3 is fixed.
class HTTPRedirectHandler(urllib2.BaseHandler):
# maximum number of redirections before assuming we're in a loop
max_redirections = 10
# Implementation notes:
# To avoid the server sending us into an infinite loop, the request
# object needs to track what URLs we have already seen. Do this by
# adding a handler-specific attribute to the Request object. The value
# of the dict is used to count the number of times the same url has
# been visited. This is needed because this isn't necessarily a loop:
# there is more than one way to redirect (Refresh, 302, 303, 307).
# Another handler-specific Request attribute, original_url, is used to
# remember the URL of the original request so that it is possible to
# decide whether or not RFC 2965 cookies should be turned on during
# redirect.
# Always unhandled redirection codes:
# 300 Multiple Choices: should not handle this here.
# 304 Not Modified: no need to handle here: only of interest to caches
# that do conditional GETs
# 305 Use Proxy: probably not worth dealing with here
# 306 Unused: what was this for in the previous versions of protocol??
def redirect_request(self, newurl, req, fp, code, msg, headers):
"""Return a Request or None in response to a redirect.
This is called by the http_error_30x methods when a redirection
response is received. If a redirection should take place, return a
new Request to allow http_error_30x to perform the redirect;
otherwise, return None to indicate that an HTTPError should be
raised.
"""
if code in (301, 302, 303) or (code == 307 and not req.has_data()):
# Strictly (according to RFC 2616), 301 or 302 in response to
# a POST MUST NOT cause a redirection without confirmation
# from the user (of urllib2, in this case). In practice,
# essentially all clients do redirect in this case, so we do
# the same.
return Request(newurl, headers=req.headers)
else:
raise HTTPError(req.get_full_url(), code, msg, headers, fp)
def http_error_302(self, req, fp, code, msg, headers):
if headers.has_key('location'):
newurl = headers['location']
elif headers.has_key('uri'):
newurl = headers['uri']
else:
return
newurl = urlparse.urljoin(req.get_full_url(), newurl)
# XXX Probably want to forget about the state of the current
# request, although that might interact poorly with other
# handlers that also use handler-specific request attributes
new = self.redirect_request(newurl, req, fp, code, msg, headers)
if new is None:
return
# remember where we started from
if hasattr(req, "original_url"):
new.original_url = req.original_url
else:
new.original_url = req.get_full_url()
# loop detection
# .error_302_dict[(url, code)] is number of times url
# previously visited as a result of a redirection with this
# code (error_30x_dict would be a better name).
new.origin_req_host = req.origin_req_host
if not hasattr(req, 'error_302_dict'):
new.error_302_dict = req.error_302_dict = {(newurl, code): 1}
else:
ed = new.error_302_dict = req.error_302_dict
nr_visits = ed.get((newurl, code), 0)
# Refreshes generate fake 302s, so we can hit the same URL as
# a result of the same redirection code twice without
# necessarily being in a loop! So, allow two visits to each
# URL as a result of each redirection code.
if len(ed) < self.max_redirections and nr_visits < 2:
ed[(newurl, code)] = nr_visits + 1
else:
raise HTTPError(req.get_full_url(), code,
self.inf_msg + msg, headers, fp)
if ClientCookie.REDIRECT_DEBUG:
_debug("redirecting to %s", newurl)
# Don't close the fp until we are sure that we won't use it
# with HTTPError.
fp.read()
fp.close()
return self.parent.open(new)
http_error_301 = http_error_303 = http_error_307 = http_error_302
inf_msg = "The HTTP server returned a redirect error that would " \
"lead to an infinite loop.\n" \
"The last 30x error message was:\n"
class Request(urllib2.Request):
def __init__(self, url, data=None, headers={}):
urllib2.Request.__init__(self, url, data, headers)
self.unredirected_hdrs = {}
def add_unredirected_header(self, key, val):
# these headers do not persist from one request to the next in a chain
# of requests
self.unredirected_hdrs[string.capitalize(key)] = val
def has_key(self, header_name):
if (self.headers.has_key(header_name) or
self.unredirected_hdrs.has_key(header_name)):
return True
return False
def get(self, header_name, failobj=None):
if self.headers.has_key(header_name):
return self.headers[header_name]
if self.unredirected_headers.has_key(header_name):
return self.unredirected_headers[header_name]
return failobj
class BaseProcessor:
processor_order = 500
def add_parent(self, parent):
self.parent = parent
def close(self):
self.parent = None
def __lt__(self, other):
if not hasattr(other, "processor_order"):
return True
return self.processor_order < other.processor_order
class HTTPRequestUpgradeProcessor(BaseProcessor):
# upgrade Request to class with support for headers that don't get
# redirected
processor_order = 0 # before anything else
def http_request(self, request):
if not hasattr(request, "add_unredirected_header"):
request = Request(request._Request__original, request.data,
request.headers)
return request
https_request = http_request
class HTTPEquivProcessor(BaseProcessor):
"""Append META HTTP-EQUIV headers to regular HTTP headers."""
def http_response(self, request, response):
if not hasattr(response, "seek"):
response = seek_wrapper(response)
# grab HTTP-EQUIV headers and add them to the true HTTP headers
headers = response.info()
for hdr, val in parse_head(response):
headers[hdr] = val
response.seek(0)
return response
https_response = http_response
# XXX ATM this only takes notice of http responses -- probably
# should be independent of protocol scheme (http, ftp, etc.)
class SeekableProcessor(BaseProcessor):
"""Make responses seekable."""
def http_response(self, request, response):
if not hasattr(response, "seek"):
return seek_wrapper(response)
return response
https_response = http_response
# XXX if this gets added to urllib2, unverifiable would end up as an
# attribute on Request.
class HTTPCookieProcessor(BaseProcessor):
"""Handle HTTP cookies."""
def __init__(self, cookies=None):
if cookies is None:
cookies = CookieJar()
self.cookies = cookies
def _unverifiable(self, request):
if hasattr(request, "error_302_dict") and request.error_302_dict:
redirect = True
else:
redirect = False
if (redirect or
(hasattr(request, "unverifiable") and request.unverifiable)):
unverifiable = True
else:
unverifiable = False
return unverifiable
def http_request(self, request):
unverifiable = self._unverifiable(request)
if not unverifiable:
# Stuff request-host of this origin transaction into Request
# object, because we need to know it to know whether cookies
# should be in operation during derived requests (redirects,
# specifically -- including refreshes).
request.origin_req_host = request_host(request)
self.cookies.add_cookie_header(request, unverifiable)
return request
def http_response(self, request, response):
unverifiable = self._unverifiable(request)
self.cookies.extract_cookies(response, request, unverifiable)
return response
https_request = http_request
https_response = http_response
class HTTPRefererProcessor(BaseProcessor):
"""Add Referer header to requests.
This only makes sense if you use each RefererProcessor for a single
chain of requests only (so, for example, if you use a single
HTTPRefererProcessor to fetch a series of URLs extracted from a single
page, this will break).
"""
def __init__(self):
self.referer = None
def http_request(self, request):
if ((self.referer is not None) and
not request.has_key("Referer")):
request.add_unredirected_header("Referer", self.referer)
return request
def http_response(self, request, response):
self.referer = response.geturl()
return response
https_request = http_request
https_response = http_response
class HTTPStandardHeadersProcessor(BaseProcessor):
def http_request(self, request):
host = request.get_host()
if not host:
raise URLError('no host given')
if request.has_data(): # POST
data = request.get_data()
if not request.has_key('Content-type'):
request.add_unredirected_header(
'Content-type',
'application/x-www-form-urlencoded')
if not request.has_key('Content-length'):
request.add_unredirected_header(
'Content-length', '%d' % len(data))
scheme, sel = urllib.splittype(request.get_selector())
sel_host, sel_path = urllib.splithost(sel)
if not request.has_key('Host'):
request.add_unredirected_header('Host', sel_host or host)
for name, value in self.parent.addheaders:
name = string.capitalize(name)
if not request.has_key(name):
request.add_unredirected_header(name, value)
return request
https_request = http_request
class HTTPResponseDebugProcessor(BaseProcessor):
processor_order = 900 # before redirections, after everything else
def http_response(self, request, response):
if not hasattr(response, "seek"):
response = seek_wrapper(response)
_debug(response.read())
_debug("*****************************************************")
response.seek(0)
return response
https_response = http_response
class HTTPRefreshProcessor(BaseProcessor):
"""Perform HTTP Refresh redirections.
Note that if a non-200 HTTP code has occurred (for example, a 30x
redirect), this processor will do nothing.
By default, only zero-time Refresh headers are redirected. Use the
max_time constructor argument to allow Refresh with longer pauses.
Use the honor_time argument to control whether the requested pause
is honoured (with a time.sleep()) or skipped in favour of immediate
redirection.
"""
processor_order = 1000
def __init__(self, max_time=0, honor_time=True):
self.max_time = max_time
self.honor_time = honor_time
def http_response(self, request, response):
code, msg, hdrs = response.code, response.msg, response.info()
if code == 200 and hdrs.has_key("refresh"):
refresh = hdrs["refresh"]
i = string.find(refresh, ";")
if i != -1:
pause, newurl_spec = refresh[:i], refresh[i+1:]
i = string.find(newurl_spec, "=")
if i != -1:
pause = int(pause)
if pause <= self.max_time:
if pause != 0 and self.honor_time:
time.sleep(pause)
newurl = newurl_spec[i+1:]
# fake a 302 response
hdrs["location"] = newurl
response = self.parent.error(
'http', request, response, 302, msg, hdrs)
return response
https_response = http_response
class HTTPErrorProcessor(BaseProcessor):
"""Process non-200 HTTP error responses.
This just passes the job on to the Handler.<proto>_error_<code>
methods, via the OpenerDirector.error method.
"""
processor_order = 1000
def http_response(self, request, response):
code, msg, hdrs = response.code, response.msg, response.info()
if code != 200:
response = self.parent.error(
'http', request, response, code, msg, hdrs)
return response
https_response = http_response
class OpenerDirector(urllib2.OpenerDirector):
# XXX might be useful to have remove_processor, too (say you want to
# set a new RefererProcessor, but keep the old CookieProcessor --
# could always just create everything anew, though (using old
# CookieJar object to create CookieProcessor)
def __init__(self):
urllib2.OpenerDirector.__init__(self)
#self.processors = []
self.process_response = {}
self.process_request = {}
def add_handler(self, handler):
# XXX
# tidy me
# the same handler could be added twice without detection
added = 0
for meth in dir(handler.__class__):
if meth[-5:] == '_open':
protocol = meth[:-5]
if self.handle_open.has_key(protocol):
self.handle_open[protocol].append(handler)
self.handle_open[protocol].sort()
else:
self.handle_open[protocol] = [handler]
added = 1
continue
i = string.find(meth, '_')
j = string.find(meth[i+1:], '_') + i + 1
if j != -1 and meth[i+1:j] == 'error':
proto = meth[:i]
kind = meth[j+1:]
try:
kind = int(kind)
except ValueError:
pass
dict = self.handle_error.get(proto, {})
if dict.has_key(kind):
dict[kind].append(handler)
dict[kind].sort()
else:
dict[kind] = [handler]
self.handle_error[proto] = dict
added = 1
continue
if meth[-9:] == "_response":
protocol = meth[:-9]
if self.process_response.has_key(protocol):
self.process_response[protocol].append(handler)
self.process_response[protocol].sort()
else:
self.process_response[protocol] = [handler]
added = True
continue
elif meth[-8:] == "_request":
protocol = meth[:-8]
if self.process_request.has_key(protocol):
self.process_request[protocol].append(handler)
self.process_request[protocol].sort()
else:
self.process_request[protocol] = [handler]
added = True
continue
if added:
self.handlers.append(handler)
self.handlers.sort()
handler.add_parent(self)
## def add_processor(self, processor):
## added = False
## for meth in dir(processor):
## if meth[-9:] == "_response":
## protocol = meth[:-9]
## if self.process_response.has_key(protocol):
## self.process_response[protocol].append(processor)
## self.process_response[protocol].sort()
## else:
## self.process_response[protocol] = [processor]
## added = True
## continue
## elif meth[-8:] == "_request":
## protocol = meth[:-8]
## if self.process_request.has_key(protocol):
## self.process_request[protocol].append(processor)
## self.process_request[protocol].sort()
## else:
## self.process_request[protocol] = [processor]
## added = True
## continue
## if added:
## self.processors.append(processor)
## # XXX base class sorts .handlers, but I have no idea why
## #self.processors.sort()
## processor.add_parent(self)
def _request(self, url_or_req, data):
if isstringlike(url_or_req):
req = Request(url_or_req, data)
else:
# already a urllib2.Request instance
req = url_or_req
if data is not None:
req.add_data(data)
return req
def open(self, fullurl, data=None):
req = self._request(fullurl, data)
type = req.get_type()
# pre-process request
# XXX should we allow a Processor to change the type (URL
# scheme) of the request?
meth_name = type+"_request"
for processor in self.process_request.get(type, []):
meth = getattr(processor, meth_name)
req = meth(req)
response = urllib2.OpenerDirector.open(self, req, data)
# post-process response
meth_name = type+"_response"
for processor in self.process_response.get(type, []):
meth = getattr(processor, meth_name)
response = meth(req, response)
return response
## def close(self):
## urllib2.OpenerDirector.close(self)
## for processor in self.processors:
## processor.close()
## self.processors = []
# Note the absence of redirect and header-adding code here
# (AbstractHTTPHandler), and the lack of other clutter that would be
# here without Processors.
class AbstractHTTPHandler(urllib2.BaseHandler):
def do_open(self, http_class, req):
host = req.get_host()
if not host:
raise URLError('no host given')
h = http_class(host) # will parse host:port
if ClientCookie.HTTP_DEBUG:
h.set_debuglevel(1)
if req.has_data():
h.putrequest('POST', req.get_selector())
else:
h.putrequest('GET', req.get_selector())
for k, v in req.headers.items():
h.putheader(k, v)
for k, v in req.unredirected_hdrs.items():
h.putheader(k, v)
# httplib will attempt to connect() here. be prepared
# to convert a socket error to a URLError.
try:
h.endheaders()
except socket.error, err:
raise URLError(err)
if req.has_data():
h.send(req.get_data())
code, msg, hdrs = h.getreply()
fp = h.getfile()
response = urllib.addinfourl(fp, hdrs, req.get_full_url())
response.code = code
response.msg = msg
return response
# XXX would self.reset() work, instead of raising this exception?
class EndOfHeadError(Exception): pass
class HeadParser(htmllib.HTMLParser):
# only these elements are allowed in or before HEAD of document
head_elems = ("html", "head",
"title", "base",
"script", "style", "meta", "link", "object")
def __init__(self):
htmllib.HTMLParser.__init__(self, formatter.NullFormatter())
self.http_equiv = []
def start_meta(self, attrs):
http_equiv = content = None
for key, value in attrs:
if key == "http-equiv":
http_equiv = value
elif key == "content":
content = value
if http_equiv is not None:
self.http_equiv.append((http_equiv, content))
def handle_starttag(self, tag, method, attrs):
if tag in self.head_elems:
method(attrs)
else:
raise EndOfHeadError()
def handle_endtag(self, tag, method):
if tag in self.head_elems:
method()
else:
raise EndOfHeadError()
def end_head(self):
raise EndOfHeadError()
def parse_head(file):
"""Return a list of key, value pairs."""
hp = HeadParser()
while 1:
data = file.read(CHUNK)
try:
hp.feed(data)
except EndOfHeadError:
break
if len(data) != CHUNK:
# this should only happen if there is no HTML body, or if
# CHUNK is big
break
return hp.http_equiv
class HTTPHandler(AbstractHTTPHandler):
def http_open(self, req):
return self.do_open(httplib.HTTP, req)
if hasattr(httplib, 'HTTPS'):
class HTTPSHandler(AbstractHTTPHandler):
def https_open(self, req):
return self.do_open(httplib.HTTPS, req)
def build_opener(*handlers):
"""Create an opener object from a list of handlers and processors.
The opener will use several default handlers and processors, including
support for HTTP and FTP. If there is a ProxyHandler, it must be at the
front of the list of handlers. (Yuck. This is fixed in 2.3.)
If any of the handlers passed as arguments are subclasses of the
default handlers, the default handlers will not be used.
"""
opener = OpenerDirector()
default_classes = [
# handlers
urllib2.ProxyHandler,
urllib2.UnknownHandler,
HTTPHandler, # from this module (derived from new AbstractHTTPHandler)
urllib2.HTTPDefaultErrorHandler,
HTTPRedirectHandler, # from this module (bugfixed)
urllib2.FTPHandler,
urllib2.FileHandler,
# processors
HTTPRequestUpgradeProcessor,
#HTTPEquivProcessor,
#SeekableProcessor,
HTTPCookieProcessor,
#HTTPRefererProcessor,
HTTPStandardHeadersProcessor,
#HTTPRefreshProcessor,
HTTPErrorProcessor
]
if hasattr(httplib, 'HTTPS'):
default_classes.append(HTTPSHandler)
skip = []
for klass in default_classes:
for check in handlers:
if type(check) == types.ClassType:
if issubclass(check, klass):
skip.append(klass)
elif type(check) == types.InstanceType:
if isinstance(check, klass):
skip.append(klass)
for klass in skip:
default_classes.remove(klass)
to_add = []
for klass in default_classes:
to_add.append(klass())
for h in handlers:
if type(h) == types.ClassType:
h = h()
to_add.append(h)
for instance in to_add:
opener.add_handler(instance)
## # yuck
## if hasattr(instance, "processor_order"):
## opener.add_processor(instance)
## else:
## opener.add_handler(instance)
return opener
_opener = None
urlopen_lock = _threading.Lock()
def urlopen(url, data=None):
global _opener
if _opener is None:
urlopen_lock.acquire()
try:
if _opener is None:
_opener = build_opener()
finally:
urlopen_lock.release()
return _opener.open(url, data)
def install_opener(opener):
global _opener
_opener = opener

File diff suppressed because it is too large Load diff

View file

@ -1,349 +0,0 @@
#! /usr/bin/python
# By Taybin Rutkin
#
# TODO
# look along $PATH to find binary.
# Use ardour binary to get version info
# hide file information from reporters
# standard
import os
import Queue
import re
import shutil
import string
import sys
import tempfile
import threading
import warnings
# probably installed
import pygtk
pygtk.require("2.0")
import gtk, gnome.ui
# we provide ClientForm and ClientCookie in $prefix/share/ardour/
sys.path.append('/usr/local/share/ardour/')
sys.path.append('/opt/local/share/ardour/')
sys.path.append('/opt/share/ardour/')
sys.path.append('/usr/share/ardour/')
# probably not installed
import ClientForm
import ClientCookie
warnings.filterwarnings('ignore', message="tempnam is a potential security risk to your program")
g_name = os.tempnam('/tmp', 'bugrp')
os.mkdir(g_name)
class NoBlock(threading.Thread):
def __init__(self, data, queue):
threading.Thread.__init__(self)
self.data = data
self.queue = queue
def zip(self):
self.queue.put('zipping')
os.system('tar cvfz ' + g_name + '.tar.gz ' + g_name + ' > /dev/null')
def login(self):
self.queue.put('logging in')
response = ClientCookie.urlopen('http://ardour.org/mantis/login.php?username=bug_tool&password=bug_tool')
print response
response.close()
def get_form(self):
self.queue.put('forming data')
response = ClientCookie.urlopen('http://ardour.org/mantis/bug_report_page.php')
print response
forms = ClientForm.ParseResponse(response)
self.form = forms[2]
def upload(self):
self.queue.put('uploading')
self.form.add_file(open(g_name+'.tar.gz'), 'application/x-gzip', 'system-info.tar.gz')
self.form['description'] = self.data['long']
self.form['summary'] = self.data['short']
self.form['custom_field_3'] = self.data['email']
self.form['custom_field_4'] = self.data['name']
request = self.form.click()
response2 = ClientCookie.urlopen(request)
response2.close()
def run(self):
print "1"
self.zip()
print "2"
self.login()
print "3"
self.get_form()
print "4"
self.upload()
print "5"
self.queue.put('done')
class ProgressWin(object):
def __init__(self, parent_window, no_block, queue):
self.no_block = no_block
self.queue = queue
self.win = gtk.Window()
self.win.set_type_hint('dialog')
self.win.set_title('Progress')
self.win.set_resizable(False)
self.win.set_transient_for(parent_window)
vbox = gtk.VBox()
self.text = gtk.Label('')
self.progress = gtk.ProgressBar()
self.progress.set_pulse_step(0.25)
vbox.pack_start(self.text)
vbox.pack_start(self.progress)
self.win.add(vbox)
self.win.show_all()
gtk.timeout_add(100, self.check)
def check(self):
try:
text = self.queue.get_nowait()
print text
if text == 'done':
gtk.main_quit()
self.text.set_text(text)
except Queue.Empty:
pass
self.progress.pulse()
return True
class ReportWin(object):
def start_page(self):
start = gnome.ui.DruidPageEdge(gnome.ui.EDGE_START)
start.set_text(
"""So, you want to report a bug in ardour. Excellent.
This program will help you to submit a bug report that will be useful to the programmers.
We are collecting this information so that we don't have to ask you to research very detailed aspects of your system configuration. The information this tool collects is stored in the Ardour bug tracking system, and is not used for any other purpose. We will not intentionally sell or disclose the information to any parties besides those authorized to view it on the Ardour bug tracking system.
""")
start.connect('cancel', lambda w, d: gtk.main_quit())
self.druid.append_page(start)
def end_page_finish(self, page, data):
print "page_finish"
if self.first_time:
self.first_time = False
self.druid.set_buttons_sensitive(False, False, False, False)
print "build queue"
self.queue = Queue.Queue(0)
print "build no_block"
self.no_block = NoBlock(self.data, self.queue)
print "build progress window"
self.progress = ProgressWin(self.win, self.no_block, self.queue)
print "start no block"
self.no_block.start()
print "exit end_page_finish"
def end_page(self):
end = gnome.ui.DruidPageEdge(gnome.ui.EDGE_FINISH)
end.set_text(
"""Thank you for helping Ardour.
When you click the "Apply" button, we will connect to the web and upload the information.
Please let the Bug Tool finish. It will exit on its own.""")
end.connect('cancel', lambda w, d: gtk.main_quit())
end.connect('finish', self.end_page_finish)
self.druid.append_page(end)
def build_tools_page_next(self, page, data):
if self.tools_radio.get_active():
os.system("g++ --version >> " +g_name+"/build-tools")
os.system("pkg-config --version >> " +g_name+"/build-tools")
os.system("autoconf --version >> " +g_name+"/build-tools")
os.system("automake --version >> " +g_name+"/build-tools")
os.system("aclocal --version >> " +g_name+"/build-tools")
os.system("libtool --version >> " +g_name+"/build-tools")
os.system("gettext --version >> " +g_name+"/build-tools")
os.system("autopoint --version >> " +g_name+"/build-tools")
def build_tools_page(self):
tools = gnome.ui.DruidPageStandard()
self.tools_radio = gtk.RadioButton(None, "Yes")
radio_btn2 = gtk.RadioButton(self.tools_radio, "No")
radio_btn2.set_active(True)
tools.append_item("Are you using a version of Ardour that you compiled yourself?", self.tools_radio, "")
tools.append_item("", radio_btn2, "")
tools.connect('cancel', lambda w, d: gtk.main_quit())
tools.connect('next', self.build_tools_page_next)
self.druid.append_page(tools)
def binary_page_next(self, page, data):
path = self.binary_path.get_text()
if len(path) > 0 and os.path.exists(path):
os.system("ldd "+path+" > "+g_name+"/linker-info")
def binary_page(self):
binary = gnome.ui.DruidPageStandard()
self.binary_path = gtk.Entry()
binary.append_item("Where is Ardour's binary located?", self.binary_path, "")
binary.connect('cancel', lambda w, d: gtk.main_quit())
binary.connect('next', self.binary_page_next)
self.druid.append_page(binary)
def versions_page_next(self, page, data):
os.system('echo "gtk-ardour version: '+self.gtkardour_version.get_text()+'" >>'+g_name+'/ardour-version')
os.system('echo "libardour version: '+self.libardour_version.get_text()+'" >>'+g_name+'/ardour-version')
def versions_page(self):
versions = gnome.ui.DruidPageStandard()
self.gtkardour_version = gtk.Entry()
self.libardour_version = gtk.Entry()
versions.append_item("What is gtk-ardour's version?", self.gtkardour_version, "")
versions.append_item("What is libardour's version?", self.libardour_version, "")
versions.connect('cancel', lambda w, d: gtk.main_quit())
versions.connect('next', self.versions_page_next)
self.druid.append_page(versions)
def session_check_toggled(self, data):
self.session_path.set_sensitive(self.session_check.get_active())
def sessions_page_next(self, page, data):
session = self.session_path.get_text()
if self.session_check.get_active() and session > 0:
if os.path.exists(session) and os.path.isfile(session):
shutil.copy(session, g_name)
def sessions_page(self):
sessions = gnome.ui.DruidPageStandard()
self.session_check = gtk.CheckButton("Yes")
self.session_check.set_active(True)
self.session_path = gtk.Entry()
sessions.append_item("Is the problem one you've noticed while trying to run Ardour?", self.session_check, "")
sessions.append_item("What is the session file you've been using?", self.session_path, "")
self.session_check.connect('toggled', self.session_check_toggled)
sessions.connect('cancel', lambda w, d:gtk.main_quit())
sessions.connect('next', self.sessions_page_next)
self.druid.append_page(sessions)
def description_page_next(self, page, data):
self.data['short'] = self.short_description.get_text()
buffer = self.long_description.get_buffer()
self.data['long'] = buffer.get_text(buffer.get_start_iter(), buffer.get_end_iter())
def description_page(self):
description = gnome.ui.DruidPageStandard()
self.long_description = gtk.TextView()
self.short_description = gtk.Entry()
self.long_description.set_size_request(-1, 70)
description.append_item(
"""OK, we've collected the system information. Now its time for you
to explain the problem.
Please note: you do not need to include any information about versions
of any software - that has been taken care of.
If you are reporting an operational problem, please carefully describe
the actions you took, any messages that you noticed, and in as much
detail as possible describe what went wrong.""", self.long_description, "")
description.append_item("Please give a one line summary of your problem", self.short_description, "")
description.connect('cancel', lambda w, d:gtk.main_quit())
description.connect('next', self.description_page_next)
self.druid.append_page(description)
def info_page_next(self, page, data):
self.data['name'] = self.name.get_text()
self.data['email'] = self.email.get_text()
def info_page(self):
info = gnome.ui.DruidPageStandard()
self.name = gtk.Entry()
self.email = gtk.Entry()
info.append_item("Name", self.name, "Optional")
info.append_item("Email", self.email, "")
info.connect('cancel', lambda w, d:gtk.main_quit())
info.connect('next', self.info_page_next)
self.druid.append_page(info)
def __init__(self):
self.first_time = True
self.win = gtk.Window()
self.win.set_title("Ardour Bug Tool")
self.win.connect('destroy', lambda w: gtk.main_quit())
self.druid = gnome.ui.Druid()
self.start_page()
self.build_tools_page()
self.binary_page()
self.versions_page()
self.sessions_page()
self.description_page()
self.info_page()
self.end_page()
self.win.add(self.druid)
self.win.show_all()
self.data = {}
def main(*args):
os.mkdir(g_name+"/proc/")
proclist = ['asound', 'cpuinfo', 'devices', 'dma', 'filesystems', 'irq', 'isapnp', 'meminfo', 'modules', 'mounts', 'partition', 'pci', 'slabinfo', 'sysvipc/shm', 'version']
for item in proclist:
if os.path.exists('/proc/'+item):
try:
if os.path.isdir('/proc/'+item):
shutil.copytree('/proc/'+item, g_name+'/proc/'+item)
else:
shutil.copy('/proc/'+item, g_name+'/proc/')
except shutil.Error:
pass #should this be reported?
else:
f = open(g_name+'/proc/'+item, 'w')
f.write(item+' missing in /proc')
f.close
liblist = ['asound', 'c', 'gdbm', 'gdk', 'gmodule', 'gtk', 'intl', 'jack', 'm', 'pthreads', 'sndfile', 'X11', 'Xext']
for lib in liblist:
for libdir in ['/lib/', '/usr/lib/', '/usr/X11R6/lib/', '/usr/local/lib/', '/opt/lib/']:
if os.path.exists(libdir+"lib"+lib+".so"):
os.system('echo "'+lib+ ' is `ls -l '+libdir+'lib'+lib+'.so`" >> '+g_name+'/libraries')
if os.path.exists('/proc/sys/kernel/lowlatency'):
shutil.copy('/proc/sys/kernel/lowlatency', g_name+'/lowlatency-status')
else:
f = open(g_name+'/lowlatency-status', 'w')
f.write('-1')
f.close()
scsi = re.compile(r'sd[a-z][0-9]')
if scsi.search(open('/proc/mounts').read()):
shutil.copytree('/proc/scsi', g_name+'/scsi')
ide = re.compile(r'hd[a-z][0-9]')
if ide.search(open('/proc/mounts').read()):
pass
os.system("xmodmap >" +g_name+"/xmodmap")
os.system("/sbin/lspci -vv >" +g_name+"/lspci.out")
if os.path.exists(os.path.expandvars('$HOME')+'/.ardour/ardour.rc'):
shutil.copy(os.path.expandvars('$HOME')+'/.ardour/ardour.rc', g_name+'/ardour.rc')
bug_win = ReportWin()
gtk.main()
if __name__ == '__main__':
main()
shutil.rmtree(g_name)
if os.path.exists(g_name+'.tar.gz'):
os.remove(g_name+'.tar.gz')