1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.edge.primitives;
20
21 import org.apache.giraph.bsp.CentralizedServiceWorker;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.edge.AbstractEdgeStore;
24 import org.apache.giraph.edge.OutEdges;
25 import org.apache.giraph.utils.VertexIdEdgeIterator;
26 import org.apache.hadoop.io.LongWritable;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.hadoop.util.Progressable;
29
30 import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
31 import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
32 import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
33
34 import java.io.DataInput;
35 import java.io.DataOutput;
36 import java.io.IOException;
37 import java.util.Iterator;
38 import java.util.Map;
39
40
41
42
43
44
45
46
47
48 public class LongEdgeStore<V extends Writable, E extends Writable>
49 extends AbstractEdgeStore<LongWritable, V, E, Long,
50 Long2ObjectMap.Entry<OutEdges<LongWritable, E>>> {
51
52
53
54
55
56
57
58
59 public LongEdgeStore(
60 CentralizedServiceWorker<LongWritable, V, E> service,
61 ImmutableClassesGiraphConfiguration<LongWritable, V, E> configuration,
62 Progressable progressable) {
63 super(service, configuration, progressable);
64 }
65
66 @Override
67 protected LongWritable getVertexId(
68 Long2ObjectMap.Entry<OutEdges<LongWritable, E>> entry,
69 LongWritable representativeVertexId) {
70 representativeVertexId.set(entry.getLongKey());
71 return representativeVertexId;
72 }
73
74 @Override
75 protected LongWritable createVertexId(
76 Long2ObjectMap.Entry<OutEdges<LongWritable, E>> entry) {
77 return new LongWritable(entry.getLongKey());
78 }
79
80
81 @Override
82 protected OutEdges<LongWritable, E> getPartitionEdges(
83 Long2ObjectMap.Entry<OutEdges<LongWritable, E>> entry) {
84 return entry.getValue();
85 }
86
87 @Override
88 protected void writeVertexKey(Long key, DataOutput output)
89 throws IOException {
90 output.writeLong(key);
91 }
92
93 @Override
94 protected Long readVertexKey(DataInput input) throws IOException {
95 return input.readLong();
96 }
97
98 @Override
99 protected Iterator<Long2ObjectMap.Entry<OutEdges<LongWritable, E>>>
100 getPartitionEdgesIterator(
101 Map<Long, OutEdges<LongWritable, E>> partitionEdges) {
102 return ((Long2ObjectMap<OutEdges<LongWritable, E>>) partitionEdges)
103 .long2ObjectEntrySet()
104 .iterator();
105 }
106
107 @Override
108 protected Long2ObjectMap<OutEdges<LongWritable, E>> getPartitionEdges(
109 int partitionId) {
110 Long2ObjectMap<OutEdges<LongWritable, E>> partitionEdges =
111 (Long2ObjectMap<OutEdges<LongWritable, E>>)
112 transientEdges.get(partitionId);
113 if (partitionEdges == null) {
114 Long2ObjectMap<OutEdges<LongWritable, E>> newPartitionEdges =
115 Long2ObjectMaps.synchronize(
116 new Long2ObjectOpenHashMap<OutEdges<LongWritable, E>>());
117 partitionEdges = (Long2ObjectMap<OutEdges<LongWritable, E>>)
118 transientEdges.putIfAbsent(partitionId,
119 newPartitionEdges);
120 if (partitionEdges == null) {
121 partitionEdges = newPartitionEdges;
122 }
123 }
124 return partitionEdges;
125 }
126
127 @Override
128 protected OutEdges<LongWritable, E> getVertexOutEdges(
129 VertexIdEdgeIterator<LongWritable, E> vertexIdEdgeIterator,
130 Map<Long, OutEdges<LongWritable, E>> partitionEdgesIn) {
131 Long2ObjectMap<OutEdges<LongWritable, E>> partitionEdges =
132 (Long2ObjectMap<OutEdges<LongWritable, E>>) partitionEdgesIn;
133 LongWritable vertexId = vertexIdEdgeIterator.getCurrentVertexId();
134 OutEdges<LongWritable, E> outEdges = partitionEdges.get(vertexId.get());
135 if (outEdges == null) {
136 synchronized (partitionEdges) {
137 outEdges = partitionEdges.get(vertexId.get());
138 if (outEdges == null) {
139 outEdges = configuration.createAndInitializeInputOutEdges();
140 partitionEdges.put(vertexId.get(), outEdges);
141 }
142 }
143 }
144 return outEdges;
145 }
146 }