|
@@ -53,16 +53,32 @@ public class MapProcessorDemo implements MapProcessor {
|
|
|
subTask.itemIds = Lists.newLinkedList();
|
|
|
subTasks.add(subTask);
|
|
|
for (int i = 0; i < BATCH_SIZE; i++) {
|
|
|
- subTask.itemIds.add(i);
|
|
|
+ subTask.itemIds.add(i + j * 100);
|
|
|
}
|
|
|
}
|
|
|
map(subTasks, "MAP_TEST_TASK");
|
|
|
return new ProcessResult(true, "map successfully");
|
|
|
- }else {
|
|
|
- // 测试在 Map 任务中追加上下文
|
|
|
- context.getWorkflowContext().appendData2WfContext("Yasuo","A sword's poor company for a long road.");
|
|
|
+ } else {
|
|
|
+
|
|
|
System.out.println("==== PROCESS ====");
|
|
|
- System.out.println("subTask: " + JsonUtils.toJSONString(context.getSubTask()));
|
|
|
+ SubTask subTask = (SubTask) context.getSubTask();
|
|
|
+ for (Integer itemId : subTask.getItemIds()) {
|
|
|
+ if (Thread.interrupted()) {
|
|
|
+ // 任务被中断
|
|
|
+ System.out.println("job has been stop! so stop to process subTask:" + subTask.getSiteId() + "=>" + itemId);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ System.out.println("processing subTask: " + subTask.getSiteId() + "=>" + itemId);
|
|
|
+ int max = Integer.MAX_VALUE >> 4;
|
|
|
+ for (int i = 0; ; i++) {
|
|
|
+ // 模拟耗时操作
|
|
|
+ if (i > max) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 测试在 Map 任务中追加上下文
|
|
|
+ context.getWorkflowContext().appendData2WfContext("Yasuo", "A sword's poor company for a long road.");
|
|
|
boolean b = ThreadLocalRandom.current().nextBoolean();
|
|
|
return new ProcessResult(b, "RESULT:" + b);
|
|
|
}
|