神刀安全网

中间件漏洞检测框架(F-MiddlewareScan)

纯python编写的轻量级中间件漏洞检测框架,实现针对中间件的自动化检测,端口探测->中间件识别->漏洞检测->获取webshell 

参数说明 

-h 必须输入的参数,支持ip(192.168.1.1),ip段(192.168.1),ip范围指定(192.168.1.1-192.168.1.254),最多限制一次可扫描65535个IP。 

-p 指定要扫描端口列表,多个端口使用,隔开 例如:7001,8080,9999。未指定即使用内置默认端口进行扫描(80,4848,7001,7002,8000,8001,8080,8081,8888,9999,9043,9080) 

-m 指定线程数量 默认100线程 

-t 指定HTTP请求超时时间,默认为10秒,端口扫描超时为值的1/2。 

漏洞检测脚本以插件形式存在,可以自定义添加修改漏洞插件,存放于plugins目录,插件标准非常简单,只需对传入的IP,端口,超时进行操作,成功返回“YES|要打印出来的信息”即可。 

新增插件需要在 plugin_config.ini配置文件中新增关联(多个漏洞插件以逗号隔开)。 

中间件识别在discern_config.ini文件中配置(支持文件内容和header识别) 

目前内置了19个漏洞插件,希望大家可以一起编写更多的插件,目前还缺少weblogic自动部署和反序列化探测以及中间件的反序列化自动获取webshell的插件等等。 

周末感冒无事,除了吃药意外就是发呆了。好友说想要修改一下,增加CMS识别以及同服查询的功能。动手开始做

def exploit(URL, Thread):     w = cms.WhatWeb(URL, Thread)     w.run()     if w.result:         return w.result  def whatcms(scan_type,task_host,task_port):     task_port = '80'     if task_host.find('http') == -1:         URL = 'http://'+str(task_host)     elif task_host.find('///') !=1 and task_host.find('~') == -1:         URL = str(task_host.replace('///','://'))     elif task_host.find('///') !=1 and task_host.find('~') != -1:         URL = task_host.replace('///','://').replace('~',':').rstrip('/')     log(scan_type,URL,task_port)     Thread = 40     try:         r = requests.get(URL, timeout=15, verify=False)         if r.status_code == 200:             return exploit(URL, Thread)     except Exception as e:         #print str(e)         return  def ip2host_get(scan_type,host,port):     ip2hosts = []     try:         req=requests.get('http://www.ip2hosts.com/search.php?ip='+str(host), timeout=45, verify=False)         src=req.content         if src.find('result') != -1:             result = json.loads(src)['result']             ip = json.loads(src)['ip']             if len(result)>0:                 for item in result:                     if len(item)>0:                         #log(scan_type,host,port,str(item))                         ip2hosts.append(item.replace('://','///').replace(':','~'))     except Exception, e:         print str(e)         pass     return ip2hosts

再次修改了其中的顺序,

def run(self):         while True:             queue_task = self.queue.get()             task_type,task_host,task_port = queue_task.split(":")             if task_type == 'portscan':                 port_status = scan_port(task_type,task_host,task_port)                 if port_status == True:                     #如果端口开发,推送到任务                     queue.put(":".join(['ip2host_get',task_host,task_port]))             elif task_type == 'ip2host_get':                 #针对存货IP发起旁站查询                 result = []                 urls = ip2host_get(task_type,task_host,task_port)                 #queue.put(":".join(['discern',task_host,task_port]))                 urls.insert(0,task_host)                 result.extend(urls)                 urls = list(set(result))                 if len(urls)>0:                     #list can not use find                     for url in urls:                         if len(url)>0:                             #print url                             #put url in queue,but some qestion in Threads and queue                             queue.put(":".join(['whatcms',str(url),task_port]))             elif task_type == 'whatcms':                 cms = whatcms(task_type,task_host,task_port)                 queue.put(":".join(['discern',task_host,task_port]))                 if cms == None:                     "go on 但是没什么乱用"                     #以后增加插件式扫描              elif task_type == 'discern':                 #针对中间件的识别                 discern_type = scan_discern(task_type,task_host,task_port)                 if discern_type:                     queue.put(":".join([discern_type,task_host,task_port]))             else:                 scan_vul(task_type,task_host,task_port)             self.queue.task_done()

中间件漏洞检测框架(F-MiddlewareScan)

但是问题来了,线程经常性的奔溃掉,然后就无奈了

中间件漏洞检测框架(F-MiddlewareScan)

然后发现了一个有意思的东西https://raw.githubusercontent.com/erevus-cn/pocscan/master/web/tasks.py

# coding:utf-8 import gevent from gevent.pool import Pool from web.lib.utils import * from pocscan.poc_launcher import Poc_Launcher from celery import Celery, platforms  app = Celery()  # 允许celery以root权限启动 platforms.C_FORCE_ROOT = True  # 修改celery的全局配置 app.conf.update(     CELERY_IMPORTS = ("tasks", ),     BROKER_URL = 'amqp://guest:guest@localhost:5672/',     CELERY_RESULT_BACKEND = 'db+mysql://root:123456@127.0.0.1:3306/pocscan',     CELERY_TASK_SERIALIZER='json',     CELERY_RESULT_SERIALIZER='json',     CELERY_TIMEZONE='Asia/Shanghai',     CELERY_ENABLE_UTC=True,     BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600}, # 如果任务没有在 可见性超时 内确认接收,任务会被重新委派给另一个Worker并执行  默认1 hour.     CELERYD_CONCURRENCY = 50 ,     CELERY_TASK_RESULT_EXPIRES = 1200,  # celery任务执行结果的超时时间,我的任务都不需要返回结     # BROKER_TRANSPORT_OPTIONS = {'fanout_prefix': True},       # 设置一个传输选项来给消息加上前缀 )  # 失败任务重启休眠时间300秒,最大重试次数5次 #@app.task(bind=True, default_retry_delay=300, max_retries=5) @app.task(time_limit=3600) def run_task_in_gevent(url_list, poc_file_dict):     # url_list 每个进程分配到一定量的url     poc = Poc_Launcher()     pool = Pool(100)     for target in url_list:         for plugin_type,poc_files in poc_file_dict.iteritems():             for poc_file in poc_files:                 if target and poc_file:                     target = fix_target(target)                     pool.add(gevent.spawn(poc.poc_verify, target, plugin_type, poc_file))     pool.join()

搜了下Celery,专门用于解决任务队列用于分发工作给不同线程。回头研究下

参考文章:

http://docs.jinkan.org/docs/celery/

http://my.oschina.net/u/2306127/blog/417360

http://rfyiamcool.blog.51cto.com/1030776/1325062

http://www.tuicool.com/articles/qi6Nve

标签:Celery, ip2host , whatcms

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » 中间件漏洞检测框架(F-MiddlewareScan)

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
分享按钮