Prioritize BitTorrent traffic over traffics

Hello everyone,

I am new to P4 and i am trying to implement priority queues on my p4 app so that i am able to prioritize BitTorrent traffic over other kinds of traffic.

Here is my p4 code:

/* -- P4_16 -- */
#include <core.p4>
#include <v1model.p4>

const bit<16> TYPE_IPV4 = 0x800;
const bit<8> TYPE_TCP = 6;

/*************************************************************************
*********************** H E A D E R S ***********************************
*************************************************************************/

typedef bit<9> egressSpec_t;
typedef bit<48> macAddr_t;
typedef bit<32> ip4Addr_t;
typedef bit<3> priority_t;

header ethernet_t {
macAddr_t dstAddr;
macAddr_t srcAddr;
bit<16> etherType;
}

header ipv4_t {
bit<4> version;
bit<4> ihl;
bit<8> tos;
bit<16> totalLen;
bit<16> identification;
bit<3> flags;
bit<13> fragOffset;
bit<8> ttl;
bit<8> protocol;
bit<16> hdrChecksum;
ip4Addr_t srcAddr;
ip4Addr_t dstAddr;
}

header tcp_t{
bit<16> srcPort;
bit<16> dstPort;
bit<32> seqNo;
bit<32> ackNo;
bit<4> dataOffset;
bit<4> res;
bit<1> cwr;
bit<1> ece;
bit<1> urg;
bit<1> ack;
bit<1> psh;
bit<1> rst;
bit<1> syn;
bit<1> fin;
bit<16> window;
bit<16> checksum;
bit<16> urgentPtr;
}
header cpu_t {
bit<16> srcPort;
bit<16> dstPort;
}

struct metadata {
@field_list(0)
bit<9> ingress_port;
/* empty */
}

struct headers {
ethernet_t ethernet;
ipv4_t ipv4;
tcp_t tcp;
cpu_t cpu;
}

/*************************************************************************
*********************** P A R S E R ***********************************
*************************************************************************/

parser MyParser(packet_in packet,
out headers hdr,
inout metadata meta,
inout standard_metadata_t standard_metadata) {

state start {
    transition parse_ethernet;
}

state parse_ethernet {
  packet.extract(hdr.ethernet);
  transition select(hdr.ethernet.etherType){
    TYPE_IPV4: parse_ipv4;
    default: accept;
  }
}
state parse_ipv4 {
  packet.extract(hdr.ipv4);
  transition select(hdr.ipv4.protocol){
    TYPE_TCP: parce_tcp;
    default: accept;
  }
}
state parce_tcp {
  packet.extract(hdr.tcp);
  transition accept;
}

}

/*************************************************************************
************ C H E C K S U M V E R I F I C A T I O N *************
*************************************************************************/

control MyVerifyChecksum(inout headers hdr, inout metadata meta) {
apply { }
}

/*************************************************************************
************** I N G R E S S P R O C E S S I N G *******************
*************************************************************************/

control MyIngress(inout headers hdr,
inout metadata meta,
inout standard_metadata_t standard_metadata) {
action drop() {
mark_to_drop(standard_metadata);
}

action ipv4_forward(macAddr_t dstAddr, egressSpec_t port) {
    hdr.ethernet.srcAddr = hdr.ethernet.dstAddr;
    hdr.ethernet.dstAddr = dstAddr;
    standard_metadata.egress_spec = port;
    hdr.ipv4.ttl = hdr.ipv4.ttl -1;

}

action learn_tcp_port() {
    clone_preserving_field_list(CloneType.I2E, 100, 0);
}

action set_priority(priority_t priority) {
    standard_metadata.priority = priority;
}


table ipv4_lpm {
    key = {
        hdr.ipv4.dstAddr: lpm;
    }
    actions = {
        ipv4_forward;
        drop;
        NoAction;
    }
    size = 1024;
    default_action = NoAction();
}

table tcp_port_learn {
    key = {
        hdr.tcp.srcPort: exact;
    }
    actions = {
        learn_tcp_port;
        NoAction;
    }
    size = 1024;
    default_action = learn_tcp_port;
}

table priority_set {
    key = {
        hdr.tcp.srcPort: exact;
    }
    actions = {
        set_priority;
        NoAction;
    }
    default_action = NoAction();
}

apply {

     //  - ipv4_lpm should be applied only when IPv4 header is valid

  if (hdr.ipv4.isValid()) {

    ipv4_lpm.apply();
    if (hdr.ipv4.protocol == TYPE_TCP){
        tcp_port_learn.apply();
        if (priority_set.apply().hit){
         //
        }
        else {
            standard_metadata.priority = (bit<3>)0;
        }
    }
    else {
        standard_metadata.priority = (bit<3>)0;
    }


  }
}

}

/*************************************************************************
**************** E G R E S S P R O C E S S I N G *******************
*************************************************************************/

control MyEgress(inout headers hdr,
inout metadata meta,
inout standard_metadata_t standard_metadata) {
apply {
hdr.ipv4.tos = (bit<8>)standard_metadata.qid;
// Non-cloned packets have an instance_type of 0, so then we clone them
// using the mirror ID = 100. That, in combination with the control plane, will
// select to which port the packet has to be cloned to.

    if (standard_metadata.instance_type != 0){
        hdr.cpu.setValid();
        hdr.cpu.srcPort = hdr.tcp.srcPort;
        hdr.cpu.dstPort = hdr.tcp.dstPort;


        // Disable other headers
        hdr.ethernet.setInvalid();
        hdr.ipv4.setInvalid();

    }


}

}

/*************************************************************************
************* C H E C K S U M C O M P U T A T I O N **************
*************************************************************************/

control MyComputeChecksum(inout headers hdr, inout metadata meta) {
apply {
update_checksum(
hdr.ipv4.isValid(),
{ hdr.ipv4.version,
hdr.ipv4.ihl,
hdr.ipv4.tos,
hdr.ipv4.totalLen,
hdr.ipv4.identification,
hdr.ipv4.flags,
hdr.ipv4.fragOffset,
hdr.ipv4.ttl,
hdr.ipv4.protocol,
hdr.ipv4.srcAddr,
hdr.ipv4.dstAddr },
hdr.ipv4.hdrChecksum,
HashAlgorithm.csum16);
}
}

/*************************************************************************
*********************** D E P A R S E R *******************************
*************************************************************************/

control MyDeparser(packet_out packet, in headers hdr) {
apply {
packet.emit(hdr.cpu);
packet.emit(hdr.ethernet);
packet.emit(hdr.ipv4);
packet.emit(hdr.tcp);
}
}

/*************************************************************************
*********************** S W I T C H *******************************
*************************************************************************/

V1Switch(
MyParser(),
MyVerifyChecksum(),
MyIngress(),
MyEgress(),
MyComputeChecksum(),
MyDeparser()
) main;

and here is the code for the controller i am using:

#!/usr/bin/env python3
import sys
import struct
import os

from scapy.all import sniff, sendp, hexdump, get_if_list, get_if_hwaddr, bind_layers
from scapy.all import Packet, IPOption, Ether, IP, raw, TCP
from scapy.all import ShortField, IntField, LongField, BitField, FieldListField, FieldLenField
from p4utils.utils.helper import load_topo
from p4utils.utils.sswitch_thrift_API import SimpleSwitchThriftAPI
from scapy.layers.inet import _IPOption_HDR
import binascii

class CpuHeader(Packet):
name = β€˜CpuPacket’
fields_desc = [BitField(β€˜srcPort’,0,16), BitField(β€˜dstPort’, 0, 16)]

bind_layers(CpuHeader, TCP)

class PrController(object):

tcp_ports_table = []

def __init__(self, sw_name):
    self.topo = load_topo('topology.json')
    self.sw_name = sw_name
    self.thrift_port = self.topo.get_thrift_port(sw_name)
    self.cpu_port =  self.topo.get_cpu_port_index(self.sw_name)
    self.controller = SimpleSwitchThriftAPI(self.thrift_port)
    self.init()

def init(self):
    self.controller.reset_state()
    self.add_mirror()
    self.ipv4_forward_fill()
    self.controller.set_queue_rate(1000,2)
    self.controller.set_queue_rate(1000,1)

def add_mirror(self):
    if self.cpu_port:
        self.controller.mirroring_add(100, self.cpu_port)

def ipv4_forward_fill(self):
    self.controller.table_add("ipv4_lpm", "ipv4_forward", ['10.0.0.1/32'], ['00:00:00:00:00:01','1'])
    self.controller.table_add("ipv4_lpm", "ipv4_forward", ['10.0.0.2/32'], ['00:00:00:00:00:02','2'])

def handle_pkt(self, pkt):
    print("Controller got a packet")
    cpu_packet = CpuHeader(bytes(pkt))
    tcp_data = bytes(cpu_packet.payload.payload)
    print(cpu_packet.srcPort, cpu_packet.dstPort)
    #print(tcp_data)
    self.handshake_check(cpu_packet, tcp_data)
    #print(self.tcp_ports_table)
    sys.stdout.flush()

def handshake_check(self, cpu_header, data):
    try:
        hex_data = binascii.hexlify(data)
    except:
        return
    if  hex_data.startswith(b'13426974546f7272656e742070726f746f636f6c'):
        print("\n\n ---- HANDSHAKE ---- \n")
        pr = '7'
        self.learn_priority([cpu_header.srcPort, cpu_header.dstPort],pr)
    #else:
        #self.learn_priority([cpu_header.srcPort, cpu_header.dstPort],0)

def learn_priority(self, learning_data, priority):
    for tcp_port in learning_data:
        s_tcp_port = str(tcp_port)
        if s_tcp_port not in self.tcp_ports_table:
            self.tcp_ports_table.append(s_tcp_port)
            self.controller.table_add("tcp_port_learn", "NoAction", [str(tcp_port)])
            self.controller.table_add("priority_set", "set_priority", [str(tcp_port)], [priority])

def run_cpu_port(self):
    cpu_port_intf = str(self.topo.get_cpu_port_intf(self.sw_name).replace("eth0", "eth1"))
    sniff(iface=cpu_port_intf, prn=self.handle_pkt)

if name == β€˜main’:
sw_name = sys.argv[1]
controller = PrController(sw_name).run_cpu_port()

Does this code work? How can i check it? If not, what do i have to do to make it work?

I forgot to mention that this code does work for the part of recognizing BitTorrent traffic but in terms of prioritize it, i think it doesn’t.

@MikVakis ,

Thank you for providing the full source code of your program!

Generally, it looks OK, so my main question would be how exactly did you test packet prioritization and what made you conclude that it does not work?

Happy hacking,
Vladimir

If you are using a simple_switch or simple_switch_grpc executable for running your P4 program, one that was built from source code since mid-November when one minor issue was updated, then in order to enable multiple priority queues per output port, you need to ensure you are using the command line option --priority-queues <number> when starting the program. If you do not, then the default behavior is to have only 1 queue per port, and I believe the behavior if you use a priority value that is too large, the packet will be dropped.

With recent versions of these programs, queues with higher numeric priority values have strictly higher priority to be scheduled vs. packets in queues with lower numeric priority, i.e. priority queue 1 packets are strictly higher priority than priority queue 0 packets.

Several ways to install a recent version of the P4 development tools can be found here: GitHub - p4lang/tutorials: P4 language tutorials including downloading an x86_64 VirtualBox VM image with the open source P4 development tools already installed on it, if you have an x86_64 system that can run VirtualBox.

1 Like

Hello Vladimir,

Thanks for your quick response. First of all, i will add the code i run to create the network

from p4utils.mininetlib.network_API import NetworkAPI
from mininet.net import Mininet

net = NetworkAPI()
net.setLogLevel(β€˜info’)
net.enableCli()

natIP=β€˜10.0.0.5’
net.addP4Switch(β€˜s1’)
net.setPriorityQueueNum(β€˜s1’, 8)
net.addHost(β€˜h1’)
net.addHost(β€˜h2’)

net.setP4Source(β€˜s1’,β€˜switch.p4’)
net.addLink(β€˜h1’, β€˜s1’)
net.addLink(β€˜h2’, β€˜s1’)

net.setIntfIp(β€˜h1’,β€˜s1’,β€˜10.0.0.1/24’)
net.setIntfIp(β€˜h2’,β€˜s1’,β€˜10.0.0.2/24’)

net.setIntfMac(β€˜h1’,β€˜s1’,β€˜00:00:00:00:00:01’)
net.setIntfMac(β€˜h2’,β€˜s1’,β€˜00:00:00:00:00:02’)

Nodes general options

net.enableCpuPortAll()
net.enablePcapDumpAll()
net.enableLogAll()

net.startNetwork()

Now in order to test i haved two diffirent torrent clients (qbittorrent, ktorrent), one for each of the hosts. For h1 i use qbittorrent to make it work as a private tracker and then i create and upload the torrent file, and for h2 i use kbittorrent to download that torrent. To test the priority aspect i created another traffic using iperf but it doesn’t seem to work. I updated p4 development tools but still nothing. Is there anything missing? Thanks again for your time.

Best regards,
Michalis

Hello there,

Thanks a lot for your quick response. Here is my code for the test network:

from p4utils.mininetlib.network_API import NetworkAPI
from mininet.net import Mininet

net = NetworkAPI()
net.setLogLevel(β€˜info’)
net.enableCli()

natIP=β€˜10.0.0.5’
net.addP4Switch(β€˜s1’)
net.setPriorityQueueNum(β€˜s1’, 8)
net.addHost(β€˜h1’)
net.addHost(β€˜h2’)
#net.addHost(β€˜h3’)

net.setP4Source(β€˜s1’,β€˜switch.p4’)
net.addLink(β€˜h1’, β€˜s1’)
net.addLink(β€˜h2’, β€˜s1’)

net.setIntfIp(β€˜h1’,β€˜s1’,β€˜10.0.0.1/24’)
net.setIntfIp(β€˜h2’,β€˜s1’,β€˜10.0.0.2/24’)

net.setIntfMac(β€˜h1’,β€˜s1’,β€˜00:00:00:00:00:01’)
net.setIntfMac(β€˜h2’,β€˜s1’,β€˜00:00:00:00:00:02’)

Nodes general options

net.enableCpuPortAll()
net.enablePcapDumpAll()
net.enableLogAll()

net.startNetwork()

I am using p4-utils for this and so i have enabled the option --priority-queues through net.setPriorityQueueNum(β€˜s1’, 8), my network starts correctly and i can see that my switch was created with that option but still it doesn’t work. Am i missing something? Thanks again for your time!

Best regards,
Michalis

@MikVakis ,

The reason I asked you how did you test is because correct testing of prioritization is not trivial.

First, you need to create a congestion situation, because if there is no congestion, prioritization has no effect. Generally, this is done by sending the traffic from two or more ports into one. Even then, you need to ascertain that the congestion is there and is observable. You can certainly create a congestion on a real device by sending line rate traffic from two ports into one. It is also possible to use ports with different speeds (e.g. the port receiving the traffic can run at 100Gbps, while the port sending the traffic will run at 25Gbps).

After that you typically inject a mix of packets that are supposed to be sent using different priorities and observe the results. Let’s assume that you injected N high-priority and N low-priority packets. In case of congestion, you will probably receive H packets of high priority and L packets of low priority on the other side.

If H + L <= N * 2, you should also see H > L (ideally H should be significantly greater than L)

If H + L == N * 2 that means that the congestion was absorbed by the buffering and in that case you should see more high-priority packets in the beginning of the output stream and fewer at the end (or if you measure the latency, then the average latency of higher-priority packets should be lower than of the lower-priority ones).

Most other experiments are not going to show you the reliable results (if any).

Happy hacking,
Vladimir