2014-09-04 443 views
2

当我收到带scapy的ICMP Destination unreachable(需要分段碎片的ICMP TYPE = 3 CODE = 4)消息时,我正在进行TCP重传行为测试。如何使用scapy从数据包中读取整个ip层和tcp层?

测试流程是这样的:
1.建立TCP连接到服务器
2.发送HTTP GET请求到服务器时TCP建立
3.当HTTP响应回
4.发送一个ICMP类型3代码4消息到服务器,带有小的MTU集合

问题是,ICMP TYPE = 3 CODE = 4消息包含该HTTP的IP标头和部分TCP标头(srt,dst和seq编号)响应数据包。目前,我只是从HTTP响应数据包中读取每个参数(如IP标识,frag标记,ttl等)。现在的问题是:有没有什么办法,我可以读取该数据包的全IP和TCP报头:

ICMP(TYPE=3 CODE=4)/IP Header/TCP Header

回答

1

希望以下将帮助:

>>> pkt = ICMP()/IP()/TCP() 
>>> ipHeader = pkt.getlayer(IP) 
>>> ipHeader 
<IP frag=0 proto=tcp |<TCP |>> 
>>> 

只获得IP报头:

>>> pkt = Ether()/IP()/TCP() 
>>> ip = pkt.getlayer(IP) 
>>> ip 
<IP frag=0 proto=tcp |<TCP |>> 
>>> ip.remove_payload() 
>>> ip 
<IP |> 
>>> 
+1

谢谢,但这似乎得到全IP层,包括IP包头,TCP报头和TCP有效载荷,而我只需要IP和TCP报头。 – Johnson 2014-09-19 06:33:38

0

我将数据包对象转换为dict对象,使我的解析生活更轻松。 代码:

from scapy.all import * 
from cStringIO import StringIO 
import sys 
class Capturing(list): 
     """ 
     This class will capture sys.out. 
     More info: 
     http://stackoverflow.com/questions/16571150/how-to-capture-stdout-output-from-a-python-function-call 
     """ 
    def __enter__(self): 
     self._stdout = sys.stdout 
     sys.stdout = self._stringio = StringIO() 
     return self 
    def __exit__(self, *args): 
     self.extend(self._stringio.getvalue().splitlines()) 
     del self._stringio # free up some memory 
     sys.stdout = self._stdout 
class PacketDict(dict): 
     """ 
     This class will convert packet into a dict by using the result of packet.show2(). Furthermore the original 
     packet will be also saved as attribute '.packet'. 
     More class functions could be added, currently only support 'haslayer()'. 
     Scapy version: scapy-2.3.3 
     """ 
    def __init__(self, pkt): 
     self.packet = pkt 
     self.__packet_to_dict() 
    def __extract_key(self, line): 
     a = line.lstrip("###[ ").rstrip(" ]### ") 
     return a 
    def __extract_value_to_dict(self, line): 
     if line.find("=") > -1: 
      b = line.replace(" ","") 
      a = b.split("=") 
      return {a[0]: a[1]} 
     return {line.replace(" ",""): None} 
    def __packet_to_dict(self): 
     with Capturing() as packet_in_list: 
      self.packet.show2() 
     current_dict = self 
     for line in packet_in_list: 
      if line.strip() != "": 
       line = line.replace("|","") 
       if line.find('###[') > -1: 
        key = self.__extract_key(line) 
        current_dict[key] = {} 
        current_dict = current_dict[key] 
        continue 
       current_dict.update(self.__extract_value_to_dict(line)) 
    def haslayer(self, pkt_cls): 
     return self.packet.haslayer(pkt_cls) 

if __name__ == "__main__": 
    packet_list = rdpcap("/media/sf_ubshare/pcap/test.pcap") 
    for packet in packet_list: 
     a = PacketDict(packet) 
     print a['Ethernet']['IP']['ihl'] 
     print a.haslayer('ISAKMP') 

输出:

/usr/bin/python2.7 /home/yuanzhi/workspace/scaptest/scaptest.py 
5L 
1 

字典的样子:

{ 
    "Ethernet": { 
    "src": "5e:22:73:12:50:02", 
    "dst": "6e:30:96:e3:a0:6c", 
    "type": "0x800", 
    "IP": { 
     "frag": "0L", 
     "src": "1.0.3.0", 
     "UDP": { 
     "dport": "isakmp", 
     "ISAKMP": { 
      "resp_cookie": "'\\xb5A\\x06\\xef\\x126~\\x95'", 
      "exch_type": "identityprot.", 
      "length": "204", 
      "version": "0x10", 
      "flags": "", 
      "init_cookie": "'2\\x12\\xbda\\xee\\xa8\\xba\\xa6'", 
      "ISAKMP SA": { 
      "IKE proposal": { 
       "SPI": "''", 
       "length": "44", 
       "IKE Transform": { 
       "length": "36", 
       "num": "0", 
       "transforms": "[('Encryption','AES-CBC'),('KeyLength',256),('Hash','SHA'),('Authentication','PSK'),('GroupDesc','1024MODPgr'),('LifeType','Seconds'),('LifeDuration',43200)]", 
       "ISAKMP Vendor ID": {......