1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use bchannel::{self, channel};
use std::net::{ToSocketAddrs, UdpSocket, SocketAddr};
use std::collections::HashSet;
use std::io::Result as IoResult;
use std::thread;
use std::marker::PhantomData;
use unreliable_message::network::AddrsContainer;
use unreliable_message::msgqueue::CompleteMessage;
use unreliable_message as unre;
use bincode;
use serialize;
use bincode::rustc_serialize::{
EncodingResult,
};
pub struct Sender<T> {
backing: bchannel::Sender<(Vec<u8>, AddrsContainer), unre::UnrError>,
_phantom: PhantomData<T>
}
impl <T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
Sender {
backing: self.backing.clone(),
_phantom: PhantomData
}
}
}
impl <T: serialize::Encodable> Sender<T> {
fn new(channel: bchannel::Sender<(Vec<u8>, AddrsContainer), unre::UnrError>) -> Sender<T> {
Sender {
backing: channel,
_phantom: PhantomData
}
}
pub fn send<A: ToSocketAddrs>(&self, object: &T, addrs: A) -> EncodingResult<()> {
let encoded = try!(bincode::rustc_serialize::encode(object, bincode::SizeLimit::Infinite));
let _ = self.backing.send((encoded, AddrsContainer::from_to_sock(addrs).unwrap()));
Ok(())
}
pub fn close(self) { }
}
pub type Receiver<T> = bchannel::Receiver<T, unre::UnrError>;
pub fn bind<I, O, A: ToSocketAddrs>(addr: A) -> IoResult<(Sender<I>, Receiver<(SocketAddr, O)>)>
where A: ToSocketAddrs, I: serialize::Encodable, O: serialize::Decodable + Send + 'static {
let addrs_clonable = try!(AddrsContainer::from_to_sock(addr));
let mut whitelist = HashSet::new();
whitelist.extend(try!(addrs_clonable.to_socket_addrs()));
let message_size = 1024;
let sock_1 = try!(UdpSocket::bind(addrs_clonable.clone()));
let sock_2 = try!(sock_1.try_clone());
let back_send = unre::Sender::from_socket(sock_1, message_size, 1);
let back_recv = unre::Receiver::from_socket(sock_2, message_size, None,
unre::network::ReceiverFilter::Whitelist(whitelist));
let (in_s, in_r) = channel();
let (out_s, out_r) = channel();
thread::spawn(move || {
let in_r = in_r;
let mut back_send = back_send;
loop {
if back_send.is_queue_empty() && in_r.is_closed() {
break;
}
for (bytes, from) in in_r.iter() {
if back_send.enqueue(bytes, from).is_err() {
break;
}
}
if back_send.send_one().is_err() {
break;
}
thread::sleep(::std::time::Duration::from_millis(2));
}
});
thread::spawn(move || {
let out_s = out_s;
let mut back_recv = back_recv;
loop {
match back_recv.poll() {
Ok((from, CompleteMessage(_id, bytes))) => {
match bincode::rustc_serialize::decode(&bytes[..]) {
Ok(obj) => {
if out_s.send((from, obj)).is_err() {
break;
}
},
Err(e) => {
let _ = out_s.error(unre::UnrError::DecodingError(e));
break;
}
};
}
Err(e) => {
let _ = out_s.error(e);
break;
}
}
}
});
Ok((Sender::new(in_s), out_r))
}