From e70cf0c7c6531e331a724eded83f602d86ddc838 Mon Sep 17 00:00:00 2001 From: adamdunkels Date: Wed, 29 Apr 2009 20:48:57 +0000 Subject: [PATCH] Rewrite of the collect module. The new version uses a packet queue for outgoing and forwarded packets so that more than one packet can be forwarded at the same time. The new version uses the packetqueue module to maintain the packet queue. --- core/net/rime/collect.c | 158 +++++++++++++++++++++++++++++++--------- core/net/rime/collect.h | 4 +- 2 files changed, 126 insertions(+), 36 deletions(-) diff --git a/core/net/rime/collect.c b/core/net/rime/collect.c index fea09ad1f..25ac273b0 100644 --- a/core/net/rime/collect.c +++ b/core/net/rime/collect.c @@ -36,7 +36,7 @@ * * This file is part of the Contiki operating system. * - * $Id: collect.c,v 1.24 2009/04/07 13:06:03 adamdunkels Exp $ + * $Id: collect.c,v 1.25 2009/04/29 20:48:57 adamdunkels Exp $ */ /** @@ -52,6 +52,8 @@ #include "net/rime/neighbor.h" #include "net/rime/collect.h" +#include "net/rime/packetqueue.h" + #include "dev/radio-sensor.h" #if CONTIKI_TARGET_NETSIM @@ -78,6 +80,10 @@ struct recent_packet { static struct recent_packet recent_packets[NUM_RECENT_PACKETS]; static uint8_t recent_packet_ptr; +#define FORWARD_PACKET_LIFETIME (CLOCK_SECOND * 16) +#define MAX_FORWARDING_QUEUE 4 +PACKETQUEUE(forwarding_queue, MAX_FORWARDING_QUEUE); + #define SINK 0 #define RTMETRIC_MAX COLLECT_MAX_DEPTH @@ -86,11 +92,69 @@ static uint8_t recent_packet_ptr; #define DEBUG 0 #if DEBUG #include -#define PRINTF(...) printf(__VA_ARGS__) +#define PRINTF(...) printf(__VA_ARGS__); fflush(NULL) #else #define PRINTF(...) #endif +/*---------------------------------------------------------------------------*/ +static void +send_queued_packet(void) +{ + struct queuebuf *q; + struct neighbor *n; + struct packetqueue_item *i; + struct collect_conn *c; + + i = packetqueue_first(&forwarding_queue); + if(i == NULL) { + PRINTF("%d.%d: nothing on queue\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); + /* No packet on the queue, so there is nothing for us to send. */ + return; + } + c = packetqueue_ptr(i); + if(c == NULL) { + /* c should not be NULL, but we check it just to be sure. */ + PRINTF("%d.%d: queue, c == NULL!\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); + return; + } + + if(c->forwarding) { + /* If we are currently forwarding a packet, we wait until the + packet is forwarded and try again then. */ + PRINTF("%d.%d: queue, c is forwarding\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); + return; + } + + q = packetqueue_queuebuf(i); + if(q != NULL) { + PRINTF("%d.%d: queue, q is on queue\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); + queuebuf_to_packetbuf(q); + + n = neighbor_best(); + + /* Don't send to the neighbor if it is the same neighbor that sent + us the packet. */ + if(n != NULL && !rimeaddr_cmp(&n->addr, packetbuf_addr(PACKETBUF_ADDR_SENDER))) { +#if CONTIKI_TARGET_NETSIM + ether_set_line(n->addr.u8[0], n->addr.u8[1]); +#endif /* CONTIKI_TARGET_NETSIM */ + PRINTF("%d.%d: sending packet to %d.%d\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], + n->addr.u8[0], n->addr.u8[1]); + + c->forwarding = 1; + runicast_send(&c->runicast_conn, &n->addr, packetbuf_attr(PACKETBUF_ATTR_MAX_REXMIT)); + } else { + PRINTF("%d.%d: did not find any neighbor to forward to\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); + } + } +} /*---------------------------------------------------------------------------*/ static void update_rtmetric(struct collect_conn *tc) @@ -114,18 +178,25 @@ update_rtmetric(struct collect_conn *tc) rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); } tc->rtmetric = RTMETRIC_MAX; - announcement_set_value(&tc->announcement, tc->rtmetric); + announcement_set_value(&tc->announcement, tc->rtmetric); } else { /* We set our rtmetric to the rtmetric of our best neighbor plus the expected transmissions to reach that neighbor. */ if(n->rtmetric + neighbor_etx(n) != tc->rtmetric) { + uint16_t old_rtmetric = tc->rtmetric; + tc->rtmetric = n->rtmetric + neighbor_etx(n); /* neighbor_discovery_start(&tc->neighbor_discovery_conn, tc->rtmetric);*/ announcement_set_value(&tc->announcement, tc->rtmetric); PRINTF("%d.%d: new rtmetric %d\n", rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], tc->rtmetric); + + /* We got a new, working, route we send any queued packets we may have. */ + if(old_rtmetric == RTMETRIC_MAX) { + send_queued_packet(); + } } } } @@ -149,7 +220,6 @@ node_packet_received(struct runicast_conn *c, rimeaddr_t *from, uint8_t seqno) { struct collect_conn *tc = (struct collect_conn *) ((char *)c - offsetof(struct collect_conn, runicast_conn)); - struct neighbor *n; int i; /* To protect against forwarding duplicate packets, we keep a list @@ -158,9 +228,11 @@ node_packet_received(struct runicast_conn *c, rimeaddr_t *from, uint8_t seqno) for(i = 0; i < NUM_RECENT_PACKETS; i++) { if(recent_packets[i].seqno == packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID) && - rimeaddr_cmp(&recent_packets[i].originator, - packetbuf_addr(PACKETBUF_ADDR_ESENDER))) { - PRINTF("%d.%d: dropping duplicate packet with seqno %d\n", + rimeaddr_cmp(&recent_packets[i].originator, + packetbuf_addr(PACKETBUF_ADDR_ESENDER))) { + PRINTF("%d.%d: dropping duplicate packet from %d.%d with seqno %d\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], + recent_packets[i].originator.u8[0], recent_packets[i].originator.u8[1], packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID)); /* Drop the packet. */ return; @@ -202,49 +274,56 @@ node_packet_received(struct runicast_conn *c, rimeaddr_t *from, uint8_t seqno) packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[1], from->u8[0], from->u8[1], tc->forwarding); - if(!tc->forwarding) { - n = neighbor_best(); - if(n != NULL && !rimeaddr_cmp(&n->addr, from)) { -#if CONTIKI_TARGET_NETSIM - ether_set_line(n->addr.u8[0], n->addr.u8[1]); -#endif /* CONTIKI_TARGET_NETSIM */ - tc->forwarding = 1; - runicast_send(c, &n->addr, packetbuf_attr(PACKETBUF_ATTR_MAX_REXMIT)); - } else { - PRINTF("%d.%d: did not find any neighbor to forward to\n", - rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); - } - return; - } else { - - PRINTF("%d.%d: still forwarding another packet\n", - rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); - return; + if(packetqueue_enqueue_packetbuf(&forwarding_queue, FORWARD_PACKET_LIFETIME, + tc)) { + send_queued_packet(); } } + return; } /*---------------------------------------------------------------------------*/ static void -node_packet_sent(struct runicast_conn *c, rimeaddr_t *to, uint8_t retransmissions) +node_packet_sent(struct runicast_conn *c, rimeaddr_t *to, uint8_t transmissions) { struct collect_conn *tc = (struct collect_conn *) ((char *)c - offsetof(struct collect_conn, runicast_conn)); + PRINTF("%d.%d: sent to %d.%d after %d retransmissions\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], + to->u8[0], to->u8[1], + transmissions); + + tc->forwarding = 0; - neighbor_update_etx(neighbor_find(to), retransmissions); + neighbor_update_etx(neighbor_find(to), transmissions); update_rtmetric(tc); + + /* Remove the first packet on the queue, the packet that was just sent. */ + packetqueue_dequeue(&forwarding_queue); + + /* Send the next packet in the queue, if any. */ + send_queued_packet(); } /*---------------------------------------------------------------------------*/ static void -node_packet_timedout(struct runicast_conn *c, rimeaddr_t *to, uint8_t retransmissions) +node_packet_timedout(struct runicast_conn *c, rimeaddr_t *to, uint8_t transmissions) { struct collect_conn *tc = (struct collect_conn *) ((char *)c - offsetof(struct collect_conn, runicast_conn)); + PRINTF("%d.%d: timedout after %d retransmissions\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], transmissions); + tc->forwarding = 0; - neighbor_timedout_etx(neighbor_find(to), retransmissions); + neighbor_timedout_etx(neighbor_find(to), transmissions); update_rtmetric(tc); + + /* Remove the first packet on the queue, the packet that just timed out. */ + packetqueue_dequeue(&forwarding_queue); + + /* Send the next packet in the queue, if any. */ + send_queued_packet(); } /*---------------------------------------------------------------------------*/ /*static void @@ -279,6 +358,9 @@ received_announcement(struct announcement *a, rimeaddr_t *from, if(n == NULL) { neighbor_add(from, value, 1); + PRINTF("%d.%d: new neighbor %d.%d, etx %d\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], + from->u8[0], from->u8[1], value); } else { neighbor_update(n, value); PRINTF("%d.%d: updating neighbor %d.%d, etx %d\n", @@ -290,8 +372,8 @@ received_announcement(struct announcement *a, rimeaddr_t *from, } /*---------------------------------------------------------------------------*/ static const struct runicast_callbacks runicast_callbacks = {node_packet_received, - node_packet_sent, - node_packet_timedout}; + node_packet_sent, + node_packet_timedout}; /*static const struct neighbor_discovery_callbacks neighbor_discovery_callbacks = { adv_received, NULL};*/ /*---------------------------------------------------------------------------*/ @@ -328,6 +410,7 @@ collect_set_sink(struct collect_conn *tc, int should_be_sink) if(should_be_sink) { tc->rtmetric = SINK; /* neighbor_discovery_start(&tc->neighbor_discovery_conn, tc->rtmetric);*/ + announcement_set_value(&tc->announcement, tc->rtmetric); } else { tc->rtmetric = RTMETRIC_MAX; } @@ -346,7 +429,7 @@ collect_send(struct collect_conn *tc, int rexmits) packetbuf_set_attr(PACKETBUF_ATTR_TTL, MAX_HOPLIM); packetbuf_set_attr(PACKETBUF_ATTR_MAX_REXMIT, rexmits); - if(tc->rtmetric == SINK) { + if(tc->rtmetric == 0) { packetbuf_set_attr(PACKETBUF_ATTR_HOPS, 0); if(tc->cb->recv != NULL) { tc->cb->recv(packetbuf_addr(PACKETBUF_ADDR_ESENDER), @@ -363,13 +446,20 @@ collect_send(struct collect_conn *tc, int rexmits) PRINTF("%d.%d: sending to %d.%d\n", rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], n->addr.u8[0], n->addr.u8[1]); - tc->forwarding = 1; - return runicast_send(&tc->runicast_conn, &n->addr, rexmits); + if(packetqueue_enqueue_packetbuf(&forwarding_queue, FORWARD_PACKET_LIFETIME, + tc)) { + send_queued_packet(); + return 1; + } } else { /* printf("Didn't find any neighbor\n");*/ PRINTF("%d.%d: did not find any neighbor to send to\n", rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); announcement_listen(1); + if(packetqueue_enqueue_packetbuf(&forwarding_queue, FORWARD_PACKET_LIFETIME, + tc)) { + return 1; + } } } return 0; diff --git a/core/net/rime/collect.h b/core/net/rime/collect.h index e6db7f42a..270f7df76 100644 --- a/core/net/rime/collect.h +++ b/core/net/rime/collect.h @@ -47,7 +47,7 @@ * * This file is part of the Contiki operating system. * - * $Id: collect.h,v 1.10 2009/03/12 21:58:21 adamdunkels Exp $ + * $Id: collect.h,v 1.11 2009/04/29 20:48:57 adamdunkels Exp $ */ /** @@ -64,7 +64,7 @@ #include "net/rime/runicast.h" #define COLLECT_ATTRIBUTES { PACKETBUF_ADDR_ESENDER, PACKETBUF_ADDRSIZE }, \ - { PACKETBUF_ATTR_EPACKET_ID, PACKETBUF_ATTR_BIT * 2 }, \ + { PACKETBUF_ATTR_EPACKET_ID, PACKETBUF_ATTR_BIT * 4 }, \ { PACKETBUF_ATTR_TTL, PACKETBUF_ATTR_BIT * 4 }, \ { PACKETBUF_ATTR_HOPS, PACKETBUF_ATTR_BIT * 4 }, \ { PACKETBUF_ATTR_MAX_REXMIT, PACKETBUF_ATTR_BIT * 3 }, \