最近碰到了一个需求, 在不绑定固定IP的情况下,动态的去分配IP地址, 且只能让MAC白名单的用户能够获取到DHCP分配的IP地址,所以只能通过代理的方式来进行对UDP的数据进行处理
安装DHCP包
注意: 安装该包,Python的环境必须是3.8即以上,否则无法安装使用
pip install dhcppython
代码如下:
# !/usr/bin/env python
# -*- coding:utf-8 -*-
"""
----------------------------------------
@所属项目 : Saixun
----------------------------------------
@作者 : French<1109527533@hoime.cn>
@软件 : PyCharm
@文件名 : dhcp_process.py
@创建时间 : 2021/7/19 - 14:04
@修改时间 : 2021/7/19 - 14:04
@文件说明 : dhcp请求处理
"""
import socket
import dhcppython
# SOCKET数据解析
def socket_parsing(data):
"""
Socket数据解析
Args:
data(bytes): socket套接字数据
Returns:
dict
"""
VERSION_OFF = 0
IHL_OFF = VERSION_OFF
DSCP_OFF = IHL_OFF + 1
ECN_OFF = DSCP_OFF
LENGTH_OFF = DSCP_OFF + 1
ID_OFF = LENGTH_OFF + 2
FLAGS_OFF = ID_OFF + 2
OFF_OFF = FLAGS_OFF
TTL_OFF = OFF_OFF + 2
PROTOCOL_OFF = TTL_OFF + 1
IP_CHECKSUM_OFF = PROTOCOL_OFF + 1
SRC_IP_OFF = IP_CHECKSUM_OFF + 2
DEST_IP_OFF = SRC_IP_OFF + 4
SRC_PORT_OFF = DEST_IP_OFF + 4
DEST_PORT_OFF = SRC_PORT_OFF + 2
UDP_LEN_OFF = DEST_PORT_OFF + 2
UDP_CHECKSUM_OFF = UDP_LEN_OFF + 2
DATA_OFF = UDP_CHECKSUM_OFF + 2
# 定义协议字典
protocol_dict = {
0: "HOPOPT", 1: "ICMP", 2: "IGMP", 3: "GGP", 4: "IP",
5: "ST", 6: "TCP", 7: "CBT", 8: "EGP", 9: "IGP",
10: "BBN-RCC-MON", 11: "NVP-II", 12: "PUP", 13: "ARGUS", 14: "EMCON",
15: "XNET", 16: "CHAOS", 17: "UDP", 18: "MUX", 19: "DCN-MEAS",
20: "HMP", 21: "PRM", 22: "XNS-IDP", 23: "TRUNK-1", 24: "TRUNK-2",
25: "LEAF-1", 26: "LEAF-2", 27: "RDP", 28: "IRTP", 29: "ISO-TP4",
30: "NETBLT", 31: "MFE-NSP", 32: "MERIT-INP", 33: "SEP", 34: "3PC",
35: "IDPR", 36: "XTP", 37: "DDP", 38: "IDPR-CMTP", 39: "TP++",
40: "IL", 41: "IPv6", 42: "SDRP", 43: "IPv6-Route", 44: "IPv6-Frag",
45: "IDRP", 46: "RSVP", 47: "GRE", 48: "MHRP", 49: "BNA",
50: "ESP", 51: "AH", 52: "I-NLSP", 53: "SWIPE", 54: "NARP",
55: "MOBILE", 56: "TLSP", 57: "SKIP", 58: "IPv6-ICMP", 59: "IPv6-NoNxt",
60: "IPv6-Opts", 62: "CFTP", 64: "SAT-EXPAK", 65: "KRYPTOLAN", 66: "RVD",
67: "IPPC", 69: "SAT-MON", 70: "VISA", 71: "IPCV", 72: "CPNX",
73: "CPHB", 74: "WSN", 75: "PVP", 76: "BR-SAT-MON", 77: "SUN-ND",
78: "WB-MON", 79: "WB-EXPAK", 80: "ISO-IP", 81: "VMTP", 82: "SECURE-VMTP",
83: "VINES", 84: "TTP", 85: "NSFNET-IGP", 86: "DGP", 87: "TCF",
88: "EIGRP", 89: "OSPFIGP", 90: "Sprite-RPC", 91: "LARP", 92: "MTP",
93: "AX.25", 94: "IPIP", 95: "MICP", 96: "SCC-SP", 97: "ETHERIP",
98: "ENCAP", 100: "GMTP", 101: "IFMP", 102: "PNNI", 103: "PIM",
104: "ARIS", 105: "SCPS", 106: "QNX", 107: "A/N", 108: "IPComp",
109: "SNP", 110: "Compaq-Peer", 111: "IPX-in-IP", 112: "VRRP", 113: "PGM",
115: "L2TP", 116: "DDX", 117: "IATP", 118: "STP", 119: "SRP",
120: "UTI", 121: "SMP", 122: "SM", 123: "PTP", 124: "ISIS",
125: "FIRE", 126: "CRTP", 127: "CRUDP", 128: "SSCOPMCE", 129: "IPLT",
130: "SPS", 131: "PIPE", 132: "SCTP", 133: "FC"
}
socket_packet = {
"version": data[VERSION_OFF] >> 4,
"ihl": data[IHL_OFF] & 0x0F,
"dscp": data[DSCP_OFF] >> 2,
"ecn": data[ECN_OFF] & 0x03,
"length": (data[LENGTH_OFF] << 8) + data[LENGTH_OFF + 1],
"identification": (data[ID_OFF] << 8) + data[ID_OFF + 1],
"flags": data[FLAGS_OFF] >> 5,
"offset": ((data[OFF_OFF] & 0b11111) << 8) + data[OFF_OFF + 1],
"ttl": data[TTL_OFF],
"protocol": protocol_dict[data[PROTOCOL_OFF]],
"checksum": (data[IP_CHECKSUM_OFF] << 8) + data[IP_CHECKSUM_OFF + 1],
"src_ip": ".".join(map(str, [data[x] for x in range(SRC_IP_OFF, SRC_IP_OFF + 4)])),
"dest_ip": ".".join(map(str, [data[x] for x in range(DEST_IP_OFF, DEST_IP_OFF + 4)])),
"src_port": (data[SRC_PORT_OFF] << 8) + data[SRC_PORT_OFF + 1],
"dest_port": (data[DEST_PORT_OFF] << 8) + data[DEST_PORT_OFF + 1],
"udp_length": (data[UDP_LEN_OFF] << 8) + data[UDP_LEN_OFF + 1],
"udp_checksum": (data[UDP_CHECKSUM_OFF] << 8) + data[UDP_CHECKSUM_OFF + 1]
}
return socket_packet
# 数据处理
def data_processing(data):
"""
数据处理
Args:
data(bytes): socket收到的数据
Returns:
bytes
"""
# 解析data数据
data = socket_parsing(data)
if data["protocol"] == "UDP":
try:
# 解析DHCP数据
dhcppython.packet.DHCPPacket.from_bytes(data[28:])
# 判定条件开始
# 在这里写判定条件
# 判定条件结束
except dhcppython.exceptions.MalformedPacketError:
pass
pass
return data
# UDP代理
def udp_proxy(scr_host, src_port, dst_host, dst_port):
"""
Upd数据代理
Args:
scr_host(str): 源IP
src_port(int): 源端口
dst_host(str): 目标IP
dst_port(int): 目标端口
Returns:
None
"""
proxy_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
proxy_socket.bind((scr_host, src_port))
# 设置客户端地址
client_address = None
# 设置服务端地址
server_address = (dst_host, dst_port)
while True:
data, address = proxy_socket.recvfrom(65535)
if client_address is None:
client_address = address
# 将数据发送给服务端
if address == client_address:
# 处理数据
data = data_processing(data)
proxy_socket.sendto(data=data, address=server_address)
# 将数据发送给客户端
elif address == server_address:
proxy_socket.sendto(data=data, address=client_address)
# 清空客户端地址
client_address = None
if __name__ == "__main__":
udp_proxy("源IP", "源端口", "目标IP", "目标端口")
文章评论