1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
use std::collections::{VecDeque, HashMap, HashSet};
use std::net::{UdpSocket, ToSocketAddrs, SocketAddr};
use std::io::Result as IoResult;
use super::msgqueue::*;
use super::UnrResult;
use bincode;
static MSG_PADDING: u16 = 32;
pub struct Sender {
out_queue: VecDeque<(MsgChunk, AddrsContainer)>,
last_id: u64,
socket: UdpSocket,
pub datagram_length: u16,
pub replication: u8
}
pub enum ReceiverFilter {
Whitelist(HashSet<SocketAddr>),
Blacklist(HashSet<SocketAddr>)
}
pub struct Receiver {
socket: UdpSocket,
queue: HashMap<SocketAddr, MsgQueue>,
pub datagram_length: u16,
pub max_connection_size: Option<usize>,
pub filter: ReceiverFilter
}
#[derive(Debug, Clone)]
pub struct AddrsContainer{
v: Vec<SocketAddr>
}
impl ReceiverFilter {
pub fn empty_blacklist() -> ReceiverFilter {
ReceiverFilter::Blacklist(HashSet::new())
}
fn allow_through(&self, addr: &SocketAddr) -> bool {
match self {
&ReceiverFilter::Whitelist(ref set) => set.contains(addr),
&ReceiverFilter::Blacklist(ref set) => !set.contains(addr)
}
}
}
impl AddrsContainer {
pub fn from_to_sock<T: ToSocketAddrs>(socket_addrs: T) -> IoResult<AddrsContainer> {
let iter = try!(socket_addrs.to_socket_addrs());
let vec = iter.collect();
Ok(AddrsContainer{v: vec})
}
}
impl ToSocketAddrs for AddrsContainer {
type Iter = ::std::vec::IntoIter<SocketAddr>;
fn to_socket_addrs(&self) -> IoResult<<AddrsContainer as ToSocketAddrs>::Iter> {
let slice: Vec<_> = self.v[..].iter().cloned().collect();
Ok(slice.into_iter())
}
}
impl Receiver {
pub fn from_socket(socket: UdpSocket, datagram_length: u16, max_connection_size: Option<usize>, filter: ReceiverFilter) -> Receiver {
Receiver {
socket: socket,
datagram_length: datagram_length,
queue: HashMap::new(),
max_connection_size: max_connection_size,
filter: filter,
}
}
pub fn poll(&mut self) -> UnrResult<(SocketAddr, CompleteMessage)> {
let mut buf: Vec<u8> = (0 .. self.datagram_length).map(|_| 0).collect();
loop {
let (amnt, from) = try!(self.socket.recv_from(&mut buf[..]));
if !self.filter.allow_through(&from) {
continue;
}
let data = &buf[0 .. amnt];
let chunk: MsgChunk = try!(bincode::rustc_serialize::decode(data));
let max_size = self.max_connection_size.clone();
let q = self.queue.entry(from.clone())
.or_insert_with(|| MsgQueue::new(max_size));
if let Some(completed) = q.insert_chunk(chunk) {
return Ok((from, completed));
}
}
}
pub fn clear_addr(&mut self, addr: &SocketAddr) {
self.queue.remove(addr);
}
}
impl Sender {
pub fn from_socket(socket: UdpSocket, datagram_length: u16, replication: u8) -> Sender {
Sender {
out_queue: VecDeque::new(),
last_id: 0,
socket: socket,
datagram_length: datagram_length,
replication: replication
}
}
pub fn enqueue<T: ToSocketAddrs>(&mut self, message: Vec<u8>, addrs: T) -> UnrResult<()> {
self.last_id += 1;
let id = self.last_id;
let addrs = try!(AddrsContainer::from_to_sock(addrs));
let num_chunks = message.len() / ((self.datagram_length - MSG_PADDING) as usize);
for _ in 0 .. self.replication {
let mut chunk_count = 0;
for chunk in message[..].chunks((self.datagram_length - MSG_PADDING) as usize) {
let mut v = Vec::new();
v.extend(chunk.iter().cloned());
let chunk = MsgChunk(
MsgId(id), PieceNum(chunk_count + 1, (num_chunks + 1) as u16), v);
self.out_queue.push_back((chunk, addrs.clone()));
chunk_count += 1;
}
}
Ok(())
}
pub fn send_one(&mut self) -> UnrResult<bool> {
let bound = bincode::SizeLimit::Bounded(self.datagram_length as u64);
if let Some((next, addrs)) = self.out_queue.pop_front() {
let bytes = try!(bincode::rustc_serialize::encode(&next, bound));
try!(self.socket.send_to(&bytes[..], addrs));
}
Ok(!self.out_queue.is_empty())
}
pub fn send_all(&mut self) -> UnrResult<()> {
while try!(self.send_one()) {}
Ok(())
}
pub fn is_queue_empty(&self) -> bool {
self.out_queue.is_empty()
}
pub fn queue_len(&self) -> usize {
self.out_queue.len()
}
}