P4Controller PacketIn. Packets not arriving

Hello,

I am currently working on P4. I am implementing the following architecture:

Switch swarm that populate their routing table by using a P4 controller.

I don’t understand how PacketIn works as I am not receiving any packet. here is my controller code:

def handle_packet_in(sw, p4info_helper, packet):
    """
    Called when a packet_in is received.
    sw = the switch object (tells us which switch sent it)
    packet = the P4Runtime packet object
    """
    print("Received a packet on switch ", sw.device_id)
    # Here is my code logic, that write the new entry


def start_p4_routers(
        p4_hop_by_hop_config: P4HopByHopControllerConfig
):
    """
    Main entry point for the P4Runtime controller
    """

    p4info_helper = p4runtime_lib.helper.P4InfoHelper(p4_hop_by_hop_config.p4info_file_path)
    
    try:

        response = initialize_multiple_switch_connections(
            p4info_helper=p4info_helper,
            bmv2_file_path=p4_hop_by_hop_config.bmv2_file_path,
            switch_entries_path= p4_hop_by_hop_config.switch_entries_path
        )
        



        def listen_packets(sw_arg, p4info_helper_arg):
            print("Listening on switch", sw_arg.device_id)
            try:
                for packet in sw_arg.PacketIn():
                    handle_packet_in(sw_arg, p4info_helper_arg, packet)
                    
                print("PacketIn generator ended for switch", sw_arg.device_id)
            except grpc.RpcError as e:
                print("gRPC error in PacketIn loop:", e)
            except Exception as e:
                print("Unexpected error in PacketIn loop:", e)

        for sw in response:
            threading.Thread(
                target=lambda sw_arg=sw: listen_packets(sw.switch_connection, p4info_helper),
                daemon=True
            ).start()

Switch initialization (called in start_p4_routers)


@dataclass
class InitializedSwitchConnectionResponse:
    switch_connection: p4runtime_lib.bmv2.Bmv2SwitchConnection
    satellite_switch: SatelliteSwitch


def initialize_switch_connection(
        p4info_helper, bmv2_file_path, switch: SatelliteSwitch
) -> InitializedSwitchConnectionResponse:

    print(f"Initialize sw{switch.device_id} on {switch.grpc_addr}")
    sw = p4runtime_lib.bmv2.Bmv2SwitchConnection(
        name=f"sw{switch.device_id}",
        address=switch.grpc_addr,
        device_id=switch.device_id,
        proto_dump_file=switch.proto_dump_file or f"logs/sw{switch.device_id}-p4runtime.txt",
    )
    print(f"sw{switch.device_id} initialized")

    # Send master arbitration update message to establish this controller as
    # master (required by P4Runtime before performing any other write operation)
    sw.MasterArbitrationUpdate()

    print(f"sw{switch.device_id} MasterArbitrationUpdate")
    import time
    time.sleep(0.5)
    # Install the P4 program on the switches
    sw.SetForwardingPipelineConfig(
        p4info=p4info_helper.p4info, bmv2_json_file_path=bmv2_file_path
    )

    return InitializedSwitchConnectionResponse(
        switch_connection=sw, satellite_switch=switch
    )


def initialize_multiple_switch_connections(
        p4info_helper, bmv2_file_path, switch_entries_path: str
) -> list[InitializedSwitchConnectionResponse]:
    switches = load_switches_from_entries_file(switch_entries_path)

    initialized_switch_connections = []
    for switch in switches:
        initialized_switch_connection = initialize_switch_connection(
            p4info_helper, bmv2_file_path, switch
        )
        initialized_switch_connections.append(initialized_switch_connection)

    return initialized_switch_connections

My ingress pipeline, where I configured to send to CPU, with the good port:

control MyIngress(inout headers hdr,
                  inout metadata meta,
                  inout standard_metadata_t standard_metadata) {

    action drop() {
        mark_to_drop(standard_metadata);
    }

    action hop_forward(egressSpec_t port) {
        standard_metadata.egress_spec = port;
    }

    action send_to_cpu() {
        standard_metadata.egress_spec = 511;
    }

    table hop_routing {
        key = {
            hdr.hop_by_hop.id_destination: exact;
            hdr.hop_by_hop.uuid: exact;
        }
        actions = {
            hop_forward;
            send_to_cpu;
            drop;
            NoAction;
        }
        size = 2048;
        default_action = send_to_cpu();
    }

    action ipv4_forward(macAddr_t dstAddr, egressSpec_t port) {
        standard_metadata.egress_spec = port;
        hdr.ethernet.srcAddr = hdr.ethernet.dstAddr;
        hdr.ethernet.dstAddr = dstAddr;
        hdr.ipv4.ttl = hdr.ipv4.ttl - 1;
    }

    table ipv4_lpm {
        key = {
            hdr.ipv4.dstAddr: lpm;
        }
        actions = {
            ipv4_forward;
            drop;
            NoAction;
        }
        size = 1024;
        default_action = drop();
    }

    apply {
        if (hdr.hop_by_hop.isValid()) {
            hop_routing.apply();
        } else if (hdr.ipv4.isValid()) {
            ipv4_lpm.apply();
        }
    }
}

My switch:

class P4SwitchGrpc(Switch):
    """P4 switch using simple_switch_grpc"""
    next_device_id = 0
    next_grpc_port = 20001  # starting gRPC port
    next_thrift_port = 9091

    def __init__(self, name, sw_path=None, json_path=None,
                 log_file=None, grpc_port=None, thrift_port= None, pcap_dump=False,
                 log_console=False, verbose=False, **kwargs):
        Switch.__init__(self, name, **kwargs)
        assert sw_path, "sw_path must be provided"
        assert json_path, "json_path must be provided"
        pathCheck(sw_path)
        if not os.path.isfile(json_path):
            os.error(f"Invalid JSON file: {json_path}\n")
            exit(1)

        self.sw_path = sw_path
        self.json_path = json_path
        self.verbose = verbose
        self.log_file = log_file or f"/tmp/p4s.{self.name}.log"
        self.output = open(self.log_file, 'w')
        self.pcap_dump = pcap_dump
        self.log_console = log_console

        # gRPC port assignment
        if grpc_port is not None:
            self.grpc_port = grpc_port
        else:
            self.grpc_port = P4SwitchGrpc.next_grpc_port
            P4SwitchGrpc.next_grpc_port += 1

        if thrift_port is not None:
            self.thrift_port = thrift_port
        else:
            self.thrift_port = P4SwitchGrpc.next_thrift_port
            P4SwitchGrpc.next_thrift_port += 1

        # device ID assignment
        self.device_id = P4SwitchGrpc.next_device_id
        P4SwitchGrpc.next_device_id += 1

        self.process = None

    def start(self, controllers):
        info(f"Starting P4 gRPC switch {self.name}.\n")
        args = [self.sw_path]


        for port, intf in list(self.intfs.items()):
            if not intf.IP():
                args.extend(['-i', f"{port}@{intf.name}"])


        args.append("--pcap")

        args.append("--log-console")
        args.extend(['--thrift-port', str(self.thrift_port)])

        args.append(self.json_path)

        args.extend(["--device-id", str(self.device_id)])

        args.append('--')
        # target-specific options (after '--')
        args.extend([
            "--cpu-port", "511",
            "--grpc-server-addr", f"0.0.0.0:{self.grpc_port}",
        ])

        info(' '.join(args) + "\n")

        print(' '.join(args))


        # Start process with Popen instead of cmd()
        with open(self.log_file, 'w') as log:
            self.process = subprocess.Popen(args, stdout=log, stderr=subprocess.STDOUT)

        # Wait for gRPC port to be ready
        import time
        start_time = time.time()
        while time.time() - start_time < 10:  # wait up to 10 seconds
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(0.5)
            if sock.connect_ex(("localhost", self.grpc_port)) == 0:
                sock.close()
                break
            sock.close()
            time.sleep(0.2)
        else:
            raise RuntimeError(f"Switch {self.name} gRPC port {self.grpc_port} not ready")

        info(f"P4 switch {self.name} started on gRPC port {self.grpc_port}.\n")

This is my log when I send a personalized packet:

[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Processing packet received on port 4
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Parser 'parser': start
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Parser 'parser' entering state 'start'
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Extracting header 'ethernet'
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Parser state 'start': key is 0900
[18:04:56.080] [bmv2] [T] [thread 241738] [2.0] [cxt 0] Bytes parsed: 14
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Parser 'parser' entering state 'parse_hop_by_hop'
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Extracting header 'hop_by_hop'
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Parser state 'parse_hop_by_hop' has no switch, going to default next state
[18:04:56.080] [bmv2] [T] [thread 241738] [2.0] [cxt 0] Bytes parsed: 34
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Parser 'parser': end
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Pipeline 'ingress': start
[18:04:56.080] [bmv2] [T] [thread 241738] [2.0] [cxt 0] hop_by_hop.p4(148) Condition "hdr.hop_by_hop.isValid()" (node_2) is true
[18:04:56.080] [bmv2] [T] [thread 241738] [2.0] [cxt 0] Applying table 'MyIngress.hop_routing'
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Looking up key:
* hdr.hop_by_hop.id_destination: 0032
* hdr.hop_by_hop.uuid          : 00112233445566778899aabbccddeeff

[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Table 'MyIngress.hop_routing': miss
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Action entry is MyIngress.send_to_cpu - 
[18:04:56.080] [bmv2] [T] [thread 241738] [2.0] [cxt 0] Action MyIngress.send_to_cpu
[18:04:56.080] [bmv2] [T] [thread 241738] [2.0] [cxt 0] hop_by_hop.p4(109) Primitive standard_metadata.egress_spec = 511
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Pipeline 'ingress': end
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Egress port is 511
[18:04:56.080] [bmv2] [D] [thread 241738] [2.0] [cxt 0] Dropping packet at the end of ingress

It sounds like the packet never reach the cpu / controller.

I was wondering if I understood well how the communication between the switch and the controller works. I watched multiple thread, especially the one that mention the ARPCache system, but it looks like the code has been fully changed.

I precise that I start my switch with the good cpu-port argument.

Hi @louis.barbier41

Welcome to the P4 community!

For a working example of PacketIn/PacketOut, see the flowcache exercise in p4lang/tutorials.
Based on your code, it looks like you aren’t configuring the CPU port during switch initialization. You can find an example here :https://github.com/p4lang/tutorials/blob/master/exercises/flowcache/mycontroller.py#L499.

Hi !

Thanks for your help, I did append the code to clone the session:

def initialize_switch_connection(
        p4info_helper, bmv2_file_path, switch: SatelliteSwitch
) -> InitializedSwitchConnectionResponse:

    print(f"Initialize sw{switch.device_id} on {switch.grpc_addr}")
    sw = p4runtime_lib.bmv2.Bmv2SwitchConnection(
        name=f"sw{switch.device_id}",
        address=switch.grpc_addr,
        device_id=switch.device_id,
        proto_dump_file=switch.proto_dump_file or f"logs/sw{switch.device_id}-p4runtime.txt",
    )
    print(f"sw{switch.device_id} initialized")

    # Send master arbitration update message to establish this controller as
    # master (required by P4Runtime before performing any other write operation)
    sw.MasterArbitrationUpdate()

    print(f"sw{switch.device_id} MasterArbitrationUpdate")
    
    import time
    time.sleep(0.5)
    # Install the P4 program on the switches
    sw.SetForwardingPipelineConfig(
        p4info=p4info_helper.p4info, bmv2_json_file_path=bmv2_file_path
    )
    
    replicas = [{"egress_port": 511, "instance": 1}]
    clone_entry = p4info_helper.buildCloneSessionEntry(511, replicas, 0)
    sw.WritePREEntry(clone_entry)

    return InitializedSwitchConnectionResponse(
        switch_connection=sw, satellite_switch=switch
    )

It initializes the switch correctly, but I still don’t receive any packet.

I think my parameters are correct.

Hi !

Thanks for your help, I did append the code to clone the session:

def initialize_switch_connection(
        p4info_helper, bmv2_file_path, switch: SatelliteSwitch
) -> InitializedSwitchConnectionResponse:

    print(f"Initialize sw{switch.device_id} on {switch.grpc_addr}")
    sw = p4runtime_lib.bmv2.Bmv2SwitchConnection(
        name=f"sw{switch.device_id}",
        address=switch.grpc_addr,
        device_id=switch.device_id,
        proto_dump_file=switch.proto_dump_file or f"logs/sw{switch.device_id}-p4runtime.txt",
    )
    print(f"sw{switch.device_id} initialized")

    # Send master arbitration update message to establish this controller as
    # master (required by P4Runtime before performing any other write operation)
    sw.MasterArbitrationUpdate()

    print(f"sw{switch.device_id} MasterArbitrationUpdate")
    
    import time
    time.sleep(0.5)
    # Install the P4 program on the switches
    sw.SetForwardingPipelineConfig(
        p4info=p4info_helper.p4info, bmv2_json_file_path=bmv2_file_path
    )
    
    replicas = [{"egress_port": 511, "instance": 1}]
    clone_entry = p4info_helper.buildCloneSessionEntry(511, replicas, 0)
    sw.WritePREEntry(clone_entry)

    return InitializedSwitchConnectionResponse(
        switch_connection=sw, satellite_switch=switch
    )

It initializes the switch correctly, but I still don’t receive any packet.

I think my parameters are correct.

EDIT: wait what is global_data[‘CPU_PORT_CLONE_SESSION_ID’] = 57 ?

I used the 511, but I should use CPU_PORT_CLONE_SESSION_ID, what is this ?

Okay I don’t have CPU_PORT_CLONE_SESSION_ID in my P4 file. I will try to configure it. Can I have more explanation about this ?

Why not using send_to_cpu instead of clone function ?

I believe that by default when you start simple_switch or simple_switch_grpc executables, assigning the value of 511 to the egress_spec field in standard_metadata during the ingress pipeline will cause the packet to be dropped at the end of the ingress pipeline. It will not cause it to be sent to the CPU port.

If you want it to be sent to the CPU port, you must specify the numeric id of the CPU port on the command line, e.g. using the --cpu-port command line option as shown in the following example command line:


# Start simple_switch_grpc with CPU port equal to 510
$ sudo simple_switch_grpc -i 0@veth0 -i 1@veth2 -i 2@veth4 -i 3@veth6 -i 4@veth8 -i 5@veth10 -i 6@veth12 -i 7@veth14 --no-p4 -- --cpu-port 510

Hi!

The clone function in P4 creates an EXACT COPY of the current packet. For example, using clone_i2e (you can also use clone_e2e, for instance) means you duplicate the packet inside the switch and send one copy to the egress pipeline while another is sent to the controller (or CPU port, like 510). So you do both at the same “time”. This happens in parallel, so the original packet continues through the normal pipeline, while the cloned one goes to the control plane for processing or monitoring.

Cloning is often used for debugging, telemetry, or packet sampling.

In short:

  • clone_i2e: keeps forwarding the original packet and also sends a copy to the controller.

  • send_to_controller: sends only one packet to the controller, without forwarding it in the datapath. Keeps waiting to see what the control plane wants.

As @andyfingerhut and @Dscano, said that if you are using the P4 tutorials, you can use the flowcache’s topology.json file.

When you type “make”, and as far as I understand, it will be executed in the background by run_exercise.py. You don’t need to change it; this is just for your information.

image

Cheers,

This is already what I am doing, I add the port 511

    def start(self, controllers):
        info(f"Starting P4 gRPC switch {self.name}.\n")
        args = [self.sw_path]


        for port, intf in list(self.intfs.items()):
            if not intf.IP():
                args.extend(['-i', f"{port}@{intf.name}"])


        args.extend(['--pcap', "pcaps"])

        args.append("--log-console")
        args.extend(['--thrift-port', str(self.thrift_port)])

        args.append(self.json_path)

        args.extend(["--device-id", str(self.device_id)])

        args.append('--')
        
        # target-specific options (after '--')
        args.extend([
            "--cpu-port", "511",
            "--grpc-server-addr", f"0.0.0.0:{self.grpc_port}",
        ])

        info(' '.join(args) + "\n")

In BMv2, assigning 511 to egress_spec during ingress processing, and leaving it at that value by the time ingress processing is complete, means that the packet will be dropped. It will not go anywhere.

You need to assign a different value for there to be any chance that the packet goes out of the device.

Solved, TLDR, do not assign 511 port for the CPU PORT.

Thank you all