findproxy的改进~

由本社区发起的开源项目
回复
头像
jjj137
帖子: 60
注册时间: 2007-06-16 0:56
联系:

findproxy的改进~

#1

帖子 jjj137 » 2008-08-13 13:21

手头没有Linux系统,在Windows下做的,关于编码不太了解,不知道会不会出乱码之类的……
8月15日:去掉了烦人的map语句。
8月14日:加入了命令行参数的支持,一些细微的改进。
8月13日:使用了多线程,结果输出到proxy.txt,验证方面改成了字符验证,速度更快。

代码: 全选

#!/usr/bin/python
# -*- coding: gbk -*-
# From: ubuntu.org.cn Copyright: GPLv2
import thread        
import urllib
import sys, re
from datetime import datetime

class findproxy(object):
    """find usable proxies from "proxyurl", using "pattern".
    connect to "desturl" and check if there is a word "checkvalue" in it.
    """
    def __init__(self):
        object.__init__(self)
        self.data = {"proxyurl":"http://proxy.cemsg.com/",
            "pattern":"\D+(\d{2,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\D*.*\D+(\d{2,5})</td>",
            "desturl":"http://www.ubuntu.org.cn/",
            "checkvalue":"whatisubuntu",
            "output":"proxy.txt"}
        self.ipdict = {}
        self.passeddict = {}

    def getlist(self):
        "get a proxy list from proxyurl."
        sock = urllib.urlopen(self.data["proxyurl"])
        html = sock.read()
        sock.close()
        for ip in re.compile(self.data["pattern"]).findall(html):
            self.ipdict[":".join(ip)] = -1
    
    def hander(self, ip):
        "check if the ip address is usable(one of the threads)."
        try:
            start = datetime.now()
            sock = urllib.urlopen(self.data["desturl"], None, {"http": "http://" + ip})
            end = datetime.now()            
            html = sock.read()
            sock.close()
            if self.data["checkvalue"] in html:
                self.ipdict[ip] = self.passeddict[ip] = end - start
            else:
                self.ipdict[ip] = "不符合条件"
        except:
            self.ipdict[ip] = "连接失败"
        finally:
            print "%-25s%s" % (ip, self.ipdict[ip])
            
    def check(self):
        "create threads to check"
        for ip in self.ipdict.keys():
            thread.start_new_thread(self.hander, (ip,))
            
    def savelist(self):
        'save usable proxies to "filename"'
        sortedlist = sorted(self.passeddict.items(), key = lambda x:x[1])
        try:
            logfile = open(self.data["output"], "a")
            logfile.write("%s\t\n" % datetime.now())
            logfile.write("".join(["%-25s\t%s\t\n" % (ip, t) for (ip, t) in sortedlist]))
            logfile.write("\t\n")
        finally:
            logfile.close()
            
def main(args = sys.argv[1:]):
    import getopt
    import socket
    helpstr = """
    FindProxy 0.0.2
    Usage:
        -p(--proxy)     the url to get proxies from.
        -r(--re)        the regexp which the IP string must meet.
        -d(--dest)      the destination url to reach through proxies.
        -v(--value)     the value to check if the proxies return correct data.
        -h(--help)      show this message.
    """
    Proxy = findproxy()
    try:
        opts = getopt.getopt(args, "p:r:d:v:o:h", ["proxy=", "re=", "dest=", "value=", "output=", "help"])[0]
    except getopt.GetoptError:
        sys.exit(helpstr)
    for o, a in opts:
        if o in ("-h", "--help"):
            sys.exit(helpstr)
        d = {("-p", "--proxy"):"proxyurl",
            ("-r", "--re"):"pattern",
            ("-d", "--dest"):"desturl",
            ("-v", "--value"):"checkvalue",
            ("-o", "--output"):"output"}
        for opt, attr in d.items():
            if o in opt:
                Proxy.data[attr] = a
    socket.setdefaulttimeout(5)
    Proxy.getlist()
    Proxy.check()
    #Wait for all threads quited
    while -1 in Proxy.ipdict.values():
        pass
    print 'Succeed!!! All available proxies will be saved in "%s".' % Proxy.data["output"]
    Proxy.savelist()    

if __name__ == "__main__":
    main()
回复