Hello,
I am currently working on P4. I am implementing the following architecture:
Switch swarm that populate their routing table by using a P4 controller.
I don’t understand how PacketIn works as I am not receiving any packet. here is my controller code:
def handle_packet_in(sw, p4info_helper, packet):
"""
Called when a packet_in is received.
sw = the switch object (tells us which switch sent it)
packet = the P4Runtime packet object
"""
print("Received a packet on switch ", sw.device_id)
# Here is my code logic, that write the new entry
def start_p4_routers(
p4_hop_by_hop_config: P4HopByHopControllerConfig
):
"""
Main entry point for the P4Runtime controller
"""
p4info_helper = p4runtime_lib.helper.P4InfoHelper(p4_hop_by_hop_config.p4info_file_path)
try:
response = initialize_multiple_switch_connections(
p4info_helper=p4info_helper,
bmv2_file_path=p4_hop_by_hop_config.bmv2_file_path,
switch_entries_path= p4_hop_by_hop_config.switch_entries_path
)
def listen_packets(sw_arg, p4info_helper_arg):
print("Listening on switch", sw_arg.device_id)
try:
for packet in sw_arg.PacketIn():
handle_packet_in(sw_arg, p4info_helper_arg, packet)
print("PacketIn generator ended for switch", sw_arg.device_id)
except grpc.RpcError as e:
print("gRPC error in PacketIn loop:", e)
except Exception as e:
print("Unexpected error in PacketIn loop:", e)
for sw in response:
threading.Thread(
target=lambda sw_arg=sw: listen_packets(sw.switch_connection, p4info_helper),
daemon=True
).start()
Switch initialization (called in start_p4_routers)
@dataclass
class InitializedSwitchConnectionResponse:
switch_connection: p4runtime_lib.bmv2.Bmv2SwitchConnection
satellite_switch: SatelliteSwitch
def initialize_switch_connection(
p4info_helper, bmv2_file_path, switch: SatelliteSwitch
) -> InitializedSwitchConnectionResponse:
print(f"Initialize sw{switch.device_id} on {switch.grpc_addr}")
sw = p4runtime_lib.bmv2.Bmv2SwitchConnection(
name=f"sw{switch.device_id}",
address=switch.grpc_addr,
device_id=switch.device_id,
proto_dump_file=switch.proto_dump_file or f"logs/sw{switch.device_id}-p4runtime.txt",
)
print(f"sw{switch.device_id} initialized")
# Send master arbitration update message to establish this controller as
# master (required by P4Runtime before performing any other write operation)
sw.MasterArbitrationUpdate()
print(f"sw{switch.device_id} MasterArbitrationUpdate")
import time
time.sleep(0.5)
# Install the P4 program on the switches
sw.SetForwardingPipelineConfig(
p4info=p4info_helper.p4info, bmv2_json_file_path=bmv2_file_path
)
return InitializedSwitchConnectionResponse(
switch_connection=sw, satellite_switch=switch
)
def initialize_multiple_switch_connections(
p4info_helper, bmv2_file_path, switch_entries_path: str
) -> list[InitializedSwitchConnectionResponse]:
switches = load_switches_from_entries_file(switch_entries_path)
initialized_switch_connections = []
for switch in switches:
initialized_switch_connection = initialize_switch_connection(
p4info_helper, bmv2_file_path, switch
)
initialized_switch_connections.append(initialized_switch_connection)
return initialized_switch_connections
My ingress pipeline, where I configured to send to CPU, with the good port:
control MyIngress(inout headers hdr,
inout metadata meta,
inout standard_metadata_t standard_metadata) {
action drop() {
mark_to_drop(standard_metadata);
}
action hop_forward(egressSpec_t port) {
standard_metadata.egress_spec = port;
}
action send_to_cpu() {
standard_metadata.egress_spec = 511;
}
table hop_routing {
key = {
hdr.hop_by_hop.id_destination: exact;
hdr.hop_by_hop.uuid: exact;
}
actions = {
hop_forward;
send_to_cpu;
drop;
NoAction;
}
size = 2048;
default_action = send_to_cpu();
}
action ipv4_forward(macAddr_t dstAddr, egressSpec_t port) {
standard_metadata.egress_spec = port;
hdr.ethernet.srcAddr = hdr.ethernet.dstAddr;
hdr.ethernet.dstAddr = dstAddr;
hdr.ipv4.ttl = hdr.ipv4.ttl - 1;
}
table ipv4_lpm {
key = {
hdr.ipv4.dstAddr: lpm;
}
actions = {
ipv4_forward;
drop;
NoAction;
}
size = 1024;
default_action = drop();
}
apply {
if (hdr.hop_by_hop.isValid()) {
hop_routing.apply();
} else if (hdr.ipv4.isValid()) {
ipv4_lpm.apply();
}
}
}
My switch:
class P4SwitchGrpc(Switch):
"""P4 switch using simple_switch_grpc"""
next_device_id = 0
next_grpc_port = 20001 # starting gRPC port
next_thrift_port = 9091
def __init__(self, name, sw_path=None, json_path=None,
log_file=None, grpc_port=None, thrift_port= None, pcap_dump=False,
log_console=False, verbose=False, **kwargs):
Switch.__init__(self, name, **kwargs)
assert sw_path, "sw_path must be provided"
assert json_path, "json_path must be provided"
pathCheck(sw_path)
if not os.path.isfile(json_path):
os.error(f"Invalid JSON file: {json_path}\n")
exit(1)
self.sw_path = sw_path
self.json_path = json_path
self.verbose = verbose
self.log_file = log_file or f"/tmp/p4s.{self.name}.log"
self.output = open(self.log_file, 'w')
self.pcap_dump = pcap_dump
self.log_console = log_console
# gRPC port assignment
if grpc_port is not None:
self.grpc_port = grpc_port
else:
self.grpc_port = P4SwitchGrpc.next_grpc_port
P4SwitchGrpc.next_grpc_port += 1
if thrift_port is not None:
self.thrift_port = thrift_port
else:
self.thrift_port = P4SwitchGrpc.next_thrift_port
P4SwitchGrpc.next_thrift_port += 1
# device ID assignment
self.device_id = P4SwitchGrpc.next_device_id
P4SwitchGrpc.next_device_id += 1
self.process = None
def start(self, controllers):
info(f"Starting P4 gRPC switch {self.name}.\n")
args = [self.sw_path]
for port, intf in list(self.intfs.items()):
if not intf.IP():
args.extend(['-i', f"{port}@{intf.name}"])
args.append("--pcap")
args.append("--log-console")
args.extend(['--thrift-port', str(self.thrift_port)])
args.append(self.json_path)
args.extend(["--device-id", str(self.device_id)])
args.append('--')
# target-specific options (after '--')
args.extend([
"--cpu-port", "511",
"--grpc-server-addr", f"0.0.0.0:{self.grpc_port}",
])
info(' '.join(args) + "\n")
print(' '.join(args))
# Start process with Popen instead of cmd()
with open(self.log_file, 'w') as log:
self.process = subprocess.Popen(args, stdout=log, stderr=subprocess.STDOUT)
# Wait for gRPC port to be ready
import time
start_time = time.time()
while time.time() - start_time < 10: # wait up to 10 seconds
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.5)
if sock.connect_ex(("localhost", self.grpc_port)) == 0:
sock.close()
break
sock.close()
time.sleep(0.2)
else:
raise RuntimeError(f"Switch {self.name} gRPC port {self.grpc_port} not ready")
info(f"P4 switch {self.name} started on gRPC port {self.grpc_port}.\n")
This is my log when I send a personalized packet:
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Processing packet received on port 4
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Parser 'parser': start
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Parser 'parser' entering state 'start'
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Extracting header 'ethernet'
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Parser state 'start': key is 0900
[18:04:56.080] [bmv2] [T] [thread 241738] [2.0] [cxt 0] Bytes parsed: 14
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Parser 'parser' entering state 'parse_hop_by_hop'
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Extracting header 'hop_by_hop'
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Parser state 'parse_hop_by_hop' has no switch, going to default next state
[18:04:56.080] [bmv2] [T] [thread 241738] [2.0] [cxt 0] Bytes parsed: 34
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Parser 'parser': end
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Pipeline 'ingress': start
[18:04:56.080] [bmv2] [T] [thread 241738] [2.0] [cxt 0] hop_by_hop.p4(148) Condition "hdr.hop_by_hop.isValid()" (node_2) is true
[18:04:56.080] [bmv2] [T] [thread 241738] [2.0] [cxt 0] Applying table 'MyIngress.hop_routing'
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Looking up key:
* hdr.hop_by_hop.id_destination: 0032
* hdr.hop_by_hop.uuid : 00112233445566778899aabbccddeeff
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Table 'MyIngress.hop_routing': miss
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Action entry is MyIngress.send_to_cpu -
[18:04:56.080] [bmv2] [T] [thread 241738] [2.0] [cxt 0] Action MyIngress.send_to_cpu
[18:04:56.080] [bmv2] [T] [thread 241738] [2.0] [cxt 0] hop_by_hop.p4(109) Primitive standard_metadata.egress_spec = 511
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Pipeline 'ingress': end
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Egress port is 511
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Dropping packet at the end of ingress
It sounds like the packet never reach the cpu / controller.
I was wondering if I understood well how the communication between the switch and the controller works. I watched multiple thread, especially the one that mention the ARPCache system, but it looks like the code has been fully changed.
I precise that I start my switch with the good cpu-port argument.
