diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..af2f537 --- /dev/null +++ b/.gitignore @@ -0,0 +1,104 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +.static_storage/ +.media/ +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ diff --git a/README b/README new file mode 100644 index 0000000..fe02742 --- /dev/null +++ b/README @@ -0,0 +1,26 @@ +Configure Kafka topics (run on one kafka node) +doc/kafka_topics.sh + +Initialize elasticsearch: +curl -X PUT 'http://:9200/threatline' -d@doc/es_mapping.json + +Install service file (FreeBSD): +cp doc/threatline /usr/local/etc/rc.d/threatline + +Enable threatline: +sysrc threatline_enable=YES +sysrc threatline_agents="normalize enrich check archive" + +Start threatline: +service threatline start + +Monitor logs: +tail -f /tmp/tl_worker.log + + +Stages: +Normalize: Touch-up/rename fields, etc. +Enrich: Enrich and part of the message. +Check: Checks parts of message (now enriched) against known bad stuff. +Archive: Push document into elasticsearch. Can also log to file. + diff --git a/doc/APACHE_KAFKA.tgz b/doc/APACHE_KAFKA.tgz new file mode 100644 index 0000000..b43a0cd Binary files /dev/null and b/doc/APACHE_KAFKA.tgz differ diff --git a/doc/es_mapping.json b/doc/es_mapping.json new file mode 100644 index 0000000..53c89b9 --- /dev/null +++ b/doc/es_mapping.json @@ -0,0 +1,2117 @@ +{ + "template": "threatline*", + "settings" : { + "index" : { + "number_of_shards" : 3, + "number_of_replicas" : 2 + } + }, + "mappings": { + "capture_loss": { + "_all": { + "enabled": false + }, + "properties": { + "acks": { + "type": "long" + }, + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "enrichment": { + "type": "object" + }, + "gaps": { + "type": "long" + }, + "peer": { + "type": "keyword" + }, + "percent_lost": { + "type": "double" + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "ts_delta": { + "type": "double" + } + } + }, + "communication": { + "_all": { + "enabled": false + }, + "properties": { + "connected_peer_addr": { + "type": "ip" + }, + "connected_peer_desc": { + "type": "keyword" + }, + "connected_peer_port": { + "type": "keyword" + }, + "level": { + "type": "keyword" + }, + "message": { + "type": "keyword" + }, + "peer": { + "type": "keyword" + }, + "src_name": { + "type": "keyword" + }, + "ts": { + "format": "epoch_second", + "type": "date" + } + } + }, + "conn": { + "_all": { + "enabled": false + }, + "properties": { + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "conn_state": { + "type": "keyword" + }, + "duration": { + "type": "double" + }, + "enrichment": { + "properties": { + "ip": { + "properties": { + "asn_country_code": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "asn_desc": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "asn_num": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "network": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "whois": { + "properties": { + "email": { + "properties": { + "value": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + } + } + }, + "kind": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "name": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "phone": { + "properties": { + "type": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "value": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + } + } + } + } + } + } + } + } + }, + "enrichmment": { + "type": "object" + }, + "history": { + "type": "keyword" + }, + "id": { + "properties": { + "orig_h": { + "type": "ip" + }, + "orig_p": { + "type": "keyword" + }, + "resp_h": { + "type": "ip" + }, + "resp_p": { + "type": "keyword" + } + } + }, + "local_orig": { + "type": "boolean" + }, + "local_resp": { + "type": "boolean" + }, + "missed_bytes": { + "type": "long" + }, + "orig_bytes": { + "type": "long" + }, + "orig_ip_bytes": { + "type": "long" + }, + "orig_l2_addr": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "orig_pkts": { + "type": "long" + }, + "proto": { + "type": "keyword" + }, + "resp_bytes": { + "type": "long" + }, + "resp_ip_bytes": { + "type": "long" + }, + "resp_l2_addr": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "resp_pkts": { + "type": "long" + }, + "service": { + "type": "keyword" + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "tunnel_parents": { + "type": "keyword" + }, + "uid": { + "type": "keyword" + } + } + }, + "dns": { + "_all": { + "enabled": false + }, + "properties": { + "AA": { + "type": "boolean" + }, + "RA": { + "type": "boolean" + }, + "RD": { + "type": "boolean" + }, + "TC": { + "type": "boolean" + }, + "TTLs": { + "type": "double" + }, + "Z": { + "type": "long" + }, + "answers": { + "type": "keyword" + }, + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "enrichment": { + "type": "object" + }, + "id": { + "properties": { + "orig_h": { + "type": "ip" + }, + "orig_p": { + "type": "keyword" + }, + "resp_h": { + "type": "ip" + }, + "resp_p": { + "type": "keyword" + } + } + }, + "proto": { + "type": "keyword" + }, + "qclass": { + "type": "long" + }, + "qclass_name": { + "type": "keyword" + }, + "qtype": { + "type": "long" + }, + "qtype_name": { + "type": "keyword" + }, + "query": { + "type": "keyword" + }, + "rcode": { + "type": "long" + }, + "rcode_name": { + "type": "keyword" + }, + "rejected": { + "type": "boolean" + }, + "rtt": { + "type": "double" + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "trans_id": { + "type": "long" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "uid": { + "type": "keyword" + } + } + }, + "dpd": { + "_all": { + "enabled": false + }, + "properties": { + "analyzer": { + "type": "keyword" + }, + "failure_reason": { + "type": "keyword" + }, + "id": { + "properties": { + "orig_h": { + "type": "ip" + }, + "orig_p": { + "type": "keyword" + }, + "resp_h": { + "type": "ip" + }, + "resp_p": { + "type": "keyword" + } + } + }, + "proto": { + "type": "keyword" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "uid": { + "type": "keyword" + } + } + }, + "files": { + "_all": { + "enabled": false + }, + "properties": { + "analyzers": { + "type": "keyword" + }, + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "conn_uids": { + "type": "keyword" + }, + "depth": { + "type": "long" + }, + "duration": { + "type": "double" + }, + "enrichment": { + "type": "object" + }, + "extracted": { + "type": "keyword" + }, + "extracted_cutoff": { + "type": "boolean" + }, + "extracted_size": { + "type": "long" + }, + "filename": { + "type": "keyword" + }, + "fuid": { + "type": "keyword" + }, + "is_orig": { + "type": "boolean" + }, + "local_orig": { + "type": "boolean" + }, + "md5": { + "type": "keyword" + }, + "mime_type": { + "type": "keyword" + }, + "missing_bytes": { + "type": "long" + }, + "overflow_bytes": { + "type": "long" + }, + "parent_fuid": { + "type": "keyword" + }, + "rx_hosts": { + "type": "ip" + }, + "seen_bytes": { + "type": "long" + }, + "sha1": { + "type": "keyword" + }, + "sha256": { + "type": "keyword" + }, + "source": { + "type": "keyword" + }, + "timedout": { + "type": "boolean" + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "total_bytes": { + "type": "long" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "tx_hosts": { + "type": "ip" + } + } + }, + "http": { + "_all": { + "enabled": false + }, + "properties": { + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "enrichment": { + "type": "object" + }, + "host": { + "type": "keyword" + }, + "id": { + "properties": { + "orig_h": { + "type": "ip" + }, + "orig_p": { + "type": "keyword" + }, + "resp_h": { + "type": "ip" + }, + "resp_p": { + "type": "keyword" + } + } + }, + "info_code": { + "type": "long" + }, + "info_msg": { + "type": "keyword" + }, + "method": { + "type": "keyword" + }, + "orig_filenames": { + "type": "keyword" + }, + "orig_fuids": { + "type": "keyword" + }, + "orig_mime_types": { + "type": "keyword" + }, + "password": { + "type": "keyword" + }, + "proxied": { + "type": "keyword" + }, + "referrer": { + "type": "keyword" + }, + "request_body_len": { + "type": "long" + }, + "resp_filenames": { + "type": "keyword" + }, + "resp_fuids": { + "type": "keyword" + }, + "resp_mime_types": { + "type": "keyword" + }, + "response_body_len": { + "type": "long" + }, + "status_code": { + "type": "long" + }, + "status_msg": { + "type": "keyword" + }, + "tags": { + "type": "keyword" + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "trans_depth": { + "type": "long" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "uid": { + "type": "keyword" + }, + "uri": { + "type": "keyword" + }, + "user_agent": { + "type": "keyword" + }, + "username": { + "type": "keyword" + }, + "version": { + "type": "keyword" + } + } + }, + "loaded_scripts": { + "_all": { + "enabled": false + }, + "properties": { + "name": { + "type": "keyword" + }, + "ts": { + "format": "epoch_second", + "type": "date" + } + } + }, + "notice": { + "_all": { + "enabled": false + }, + "properties": { + "actions": { + "type": "keyword" + }, + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "dropped": { + "type": "boolean" + }, + "dst": { + "type": "ip" + }, + "enrichment": { + "type": "object" + }, + "file_desc": { + "type": "keyword" + }, + "file_mime_type": { + "type": "keyword" + }, + "fuid": { + "type": "keyword" + }, + "id": { + "properties": { + "orig_h": { + "type": "ip" + }, + "orig_p": { + "type": "keyword" + }, + "resp_h": { + "type": "ip" + }, + "resp_p": { + "type": "keyword" + } + } + }, + "msg": { + "type": "keyword" + }, + "n": { + "type": "long" + }, + "note": { + "type": "keyword" + }, + "p": { + "type": "keyword" + }, + "peer_descr": { + "type": "keyword" + }, + "proto": { + "type": "keyword" + }, + "remote_location": { + "properties": { + "city": { + "type": "keyword" + }, + "country_code": { + "type": "keyword" + }, + "latitude": { + "type": "double" + }, + "longitude": { + "type": "double" + }, + "region": { + "type": "keyword" + } + } + }, + "src": { + "type": "ip" + }, + "sub": { + "type": "keyword" + }, + "suppress_for": { + "type": "double" + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "uid": { + "type": "keyword" + } + } + }, + "packet_filter": { + "_all": { + "enabled": false + }, + "properties": { + "filter": { + "type": "keyword" + }, + "init": { + "type": "boolean" + }, + "node": { + "type": "keyword" + }, + "success": { + "type": "boolean" + }, + "ts": { + "format": "epoch_second", + "type": "date" + } + } + }, + "pe": { + "properties": { + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "compile_ts": { + "type": "float" + }, + "enrichment": { + "type": "object" + }, + "has_cert_table": { + "type": "boolean" + }, + "has_debug_data": { + "type": "boolean" + }, + "has_export_table": { + "type": "boolean" + }, + "has_import_table": { + "type": "boolean" + }, + "is_64bit": { + "type": "boolean" + }, + "is_exe": { + "type": "boolean" + }, + "machine": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "os": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "section_names": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "subsystem": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "uid": { + "type": "keyword" + }, + "uses_aslr": { + "type": "boolean" + }, + "uses_code_integrity": { + "type": "boolean" + }, + "uses_dep": { + "type": "boolean" + }, + "uses_seh": { + "type": "boolean" + } + } + }, + "rdp": { + "properties": { + "cert_count": { + "type": "long" + }, + "cookie": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "id": { + "properties": { + "orig_h": { + "type": "ip" + }, + "orig_p": { + "type": "keyword" + }, + "resp_h": { + "type": "ip" + }, + "resp_p": { + "type": "keyword" + } + } + }, + "result": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "security_protocol": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "uid": { + "type": "keyword" + } + } + }, + "reporter": { + "_all": { + "enabled": false + }, + "properties": { + "level": { + "type": "keyword" + }, + "location": { + "type": "keyword" + }, + "message": { + "type": "keyword" + }, + "ts": { + "format": "epoch_second", + "type": "date" + } + } + }, + "smb_files": { + "properties": { + "action": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "enrichment": { + "type": "object" + }, + "fuid": { + "type": "keyword" + }, + "id": { + "properties": { + "orig_h": { + "type": "ip" + }, + "orig_p": { + "type": "keyword" + }, + "resp_h": { + "type": "ip" + }, + "resp_p": { + "type": "keyword" + } + } + }, + "name": { + "type": "keyword" + }, + "path": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "prev_name": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "size": { + "type": "long" + }, + "times": { + "properties": { + "accessed": { + "type": "float" + }, + "changed": { + "type": "float" + }, + "created": { + "type": "float" + }, + "modified": { + "type": "float" + } + } + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "uid": { + "type": "keyword" + } + } + }, + "smb_mapping": { + "properties": { + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "id": { + "properties": { + "orig_h": { + "type": "ip" + }, + "orig_p": { + "type": "keyword" + }, + "resp_h": { + "type": "ip" + }, + "resp_p": { + "type": "keyword" + } + } + }, + "path": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "share_type": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "uid": { + "type": "keyword" + } + } + }, + "smtp": { + "properties": { + "cc": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "date": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "enrichment": { + "type": "object" + }, + "first_received": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "from": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "fuids": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "helo": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "id": { + "properties": { + "orig_h": { + "type": "ip" + }, + "orig_p": { + "type": "keyword" + }, + "resp_h": { + "type": "ip" + }, + "resp_p": { + "type": "keyword" + } + } + }, + "in_reply_to": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "is_webmail": { + "type": "boolean" + }, + "last_reply": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "mailfrom": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "msg_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "path": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "rcptto": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "reply_to": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "subject": { + "type": "keyword" + }, + "tls": { + "type": "boolean" + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "to": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "trans_depth": { + "type": "long" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "uid": { + "type": "keyword" + }, + "user_agent": { + "type": "keyword" + } + } + }, + "snmp": { + "properties": { + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "community": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "duration": { + "type": "double" + }, + "enrichment": { + "type": "object" + }, + "get_bulk_requests": { + "type": "long" + }, + "get_requests": { + "type": "long" + }, + "get_responses": { + "type": "long" + }, + "id": { + "properties": { + "orig_h": { + "type": "ip" + }, + "orig_p": { + "type": "keyword" + }, + "resp_h": { + "type": "ip" + }, + "resp_p": { + "type": "keyword" + } + } + }, + "set_requests": { + "type": "long" + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "uid": { + "type": "keyword" + }, + "version": { + "type": "keyword" + } + } + }, + "software": { + "properties": { + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "enrichment": { + "type": "object" + }, + "host": { + "type": "keyword" + }, + "host_p": { + "type": "long" + }, + "name": { + "type": "keyword" + }, + "software_type": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "unparsed_version": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "version_addl": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "version_major": { + "type": "long" + }, + "version_minor": { + "type": "long" + }, + "version_minor2": { + "type": "long" + }, + "version_minor3": { + "type": "long" + } + } + }, + "ssh": { + "properties": { + "auth_attempts": { + "type": "long" + }, + "auth_success": { + "type": "boolean" + }, + "cipher_alg": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "client": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "compression_alg": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "host_key": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "host_key_alg": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "id": { + "properties": { + "orig_h": { + "type": "ip" + }, + "orig_p": { + "type": "keyword" + }, + "resp_h": { + "type": "ip" + }, + "resp_p": { + "type": "keyword" + } + } + }, + "kex_alg": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "mac_alg": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "server": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "uid": { + "type": "keyword" + }, + "version": { + "type": "keyword" + } + } + }, + "ssl": { + "_all": { + "enabled": false + }, + "properties": { + "cert_chain_fuids": { + "type": "keyword" + }, + "cipher": { + "type": "keyword" + }, + "client_cert_chain_fuids": { + "type": "keyword" + }, + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "client_issuer": { + "type": "keyword" + }, + "client_subject": { + "type": "keyword" + }, + "curve": { + "type": "keyword" + }, + "enrichment": { + "type": "object" + }, + "established": { + "type": "boolean" + }, + "id": { + "properties": { + "orig_h": { + "type": "ip" + }, + "orig_p": { + "type": "keyword" + }, + "resp_h": { + "type": "ip" + }, + "resp_p": { + "type": "keyword" + } + } + }, + "issuer": { + "type": "keyword" + }, + "last_alert": { + "type": "keyword" + }, + "next_protocol": { + "type": "keyword" + }, + "resumed": { + "type": "boolean" + }, + "server_name": { + "type": "keyword" + }, + "subject": { + "type": "keyword" + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "uid": { + "type": "keyword" + }, + "validation_status": { + "type": "keyword" + }, + "version": { + "type": "keyword" + } + } + }, + "stats": { + "_all": { + "enabled": false + }, + "properties": { + "active_dns_requests": { + "type": "long" + }, + "active_files": { + "type": "long" + }, + "active_icmp_conns": { + "type": "long" + }, + "active_tcp_conns": { + "type": "long" + }, + "active_timers": { + "type": "long" + }, + "active_udp_conns": { + "type": "long" + }, + "bytes_recv": { + "type": "long" + }, + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "dns_requests": { + "type": "long" + }, + "enrichment": { + "type": "object" + }, + "events_proc": { + "type": "long" + }, + "events_queued": { + "type": "long" + }, + "files": { + "type": "long" + }, + "icmp_conns": { + "type": "long" + }, + "mem": { + "type": "long" + }, + "peer": { + "type": "keyword" + }, + "pkt_lag": { + "type": "double" + }, + "pkts_dropped": { + "type": "long" + }, + "pkts_link": { + "type": "long" + }, + "pkts_proc": { + "type": "long" + }, + "reassem_file_size": { + "type": "long" + }, + "reassem_frag_size": { + "type": "long" + }, + "reassem_tcp_size": { + "type": "long" + }, + "reassem_unknown_size": { + "type": "long" + }, + "tcp_conns": { + "type": "long" + }, + "timers": { + "type": "long" + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "udp_conns": { + "type": "long" + } + } + }, + "tunnel": { + "properties": { + "action": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "enrichment": { + "type": "object" + }, + "id": { + "properties": { + "orig_h": { + "type": "ip" + }, + "orig_p": { + "type": "keyword" + }, + "resp_h": { + "type": "ip" + }, + "resp_p": { + "type": "keyword" + } + } + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "tunnel_type": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "uid": { + "type": "keyword" + } + } + }, + "weird": { + "_all": { + "enabled": false + }, + "properties": { + "addl": { + "type": "keyword" + }, + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "enrichment": { + "type": "object" + }, + "id": { + "properties": { + "orig_h": { + "type": "ip" + }, + "orig_p": { + "type": "keyword" + }, + "resp_h": { + "type": "ip" + }, + "resp_p": { + "type": "keyword" + } + } + }, + "name": { + "type": "keyword" + }, + "notice": { + "type": "boolean" + }, + "peer": { + "type": "keyword" + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "uid": { + "type": "keyword" + } + } + }, + "x509": { + "properties": { + "basic_constraints": { + "properties": { + "ca": { + "type": "boolean" + }, + "path_len": { + "type": "long" + } + } + }, + "certificate": { + "properties": { + "curve": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "exponent": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "issuer": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "key_alg": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "key_length": { + "type": "long" + }, + "key_type": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "not_valid_after": { + "type": "float" + }, + "not_valid_before": { + "type": "float" + }, + "serial": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "sig_alg": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "subject": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "version": { + "type": "long" + } + } + }, + "client_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "enrichment": { + "type": "object" + }, + "san": { + "properties": { + "dns": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "email": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + } + } + }, + "tltype": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + }, + "ts": { + "format": "epoch_second", + "type": "date" + }, + "uid": { + "type": "keyword" + } + } + } + } +} diff --git a/doc/kafka_topics.sh b/doc/kafka_topics.sh new file mode 100644 index 0000000..a592152 --- /dev/null +++ b/doc/kafka_topics.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +KBIN=/usr/local/share/java/kafka/bin +${KBIN}/kafka-topics.sh --zookeeper 10.15.0.38:2181 --create --topic TLINGEST-lodi --partitions=3 --replication-factor=1 +${KBIN}/kafka-topics.sh --zookeeper 10.15.0.38:2181 --create --topic TLINGEST-wyomic --partitions=3 --replication-factor=1 +${KBIN}/kafka-topics.sh --zookeeper 10.15.0.38:2181 --create --topic TLINGEST-qwf --partitions=3 --replication-factor=1 +${KBIN}/kafka-topics.sh --zookeeper 10.15.0.38:2181 --create --topic TLNORMALIZED --partitions=3 --replication-factor=1 +${KBIN}/kafka-topics.sh --zookeeper 10.15.0.38:2181 --create --topic TLENRICHED --partitions=3 --replication-factor=1 +${KBIN}/kafka-topics.sh --zookeeper 10.15.0.38:2181 --create --topic TLALERTS --partitions=3 --replication-factor=1 +${KBIN}/kafka-topics.sh --zookeeper 10.15.0.38:2181 --create --topic TLARCHIVE --partitions=3 --replication-factor=1 + diff --git a/doc/setup_pfsense.sh b/doc/setup_pfsense.sh new file mode 100644 index 0000000..22ffe22 --- /dev/null +++ b/doc/setup_pfsense.sh @@ -0,0 +1,101 @@ +# FreeBSD 11.1-RELEASE + +# Install dependencies +pkg install -y bash git flex bison cmake libpcap librdkafka python py27-sqlite3 caf swig + +# Compile Bro (no install) +# Needs compiled because build/src/bifcl is needed to compile plugins +mkdir /usr/local/src; cd /usr/local/src/ +git clone https://github.com/bro/bro +cd bro; ./configure && make -j2 + +# Compile kafka plugin (no install) +# This will generate APACHE_KAFKA.tar.gz +cd /usr/local/src/ +git clone https://github.com/apache/metron-bro-plugin-kafka.git +./configure --bro-dist=/usr/local/src/bro +make + +# Copy APACHE_KAFKA.tgz to pfsense +# Login into pfsense and enable FreeBSD repos (temporarily) +sed -i '' 's/FreeBSD: { enabled: no/FreeBSD: { enabled: yes/g' /usr/local/share/pfSense/pkg/repos/pfSense-repo.conf +sed -i '' 's/FreeBSD: { enabled: no/FreeBSD: { enabled: yes/g' /usr/local/etc/pkg/repos/FreeBSD.conf +pkg install -y bro librdkafka +sed -i '' 's/FreeBSD: { enabled: yes/FreeBSD: { enabled: no/g' /usr/local/share/pfSense/pkg/repos/pfSense-repo.conf +sed -i '' 's/FreeBSD: { enabled: yes/FreeBSD: { enabled: no/g' /usr/local/etc/pkg/repos/FreeBSD.conf +pkg update + +# Extract plugin and enable it +tar xzf APACHE_KAFKA.tgz -C /usr/local/lib/bro/plugins +cat > /usr/local/share/bro/site/local.bro < /usr/local/etc/node.cfg < /usr/local/etc/broctl.cfg </dev/null + fi +} + +threatline_restart() +{ + threatline_stop + threatline_start +} + +run_rc_command "$1" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..06ebce0 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +elasticsearch +kafka-python +lz4 +ipwhois +ipaddress +requests +netaddr diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..7f1969f --- /dev/null +++ b/setup.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python + +from distutils.core import setup + +setup( + name='ThreatLine', + version='0.1', + packages=['threatline'], + license='Creative Commons Attribution-Noncommercial-Share Alike license', + long_description=open('README.txt').read(), +) diff --git a/threatline/handlers/__init__.py b/threatline/handlers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/threatline/handlers/archive.py b/threatline/handlers/archive.py new file mode 100644 index 0000000..52e561a --- /dev/null +++ b/threatline/handlers/archive.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +# Archiving handler + +import sys +from base import BaseHandler +from utils.archive_utils import ElasticLogger + + +class Archive(BaseHandler): + + def handle_conn(self, message): + self.archiver.send(message) + + def handle_dhcp(self, message): + self.archiver.send(message) + + def handle_dce_rpc(self, message): + self.archiver.send(message) + + def handle_known_devices(self, message): + self.archiver.send(message) + + def handle_dns(self, message): + self.archiver.send(message) + + def handle_files(self, message): + self.archiver.send(message) + + def handle_http(self, message): + self.archiver.send(message) + + def handle_notice(self, message): + self.archiver.send(message) + + def handle_smtp(self, message): + self.archiver.send(message) + + def handle_snmp(self, message): + self.archiver.send(message) + + def handle_software(self, message): + self.archiver.send(message) + + def handle_ssh(self, message): + self.archiver.send(message) + + def handle_ssl(self, message): + self.archiver.send(message) + + def handle_stats(self, message): + self.archiver.send(message) + + def handle_syslog(self, message): + self.archiver.send(message) + + def handle_weird(self, message): + self.archiver.send(message) + + def handle_x509(self, message): + self.archiver.send(message) + + def handle_intel(self, message): + self.archiver.send(message) + + def handle_capture_loss(self, message): + self.archiver.send(message) + + def handle_communication(self, message): + self.archiver.send(message) + + def handle_ntlm(self, message): + self.archiver.send(message) + + def handle_pe(self, message): + self.archiver.send(message) + + def handle_smb_files(self, message): + self.archiver.send(message) + + def handle_smb_mapping(self, message): + self.archiver.send(message) + + def handle_tunnel(self, message): + self.archiver.send(message) + + def handle_rdp(self, message): + self.archiver.send(message) + + def __init__(self): + super(Archive, self).__init__() + self.mymod = sys.modules[__name__].Archive + self.settings = { + 'consumer_topic': 'TLARCHIVE', + 'consumer_group': 'archive_group', + 'producer_topic': 'TLDONE', + 'producer_client_id': 'archiver-', + 'dispatchers': {} # will be generated at initialize() + } + # self.archiver = FileLogger('threatline.log') + self.archiver = ElasticLogger() diff --git a/threatline/handlers/base.py b/threatline/handlers/base.py new file mode 100644 index 0000000..f7b9822 --- /dev/null +++ b/threatline/handlers/base.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +# Ingestion handler + +import sys +from functools import partial + + +class BaseHandler(object): + + def __init__(self): + self.settings = {} + self.dispatch = {} + self.mymod = None + + def handle_conn(self, message): + return message + + def handle_dce_rpc(self, message): + return message + + def handle_known_devices(self, message): + return message + + def handle_dhcp(self, message): + return message + + def handle_dns(self, message): + return message + + def handle_files(self, message): + return message + + def handle_http(self, message): + return message + + def handle_notice(self, message): + return message + + def handle_smtp(self, message): + return message + + def handle_snmp(self, message): + return message + + def handle_software(self, message): + return message + + def handle_ssh(self, message): + return message + + def handle_ssl(self, message): + return message + + def handle_stats(self, message): + return message + + def handle_syslog(self, message): + return message + + def handle_weird(self, message): + return message + + def handle_x509(self, message): + return message + + def handle_intel(self, message): + return message + + def handle_capture_loss(self, message): + return message + + def handle_communication(self, message): + return message + + def handle_ntlm(self, message): + return message + + def handle_pe(self, message): + return message + + def handle_smb_files(self, message): + return message + + def handle_smb_mapping(self, message): + return message + + def handle_tunnel(self, message): + return message + + def handle_rdp(self, message): + return message + + def initialize(self): + for lm in dir(self.mymod): + if lm.startswith('handle_'): + name = lm.replace('handle_', '') + # Bind each method found, to this instance (self) + self.dispatch[name] = partial(getattr(self.mymod, lm), self) + + self.settings['dispatchers'] = self.dispatch diff --git a/threatline/handlers/check.py b/threatline/handlers/check.py new file mode 100644 index 0000000..60a54fc --- /dev/null +++ b/threatline/handlers/check.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +# Check handler + +import sys +from base import BaseHandler +from utils.check_utils import Checker + + +class Check(BaseHandler): + + def check_ioc(self, dtype, data): + if dtype == 'domain': + return self.checker.check_domain(data) + else: + return {} + + def handle_dns(self, message): + if len(message['query']) == 0: + return message + domain = message['query'] + message['alert'] = self.check_ioc('domain', domain) + return message + + def __init__(self): + super(Check, self).__init__() + self.mymod = sys.modules[__name__].Check + self.checker = Checker() + self.settings = { + 'consumer_topic': 'TLENRICHED', + 'consumer_group': 'check_group', + 'producer_topic': 'TLARCHIVE', + 'alert_topic': 'TLALERTS', + 'producer_client_id': 'checker-', + 'dispatchers': {} # will be generated at initialize() + } diff --git a/threatline/handlers/enrich.py b/threatline/handlers/enrich.py new file mode 100644 index 0000000..655d1d4 --- /dev/null +++ b/threatline/handlers/enrich.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# Enrichment handler + +import sys +import logging +from base import BaseHandler +from functools import partial +from utils.enrich_utils import IPClient, mac_lookup + + +class Enrich(BaseHandler): + + def handle_conn(self, message): + message['enrichment'] = {} + if message['local_orig']: # If True, means conn originated locally + e = self.ipcli.enrichip(message['id.resp_h']) + else: + e = self.ipcli.enrichip(message['id.orig_h']) + if e: + message['enrichment']['ip'] = e + return message + + def handle_known_devices(self, message): + message['enrichment'] = {} + ret = mac_lookup(message['mac']) + if not ret: + ret = {'note': 'Not Registered!'} + message['enrichment']['mac'] = ret + return message + + def __init__(self): + super(Enrich, self).__init__() + self.ipcli = IPClient() + self.mymod = sys.modules[__name__].Enrich + self.settings = { + 'consumer_topic': 'TLNORMALIZED', + 'consumer_group': 'enrich_group', + 'producer_topic': 'TLENRICHED', + 'producer_client_id': 'enricher-', + 'dispatchers': {} # will be generated at initialize() + } diff --git a/threatline/handlers/normalize.py b/threatline/handlers/normalize.py new file mode 100644 index 0000000..c7d30c5 --- /dev/null +++ b/threatline/handlers/normalize.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# Ingestion handler + +import sys +from ipaddress import ip_address +from base import BaseHandler + + +class Normalize(BaseHandler): + + def handle_x509(self, message): + # for some reason the uid for x509 log types + # are 'id' instead. Lets fix that. + message['uid'] = message['id'] + _ = message.pop('id') + return(message) + + def handle_pe(self, message): + # same issuse as x509. + message['uid'] = message['id'] + _ = message.pop('id') + return(message) + + def handle_software(self, message): + fields = ('version.major', 'version.minor', 'version.minor2', + 'version.minor3', 'version.addl') + for old in fields: + new = old.replace('.', '_') + try: + message[new] = message[old] + _ = message.pop(old) + except KeyError: + continue + return(message) + + def handle_conn(self, message): + q = ip_address(message['id.orig_h']) + if q.version == 6: + return # don't pass along IPv6 + return message + + def __init__(self): + super(Normalize, self).__init__() + self.mymod = sys.modules[__name__].Normalize + self.settings = { + 'consumer_topic': ['TLINGEST-qwf', + 'TLINGEST-wyomic', + 'TLINGEST-lodi'], + 'consumer_group': 'normalize_group', + 'producer_topic': 'TLNORMALIZED', + 'producer_client_id': 'normalizer-', + 'dispatchers': {} # will be generated at initialize() + } diff --git a/threatline/handlers/utils/__init__.py b/threatline/handlers/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/threatline/handlers/utils/archive_utils.py b/threatline/handlers/utils/archive_utils.py new file mode 100644 index 0000000..b3836f9 --- /dev/null +++ b/threatline/handlers/utils/archive_utils.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python + +import json +from elasticsearch import Elasticsearch + + +class FileLogger(object): + + def __init__(self, logfile): + self.logfile = logfile + self.flogger = open(self.logfile, 'a') + + def send(self, message): + """ this buffers writes, be patient if your + tailing the log file """ + self.flogger.write(json.dumps(message)) + self.flogger.write('\n') + + +class ElasticLogger(object): + + def __init__(self): + self.es = Elasticsearch(hosts=['10.15.0.45:9200', + '10.15.0.46:9200', + '10.15.0.47:9200']) + self.mapped_types = ('files', 'notice', 'http', 'reporter', + 'communication', 'packet_filter', 'ssl', 'dpd', + 'capture_loss', 'dns', 'loaded_scripts', 'stats', + 'weird', 'conn', 'x509', 'ssh', 'pe', 'smb_files', + 'smb_mapping', 'snmp', 'dce_rpc', 'smtp', 'rdp', + 'software', 'tunnel', 'known_certs', + 'known_devices', 'known_hosts', 'known_services', + 'dhcp') + + def send(self, message): + self.doctype = message['tltype'] + self.docindex = 'threatline' # + self.doctype + """ Bro creates timestamps as floats like + xxxxxxxxx.xxxxx ... don't want this. """ + message['ts'] = int(message['ts']) + try: + if self.doctype in self.mapped_types: + self.es.index(index=self.docindex, + doc_type=self.doctype, + body=message) + else: + return + except Exception as e: + print(e) + return diff --git a/threatline/handlers/utils/check_utils.py b/threatline/handlers/utils/check_utils.py new file mode 100644 index 0000000..7c4688c --- /dev/null +++ b/threatline/handlers/utils/check_utils.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python + +import os +import logging +from threat_intel import domain_intel + + +class Checker(object): + + def __init__(self): + self.dnsbl = domain_intel() + + def check_domain(self, domain): + if domain in self.dnsbl: + return {'alert_type': 'domain', + 'domain': domain, + 'info': self.dnsbl[domain]} + return None diff --git a/threatline/handlers/utils/enrich_utils.py b/threatline/handlers/utils/enrich_utils.py new file mode 100644 index 0000000..f2832da --- /dev/null +++ b/threatline/handlers/utils/enrich_utils.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python + +import logging +from ipwhois import IPWhois, utils +from ipaddress import ip_address +from netaddr import * + +def mac_lookup(mac_address): + ret = {} + mac = EUI(mac_address) + try: + oui = mac.oui + except Exception as e: + logging.error('MAC {} not registered'.format(mac)) + return None + ret['org'] = oui.registration().org + ret['address'] = ' '.join(oui.registration().address) + return ret + + +class IPClient(object): + + def __init__(self): + self.cache = {} # cache for faster whois lookups + + def is_defined(self, addr): + """ Checks if the IP is defined as loopback, multicast, etc. """ + queryip = ip_address(addr) + if queryip.version == 4: + defined, _, _ = utils.ipv4_is_defined(addr) + if queryip.version == 6: + defined, _, _ = utils.ipv6_is_defined(addr) + return defined + + def enrichip(self, addr): + """ Enriches the IP address if it's not reserved (defined) IP. """ + if addr in self.cache: + logging.info('Whois cache hit') + return self.cache[addr] + + if self.is_defined(addr): + return None + + self.queryip = IPWhois(addr) + try: + r = self.queryip.lookup_rdap() + except Exception as e: + print(e) + return None + + ent = r['entities'][0] + e = {'asn_num': r['asn'], + 'asn_desc': r['asn_description'], + 'asn_country_code': r['asn_country_code'], + 'network': r['network']['cidr'], + 'whois': r['objects'][ent]['contact']} + + _ = e['whois'].pop('address') + _ = e['whois'].pop('email') + + self.cache[addr] = e + return e diff --git a/threatline/handlers/utils/threat_intel.py b/threatline/handlers/utils/threat_intel.py new file mode 100644 index 0000000..c3ee859 --- /dev/null +++ b/threatline/handlers/utils/threat_intel.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python + +import os +import logging +import requests +import ipaddress + + +domain_intel_sources = [ + ('MALWAREDOMAIN', 'http://bld.scoutsec.com/?genres=malware&style=list') +] + + +def get_download(url): + try: + return requests.get(url) + except Exception as e: + print(e) + return None + + +def domain_intel(): + for source, url in domain_intel_sources: + d = get_download(url).text.split('\n') + d = {domain: source for domain in d} + return d diff --git a/threatline/threatline.py b/threatline/threatline.py new file mode 100755 index 0000000..e26d3a4 --- /dev/null +++ b/threatline/threatline.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python + +import sys +import json +import time +import signal +import logging +from kafka import KafkaConsumer, KafkaProducer +from kafka.errors import KafkaError +from multiprocessing import Process + +VALID_STAGES = ['normalize', 'enrich', 'check', 'archive'] +KAFKA_BOOTSTRAP = ['10.15.0.40:9092', '10.15.0.41:9092', '10.15.0.42:9092'] +LOGFORMAT = '%(asctime)-15s %(message)s' + + +def get_consumer(topic, consumer_group): + consumer = KafkaConsumer( + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + bootstrap_servers=KAFKA_BOOTSTRAP, + group_id=consumer_group, + enable_auto_commit=True + ) + consumer.subscribe(topic) + return consumer + + +def get_producer(client_id): + producer = KafkaProducer( + value_serializer=lambda m: json.dumps(m).encode('utf-8'), + bootstrap_servers=KAFKA_BOOTSTRAP, + client_id=client_id, + compression_type='lz4' + ) + return producer + + +def main(stage): + LOGFILE = '/var/log/' + stage + '_worker.log' + logging.basicConfig(level=logging.INFO, format=LOGFORMAT, + filename=LOGFILE) + logging.info('Launching {}'.format(stage)) + logging.info('Singals registered') + + normalize_flag = False + archive_flag = False + if stage == 'normalize': + from handlers.normalize import Normalize + handle = Normalize() + normalize_flag = True + elif stage == 'enrich': + from handlers.enrich import Enrich + handle = Enrich() + elif stage == 'check': + from handlers.check import Check + handle = Check() + elif stage == 'archive': + from handlers.archive import Archive + handle = Archive() + else: + print('Unknown stage \'{}\''.format(stage)) + print('Valid stages: {}'.format(VALID_STAGES)) + sys.exit(1) + + handle.initialize() # build the function dictionary + alert_topic = None + consumer_topic = handle.settings['consumer_topic'] + consumer_group = handle.settings['consumer_group'] + producer_topic = handle.settings['producer_topic'] + producer_client_id = handle.settings['producer_client_id'] + dispatch = handle.settings['dispatchers'] + + consumer = get_consumer(consumer_topic, consumer_group) + if stage == 'archive': # we don't produce during archiving.. + producer = None + else: + producer = get_producer(producer_client_id) + + if stage == 'check': + alert_topic = handle.settings['alert_topic'] + + def sig_handler(signal, fname): + logging.warn('SIGTERM received. Shutting down...') + try: + consumer.close() + if producer: + producer.close(5.0) + except Exception as e: + logging.info(e) + finally: + sys.exit(1) + signal.signal(signal.SIGTERM, sig_handler) + + for m in consumer: + message = m.value + if normalize_flag: + """ At the normalize stage, the log data is nested + with the key being the bro log-type.""" + client_id = m.topic.replace('TLINGEST-', '') + message_type = message.keys()[0] # get the root key (log type) + message = message[message_type] # un-nest the log data + message['tltype'] = message_type # used at next stage + message['client_id'] = client_id # scoutsec client_id + message['enrichment'] = {} # for later enrichment + else: + message_type = message['tltype'] + + try: + # handle the message + pubdata = dispatch[message_type](message) + except Exception as ex: + logging.error(ex) + continue + + if pubdata: + if 'alert' in pubdata and alert_topic: + if pubdata['alert']: + producer.send(alert_topic, pubdata) + if producer: + producer.send(producer_topic, pubdata) + + +if __name__ == '__main__': + if len(sys.argv) < 2: + print('usage: script.py {}'.format(VALID_STAGES)) + sys.exit(1) + + stages = sys.argv[1:] + if len(stages) < 8: + for s in stages: + p = Process(target=main, args=(s,)) + p.start() +else: + LOGFILE = '/dev/stdout' + logging.basicConfig(level=logging.INFO, format=LOGFORMAT, + filename=LOGFILE)