1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.utils.io;
20
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22 import org.apache.giraph.utils.ExtendedDataOutput;
23 import org.apache.hadoop.io.Writable;
24
25 import com.google.common.collect.Iterables;
26 import com.google.common.collect.Lists;
27
28 import java.io.DataInput;
29 import java.io.DataOutput;
30 import java.io.IOException;
31 import java.util.ArrayList;
32 import java.util.List;
33
34
35
36
37
38
39
40
41 public class BigDataOutput implements DataOutput, Writable {
42
43 private static final int DEFAULT_INITIAL_SIZE = 16;
44
45 private static final int MAX_SIZE = 1 << 25;
46
47
48
49
50 private static final int SIZE_DELTA = 100;
51
52
53 protected ExtendedDataOutput currentDataOutput;
54
55 protected List<ExtendedDataOutput> dataOutputs;
56
57 protected final ImmutableClassesGiraphConfiguration conf;
58
59
60
61
62
63
64 public BigDataOutput(ImmutableClassesGiraphConfiguration conf) {
65 this(DEFAULT_INITIAL_SIZE, conf);
66 }
67
68
69
70
71
72
73
74 public BigDataOutput(int initialSize,
75 ImmutableClassesGiraphConfiguration conf) {
76 this.conf = conf;
77 dataOutputs = null;
78 currentDataOutput = createOutput(initialSize);
79 }
80
81
82
83
84
85
86 protected int getMaxSize() {
87 return MAX_SIZE;
88 }
89
90
91
92
93
94
95
96 protected ExtendedDataOutput createOutput(int size) {
97 return conf.createExtendedDataOutput(size);
98 }
99
100
101
102
103
104
105
106 private ExtendedDataOutput getDataOutputToWriteTo() {
107 return getDataOutputToWriteTo(SIZE_DELTA);
108 }
109
110
111
112
113
114
115
116
117 private ExtendedDataOutput getDataOutputToWriteTo(int additionalSize) {
118 if (currentDataOutput.getPos() + additionalSize > getMaxSize()) {
119 if (dataOutputs == null) {
120 dataOutputs = new ArrayList<>(1);
121 }
122 dataOutputs.add(currentDataOutput);
123 currentDataOutput = createOutput(getMaxSize());
124 }
125 return currentDataOutput;
126 }
127
128
129
130
131
132
133 public int getNumberOfDataOutputs() {
134 return (dataOutputs == null) ? 1 : dataOutputs.size() + 1;
135 }
136
137
138
139
140
141
142 public Iterable<ExtendedDataOutput> getDataOutputs() {
143 ArrayList<ExtendedDataOutput> currentList =
144 Lists.newArrayList(currentDataOutput);
145 if (dataOutputs == null) {
146 return currentList;
147 } else {
148 return Iterables.concat(dataOutputs, currentList);
149 }
150 }
151
152 public ImmutableClassesGiraphConfiguration getConf() {
153 return conf;
154 }
155
156
157
158
159
160
161 public long getSize() {
162 long size = currentDataOutput.getPos();
163 if (dataOutputs != null) {
164 for (ExtendedDataOutput dataOutput : dataOutputs) {
165 size += dataOutput.getPos();
166 }
167 }
168 return size;
169 }
170
171 @Override
172 public void write(int b) throws IOException {
173 getDataOutputToWriteTo().write(b);
174 }
175
176 @Override
177 public void write(byte[] b) throws IOException {
178 write(b, 0, b.length);
179 }
180
181 @Override
182 public void write(byte[] b, int off, int len) throws IOException {
183 if (len <= getMaxSize()) {
184 getDataOutputToWriteTo(len).write(b, off, len);
185 } else {
186
187
188 while (len > 0) {
189 int toWrite = Math.min(getMaxSize(), len);
190 write(b, off, toWrite);
191 len -= toWrite;
192 off += toWrite;
193 }
194 }
195 }
196
197 @Override
198 public void writeBoolean(boolean v) throws IOException {
199 getDataOutputToWriteTo().writeBoolean(v);
200 }
201
202 @Override
203 public void writeByte(int v) throws IOException {
204 getDataOutputToWriteTo().writeByte(v);
205 }
206
207 @Override
208 public void writeShort(int v) throws IOException {
209 getDataOutputToWriteTo().writeShort(v);
210 }
211
212 @Override
213 public void writeChar(int v) throws IOException {
214 getDataOutputToWriteTo().writeChar(v);
215 }
216
217 @Override
218 public void writeInt(int v) throws IOException {
219 getDataOutputToWriteTo().writeInt(v);
220 }
221
222 @Override
223 public void writeLong(long v) throws IOException {
224 getDataOutputToWriteTo().writeLong(v);
225 }
226
227 @Override
228 public void writeFloat(float v) throws IOException {
229 getDataOutputToWriteTo().writeFloat(v);
230 }
231
232 @Override
233 public void writeDouble(double v) throws IOException {
234 getDataOutputToWriteTo().writeDouble(v);
235 }
236
237 @Override
238 public void writeBytes(String s) throws IOException {
239 getDataOutputToWriteTo().writeBytes(s);
240 }
241
242 @Override
243 public void writeChars(String s) throws IOException {
244 getDataOutputToWriteTo().writeChars(s);
245 }
246
247 @Override
248 public void writeUTF(String s) throws IOException {
249 getDataOutputToWriteTo().writeUTF(s);
250 }
251
252
253
254
255
256
257
258 private void writeExtendedDataOutput(ExtendedDataOutput dataOutput,
259 DataOutput out) throws IOException {
260 out.writeInt(dataOutput.getPos());
261 out.write(dataOutput.getByteArray(), 0, dataOutput.getPos());
262 }
263
264
265
266
267
268
269
270 private ExtendedDataOutput readExtendedDataOutput(
271 DataInput in) throws IOException {
272 int length = in.readInt();
273 byte[] data = new byte[length];
274 in.readFully(data);
275 return conf.createExtendedDataOutput(data, data.length);
276 }
277
278 @Override
279 public void write(DataOutput out) throws IOException {
280 if (dataOutputs == null) {
281 out.writeInt(0);
282 } else {
283 out.writeInt(dataOutputs.size());
284 for (ExtendedDataOutput stream : dataOutputs) {
285 writeExtendedDataOutput(stream, out);
286 }
287 }
288 writeExtendedDataOutput(currentDataOutput, out);
289 }
290
291 @Override
292 public void readFields(DataInput in) throws IOException {
293 int size = in.readInt();
294 if (size == 0) {
295 dataOutputs = null;
296 } else {
297 dataOutputs = new ArrayList<ExtendedDataOutput>(size);
298 while (size-- > 0) {
299 dataOutputs.add(readExtendedDataOutput(in));
300 }
301 }
302 currentDataOutput = readExtendedDataOutput(in);
303 }
304 }