1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use errors::{ParsingError, RedisError};
use results::RedisResult;
use std::io::BufReader;
use std::io::prelude::*;
use std::net::TcpStream;
pub struct Reader;
impl Reader {
pub fn read(buffer: &mut BufReader<TcpStream>) -> Result<RedisResult, RedisError> {
let mut head_line = String::new();
try!(buffer.read_line(&mut head_line));
let identifier = head_line.remove(0);
match identifier{
'$' => Reader::read_bulk_string(&head_line, buffer),
'*' => Reader::read_array(&head_line, buffer),
'+' => Reader::read_string(&head_line),
':' => Reader::read_integer(&head_line),
'-' => Reader::read_error(&head_line),
_ => Err(RedisError::Parse(ParsingError::BadIdentifier(identifier.to_string()))),
}
}
pub fn read_pipeline(buffer: &mut BufReader<TcpStream>, cmd_nb: usize) -> Result<Vec<RedisResult>, RedisError> {
let mut results: Vec<RedisResult> = Vec::with_capacity(cmd_nb);
let mut remaining_cmd = cmd_nb;
loop {
if remaining_cmd == 0 {
break;
}
remaining_cmd -= 1;
match Reader::read(buffer) {
Ok(value) => results.push(value),
Err(RedisError::Response(err)) => results.push(RedisResult::String(err)),
Err(err) => return Err(err),
};
}
Ok(results)
}
fn read_bulk_string(head_line: & String, buffer: &mut BufReader<TcpStream>) -> Result<RedisResult, RedisError> {
let read_byte_nb: i64 = try!(head_line.trim().parse());
if read_byte_nb < 0 {
Ok(RedisResult::Nil)
} else {
let mut result: Vec<u8> = Vec::with_capacity((read_byte_nb + 2) as usize);
loop {
let length = {
let buf = try!(buffer.fill_buf());
result.extend(buf.iter().cloned());
buf.len()
};
if result.len() >= (read_byte_nb + 2) as usize {
buffer.consume(length - (result.len() - (read_byte_nb + 2) as usize));
break;
} else {
buffer.consume(length);
}
}
result.truncate(read_byte_nb as usize);
Ok(RedisResult::Bytes(result))
}
}
fn read_string(simple_str: & String) -> Result<RedisResult, RedisError> {
Ok(RedisResult::String(simple_str.trim().to_string()))
}
fn read_integer(integer_str: & String) -> Result<RedisResult, RedisError> {
Ok(RedisResult::Int(try!(integer_str.trim().parse::<i64>())))
}
fn read_error(error_str: & String) -> Result<RedisResult, RedisError> {
Err(RedisError::Response(error_str.to_string()))
}
fn read_array(array_str: & String, buffer: &mut BufReader<TcpStream>) -> Result<RedisResult, RedisError> {
let mut read_elmt_nb: i64 = try!(array_str.trim().parse());
if read_elmt_nb < 0 {
Ok(RedisResult::Nil)
} else if read_elmt_nb == 0 {
Ok(RedisResult::Array(Vec::new()))
}else {
let mut result: Vec<RedisResult> = Vec::with_capacity(read_elmt_nb as usize);
loop {
match Reader::read(buffer) {
Ok(value) => result.push(value),
Err(RedisError::Response(err)) => result.push(RedisResult::String(err)),
Err(err) => return Err(err),
};
read_elmt_nb -= 1;
if read_elmt_nb == 0 {
break;
}
}
Ok(RedisResult::Array(result))
}
}
}