自动化爬取并自动测试所有swagger-ui.html显示的接口

Related tags

Testing swagger-hack
Overview

swagger-hack

在测试中偶尔会碰到swagger泄露 常见的泄露如图: 有的泄露接口特别多,每一个都手动去试根本试不过来 于是用python写了个脚本自动爬取所有接口,配置好传参发包访问

原理是首先抓取http://url/swagger-resources 获取到有哪些标准及对应的文档地址 而后对每个标准下的接口文档进行解析,构造请求包,获取响应

尽量考虑到了所有可能的传参格式,实际测试只有少数几个会500或400响应需要手动修改一下,其余都是401或者200 200就是未授权访问接口了,可以进一步做其他诸如sqli等测试 运行时如图: 所有测试结果都存储在csv中:

欢迎大家关注稻草人安全团队,后续有更多技术文章、工具分享等

You might also like...
Comments
  • 魔改了一下,渗透测试目标效果很好,就不提pr了,作者大大感兴趣可以放进repo里面。

    魔改了一下,渗透测试目标效果很好,就不提pr了,作者大大感兴趣可以放进repo里面。

    相较原repo改动如下

    • 增加了XFF
    • 增加了代理功能
    • 解决了HTTPS报错的问题
    • 更改为class方便集成
    • 增加了shiro组件检测
    • 增加了假的Referer
    • 增加了随机Agent
    • 删除了PUT/DELETE测试,降低破坏目标站点的风险
    • 我测的目标站点存在无type字段的field,使用Dict.get('type')而不是Dict['type']防止程序崩溃

    个人比较喜欢用这个做logger,因此要新加一个依赖

    pip install loguru
    

    代码如下:

    import json
    import random
    
    import requests
    import csv
    from loguru import logger
    import urllib3
    
    urllib3.disable_warnings()
    
    USER_AGENTS = [
        "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.8) Gecko Fedora/1.9.0.8-1.fc10 Kazehakase/0.5.6",
        "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/535.20 (KHTML, like Gecko) Chrome/19.0.1036.7 Safari/535.20",
        "Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; fr) Presto/2.9.168 Version/11.52",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:81.0) Gecko/20100101 Firefox/81.0",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/535.20 (KHTML, like Gecko) Chrome/19.0.1036.7 Safari/535.20",
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
        "Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
        "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 2.0.50727; Media Center PC 6.0)",
        "Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 1.0.3705; .NET CLR 1.1.4322)",
        "Mozilla/4.0 (compatible; MSIE 7.0b; Windows NT 5.2; .NET CLR 1.1.4322; .NET CLR 2.0.50727; InfoPath.2; .NET CLR 3.0.04506.30)",
        "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN) AppleWebKit/523.15 (KHTML, like Gecko, Safari/419.3) Arora/0.3 (Change: 287 c9dfb30)",
    ]
    
    class HttpMixin:
    
        def _random_x_forwarded_for(self):
            return "%d.%d.%d.%d" % (
                random.randint(1, 254),
                random.randint(1, 254),
                random.randint(1, 254),
                random.randint(1, 254),
            )
    
        def get_or_confuse_headers(self, headers=None, testShiro=False, isRest=False):
            if headers is None:
                headers = {
                    "User-Agent": random.choice(USER_AGENTS),
                    "X-Forwarded-For": self._random_x_forwarded_for(),
                    # "Referer": "http://www.baidu.com",
                }
                if not testShiro:
                    headers['Cookie'] = "rememberMe=xx"
            else:
                headers["User-Agent"] = random.choice(USER_AGENTS)
                headers["X-Forwarded-For"] = self._random_x_forwarded_for()
                headers["Referer"] = "http://www.baidu.com"
                if headers.get("Cookie"):
                    if 'rememberMe=xx' not in headers.get("Cookie"):
                        headers["Cookie"] += ";" + "rememberMe=xx"
                    else:
                        pass
                else:
                    headers["Cookie"] = "rememberMe=xx"
    
            if isRest:
                headers["Content-Type"] = "Application/json"
    
            return headers
    
    
    class SwaggerHack(HttpMixin):
        def __init__(self, proxy_ip, proxy_port):
            f = open('swagger.csv', 'w', newline='', encoding='utf-8')  # 写到csv中
            self.writer = csv.writer(f)
            self.proxy_ip = proxy_ip
            self.proxy_port = proxy_port
    
        def run(self, url: str):
            specs = self.get_specs(url)
            logger.info("[+] 共抓取到 %d 个标准" % (len(specs)))
            try:
                self.writer.writerow(
                    ["标准", "summary", "path", "method", "consumes", "url", "num of params", "data", "status_code",
                     "response"])
            except Exception as e:
                logger.info(e)
    
            for spec in specs:
                spec_url = url + spec['url']
                pre = spec['url'].split('/')[1]
                logger.info("[+] : 开始测试 %s 标准" % spec_url)
                self.check_spec(spec_url, url)
                # break
    
        def get_proxy_dict(self):
            proxies = {
                "http": "http://{}:{}".format(self.proxy_ip, self.proxy_port),
                "https": "http://{}:{}".format(self.proxy_ip, self.proxy_port),
            }
            return proxies
    
        def get_specs(self, url):  # 获取标准列表
            specs_url = url + "/swagger-resources"
            res = requests.get(url=specs_url, proxies=self.get_proxy_dict(), verify=False)
            specs = json.loads(res.text)
            return specs
    
        def check_spec(self, spec_url, url):  # 前一个是接口文档,用于分析,后一个是文档对应的实际接口请求地址
            res = requests.get(url=spec_url, proxies=self.get_proxy_dict(), verify=False)
            headers = self.get_or_confuse_headers(isRest=True)
            try:
                paths = json.loads(res.text)['paths']
                logger.info("[+] : 此标准下共有 %d 个接口" % (len(paths)))
            except Exception as e:
                logger.exception(e)
                return 0
    
            for path in paths:
                logger.info("[+] : 开始测试接口 %s " % (path))
                methods = paths[path]
                for method in methods:
                    tags = paths[path][method]['tags'][0]
                    summary = paths[path][method]['summary']
                    operationId = paths[path][method]['operationId']
                    if 'consumes' in paths[path][method].keys():  # json格式
                        consumes = paths[path][method]['consumes'][0]
                    else:
                        consumes = '0'
    
                    if consumes != '0':  # 如果是json格式传输  post/put #post和post都是发送的json,但是接口文档并没有如何构造json的参数,目前只是随便发送一个
                        logger.info("使用json格式传输")
                        json_array = {}
                        if 'parameters' in paths[path][method]:
                            parameters = paths[path][method]['parameters']
                            logger.info("接口参数个数为 %d" % (len(parameters)))
                            for parameter in parameters:
                                position = parameter.get('in')
                                if position == "header":
                                    if parameter.get('type') == 'boolean':
                                        headers[parameter['name']] = 'true'
                                    else:
                                        headers[parameter['name']] = '1'
                                else:
                                    if parameter.get('type') == "boolean":  # 布尔型全为true,string和数字全部为1
                                        json_array[parameter['name']] = 'true'
                                    else:
                                        json_array[parameter['name']] = '1'
                        else:
                            json_array = ''
                            logger.info("接口参数个数为 %d" % (0))
    
                        logger.info("构造请求参数...")
                        json_string = json.dumps(json_array)
                        logger.info(json_string)
    
                        if method == "post":
                            res = requests.post(url=url + path, data=json_string, headers=headers, verify=False,
                                                proxies=self.get_proxy_dict())
                        elif method == "put":
                            logger.info("[!] {} 存有put方法,我不敢测".format(url))
                        try:  # post居然也可能没参数
                            row = [spec_url, summary, path, method, consumes, url + path,
                                   str(len(paths[path][method]['parameters'])), json_string, res.status_code, res.text]
                        except:
                            row = [spec_url, summary, path, method, consumes, url + path, '0', json_string, res.status_code,
                                   res.text]
                        self.writer.writerow(row)
                    else:  # 不是json传输
                        if "{" in path:
                            parameter = paths[path][method]['parameters'][0]
                            print(parameter)
                            try:
                                if parameter.get('type') == "boolean":  # 布尔型全为true,string和数字全部为1
                                    tmp = "true"
                                else:
                                    tmp = "1"
                            except Exception as e:
                                tmp = "{1}"
                                logger.exception(e)
                            if method == 'get':
                                res = requests.get(url=url + path[:path.index('{')] + tmp, verify=False,
                                                   proxies=self.get_proxy_dict())
                            elif method == 'delete':
                                logger.info("[!] {} 存有delete方法,我不敢测".format(url))
    
                            row = [spec_url, summary, path, method, consumes, url + path[:path.index('{')],
                                   str(len(paths[path][method]['parameters'])), "", res.status_code, res.text]
                            self.writer.writerow(row)
    
                        else:
                            query_string = ''
                            if 'parameters' in paths[path][method]:
                                parameters = paths[path][method]['parameters']
                                num_of_param = len(parameters)
                                for parameter in parameters:
                                    try:
                                        if parameter['type'] == "boolean":  # 布尔型全为true,string和数字全部为1
                                            query_string += "&%s=true" % (parameter['name'])
                                        else:
                                            query_string += "&%s=1" % (parameter['name'])
                                    except:
                                        query_string += "&%s={1}" % (parameter['name'])
                            else:
                                query_string = ''
                                num_of_param = 0
                            query_string = query_string[1:]
    
                            if method == "get":
                                res = requests.get(url=url + path + "?" + query_string, verify=False,
                                                   proxies=self.get_proxy_dict())
                            elif method == "delete":
                                logger.info("[!] {} 存有delete方法,我不敢测".format(url))
    
                            row = [spec_url, summary, path, method, consumes, url + path + "?" + query_string,
                                   str(num_of_param), "", res.status_code, res.text]
                            self.writer.writerow(row)
    
    
    if __name__ == '__main__':
        target_url = "https://xxx.xxx.xxx/"
        swaggerHack = SwaggerHack("localhost", 8080)
        swaggerHack.run(target_url)
    
    
    opened by Kai5174 3
Owner
jayus
Web安全划水选手
jayus