1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.yarn;
19
20 import org.apache.giraph.conf.GiraphConfiguration;
21 import org.apache.giraph.conf.GiraphConstants;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.graph.GraphTaskManager;
24
25 import org.apache.giraph.io.VertexOutputFormat;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.hadoop.io.WritableComparable;
29 import org.apache.hadoop.mapreduce.MapContext;
30 import org.apache.hadoop.mapreduce.Mapper.Context;
31 import org.apache.hadoop.mapreduce.OutputCommitter;
32 import org.apache.hadoop.mapreduce.TaskID;
33 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
34 import org.apache.hadoop.mapreduce.task.MapContextImpl;
35 import org.apache.hadoop.mapreduce.TaskAttemptID;
36 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
37 import org.apache.log4j.Logger;
38
39 import java.io.IOException;
40
41
42
43
44
45
46
47
48
49
50
51
52 public class GiraphYarnTask<I extends WritableComparable, V extends Writable,
53 E extends Writable> {
54 static {
55 Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE);
56 }
57
58 private static final Logger LOG = Logger.getLogger(GiraphYarnTask.class);
59
60 private GraphTaskManager<I, V, E> graphTaskManager;
61
62 private final int bspTaskId;
63
64 private Context proxy;
65
66 private ImmutableClassesGiraphConfiguration conf;
67
68
69
70
71
72
73
74
75 public GiraphYarnTask(final TaskAttemptID taskAttemptId) {
76 conf = new ImmutableClassesGiraphConfiguration<I, V, E>(
77 new GiraphConfiguration());
78 bspTaskId = taskAttemptId.getTaskID().getId();
79 conf.setInt("mapred.task.partition", bspTaskId);
80 proxy = buildProxyMapperContext(taskAttemptId);
81 graphTaskManager = new GraphTaskManager<I, V, E>(proxy);
82 }
83
84
85
86
87 public void run() {
88
89
90 try {
91 graphTaskManager.setup(null);
92 graphTaskManager.execute();
93 graphTaskManager.cleanup();
94 graphTaskManager.sendWorkerCountersAndFinishCleanup();
95 } catch (InterruptedException ie) {
96 LOG.error("run() caught an unrecoverable InterruptedException.", ie);
97 } catch (IOException ioe) {
98 throw new RuntimeException(
99 "run() caught an unrecoverable IOException.", ioe);
100
101 } catch (RuntimeException e) {
102
103 graphTaskManager.zooKeeperCleanup();
104 graphTaskManager.workerFailureCleanup();
105 throw new RuntimeException(
106 "run: Caught an unrecoverable exception " + e.getMessage(), e);
107 } finally {
108
109 finalizeYarnJob();
110 }
111 }
112
113
114
115
116
117
118 private void finalizeYarnJob() {
119 if (conf.isPureYarnJob() && graphTaskManager.isMaster() &&
120 conf.getVertexOutputFormatClass() != null) {
121 try {
122 LOG.info("Master is ready to commit final job output data.");
123 VertexOutputFormat vertexOutputFormat =
124 conf.createWrappedVertexOutputFormat();
125 OutputCommitter outputCommitter =
126 vertexOutputFormat.getOutputCommitter(proxy);
127
128 outputCommitter.commitJob(proxy);
129 LOG.info("Master has committed the final job output data.");
130 } catch (InterruptedException ie) {
131 LOG.error("Interrupted while attempting to obtain " +
132 "OutputCommitter.", ie);
133 } catch (IOException ioe) {
134 LOG.error("Master task's attempt to commit output has " +
135 "FAILED.", ioe);
136 }
137 }
138 }
139
140
141
142
143
144
145
146
147
148 private Context buildProxyMapperContext(final TaskAttemptID tid) {
149 MapContext mc = new MapContextImpl<Object, Object, Object, Object>(
150 conf,
151 tid,
152 null,
153 null,
154 null,
155 new TaskAttemptContextImpl.DummyReporter() {
156 @Override
157 public void setStatus(String msg) {
158 LOG.info("[STATUS: task-" + bspTaskId + "] " + msg);
159 }
160 },
161 null);
162
163
164 WrappedMapper<Object, Object, Object, Object> wrappedMapper
165 = new WrappedMapper<Object, Object, Object, Object>();
166 return wrappedMapper.getMapContext(mc);
167 }
168
169
170
171
172
173
174
175 @SuppressWarnings("rawtypes")
176 public static void main(String[] args) {
177 if (args.length != 4) {
178 throw new IllegalStateException("GiraphYarnTask could not construct " +
179 "a TaskAttemptID for the Giraph job from args: " + printArgs(args));
180 }
181 try {
182 GiraphYarnTask<?, ?, ?> giraphYarnTask =
183 new GiraphYarnTask(getTaskAttemptID(args));
184 giraphYarnTask.run();
185
186 } catch (Throwable t) {
187
188 LOG.error("GiraphYarnTask threw a top-level exception, failing task", t);
189 System.exit(2);
190 }
191 System.exit(0);
192 }
193
194
195
196
197
198
199
200
201
202
203
204
205 private static TaskAttemptID getTaskAttemptID(String[] args) {
206 return new TaskAttemptID(
207 args[0],
208 Integer.parseInt(args[1]),
209 TaskID.getTaskType('m'),
210 Integer.parseInt(args[2]) - 2,
211 Integer.parseInt(args[3]));
212 }
213
214
215
216
217
218
219 private static String printArgs(String[] args) {
220 int count = 0;
221 StringBuilder sb = new StringBuilder();
222 for (String arg : args) {
223 sb.append("arg[" + (count++) + "] == " + arg + ", ");
224 }
225 return sb.toString();
226 }
227 }