相较原repo改动如下
- 增加了XFF
- 增加了代理功能
- 解决了HTTPS报错的问题
- 更改为class方便集成
- 增加了shiro组件检测
- 增加了假的Referer
- 增加了随机Agent
- 删除了PUT/DELETE测试,降低破坏目标站点的风险
- 我测的目标站点存在无
type
字段的field,使用Dict.get('type')而不是Dict['type']防止程序崩溃
个人比较喜欢用这个做logger,因此要新加一个依赖
pip install loguru
代码如下:
import json
import random
import requests
import csv
from loguru import logger
import urllib3
urllib3.disable_warnings()
USER_AGENTS = [
"Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.8) Gecko Fedora/1.9.0.8-1.fc10 Kazehakase/0.5.6",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/535.20 (KHTML, like Gecko) Chrome/19.0.1036.7 Safari/535.20",
"Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; fr) Presto/2.9.168 Version/11.52",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:81.0) Gecko/20100101 Firefox/81.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/535.20 (KHTML, like Gecko) Chrome/19.0.1036.7 Safari/535.20",
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
"Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
"Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 2.0.50727; Media Center PC 6.0)",
"Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 1.0.3705; .NET CLR 1.1.4322)",
"Mozilla/4.0 (compatible; MSIE 7.0b; Windows NT 5.2; .NET CLR 1.1.4322; .NET CLR 2.0.50727; InfoPath.2; .NET CLR 3.0.04506.30)",
"Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN) AppleWebKit/523.15 (KHTML, like Gecko, Safari/419.3) Arora/0.3 (Change: 287 c9dfb30)",
]
class HttpMixin:
def _random_x_forwarded_for(self):
return "%d.%d.%d.%d" % (
random.randint(1, 254),
random.randint(1, 254),
random.randint(1, 254),
random.randint(1, 254),
)
def get_or_confuse_headers(self, headers=None, testShiro=False, isRest=False):
if headers is None:
headers = {
"User-Agent": random.choice(USER_AGENTS),
"X-Forwarded-For": self._random_x_forwarded_for(),
# "Referer": "http://www.baidu.com",
}
if not testShiro:
headers['Cookie'] = "rememberMe=xx"
else:
headers["User-Agent"] = random.choice(USER_AGENTS)
headers["X-Forwarded-For"] = self._random_x_forwarded_for()
headers["Referer"] = "http://www.baidu.com"
if headers.get("Cookie"):
if 'rememberMe=xx' not in headers.get("Cookie"):
headers["Cookie"] += ";" + "rememberMe=xx"
else:
pass
else:
headers["Cookie"] = "rememberMe=xx"
if isRest:
headers["Content-Type"] = "Application/json"
return headers
class SwaggerHack(HttpMixin):
def __init__(self, proxy_ip, proxy_port):
f = open('swagger.csv', 'w', newline='', encoding='utf-8') # 写到csv中
self.writer = csv.writer(f)
self.proxy_ip = proxy_ip
self.proxy_port = proxy_port
def run(self, url: str):
specs = self.get_specs(url)
logger.info("[+] 共抓取到 %d 个标准" % (len(specs)))
try:
self.writer.writerow(
["标准", "summary", "path", "method", "consumes", "url", "num of params", "data", "status_code",
"response"])
except Exception as e:
logger.info(e)
for spec in specs:
spec_url = url + spec['url']
pre = spec['url'].split('/')[1]
logger.info("[+] : 开始测试 %s 标准" % spec_url)
self.check_spec(spec_url, url)
# break
def get_proxy_dict(self):
proxies = {
"http": "http://{}:{}".format(self.proxy_ip, self.proxy_port),
"https": "http://{}:{}".format(self.proxy_ip, self.proxy_port),
}
return proxies
def get_specs(self, url): # 获取标准列表
specs_url = url + "/swagger-resources"
res = requests.get(url=specs_url, proxies=self.get_proxy_dict(), verify=False)
specs = json.loads(res.text)
return specs
def check_spec(self, spec_url, url): # 前一个是接口文档,用于分析,后一个是文档对应的实际接口请求地址
res = requests.get(url=spec_url, proxies=self.get_proxy_dict(), verify=False)
headers = self.get_or_confuse_headers(isRest=True)
try:
paths = json.loads(res.text)['paths']
logger.info("[+] : 此标准下共有 %d 个接口" % (len(paths)))
except Exception as e:
logger.exception(e)
return 0
for path in paths:
logger.info("[+] : 开始测试接口 %s " % (path))
methods = paths[path]
for method in methods:
tags = paths[path][method]['tags'][0]
summary = paths[path][method]['summary']
operationId = paths[path][method]['operationId']
if 'consumes' in paths[path][method].keys(): # json格式
consumes = paths[path][method]['consumes'][0]
else:
consumes = '0'
if consumes != '0': # 如果是json格式传输 post/put #post和post都是发送的json,但是接口文档并没有如何构造json的参数,目前只是随便发送一个
logger.info("使用json格式传输")
json_array = {}
if 'parameters' in paths[path][method]:
parameters = paths[path][method]['parameters']
logger.info("接口参数个数为 %d" % (len(parameters)))
for parameter in parameters:
position = parameter.get('in')
if position == "header":
if parameter.get('type') == 'boolean':
headers[parameter['name']] = 'true'
else:
headers[parameter['name']] = '1'
else:
if parameter.get('type') == "boolean": # 布尔型全为true,string和数字全部为1
json_array[parameter['name']] = 'true'
else:
json_array[parameter['name']] = '1'
else:
json_array = ''
logger.info("接口参数个数为 %d" % (0))
logger.info("构造请求参数...")
json_string = json.dumps(json_array)
logger.info(json_string)
if method == "post":
res = requests.post(url=url + path, data=json_string, headers=headers, verify=False,
proxies=self.get_proxy_dict())
elif method == "put":
logger.info("[!] {} 存有put方法,我不敢测".format(url))
try: # post居然也可能没参数
row = [spec_url, summary, path, method, consumes, url + path,
str(len(paths[path][method]['parameters'])), json_string, res.status_code, res.text]
except:
row = [spec_url, summary, path, method, consumes, url + path, '0', json_string, res.status_code,
res.text]
self.writer.writerow(row)
else: # 不是json传输
if "{" in path:
parameter = paths[path][method]['parameters'][0]
print(parameter)
try:
if parameter.get('type') == "boolean": # 布尔型全为true,string和数字全部为1
tmp = "true"
else:
tmp = "1"
except Exception as e:
tmp = "{1}"
logger.exception(e)
if method == 'get':
res = requests.get(url=url + path[:path.index('{')] + tmp, verify=False,
proxies=self.get_proxy_dict())
elif method == 'delete':
logger.info("[!] {} 存有delete方法,我不敢测".format(url))
row = [spec_url, summary, path, method, consumes, url + path[:path.index('{')],
str(len(paths[path][method]['parameters'])), "", res.status_code, res.text]
self.writer.writerow(row)
else:
query_string = ''
if 'parameters' in paths[path][method]:
parameters = paths[path][method]['parameters']
num_of_param = len(parameters)
for parameter in parameters:
try:
if parameter['type'] == "boolean": # 布尔型全为true,string和数字全部为1
query_string += "&%s=true" % (parameter['name'])
else:
query_string += "&%s=1" % (parameter['name'])
except:
query_string += "&%s={1}" % (parameter['name'])
else:
query_string = ''
num_of_param = 0
query_string = query_string[1:]
if method == "get":
res = requests.get(url=url + path + "?" + query_string, verify=False,
proxies=self.get_proxy_dict())
elif method == "delete":
logger.info("[!] {} 存有delete方法,我不敢测".format(url))
row = [spec_url, summary, path, method, consumes, url + path + "?" + query_string,
str(num_of_param), "", res.status_code, res.text]
self.writer.writerow(row)
if __name__ == '__main__':
target_url = "https://xxx.xxx.xxx/"
swaggerHack = SwaggerHack("localhost", 8080)
swaggerHack.run(target_url)