Commit 29f834aa authored by Eric Dumazet's avatar Eric Dumazet Committed by Paolo Abeni
Browse files

net_sched: sch_fq: add 3 bands and WRR scheduling



Before Google adopted FQ for its production servers,
we had to ensure AF4 packets would get a higher share
than BE1 ones.

As discussed this week in Netconf 2023 in Paris, it is time
to upstream this for public use.

After this patch FQ can replace pfifo_fast, with the following
differences :

- FQ uses WRR instead of strict prio, to avoid starvation of
  low priority packets.

- We make sure each band/prio tracks its own usage against sch->limit.
  This was done to make sure flood of low priority packets would not
  prevent AF4 packets to be queued. Contributed by Willem.

- priomap can be changed, if needed (default value are the ones
  coming from pfifo_fast).

In this patch, we set default band weights so that :

- high prio (band=0) packets get 90% of the bandwidth
  if they compete with low prio (band=2) packets.

- high prio packets get 75% of the bandwidth
  if they compete with medium prio (band=1) packets.

Following patch in this series adds the possibility to tune
the per-band weights.

As we added many fields in 'struct fq_sched_data', we had
to make sure to have the first cache line read-mostly, and
avoid wasting precious cache lines.

More optimizations are possible but will be sent separately.

Signed-off-by: default avatarEric Dumazet <edumazet@google.com>
Acked-by: default avatarDave Taht <dave.taht@gmail.com>
Reviewed-by: default avatarWillem de Bruijn <willemb@google.com>
Acked-by: default avatarSoheil Hassas Yeganeh <soheil@google.com>
Reviewed-by: default avatarToke Høiland-Jørgensen <toke@redhat.com>
Signed-off-by: default avatarPaolo Abeni <pabeni@redhat.com>
parent 5579ee46
Loading
Loading
Loading
Loading
+9 −2
Original line number Diff line number Diff line
@@ -941,15 +941,19 @@ enum {

	TCA_FQ_HORIZON_DROP,	/* drop packets beyond horizon, or cap their EDT */

	TCA_FQ_PRIOMAP,		/* prio2band */

	__TCA_FQ_MAX
};

#define TCA_FQ_MAX	(__TCA_FQ_MAX - 1)

#define FQ_BANDS 3

struct tc_fq_qd_stats {
	__u64	gc_flows;
	__u64	highprio_packets;
	__u64	tcp_retrans;
	__u64	highprio_packets;	/* obsolete */
	__u64	tcp_retrans;		/* obsolete */
	__u64	throttled;
	__u64	flows_plimit;
	__u64	pkts_too_long;
@@ -963,6 +967,9 @@ struct tc_fq_qd_stats {
	__u64	horizon_drops;
	__u64	horizon_caps;
	__u64	fastpath_packets;
	__u64	band_drops[FQ_BANDS];
	__u32	band_pkt_count[FQ_BANDS];
	__u32	pad;
};

/* Heavy-Hitter Filter */
+162 −42
Original line number Diff line number Diff line
@@ -52,6 +52,7 @@

struct fq_skb_cb {
	u64	time_to_send;
	u8	band;
};

static inline struct fq_skb_cb *fq_skb_cb(struct sk_buff *skb)
@@ -84,32 +85,28 @@ struct fq_flow {
	u32		socket_hash;	/* sk_hash */
	int		qlen;		/* number of packets in flow queue */

/* Second cache line, used in fq_dequeue() */
/* Second cache line */
	int		credit;
	/* 32bit hole on 64bit arches */

	int		band;
	struct fq_flow *next;		/* next pointer in RR lists */

	struct rb_node  rate_node;	/* anchor in q->delayed tree */
	u64		time_next_packet;
} ____cacheline_aligned_in_smp;
};

struct fq_flow_head {
	struct fq_flow *first;
	struct fq_flow *last;
};

struct fq_sched_data {
struct fq_perband_flows {
	struct fq_flow_head new_flows;

	struct fq_flow_head old_flows;
	int		    credit;
	int		    quantum; /* based on band nr : 576KB, 192KB, 64KB */
};

	struct rb_root	delayed;	/* for rate limited flows */
	u64		time_next_delayed_flow;
	unsigned long	unthrottle_latency_ns;

	struct fq_flow	internal;	/* for non classified or high prio packets */

struct fq_sched_data {
/* Read mostly cache line */

	u32		quantum;
@@ -125,10 +122,21 @@ struct fq_sched_data {
	u8		rate_enable;
	u8		fq_trees_log;
	u8		horizon_drop;
	u8		prio2band[(TC_PRIO_MAX + 1) >> 2];
	u32		timer_slack; /* hrtimer slack in ns */

/* Read/Write fields. */

	unsigned int band_nr; /* band being serviced in fq_dequeue() */

	struct fq_perband_flows band_flows[FQ_BANDS];

	struct fq_flow	internal;	/* fastpath queue. */
	struct rb_root	delayed;	/* for rate limited flows */
	u64		time_next_delayed_flow;
	unsigned long	unthrottle_latency_ns;

	u32		band_pkt_count[FQ_BANDS];
	u32		flows;
	u32		inactive_flows; /* Flows with no packet to send. */
	u32		throttled_flows;
@@ -139,7 +147,7 @@ struct fq_sched_data {

/* Seldom used fields. */

	u64		stat_internal_packets; /* aka highprio */
	u64		stat_band_drops[FQ_BANDS];
	u64		stat_ce_mark;
	u64		stat_horizon_drops;
	u64		stat_horizon_caps;
@@ -148,6 +156,12 @@ struct fq_sched_data {
	u64		stat_allocation_errors;
};

/* return the i-th 2-bit value ("crumb") */
static u8 fq_prio2band(const u8 *prio2band, unsigned int prio)
{
	return (prio2band[prio / 4] >> (2 * (prio & 0x3))) & 0x3;
}

/*
 * f->tail and f->age share the same location.
 * We can use the low order bit to differentiate if this location points
@@ -172,8 +186,19 @@ static bool fq_flow_is_throttled(const struct fq_flow *f)
	return f->next == &throttled;
}

static void fq_flow_add_tail(struct fq_flow_head *head, struct fq_flow *flow)
enum new_flow {
	NEW_FLOW,
	OLD_FLOW
};

static void fq_flow_add_tail(struct fq_sched_data *q, struct fq_flow *flow,
			     enum new_flow list_sel)
{
	struct fq_perband_flows *pband = &q->band_flows[flow->band];
	struct fq_flow_head *head = (list_sel == NEW_FLOW) ?
					&pband->new_flows :
					&pband->old_flows;

	if (head->first)
		head->last->next = flow;
	else
@@ -186,7 +211,7 @@ static void fq_flow_unset_throttled(struct fq_sched_data *q, struct fq_flow *f)
{
	rb_erase(&f->rate_node, &q->delayed);
	q->throttled_flows--;
	fq_flow_add_tail(&q->old_flows, f);
	fq_flow_add_tail(q, f, OLD_FLOW);
}

static void fq_flow_set_throttled(struct fq_sched_data *q, struct fq_flow *f)
@@ -326,11 +351,6 @@ static struct fq_flow *fq_classify(struct Qdisc *sch, struct sk_buff *skb,
	struct rb_root *root;
	struct fq_flow *f;

	/* warning: no starvation prevention... */
	if (unlikely((skb->priority & TC_PRIO_MAX) == TC_PRIO_CONTROL)) {
		q->stat_internal_packets++; /* highprio packet */
		return &q->internal;
	}
	/* SYNACK messages are attached to a TCP_NEW_SYN_RECV request socket
	 * or a listener (SYNCOOKIE mode)
	 * 1) request sockets are not full blown,
@@ -509,9 +529,13 @@ static int fq_enqueue(struct sk_buff *skb, struct Qdisc *sch,
	struct fq_sched_data *q = qdisc_priv(sch);
	struct fq_flow *f;
	u64 now;
	u8 band;

	if (unlikely(sch->q.qlen >= sch->limit))
	band = fq_prio2band(q->prio2band, skb->priority & TC_PRIO_MAX);
	if (unlikely(q->band_pkt_count[band] >= sch->limit)) {
		q->stat_band_drops[band]++;
		return qdisc_drop(skb, sch, to_free);
	}

	now = ktime_get_ns();
	if (!skb->tstamp) {
@@ -538,11 +562,14 @@ static int fq_enqueue(struct sk_buff *skb, struct Qdisc *sch,
		}

		if (fq_flow_is_detached(f)) {
			fq_flow_add_tail(&q->new_flows, f);
			fq_flow_add_tail(q, f, NEW_FLOW);
			if (time_after(jiffies, f->age + q->flow_refill_delay))
				f->credit = max_t(u32, f->credit, q->quantum);
		}

		f->band = band;
		q->band_pkt_count[band]++;
		fq_skb_cb(skb)->band = band;
		if (f->qlen == 0)
			q->inactive_flows--;
	}
@@ -584,13 +611,26 @@ static void fq_check_throttled(struct fq_sched_data *q, u64 now)
	}
}

static struct fq_flow_head *fq_pband_head_select(struct fq_perband_flows *pband)
{
	if (pband->credit <= 0)
		return NULL;

	if (pband->new_flows.first)
		return &pband->new_flows;

	return pband->old_flows.first ? &pband->old_flows : NULL;
}

static struct sk_buff *fq_dequeue(struct Qdisc *sch)
{
	struct fq_sched_data *q = qdisc_priv(sch);
	struct fq_perband_flows *pband;
	struct fq_flow_head *head;
	struct sk_buff *skb;
	struct fq_flow *f;
	unsigned long rate;
	int retry;
	u32 plen;
	u64 now;

@@ -606,24 +646,31 @@ static struct sk_buff *fq_dequeue(struct Qdisc *sch)

	now = ktime_get_ns();
	fq_check_throttled(q, now);
	retry = 0;
	pband = &q->band_flows[q->band_nr];
begin:
	head = &q->new_flows;
	if (!head->first) {
		head = &q->old_flows;
		if (!head->first) {
	head = fq_pband_head_select(pband);
	if (!head) {
		while (++retry < FQ_BANDS) {
			if (++q->band_nr == FQ_BANDS)
				q->band_nr = 0;
			pband = &q->band_flows[q->band_nr];
			pband->credit = min(pband->credit + pband->quantum,
					    pband->quantum);
			goto begin;
		}
		if (q->time_next_delayed_flow != ~0ULL)
			qdisc_watchdog_schedule_range_ns(&q->watchdog,
							q->time_next_delayed_flow,
							q->timer_slack);
		return NULL;
	}
	}
	f = head->first;

	retry = 0;
	if (f->credit <= 0) {
		f->credit += q->quantum;
		head->first = f->next;
		fq_flow_add_tail(&q->old_flows, f);
		fq_flow_add_tail(q, f, OLD_FLOW);
		goto begin;
	}

@@ -645,12 +692,13 @@ static struct sk_buff *fq_dequeue(struct Qdisc *sch)
		}
		if (--f->qlen == 0)
			q->inactive_flows++;
		q->band_pkt_count[fq_skb_cb(skb)->band]--;
		fq_dequeue_skb(sch, f, skb);
	} else {
		head->first = f->next;
		/* force a pass through old_flows to prevent starvation */
		if ((head == &q->new_flows) && q->old_flows.first) {
			fq_flow_add_tail(&q->old_flows, f);
		if (head == &pband->new_flows) {
			fq_flow_add_tail(q, f, OLD_FLOW);
		} else {
			fq_flow_set_detached(f);
		}
@@ -658,6 +706,7 @@ static struct sk_buff *fq_dequeue(struct Qdisc *sch)
	}
	plen = qdisc_pkt_len(skb);
	f->credit -= plen;
	pband->credit -= plen;

	if (!q->rate_enable)
		goto out;
@@ -749,8 +798,10 @@ static void fq_reset(struct Qdisc *sch)
			kmem_cache_free(fq_flow_cachep, f);
		}
	}
	q->new_flows.first	= NULL;
	q->old_flows.first	= NULL;
	for (idx = 0; idx < FQ_BANDS; idx++) {
		q->band_flows[idx].new_flows.first = NULL;
		q->band_flows[idx].old_flows.first = NULL;
	}
	q->delayed		= RB_ROOT;
	q->flows		= 0;
	q->inactive_flows	= 0;
@@ -864,8 +915,54 @@ static const struct nla_policy fq_policy[TCA_FQ_MAX + 1] = {
	[TCA_FQ_TIMER_SLACK]		= { .type = NLA_U32 },
	[TCA_FQ_HORIZON]		= { .type = NLA_U32 },
	[TCA_FQ_HORIZON_DROP]		= { .type = NLA_U8 },
	[TCA_FQ_PRIOMAP]		= {
			.type = NLA_BINARY,
			.len = sizeof(struct tc_prio_qopt),
		},
};

/* compress a u8 array with all elems <= 3 to an array of 2-bit fields */
static void fq_prio2band_compress_crumb(const u8 *in, u8 *out)
{
	const int num_elems = TC_PRIO_MAX + 1;
	int i;

	memset(out, 0, num_elems / 4);
	for (i = 0; i < num_elems; i++)
		out[i / 4] |= in[i] << (2 * (i & 0x3));
}

static void fq_prio2band_decompress_crumb(const u8 *in, u8 *out)
{
	const int num_elems = TC_PRIO_MAX + 1;
	int i;

	for (i = 0; i < num_elems; i++)
		out[i] = fq_prio2band(in, i);
}

static int fq_load_priomap(struct fq_sched_data *q,
			   const struct nlattr *attr,
			   struct netlink_ext_ack *extack)
{
	const struct tc_prio_qopt *map = nla_data(attr);
	int i;

	if (map->bands != FQ_BANDS) {
		NL_SET_ERR_MSG_MOD(extack, "FQ only supports 3 bands");
		return -EINVAL;
	}
	for (i = 0; i < TC_PRIO_MAX + 1; i++) {
		if (map->priomap[i] >= FQ_BANDS) {
			NL_SET_ERR_MSG_FMT_MOD(extack, "FQ priomap field %d maps to a too high band %d",
					       i, map->priomap[i]);
			return -EINVAL;
		}
	}
	fq_prio2band_compress_crumb(map->priomap, q->prio2band);
	return 0;
}

static int fq_change(struct Qdisc *sch, struct nlattr *opt,
		     struct netlink_ext_ack *extack)
{
@@ -940,6 +1037,9 @@ static int fq_change(struct Qdisc *sch, struct nlattr *opt,
		q->flow_refill_delay = usecs_to_jiffies(usecs_delay);
	}

	if (!err && tb[TCA_FQ_PRIOMAP])
		err = fq_load_priomap(q, tb[TCA_FQ_PRIOMAP], extack);

	if (tb[TCA_FQ_ORPHAN_MASK])
		q->orphan_mask = nla_get_u32(tb[TCA_FQ_ORPHAN_MASK]);

@@ -991,7 +1091,7 @@ static int fq_init(struct Qdisc *sch, struct nlattr *opt,
		   struct netlink_ext_ack *extack)
{
	struct fq_sched_data *q = qdisc_priv(sch);
	int err;
	int i, err;

	sch->limit		= 10000;
	q->flow_plimit		= 100;
@@ -1001,8 +1101,13 @@ static int fq_init(struct Qdisc *sch, struct nlattr *opt,
	q->flow_max_rate	= ~0UL;
	q->time_next_delayed_flow = ~0ULL;
	q->rate_enable		= 1;
	q->new_flows.first	= NULL;
	q->old_flows.first	= NULL;
	for (i = 0; i < FQ_BANDS; i++) {
		q->band_flows[i].new_flows.first = NULL;
		q->band_flows[i].old_flows.first = NULL;
	}
	q->band_flows[0].quantum = 9 << 16;
	q->band_flows[1].quantum = 3 << 16;
	q->band_flows[2].quantum = 1 << 16;
	q->delayed		= RB_ROOT;
	q->fq_root		= NULL;
	q->fq_trees_log		= ilog2(1024);
@@ -1017,6 +1122,7 @@ static int fq_init(struct Qdisc *sch, struct nlattr *opt,
	/* Default ce_threshold of 4294 seconds */
	q->ce_threshold		= (u64)NSEC_PER_USEC * ~0U;

	fq_prio2band_compress_crumb(sch_default_prio2band, q->prio2band);
	qdisc_watchdog_init_clockid(&q->watchdog, sch, CLOCK_MONOTONIC);

	if (opt)
@@ -1031,6 +1137,9 @@ static int fq_dump(struct Qdisc *sch, struct sk_buff *skb)
{
	struct fq_sched_data *q = qdisc_priv(sch);
	u64 ce_threshold = q->ce_threshold;
	struct tc_prio_qopt prio = {
		.bands = FQ_BANDS,
	};
	u64 horizon = q->horizon;
	struct nlattr *opts;

@@ -1062,6 +1171,10 @@ static int fq_dump(struct Qdisc *sch, struct sk_buff *skb)
	    nla_put_u8(skb, TCA_FQ_HORIZON_DROP, q->horizon_drop))
		goto nla_put_failure;

	fq_prio2band_decompress_crumb(q->prio2band, prio.priomap);
	if (nla_put(skb, TCA_FQ_PRIOMAP, sizeof(prio), &prio))
		goto nla_put_failure;

	return nla_nest_end(skb, opts);

nla_put_failure:
@@ -1072,11 +1185,14 @@ static int fq_dump_stats(struct Qdisc *sch, struct gnet_dump *d)
{
	struct fq_sched_data *q = qdisc_priv(sch);
	struct tc_fq_qd_stats st;
	int i;

	st.pad = 0;

	sch_tree_lock(sch);

	st.gc_flows		  = q->stat_gc_flows;
	st.highprio_packets	  = q->stat_internal_packets;
	st.highprio_packets	  = 0;
	st.fastpath_packets	  = q->internal.stat_fastpath_packets;
	st.tcp_retrans		  = 0;
	st.throttled		  = q->stat_throttled;
@@ -1093,6 +1209,10 @@ static int fq_dump_stats(struct Qdisc *sch, struct gnet_dump *d)
	st.ce_mark		  = q->stat_ce_mark;
	st.horizon_drops	  = q->stat_horizon_drops;
	st.horizon_caps		  = q->stat_horizon_caps;
	for (i = 0; i < FQ_BANDS; i++) {
		st.band_drops[i]  = q->stat_band_drops[i];
		st.band_pkt_count[i] = q->band_pkt_count[i];
	}
	sch_tree_unlock(sch);

	return gnet_stats_copy_app(d, &st, sizeof(st));
@@ -1120,7 +1240,7 @@ static int __init fq_module_init(void)

	fq_flow_cachep = kmem_cache_create("fq_flow_cache",
					   sizeof(struct fq_flow),
					   0, 0, NULL);
					   0, SLAB_HWCACHE_ALIGN, NULL);
	if (!fq_flow_cachep)
		return -ENOMEM;