Clone to CPU: First packet received by the controller, other not received

Hello everyone,

I’m currently debugging an issue with the PacketIn mechanism in my P4Runtime controller (Python, asyncio). I implemented the same architecture as the flow-cache example: a processNotif task, an asyncio Queue, and two asynchronous handlers (packetInHandler and idleTimeHandler).

Here is the problem:

  • When I send the first packet, it is cloned to the CPU port and successfully received by the controller through PacketIn.

  • When I send a second packet, nothing is received on the controller side.

  • It looks like the switch transmits the cloned packet correctly, but it is lost or blocked somewhere during the path to the controller.

Additional details:

  • The controller throws no exceptions — everything appears normal.

  • gRPC remains connected (verified using ss -tnp | grep 20000).

  • Switch logs show that the packet is transmitted (see logs in section (1)).

  • tcpdump on the switch CPU port shows the PacketIn being emitted (see section (2)).

  • I am using nearly the same code structure as the official example (async queue, packetInHandler, processNotif loop, etc.).

So far, everything suggests that:

  • The switch does clone and send all PacketIns.

  • The controller only receives the very first PacketIn, and nothing after that.

  • No visible errors occur on either side.

Has anyone experienced a situation where only the first PacketIn is delivered to the controller, while subsequent ones silently disappear?
Any advice on further things to check — queue handling, gRPC stream issues, controller event loop, or BMv2 behavior — would be greatly appreciated.

Thank you!

Switch log (1):

[00:43:52.633] [bmv2] [D] [thread 65316] [18.8] [cxt 0] Table 'tbl_prepend_packet_in_hdr': miss
[00:43:52.633] [bmv2] [D] [thread 65316] [18.8] [cxt 0] Action entry is MyEgress.prepend_packet_in_hdr - 
[00:43:52.633] [bmv2] [T] [thread 65316] [18.8] [cxt 0] Action MyEgress.prepend_packet_in_hdr
[00:43:52.633] [bmv2] [T] [thread 65316] [18.8] [cxt 0] hop_by_hop.p4(291) Primitive hdr.packet_in.setValid()
[00:43:52.633] [bmv2] [T] [thread 65316] [18.8] [cxt 0] hop_by_hop.p4(292) Primitive hdr.packet_in.input_port = (PortIdToController_t) ingress_port
[00:43:52.633] [bmv2] [T] [thread 65316] [18.8] [cxt 0] hop_by_hop.p4(293) Primitive hdr.packet_in.punt_reason = punt_reason
[00:43:52.633] [bmv2] [T] [thread 65316] [18.8] [cxt 0] hop_by_hop.p4(54) Primitive 0, ...
[00:43:52.633] [bmv2] [D] [thread 65316] [18.8] [cxt 0] Pipeline 'egress': end
[00:43:52.633] [bmv2] [D] [thread 65316] [18.8] [cxt 0] Deparser 'deparser': start
[00:43:52.633] [bmv2] [T] [thread 65316] [18.8] [cxt 0] Skipping checksum 'cksum' update because condition not met
[00:43:52.633] [bmv2] [D] [thread 65316] [18.8] [cxt 0] Deparsing header 'packet_in'
[00:43:52.633] [bmv2] [D] [thread 65316] [18.8] [cxt 0] Deparsing header 'ethernet'
[00:43:52.633] [bmv2] [D] [thread 65316] [18.8] [cxt 0] Deparsing header 'hop_by_hop'
[00:43:52.633] [bmv2] [D] [thread 65316] [18.8] [cxt 0] Deparser 'deparser': end
[00:43:52.633] [bmv2] [D] [thread 65318] [18.8] [cxt 0] Transmitting packet of size 49 out of port 510
[00:43:52.633] [bmv2] [D] [thread 65318] Transmitting packet-in
tcpdump: listening on lo, link-type EN10MB (Ethernet), snapshot length 262144 bytes
00:50:22.204840 IP (tos 0x0, ttl 64, id 3841, offset 0, flags [DF], proto TCP (6), length 136)
    localhost.20000 > localhost.45598: Flags [P.], cksum 0xfe7c (incorrect -> 0x5ecf), seq 2231419937:2231420021, ack 1933452642, win 162, options [nop,nop,TS val 1117720369 ecr 1117331415], length 84
00:50:22.204850 IP (tos 0x0, ttl 64, id 42219, offset 0, flags [DF], proto TCP (6), length 52)
    localhost.45598 > localhost.20000: Flags [.], cksum 0xfe28 (incorrect -> 0xf26e), seq 1, ack 84, win 86, options [nop,nop,TS val 1117720369 ecr 1117720369], length 0
00:50:22.204902 IP (tos 0x0, ttl 64, id 42220, offset 0, flags [DF], proto TCP (6), length 95)
    localhost.45598 > localhost.20000: Flags [P.], cksum 0xfe53 (incorrect -> 0xcb52), seq 1:44, ack 84, win 86, options [nop,nop,TS val 1117720369 ecr 1117720369], length 43
00:50:22.204903 IP (tos 0x0, ttl 64, id 3842, offset 0, flags [DF], proto TCP (6), length 136)
    localhost.20000 > localhost.45598: Flags [P.], cksum 0xfe7c (incorrect -> 0x6f1b), seq 84:168, ack 1, win 162, options [nop,nop,TS val 1117720369 ecr 1117720369], length 84
00:50:22.204961 IP (tos 0x0, ttl 64, id 3843, offset 0, flags [DF], proto TCP (6), length 153)
    localhost.20000 > localhost.45598: Flags [P.], cksum 0xfe8d (incorrect -> 0x711d), seq 168:269, ack 44, win 162, options [nop,nop,TS val 1117720369 ecr 1117720369], length 101
00:50:22.204971 IP (tos 0x0, ttl 64, id 42221, offset 0, flags [DF], proto TCP (6), length 52)
    localhost.45598 > localhost.20000: Flags [.], cksum 0xfe28 (incorrect -> 0xf18a), seq 44, ack 269, win 86, options [nop,nop,TS val 1117720369 ecr 1117720369], length 0
00:50:22.205040 IP (tos 0x0, ttl 64, id 3844, offset 0, flags [DF], proto TCP (6), length 136)
    localhost.20000 > localhost.45598: Flags [P.], cksum 0xfe7c (incorrect -> 0x6e37), seq 269:353, ack 44, win 162, options [nop,nop,TS val 1117720369 ecr 1117720369], length 84
00:50:22.205138 IP (tos 0x0, ttl 64, id 3845, offset 0, flags [DF], proto TCP (6), length 136)
    localhost.20000 > localhost.45598: Flags [P.], cksum 0xfe7c (incorrect -> 0x6de3), seq 353:437, ack 44, win 162, options [nop,nop,TS val 1117720369 ecr 1117720369], length 84
00:50:22.205143 IP (tos 0x0, ttl 64, id 42222, offset 0, flags [DF], proto TCP (6), length 52)
    localhost.45598 > localhost.20000: Flags [.], cksum 0xfe28 (incorrect -> 0xf0e2), seq 44, ack 437, win 86, options [nop,nop,TS val 1117720369 ecr 1117720369], length 0
00:50:22.205238 IP (tos 0x0, ttl 64, id 3846, offset 0, flags [DF], proto TCP (6), length 136)
    localhost.20000 > localhost.45598: Flags [P.], cksum 0xfe7c (incorrect -> 0x6d8f), seq 437:521, ack 44, win 162, options [nop,nop,TS val 1117720369 ecr 1117720369], length 84
00:50:22.205332 IP (tos 0x0, ttl 64, id 3847, offset 0, flags [DF], proto TCP (6), length 136)
    localhost.20000 > localhost.45598: Flags [P.], cksum 0xfe7c (incorrect -> 0x6d3b), seq 521:605, ack 44, win 162, options [nop,nop,TS val 1117720369 ecr 1117720369], length 84
00:50:22.205338 IP (tos 0x0, ttl 64, id 42223, offset 0, flags [DF], proto TCP (6), length 52)
    localhost.45598 > localhost.20000: Flags [.], cksum 0xfe28 (incorrect -> 0xf03a), seq 44, ack 605, win 86, options [nop,nop,TS val 1117720369 ecr 1117720369], length 0
00:50:23.146315 IP (tos 0x0, ttl 64, id 42224, offset 0, flags [DF], proto TCP (6), length 95)
    localhost.45598 > localhost.20000: Flags [P.], cksum 0xfe53 (incorrect -> 0x5826), seq 44:87, ack 605, win 86, options [nop,nop,TS val 1117721310 ecr 1117720369], length 43
00:50:23.146453 IP (tos 0x0, ttl 64, id 3848, offset 0, flags [DF], proto TCP (6), length 69)
    localhost.20000 > localhost.45598: Flags [P.], cksum 0xfe39 (incorrect -> 0x28f1), seq 605:622, ack 87, win 162, options [nop,nop,TS val 1117721310 ecr 1117721310], length 17
00:50:23.186817 IP (tos 0x0, ttl 64, id 42225, offset 0, flags [DF], proto TCP (6), length 52)
    localhost.45598 > localhost.20000: Flags [.], cksum 0xfe28 (incorrect -> 0xe87b), seq 87, ack 622, win 86, options [nop,nop,TS val 1117721351 ecr 1117721310], length 0

Here is my python code:


hop_by_hop_routers: dict[Bmv2SwitchConnection, P4HopByHop] = {}

bind_layers(Ether, HopByHop, type=0x900)

global_data = {}

global_data['CPU_PORT'] = 510
global_data['CPU_PORT_CLONE_SESSION_ID'] = 57



def decodePacketInMetadata(pktin_info, packet):
    pktin_field_to_val = {}
    for md in packet.metadata:
        md_id_int = md.metadata_id
        md_val_int = int.from_bytes(md.value, byteorder='big')
        assert md_id_int in pktin_info
        md_field_info = pktin_info[md_id_int]
        pktin_field_to_val[md_field_info['name']] = md_val_int
    ret = {'metadata': pktin_field_to_val,
           'payload': packet.payload}
    print("decodePacketInMetadata: ret=%s" % (ret))
    return ret


def serializableEnumDict(p4info_data, name):
    type_info = p4info_data.type_info
    name_to_int = {}
    int_to_name = {}
    for member in type_info.serializable_enums[name].members:
        name = member.name
        int_val = int.from_bytes(member.value, byteorder='big')
        name_to_int[name] = int_val
        int_to_name[int_val] = name
    print("serializableEnumDict: name='%s' name_to_int=%s int_to_name=%s"
          "" % (name, name_to_int, int_to_name))
    return name_to_int, int_to_name



def getObj(p4info_obj_map, obj_type, name):
    key = (obj_type, name)
    return p4info_obj_map.get(key, None)


def controllerPacketMetadataDictKeyId(p4info_obj_map, name):
    cpm_info = getObj(p4info_obj_map, "controller_packet_metadata", name)
    assert cpm_info != None
    ret = {}
    for md in cpm_info.metadata:
        id = md.id
        ret[md.id] = {'id': md.id, 'name': md.name, 'bitwidth': md.bitwidth}
    return ret


def makeP4infoObjMap(p4info_data):
    p4info_obj_map = {}
    suffix_count = Counter()
    for obj_type in ["tables", "action_profiles", "actions", "counters",
                     "direct_counters", "controller_packet_metadata"]:
        for obj in getattr(p4info_data, obj_type):
            pre = obj.preamble
            suffix = None
            for s in reversed(pre.name.split(".")):
                suffix = s if suffix is None else s + "." + suffix
                key = (obj_type, suffix)
                p4info_obj_map[key] = obj
                suffix_count[key] += 1
    for key, c in list(suffix_count.items()):
        if c > 1:
            del p4info_obj_map[key]
    return p4info_obj_map


def sendPacketOut(sw, payload, metadatas):
    # TODO: Implement the function logic to send a packet-out message
    sw.PacketOut(payload, metadatas)


def packetOutMetadataList(opcode, reserved1, operand0):
    # This function does not use the generated contents of the P4Info
    # file to map PacketOut metadata fields to indices.  If you change
    # the PacketOut metadata format in the P4 program, this code must
    # be manually updated to match.
    return [{"value": opcode, "bitwidth": 8},
            {"value": reserved1, "bitwidth": 8},
            {"value": operand0, "bitwidth": 32}]


def processPacket(message):
    payload = message["packet-in"].payload
    packet = message["packet-in"]
    pktinfo = decodePacketInMetadata(global_data['cpm_packetin_id2data'], packet)

    print("Received PacketIn message of length %d bytes from switch %s"
          "" % (len(payload), message["sw"].name))

    if len(payload) > 0:
        try:
            i = 0
            pkt = Ether(payload)
            metadatas = packetOutMetadataList(
                global_data['controller_opcode_name2int']['ROUTE_NOT_FOUND1'],
                0, 1)

            if HopByHop in pkt:
                hop = pkt[HopByHop]
                print("id_destination:", hop.id_destination)
                print("uuid:", hop.uuid.hex())



            print("Sending PacketOut message")
            sendPacketOut(message["sw"], payload, metadatas)
            print("PacketOut message sent")


        except Exception as e:
            print("Error processing PacketIn message: ", e)
            traceback.print_exc()
    else:
        print("Empty payload in PacketIn message")


async def processNotif(notif_queue):
    while True:
        try:
            notif = await notif_queue.get()
            debug_notif = False
            if debug_notif:
                print(notif)
            if notif["type"] == "packet-in":
                processPacket(notif)


            notif_queue.task_done()

        except Exception as e:
            print("Error in processNotif: ", e)
            traceback.print_exc()


async def packetInHandler(notif_queue, sw):
    while True:
        if sw.name == "sw0":
            print("Waiting for PacketIn from switch", sw.name)

        try:
            packet_in = await asyncio.to_thread(sw.PacketIn)
            print(f"Packet handler received packet: {packet_in}")
            message = {"type": "packet-in", "sw": sw, "packet-in": packet_in}
            await notif_queue.put(message)
            print("I am blocked here after putting packet-in into notif_queue")

        except grpc.RpcError as e:
            print(f"[gRPC Error in packetInHandler for {sw.name}]")
            printGrpcError(e)

            if e.code() == grpc.StatusCode.UNKNOWN:
                print(f"Unknown gRPC error from {sw.name}. Retrying...")
            await asyncio.sleep(2)

        except Exception as e:
            print(f"[Unexpected Error in packetInHandler for {sw.name}]: {e}")
            traceback.print_exc()
            await asyncio.sleep(2)


def writeTableEntries(
        p4info_helper, sw, id_destination: int, flow_id: str, dst_port: int
):
    """
    Programs the routing table to forward packets with a given destination_id and flow_id to a port.

    :param p4info_helper: the P4Info helper
    :param sw: the switch connection
    :param id_destination: The destination id as defined in hop_by_hop_t
    :param flow_id: The flow id (use uuid if that's your identifier, or adapt accordingly)
    :param dst_port: The destination port to forward to
    """

    flow_id_bytes = bytes.fromhex(flow_id)
    table_entry = p4info_helper.buildTableEntry(
        table_name="MyIngress.routing_table",
        match_fields={
            "hdr.hop_by_hop.id_destination": id_destination,
            "hdr.hop_by_hop.uuid": flow_id_bytes,
        },
        action_name="MyIngress.route_forward",
        action_params={
            "port": dst_port,
        },
    )
    sw.WriteTableEntry(table_entry)


def writeCloneSession(sw, p4info_helper, clone_session_id, replicas):
    clone_entry = p4info_helper.buildCloneSessionEntry(clone_session_id, replicas, 0)
    sw.WritePREEntry(clone_entry)


async def monitorQueueSize(notif_queue: asyncio.Queue):
    """
    Periodically prints the size of the notif_queue.

    :param notif_queue: asyncio.Queue to monitor
    :param interval: time in seconds between checks
    """
    while True:
        try:
            size = notif_queue.qsize()
            if size > 0:
                print(f"[Queue Monitor] Current notif_queue size: {size}")
        except Exception as e:
            print(f"[Queue Monitor] Error monitoring queue size: {e}")


async def start_p4_routers(
        p4_hop_by_hop_config: P4HopByHopControllerConfig
):
    """
    Main entry point for the P4Runtime controller
    """

    p4info_helper = p4runtime_lib.helper.P4InfoHelper(p4_hop_by_hop_config.p4info_file_path)

    try:
        response = initialize_multiple_switch_connections(
            p4info_helper=p4info_helper,
            bmv2_file_path=p4_hop_by_hop_config.bmv2_file_path,
            switch_entries_path=p4_hop_by_hop_config.switch_entries_path
        )
        print("Installed P4 Program using SetForwardingPipelineConfig on sw")


        for sw in response:
            p4_hop_by_hop = P4HopByHop(
                egress_interfaces=sw.satellite_switch.interfaces,
                constellation_context=p4_hop_by_hop_config.constellation_config,
                current_satellite_id=sw.satellite_switch.device_id,
            )

            try:
                replicas = [{"egress_port": 510, "instance": 1}]
                writeCloneSession(sw.switch_connection, p4info_helper, 57, replicas)

                hop_by_hop_routers.update({sw.switch_connection: p4_hop_by_hop})

            except Exception as e:
                print("Got exception trying to configure clone session %d."
                      "  Assuming it was initialized already in an earlier"
                      " run of the controller: ", e)

        global_data['p4info_obj_map'] = makeP4infoObjMap(p4info_helper.p4info)

        global_data['cpm_packetin_id2data'] = \
            controllerPacketMetadataDictKeyId(global_data['p4info_obj_map'],
                                              "packet_in")

        global_data['punt_reason_name2int'], global_data['punt_reason_int2name'] = \
                serializableEnumDict(p4info_helper.p4info, 'PuntReason_t')

        global_data['controller_opcode_name2int'], global_data['controller_opcode_int2name'] = \
                serializableEnumDict(p4info_helper.p4info, 'ControllerOpcode_t')

        notif_queue = asyncio.Queue()

        packetInHandler_sw_list = []

        for sw in response:
            sw_task = asyncio.create_task(packetInHandler(notif_queue, sw.switch_connection))
            packetInHandler_sw_list.append(sw_task)

        proc_notif = asyncio.create_task(processNotif(notif_queue))

        # Start monitor
        gatherable_objects = packetInHandler_sw_list  + [proc_notif]

        exceptions = await asyncio.gather(*gatherable_objects, return_exceptions=True)

        print("Stopped program with the following exceptions: ", exceptions)

    except KeyboardInterrupt:
        print(" Shutting down.")
    except grpc.RpcError as e:
        printGrpcError(e)

    ShutdownAllSwitchConnections()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="P4Runtime Controller")
    parser.add_argument(
        "--p4info",
        help="p4info proto in text format from p4c",
        type=str,
        action="store",
        required=False,
        default="./data/basic.p4info.txtpb",
    )
    parser.add_argument(
        "--bmv2-json",
        help="BMv2 JSON file from p4c",
        type=str,
        action="store",
        required=False,
        default="./data/basic.json",
    )
    parser.add_argument(
        "--ip",
        help="IP address ",
        dest="ip_address",
        type=str,
        action="store",
        required=False,
        default="100.0.0.1",
    )

    parser.add_argument(
        "--switch-entries-json", help="Switch entries JSON file from p4c"
    )

    parser.add_argument(
        "--constellation-config",
        help="Constellation configuration file path",
        type=str,
        required=False,
        default="./data/constellation_config.json",
    )
    args = parser.parse_args()

    if not os.path.exists(args.p4info):
        parser.print_help()
        print("\np4info file not found: %s\nHave you run 'make'?" % args.p4info)
        parser.exit(1)

    if not os.path.exists(args.bmv2_json):
        parser.print_help()
        print("\nBMv2 JSON file not found: %s\nHave you run 'make'?" % args.bmv2_json)
        parser.exit(1)

    if not os.path.exists(args.constellation_config):
        parser.print_help()
        print(
            "\nConstellation configuration file not found: %s\n"
            % args.constellation_config
        )
        parser.exit(1)

    if not os.path.exists(args.switch_entries_json):
        parser.print_help()
        print(
            "\nSwitch entries file not found: %s\n"
            % args.switch_entries_json
        )
        parser.exit(1)

    # Load constellation configuration from a json file. Load the json
    with open(args.constellation_config, "r") as f:
        constellation_config_data = json.load(f)
        constellation_context = ConstellationContext(**constellation_config_data)

    p4_hop_by_hop_config = P4HopByHopControllerConfig(
        p4info_file_path=args.p4info,
        bmv2_file_path=args.bmv2_json,
        switch_entries_path=args.switch_entries_json,
        constellation_config=constellation_context,
    )
    asyncio.run(start_p4_routers(
        p4_hop_by_hop_config
    ))

Here is my P4 code:

// SPDX-License-Identifier: Apache-2.0
/* -*- P4_16 -*- */
#include <core.p4>
#include <v1model.p4>

const bit<16> TYPE_IPV4 = 0x800;
const bit<16> TYPE_HOP_BY_HOP = 0x900;

const int FL_PACKET_IN = 1;

const int CPU_PORT_CLONE_SESSION_ID = 57;

#define CPU_PORT 510

/*************************************************************************
*********************** H E A D E R S  ***********************************
*************************************************************************/

typedef bit<9>  egressSpec_t;
typedef bit<48> macAddr_t;
typedef bit<32> ip4Addr_t;
typedef bit<9>  PortId_t;
typedef bit<16> PortIdToController_t;

header hop_by_hop_t {
    bit<16> id_destination;
    bit<16> id_source;
    bit<128> uuid;
}

header ethernet_t {
    macAddr_t dstAddr;
    macAddr_t srcAddr;
    bit<16>   etherType;
}

header ipv4_t {
    bit<4>    version;
    bit<4>    ihl;
    bit<8>    diffserv;
    bit<16>   totalLen;
    bit<16>   identification;
    bit<3>    flags;
    bit<13>   fragOffset;
    bit<8>    ttl;
    bit<8>    protocol;
    bit<16>   hdrChecksum;
    ip4Addr_t srcAddr;
    ip4Addr_t dstAddr;
}


enum bit<8> ControllerOpcode_t {
    NO_OP                    = 0,
    SEND_TO_PORT_IN_OPERAND0 = 1,
    ROUTE_NOT_FOUND1         = 2
}

enum bit<8> PuntReason_t {
    FLOW_UNKNOWN        = 1,
    UNRECOGNIZED_OPCODE = 2
}

@controller_header("packet_out")
header packet_out_header_h {
    ControllerOpcode_t   opcode;
    bit<8>  reserved1;
    bit<32> operand0;
}

@controller_header("packet_in")
header packet_in_header_h {
    PortIdToController_t input_port;
    PuntReason_t         punt_reason;
    ControllerOpcode_t   opcode;
}

struct metadata_t {
    @field_list(FL_PACKET_IN)
    PortId_t             ingress_port;
    @field_list(FL_PACKET_IN)
    PuntReason_t         punt_reason;
    @field_list(FL_PACKET_IN)
    ControllerOpcode_t   opcode;
}

struct headers_t {
    packet_in_header_h packet_in;
    packet_out_header_h packet_out;
    hop_by_hop_t hop_by_hop;
    ethernet_t ethernet;
    ipv4_t ipv4;
}

/*************************************************************************
*********************** P A R S E R  ***********************************
*************************************************************************/

parser MyParser(packet_in packet,
                out headers_t hdr,
                inout metadata_t meta,
                inout standard_metadata_t standard_metadata) {

     // Start by checking if the packet is from CPU port
     state start {
        transition check_for_cpu_port;
    }

     state check_for_cpu_port {
        transition select (standard_metadata.ingress_port) {
            CPU_PORT: parse_controller_packet_out_header;
            // If not from CPU port, continue normal parsing
            default: parse_ethernet;
        }
     }
     state parse_controller_packet_out_header {
        packet.extract(hdr.packet_out);
        transition accept;
    }

    state parse_hop_by_hop {
        packet.extract(hdr.hop_by_hop);
        transition accept;
    }

    /**
        Not sure if I must keep as never called
    **/
    state parse_ethernet {
        packet.extract(hdr.ethernet);
        transition select(hdr.ethernet.etherType) {
            TYPE_HOP_BY_HOP: parse_hop_by_hop;
            TYPE_IPV4: parse_ipv4;
            default: accept;
        }
    }

    state parse_ipv4 {
        packet.extract(hdr.ipv4);
        transition accept;
    }
}

/*************************************************************************
************   C H E C K S U M    V E R I F I C A T I O N   *************
*************************************************************************/

control MyVerifyChecksum(inout headers_t hdr, inout metadata_t meta) {
    apply { }
}

/*************************************************************************
**************  I N G R E S S   P R O C E S S I N G   *******************
*************************************************************************/

control MyIngress(inout headers_t hdr,
                  inout metadata_t meta,
                  inout standard_metadata_t standard_metadata) {

    action drop_packet() {
        mark_to_drop(standard_metadata);
    }

    action hop_forward(egressSpec_t port) {
        standard_metadata.egress_spec = port;
    }

    // This action clones the packet to CPU and preserves metadata
    action send_copy_to_controller(
        PuntReason_t       punt_reason,
        ControllerOpcode_t opcode
        ) {

            clone_preserving_field_list(CloneType.I2E, CPU_PORT_CLONE_SESSION_ID, FL_PACKET_IN);
            meta.ingress_port = (bit<9>) standard_metadata.ingress_port;
            meta.punt_reason = punt_reason;
            meta.opcode = opcode;
    }

    action flow_unknown() {
        send_copy_to_controller(PuntReason_t.FLOW_UNKNOWN, ControllerOpcode_t.NO_OP);
        drop_packet();
    }

    /**

    Hop By Hop Routing Table
    **/

    table hop_routing {
        key = {
            hdr.hop_by_hop.id_destination: exact;
            hdr.hop_by_hop.uuid: exact;
        }
        actions = {
            hop_forward;
            flow_unknown;
            drop_packet();
            NoAction;
        }
        size = 65536;
        default_action = flow_unknown();
    }

    /**
    Packet out "routing table"
    **/
     table dbgPacketOutHdr {
        key = {
            hdr.packet_out.opcode : exact;
            hdr.packet_out.reserved1 : exact;
        }
        actions = { NoAction; }
        const default_action = NoAction;
    }

    apply {
            if (hdr.packet_out.isValid()) {
                // Process packet from controller
                dbgPacketOutHdr.apply();
                switch (hdr.packet_out.opcode) {

                    ControllerOpcode_t.SEND_TO_PORT_IN_OPERAND0: {
                        standard_metadata.egress_spec = (PortId_t) hdr.packet_out.operand0;
                        hdr.packet_out.setInvalid();
                    }

                    ControllerOpcode_t.ROUTE_NOT_FOUND1: {
                        drop_packet();
                        hdr.packet_out.setInvalid();
                    }

                    default: {
                        send_copy_to_controller(
                            PuntReason_t.UNRECOGNIZED_OPCODE,
                            hdr.packet_out.opcode);
                        hdr.packet_out.setInvalid();
                    }
                }
            } else if (hdr.hop_by_hop.isValid()) {
                hop_routing.apply();
            } else {
                drop_packet();
            }
    }
}

/*************************************************************************
****************  E G R E S S   P R O C E S S I N G   *******************
*************************************************************************/

control MyEgress(inout headers_t hdr,
                 inout metadata_t meta,
                 inout standard_metadata_t standard_metadata) {


      table debug_egress_standard_metadata {
        key = {
            standard_metadata.ingress_port : exact;
            standard_metadata.egress_spec : exact;
            standard_metadata.egress_port : exact;
            standard_metadata.instance_type : exact;
            standard_metadata.packet_length : exact;
            //standard_metadata.enq_timestamp : exact;
            //standard_metadata.enq_qdepth : exact;
            //standard_metadata.deq_timedelta : exact;
            //standard_metadata.deq_qdepth : exact;
            //standard_metadata.ingress_global_timestamp : exact;
            //standard_metadata.egress_global_timestamp : exact;
            //standard_metadata.mcast_grp : exact;
            //standard_metadata.egress_rid : exact;
            //standard_metadata.checksum_error : exact;
            //standard_metadata.parser_error : exact;
            //standard_metadata.priority : exact;
        }
        actions = { NoAction; }
        const default_action = NoAction();
        size = 0;
    }

    table debug_egress_hdr {
        key = {
            hdr.hop_by_hop.uuid: exact;
        }
        actions = { NoAction; }
        const default_action = NoAction();
        size = 0;
    }

    action prepend_packet_in_hdr(PuntReason_t punt_reason, PortId_t ingress_port) {
        hdr.packet_in.setValid();
        hdr.packet_in.input_port = (PortIdToController_t) ingress_port;
        hdr.packet_in.punt_reason = punt_reason;
        hdr.packet_in.opcode = ControllerOpcode_t.NO_OP;
    }

    apply {
        debug_egress_standard_metadata.apply();
        debug_egress_hdr.apply();
        if (standard_metadata.egress_port == CPU_PORT) {
            prepend_packet_in_hdr(meta.punt_reason, meta.ingress_port);
        } else {

        }

    }
}

/*************************************************************************
*************   C H E C K S U M    C O M P U T A T I O N   **************
*************************************************************************/

control MyComputeChecksum(inout headers_t  hdr, inout metadata_t meta) {
     apply {
        update_checksum(
        hdr.ipv4.isValid(),
            { hdr.ipv4.version,
              hdr.ipv4.ihl,
              hdr.ipv4.diffserv,
              hdr.ipv4.totalLen,
              hdr.ipv4.identification,
              hdr.ipv4.flags,
              hdr.ipv4.fragOffset,
              hdr.ipv4.ttl,
              hdr.ipv4.protocol,
              hdr.ipv4.srcAddr,
              hdr.ipv4.dstAddr },
            hdr.ipv4.hdrChecksum,
            HashAlgorithm.csum16);
    }
}

/*************************************************************************
***********************  D E P A R S E R  *******************************
*************************************************************************/

control MyDeparser(packet_out packet, in headers_t hdr) {
    apply {
        packet.emit(hdr.packet_in);
        packet.emit(hdr.ethernet);
        packet.emit(hdr.hop_by_hop);
        packet.emit(hdr.ipv4);
    }
}

/*************************************************************************
***********************  S W I T C H  *******************************
*************************************************************************/

V1Switch(
    MyParser(),
    MyVerifyChecksum(),
    MyIngress(),
    MyEgress(),
    MyComputeChecksum(),
    MyDeparser()
) main;

Hello everyone.

I fixed the problem. I am running one hundred switch. Only 25 were able to run, I had too many thread.

To fix that, we have top specify the number of workers:

async def start_p4_routers(
        p4_hop_by_hop_config: P4HopByHopControllerConfig
):

    """
    Main entry point for the P4Runtime controller
    """

    loop = asyncio.get_running_loop()
    executor = ThreadPoolExecutor(max_workers=100) # Replace by the number of switch
    loop.set_default_executor(executor)


    p4info_helper = p4runtime_lib.helper.P4InfoHelper(p4_hop_by_hop_config.p4info_file_path)

    try:
        response = initialize_multiple_switch_connections(
            p4info_helper=p4info_helper,
            bmv2_file_path=p4_hop_by_hop_config.bmv2_file_path,
            switch_entries_path=p4_hop_by_hop_config.switch_entries_path
        )
        print("Installed P4 Program using SetForwardingPipelineConfig on sw")


        for sw in response: