X7ROOT File Manager
Current Path:
/opt/alt/python33/lib64/python3.3
opt
/
alt
/
python33
/
lib64
/
python3.3
/
ðŸ“
..
📄
__future__.py
(4.48 KB)
📄
__phello__.foo.py
(64 B)
ðŸ“
__pycache__
📄
_compat_pickle.py
(4.24 KB)
📄
_dummy_thread.py
(4.66 KB)
📄
_markupbase.py
(14.26 KB)
📄
_osx_support.py
(18.41 KB)
📄
_pyio.py
(71.2 KB)
📄
_strptime.py
(21.17 KB)
📄
_sysconfigdata.py
(22.31 KB)
📄
_threading_local.py
(7.24 KB)
📄
_weakrefset.py
(5.57 KB)
📄
abc.py
(7.87 KB)
📄
aifc.py
(30.33 KB)
📄
antigravity.py
(475 B)
📄
argparse.py
(86.98 KB)
📄
ast.py
(11.86 KB)
📄
asynchat.py
(11.32 KB)
📄
asyncore.py
(20.27 KB)
📄
base64.py
(13.66 KB)
📄
bdb.py
(21.38 KB)
📄
binhex.py
(13.39 KB)
📄
bisect.py
(2.53 KB)
📄
bz2.py
(18.04 KB)
📄
cProfile.py
(6.21 KB)
📄
calendar.py
(22.4 KB)
📄
cgi.py
(34.72 KB)
📄
cgitb.py
(11.76 KB)
📄
chunk.py
(5.25 KB)
📄
cmd.py
(14.51 KB)
📄
code.py
(9.79 KB)
📄
codecs.py
(35.11 KB)
📄
codeop.py
(5.85 KB)
ðŸ“
collections
📄
colorsys.py
(3.6 KB)
📄
compileall.py
(9.51 KB)
ðŸ“
concurrent
ðŸ“
config-3.3m
📄
configparser.py
(48.28 KB)
📄
contextlib.py
(8.91 KB)
📄
copy.py
(8.78 KB)
📄
copyreg.py
(6.46 KB)
📄
crypt.py
(1.83 KB)
📄
csv.py
(15.81 KB)
ðŸ“
ctypes
ðŸ“
curses
📄
datetime.py
(73.2 KB)
ðŸ“
dbm
📄
decimal.py
(223.2 KB)
📄
difflib.py
(80.58 KB)
📄
dis.py
(9.9 KB)
ðŸ“
distutils
📄
doctest.py
(100.52 KB)
📄
dummy_threading.py
(2.75 KB)
ðŸ“
email
ðŸ“
encodings
📄
filecmp.py
(9.37 KB)
📄
fileinput.py
(13.92 KB)
📄
fnmatch.py
(3.09 KB)
📄
formatter.py
(14.58 KB)
📄
fractions.py
(22.49 KB)
📄
ftplib.py
(39.31 KB)
📄
functools.py
(13.28 KB)
📄
genericpath.py
(3.02 KB)
📄
getopt.py
(7.31 KB)
📄
getpass.py
(5.66 KB)
📄
gettext.py
(20.15 KB)
📄
glob.py
(2.77 KB)
📄
gzip.py
(23.83 KB)
📄
hashlib.py
(6.05 KB)
📄
heapq.py
(17.58 KB)
📄
hmac.py
(4.34 KB)
ðŸ“
html
ðŸ“
http
ðŸ“
idlelib
📄
imaplib.py
(48.94 KB)
📄
imghdr.py
(3.45 KB)
📄
imp.py
(9.5 KB)
ðŸ“
importlib
📄
inspect.py
(77.11 KB)
📄
io.py
(3.2 KB)
📄
ipaddress.py
(68.66 KB)
ðŸ“
json
📄
keyword.py
(2.01 KB)
ðŸ“
lib-dynload
ðŸ“
lib2to3
📄
linecache.py
(3.77 KB)
📄
locale.py
(91.03 KB)
ðŸ“
logging
📄
lzma.py
(17.04 KB)
📄
macpath.py
(5.49 KB)
📄
macurl2path.py
(2.67 KB)
📄
mailbox.py
(77.24 KB)
📄
mailcap.py
(7.26 KB)
📄
mimetypes.py
(20.25 KB)
📄
modulefinder.py
(22.65 KB)
ðŸ“
multiprocessing
📄
netrc.py
(5.61 KB)
📄
nntplib.py
(41.78 KB)
📄
ntpath.py
(19.96 KB)
📄
nturl2path.py
(2.34 KB)
📄
numbers.py
(10.15 KB)
📄
opcode.py
(4.98 KB)
📄
optparse.py
(58.93 KB)
📄
os.py
(33.96 KB)
📄
os2emxpath.py
(4.55 KB)
📄
pdb.py
(59.23 KB)
📄
pickle.py
(46.74 KB)
📄
pickletools.py
(79.44 KB)
📄
pipes.py
(8.71 KB)
📄
pkgutil.py
(21.03 KB)
ðŸ“
plat-linux
📄
platform.py
(49.55 KB)
📄
plistlib.py
(14.43 KB)
📄
poplib.py
(11.11 KB)
📄
posixpath.py
(13.92 KB)
📄
pprint.py
(12.4 KB)
📄
profile.py
(20.95 KB)
📄
pstats.py
(25.75 KB)
📄
pty.py
(4.94 KB)
📄
py_compile.py
(6.56 KB)
📄
pyclbr.py
(13.12 KB)
📄
pydoc.py
(99.26 KB)
ðŸ“
pydoc_data
📄
queue.py
(8.63 KB)
📄
quopri.py
(7.14 KB)
📄
random.py
(25.06 KB)
📄
re.py
(14.62 KB)
📄
reprlib.py
(4.99 KB)
📄
rlcompleter.py
(5.4 KB)
📄
runpy.py
(10.17 KB)
📄
sched.py
(6.25 KB)
📄
shelve.py
(8.05 KB)
📄
shlex.py
(11.23 KB)
📄
shutil.py
(38.23 KB)
ðŸ“
site-packages
📄
site.py
(21.46 KB)
📄
smtpd.py
(29.5 KB)
📄
smtplib.py
(37.13 KB)
📄
sndhdr.py
(6.07 KB)
📄
socket.py
(14.56 KB)
📄
socketserver.py
(23.63 KB)
ðŸ“
sqlite3
📄
sre_compile.py
(15.96 KB)
📄
sre_constants.py
(7.06 KB)
📄
sre_parse.py
(29.5 KB)
📄
ssl.py
(23.9 KB)
📄
stat.py
(4.2 KB)
📄
string.py
(9.19 KB)
📄
stringprep.py
(12.61 KB)
📄
struct.py
(238 B)
📄
subprocess.py
(65.99 KB)
📄
sunau.py
(17.11 KB)
📄
symbol.py
(2 KB)
📄
symtable.py
(7.21 KB)
📄
sysconfig.py
(24.58 KB)
📄
tabnanny.py
(11.14 KB)
📄
tarfile.py
(86.78 KB)
📄
telnetlib.py
(26.71 KB)
📄
tempfile.py
(22.47 KB)
ðŸ“
test
📄
textwrap.py
(16.1 KB)
📄
this.py
(1003 B)
📄
threading.py
(44.57 KB)
📄
timeit.py
(12.1 KB)
📄
token.py
(2.96 KB)
📄
tokenize.py
(24.29 KB)
📄
trace.py
(30.75 KB)
📄
traceback.py
(11.7 KB)
📄
tty.py
(879 B)
📄
types.py
(3.09 KB)
ðŸ“
unittest
ðŸ“
urllib
📄
uu.py
(6.61 KB)
📄
uuid.py
(21.83 KB)
ðŸ“
venv
📄
warnings.py
(13.5 KB)
📄
wave.py
(18.14 KB)
📄
weakref.py
(11.23 KB)
📄
webbrowser.py
(22.38 KB)
ðŸ“
wsgiref
📄
xdrlib.py
(5.25 KB)
ðŸ“
xml
ðŸ“
xmlrpc
📄
zipfile.py
(64.87 KB)
Editing: ssl.py
# Wrapper module for _ssl, providing some additional facilities # implemented in Python. Written by Bill Janssen. """This module provides some more Pythonic support for SSL. Object types: SSLSocket -- subtype of socket.socket which does SSL over the socket Exceptions: SSLError -- exception raised for I/O errors Functions: cert_time_to_seconds -- convert time string used for certificate notBefore and notAfter functions to integer seconds past the Epoch (the time values returned from time.time()) fetch_server_certificate (HOST, PORT) -- fetch the certificate provided by the server running on HOST at port PORT. No validation of the certificate is performed. Integer constants: SSL_ERROR_ZERO_RETURN SSL_ERROR_WANT_READ SSL_ERROR_WANT_WRITE SSL_ERROR_WANT_X509_LOOKUP SSL_ERROR_SYSCALL SSL_ERROR_SSL SSL_ERROR_WANT_CONNECT SSL_ERROR_EOF SSL_ERROR_INVALID_ERROR_CODE The following group define certificate requirements that one side is allowing/requiring from the other side: CERT_NONE - no certificates from the other side are required (or will be looked at if provided) CERT_OPTIONAL - certificates are not required, but if provided will be validated, and if validation fails, the connection will also fail CERT_REQUIRED - certificates are required, and will be validated, and if validation fails, the connection will also fail The following constants identify various SSL protocol variants: PROTOCOL_SSLv2 PROTOCOL_SSLv3 PROTOCOL_SSLv23 PROTOCOL_TLSv1 """ import textwrap import re import _ssl # if we can't import it, let the error propagate from _ssl import OPENSSL_VERSION_NUMBER, OPENSSL_VERSION_INFO, OPENSSL_VERSION from _ssl import _SSLContext from _ssl import ( SSLError, SSLZeroReturnError, SSLWantReadError, SSLWantWriteError, SSLSyscallError, SSLEOFError, ) from _ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED from _ssl import ( OP_ALL, OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_TLSv1, OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_DH_USE ) try: from _ssl import OP_NO_COMPRESSION except ImportError: pass try: from _ssl import OP_SINGLE_ECDH_USE except ImportError: pass from _ssl import RAND_status, RAND_add, RAND_bytes, RAND_pseudo_bytes try: from _ssl import RAND_egd except ImportError: pass from _ssl import ( SSL_ERROR_ZERO_RETURN, SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE, SSL_ERROR_WANT_X509_LOOKUP, SSL_ERROR_SYSCALL, SSL_ERROR_SSL, SSL_ERROR_WANT_CONNECT, SSL_ERROR_EOF, SSL_ERROR_INVALID_ERROR_CODE, ) from _ssl import HAS_SNI, HAS_ECDH, HAS_NPN from _ssl import (PROTOCOL_SSLv3, PROTOCOL_SSLv23, PROTOCOL_TLSv1, PROTOCOL_TLS) from _ssl import _OPENSSL_API_VERSION globals()['PROTOCOL_SSLv23'] = getattr(_ssl, 'PROTOCOL_TLS') _PROTOCOL_NAMES = { PROTOCOL_TLSv1: "TLSv1", PROTOCOL_SSLv23: "SSLv23", PROTOCOL_SSLv3: "SSLv3", PROTOCOL_TLS: "SSLv23" } try: from _ssl import PROTOCOL_SSLv2 _SSLv2_IF_EXISTS = PROTOCOL_SSLv2 except ImportError: _SSLv2_IF_EXISTS = None else: _PROTOCOL_NAMES[PROTOCOL_SSLv2] = "SSLv2" from socket import getnameinfo as _getnameinfo from socket import error as socket_error from socket import socket, AF_INET, SOCK_STREAM, create_connection from socket import SOL_SOCKET, SO_TYPE import base64 # for DER-to-PEM translation import traceback import errno if _ssl.HAS_TLS_UNIQUE: CHANNEL_BINDING_TYPES = ['tls-unique'] else: CHANNEL_BINDING_TYPES = [] # Disable weak or insecure ciphers by default # (OpenSSL's default setting is 'DEFAULT:!aNULL:!eNULL') _DEFAULT_CIPHERS = 'DEFAULT:!aNULL:!eNULL:!LOW:!EXPORT:!SSLv2' class CertificateError(ValueError): pass def _dnsname_match(dn, hostname, max_wildcards=1): """Matching according to RFC 6125, section 6.4.3 http://tools.ietf.org/html/rfc6125#section-6.4.3 """ pats = [] if not dn: return False leftmost, *remainder = dn.split(r'.') wildcards = leftmost.count('*') if wildcards > max_wildcards: # Issue #17980: avoid denials of service by refusing more # than one wildcard per fragment. A survery of established # policy among SSL implementations showed it to be a # reasonable choice. raise CertificateError( "too many wildcards in certificate DNS name: " + repr(dn)) # speed up common case w/o wildcards if not wildcards: return dn.lower() == hostname.lower() # RFC 6125, section 6.4.3, subitem 1. # The client SHOULD NOT attempt to match a presented identifier in which # the wildcard character comprises a label other than the left-most label. if leftmost == '*': # When '*' is a fragment by itself, it matches a non-empty dotless # fragment. pats.append('[^.]+') elif leftmost.startswith('xn--') or hostname.startswith('xn--'): # RFC 6125, section 6.4.3, subitem 3. # The client SHOULD NOT attempt to match a presented identifier # where the wildcard character is embedded within an A-label or # U-label of an internationalized domain name. pats.append(re.escape(leftmost)) else: # Otherwise, '*' matches any dotless string, e.g. www* pats.append(re.escape(leftmost).replace(r'\*', '[^.]*')) # add the remaining fragments, ignore any wildcards for frag in remainder: pats.append(re.escape(frag)) pat = re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE) return pat.match(hostname) def match_hostname(cert, hostname): """Verify that *cert* (in decoded format as returned by SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 and RFC 6125 rules are followed, but IP addresses are not accepted for *hostname*. CertificateError is raised on failure. On success, the function returns nothing. """ if not cert: raise ValueError("empty or no certificate") dnsnames = [] san = cert.get('subjectAltName', ()) for key, value in san: if key == 'DNS': if _dnsname_match(value, hostname): return dnsnames.append(value) if not dnsnames: # The subject is only checked when there is no dNSName entry # in subjectAltName for sub in cert.get('subject', ()): for key, value in sub: # XXX according to RFC 2818, the most specific Common Name # must be used. if key == 'commonName': if _dnsname_match(value, hostname): return dnsnames.append(value) if len(dnsnames) > 1: raise CertificateError("hostname %r " "doesn't match either of %s" % (hostname, ', '.join(map(repr, dnsnames)))) elif len(dnsnames) == 1: raise CertificateError("hostname %r " "doesn't match %r" % (hostname, dnsnames[0])) else: raise CertificateError("no appropriate commonName or " "subjectAltName fields were found") class SSLContext(_SSLContext): """An SSLContext holds various SSL-related configuration options and data, such as certificates and possibly a private key.""" __slots__ = ('protocol',) def __new__(cls, protocol, *args, **kwargs): self = _SSLContext.__new__(cls, protocol) if protocol != _SSLv2_IF_EXISTS: self.set_ciphers(_DEFAULT_CIPHERS) return self def __init__(self, protocol): self.protocol = protocol def wrap_socket(self, sock, server_side=False, do_handshake_on_connect=True, suppress_ragged_eofs=True, server_hostname=None): return SSLSocket(sock=sock, server_side=server_side, do_handshake_on_connect=do_handshake_on_connect, suppress_ragged_eofs=suppress_ragged_eofs, server_hostname=server_hostname, _context=self) def set_npn_protocols(self, npn_protocols): protos = bytearray() for protocol in npn_protocols: b = bytes(protocol, 'ascii') if len(b) == 0 or len(b) > 255: raise SSLError('NPN protocols must be 1 to 255 in length') protos.append(len(b)) protos.extend(b) self._set_npn_protocols(protos) class SSLSocket(socket): """This class implements a subtype of socket.socket that wraps the underlying OS socket in an SSL context when necessary, and provides read and write methods over that channel.""" def __init__(self, sock=None, keyfile=None, certfile=None, server_side=False, cert_reqs=CERT_NONE, ssl_version=PROTOCOL_TLS, ca_certs=None, do_handshake_on_connect=True, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None, suppress_ragged_eofs=True, npn_protocols=None, ciphers=None, server_hostname=None, _context=None): if _context: self.context = _context else: if server_side and not certfile: raise ValueError("certfile must be specified for server-side " "operations") if keyfile and not certfile: raise ValueError("certfile must be specified") if certfile and not keyfile: keyfile = certfile self.context = SSLContext(ssl_version) self.context.verify_mode = cert_reqs if ca_certs: self.context.load_verify_locations(ca_certs) if certfile: self.context.load_cert_chain(certfile, keyfile) if npn_protocols: self.context.set_npn_protocols(npn_protocols) if ciphers: self.context.set_ciphers(ciphers) self.keyfile = keyfile self.certfile = certfile self.cert_reqs = cert_reqs self.ssl_version = ssl_version self.ca_certs = ca_certs self.ciphers = ciphers # Can't use sock.type as other flags (such as SOCK_NONBLOCK) get # mixed in. if sock.getsockopt(SOL_SOCKET, SO_TYPE) != SOCK_STREAM: raise NotImplementedError("only stream sockets are supported") if server_side and server_hostname: raise ValueError("server_hostname can only be specified " "in client mode") self.server_side = server_side self.server_hostname = server_hostname self.do_handshake_on_connect = do_handshake_on_connect self.suppress_ragged_eofs = suppress_ragged_eofs connected = False if sock is not None: socket.__init__(self, family=sock.family, type=sock.type, proto=sock.proto, fileno=sock.fileno()) self.settimeout(sock.gettimeout()) # see if it's connected try: sock.getpeername() except socket_error as e: if e.errno != errno.ENOTCONN: raise else: connected = True sock.detach() elif fileno is not None: socket.__init__(self, fileno=fileno) else: socket.__init__(self, family=family, type=type, proto=proto) self._closed = False self._sslobj = None self._connected = connected if connected: # create the SSL object try: self._sslobj = self.context._wrap_socket(self, server_side, server_hostname) if do_handshake_on_connect: timeout = self.gettimeout() if timeout == 0.0: # non-blocking raise ValueError("do_handshake_on_connect should not be specified for non-blocking sockets") self.do_handshake() except socket_error as x: self.close() raise x def dup(self): raise NotImplemented("Can't dup() %s instances" % self.__class__.__name__) def _checkClosed(self, msg=None): # raise an exception here if you wish to check for spurious closes pass def read(self, len=0, buffer=None): """Read up to LEN bytes and return them. Return zero-length string on EOF.""" self._checkClosed() try: if buffer is not None: v = self._sslobj.read(len, buffer) else: v = self._sslobj.read(len or 1024) return v except SSLError as x: if x.args[0] == SSL_ERROR_EOF and self.suppress_ragged_eofs: if buffer is not None: return 0 else: return b'' else: raise def write(self, data): """Write DATA to the underlying SSL channel. Returns number of bytes of DATA actually transmitted.""" self._checkClosed() return self._sslobj.write(data) def getpeercert(self, binary_form=False): """Returns a formatted version of the data in the certificate provided by the other end of the SSL channel. Return None if no certificate was provided, {} if a certificate was provided, but not validated.""" self._checkClosed() return self._sslobj.peer_certificate(binary_form) def selected_npn_protocol(self): self._checkClosed() if not self._sslobj or not _ssl.HAS_NPN: return None else: return self._sslobj.selected_npn_protocol() def cipher(self): self._checkClosed() if not self._sslobj: return None else: return self._sslobj.cipher() def compression(self): self._checkClosed() if not self._sslobj: return None else: return self._sslobj.compression() def send(self, data, flags=0): self._checkClosed() if self._sslobj: if flags != 0: raise ValueError( "non-zero flags not allowed in calls to send() on %s" % self.__class__) while True: try: v = self._sslobj.write(data) except SSLError as x: if x.args[0] == SSL_ERROR_WANT_READ: return 0 elif x.args[0] == SSL_ERROR_WANT_WRITE: return 0 else: raise else: return v else: return socket.send(self, data, flags) def sendto(self, data, flags_or_addr, addr=None): self._checkClosed() if self._sslobj: raise ValueError("sendto not allowed on instances of %s" % self.__class__) elif addr is None: return socket.sendto(self, data, flags_or_addr) else: return socket.sendto(self, data, flags_or_addr, addr) def sendmsg(self, *args, **kwargs): # Ensure programs don't send data unencrypted if they try to # use this method. raise NotImplementedError("sendmsg not allowed on instances of %s" % self.__class__) def sendall(self, data, flags=0): self._checkClosed() if self._sslobj: if flags != 0: raise ValueError( "non-zero flags not allowed in calls to sendall() on %s" % self.__class__) amount = len(data) count = 0 while (count < amount): v = self.send(data[count:]) count += v return amount else: return socket.sendall(self, data, flags) def recv(self, buflen=1024, flags=0): self._checkClosed() if self._sslobj: if flags != 0: raise ValueError( "non-zero flags not allowed in calls to recv() on %s" % self.__class__) return self.read(buflen) else: return socket.recv(self, buflen, flags) def recv_into(self, buffer, nbytes=None, flags=0): self._checkClosed() if buffer and (nbytes is None): nbytes = len(buffer) elif nbytes is None: nbytes = 1024 if self._sslobj: if flags != 0: raise ValueError( "non-zero flags not allowed in calls to recv_into() on %s" % self.__class__) return self.read(nbytes, buffer) else: return socket.recv_into(self, buffer, nbytes, flags) def recvfrom(self, buflen=1024, flags=0): self._checkClosed() if self._sslobj: raise ValueError("recvfrom not allowed on instances of %s" % self.__class__) else: return socket.recvfrom(self, buflen, flags) def recvfrom_into(self, buffer, nbytes=None, flags=0): self._checkClosed() if self._sslobj: raise ValueError("recvfrom_into not allowed on instances of %s" % self.__class__) else: return socket.recvfrom_into(self, buffer, nbytes, flags) def recvmsg(self, *args, **kwargs): raise NotImplementedError("recvmsg not allowed on instances of %s" % self.__class__) def recvmsg_into(self, *args, **kwargs): raise NotImplementedError("recvmsg_into not allowed on instances of " "%s" % self.__class__) def pending(self): self._checkClosed() if self._sslobj: return self._sslobj.pending() else: return 0 def shutdown(self, how): self._checkClosed() self._sslobj = None socket.shutdown(self, how) def unwrap(self): if self._sslobj: s = self._sslobj.shutdown() self._sslobj = None return s else: raise ValueError("No SSL wrapper around " + str(self)) def _real_close(self): self._sslobj = None # self._closed = True socket._real_close(self) def do_handshake(self, block=False): """Perform a TLS/SSL handshake.""" timeout = self.gettimeout() try: if timeout == 0.0 and block: self.settimeout(None) self._sslobj.do_handshake() finally: self.settimeout(timeout) def _real_connect(self, addr, connect_ex): if self.server_side: raise ValueError("can't connect in server-side mode") # Here we assume that the socket is client-side, and not # connected at the time of the call. We connect it, then wrap it. if self._connected: raise ValueError("attempt to connect already-connected SSLSocket!") self._sslobj = self.context._wrap_socket(self, False, self.server_hostname) try: if connect_ex: rc = socket.connect_ex(self, addr) else: rc = None socket.connect(self, addr) if not rc: if self.do_handshake_on_connect: self.do_handshake() self._connected = True return rc except socket_error: self._sslobj = None raise def connect(self, addr): """Connects to remote ADDR, and then wraps the connection in an SSL channel.""" self._real_connect(addr, False) def connect_ex(self, addr): """Connects to remote ADDR, and then wraps the connection in an SSL channel.""" return self._real_connect(addr, True) def accept(self): """Accepts a new connection from a remote client, and returns a tuple containing that new connection wrapped with a server-side SSL channel, and the address of the remote client.""" newsock, addr = socket.accept(self) newsock = self.context.wrap_socket(newsock, do_handshake_on_connect=self.do_handshake_on_connect, suppress_ragged_eofs=self.suppress_ragged_eofs, server_side=True) return newsock, addr def get_channel_binding(self, cb_type="tls-unique"): """Get channel binding data for current connection. Raise ValueError if the requested `cb_type` is not supported. Return bytes of the data or None if the data is not available (e.g. before the handshake). """ if cb_type not in CHANNEL_BINDING_TYPES: raise ValueError("Unsupported channel binding type") if cb_type != "tls-unique": raise NotImplementedError( "{0} channel binding type not implemented" .format(cb_type)) if self._sslobj is None: return None return self._sslobj.tls_unique_cb() def wrap_socket(sock, keyfile=None, certfile=None, server_side=False, cert_reqs=CERT_NONE, ssl_version=PROTOCOL_TLS, ca_certs=None, do_handshake_on_connect=True, suppress_ragged_eofs=True, ciphers=None): return SSLSocket(sock=sock, keyfile=keyfile, certfile=certfile, server_side=server_side, cert_reqs=cert_reqs, ssl_version=ssl_version, ca_certs=ca_certs, do_handshake_on_connect=do_handshake_on_connect, suppress_ragged_eofs=suppress_ragged_eofs, ciphers=ciphers) # some utility functions def cert_time_to_seconds(cert_time): """Takes a date-time string in standard ASN1_print form ("MON DAY 24HOUR:MINUTE:SEC YEAR TIMEZONE") and return a Python time value in seconds past the epoch.""" import time return time.mktime(time.strptime(cert_time, "%b %d %H:%M:%S %Y GMT")) PEM_HEADER = "-----BEGIN CERTIFICATE-----" PEM_FOOTER = "-----END CERTIFICATE-----" def DER_cert_to_PEM_cert(der_cert_bytes): """Takes a certificate in binary DER format and returns the PEM version of it as a string.""" f = str(base64.standard_b64encode(der_cert_bytes), 'ASCII', 'strict') return (PEM_HEADER + '\n' + textwrap.fill(f, 64) + '\n' + PEM_FOOTER + '\n') def PEM_cert_to_DER_cert(pem_cert_string): """Takes a certificate in ASCII PEM format and returns the DER-encoded version of it as a byte sequence""" if not pem_cert_string.startswith(PEM_HEADER): raise ValueError("Invalid PEM encoding; must start with %s" % PEM_HEADER) if not pem_cert_string.strip().endswith(PEM_FOOTER): raise ValueError("Invalid PEM encoding; must end with %s" % PEM_FOOTER) d = pem_cert_string.strip()[len(PEM_HEADER):-len(PEM_FOOTER)] return base64.decodebytes(d.encode('ASCII', 'strict')) def get_server_certificate(addr, ssl_version=PROTOCOL_SSLv3, ca_certs=None): """Retrieve the certificate from the server at the specified address, and return it as a PEM-encoded string. If 'ca_certs' is specified, validate the server cert against it. If 'ssl_version' is specified, use it in the connection attempt.""" host, port = addr if (ca_certs is not None): cert_reqs = CERT_REQUIRED else: cert_reqs = CERT_NONE s = create_connection(addr) s = wrap_socket(s, ssl_version=ssl_version, cert_reqs=cert_reqs, ca_certs=ca_certs) dercert = s.getpeercert(True) s.close() return DER_cert_to_PEM_cert(dercert) def get_protocol_name(protocol_code): return _PROTOCOL_NAMES.get(protocol_code, '<unknown>')
Upload File
Create Folder