00001
00028 #ifndef GALOIS_RUNTIME_PARALLELWORK_H
00029 #define GALOIS_RUNTIME_PARALLELWORK_H
00030
00031 #include "Galois/Mem.h"
00032 #include "Galois/Runtime/Config.h"
00033 #include "Galois/Runtime/Context.h"
00034 #include "Galois/Runtime/ForEachTraits.h"
00035 #include "Galois/Runtime/LoopHooks.h"
00036 #include "Galois/Runtime/Support.h"
00037 #include "Galois/Runtime/Range.h"
00038 #include "Galois/Runtime/Termination.h"
00039 #include "Galois/Runtime/ThreadPool.h"
00040 #include "Galois/Runtime/UserContextAccess.h"
00041 #include "Galois/Runtime/WorkList.h"
00042
00043 #include <algorithm>
00044
00045 namespace GaloisRuntime {
00046
00047 template<typename RangeTy, typename WorkTy>
00048 struct Initializer {
00049 RangeTy range;
00050 WorkTy& work;
00051
00052 Initializer(const RangeTy& r, WorkTy& w): range(r), work(w) { }
00053
00054 void operator()() {
00055 work.AddInitialWork(range);
00056 }
00057 };
00058
00059 template <bool Enabled>
00060 class LoopStatistics {
00061 unsigned long conflicts;
00062 unsigned long iterations;
00063 const char* loopname;
00064
00065 public:
00066 explicit LoopStatistics(const char* ln) :conflicts(0), iterations(0), loopname(ln) { }
00067 ~LoopStatistics() {
00068 reportStat(loopname, "Conflicts", conflicts);
00069 reportStat(loopname, "Iterations", iterations);
00070 }
00071 inline void inc_iterations(int amount = 1) {
00072 iterations += amount;
00073 }
00074 inline void inc_conflicts() {
00075 ++conflicts;
00076 }
00077 };
00078
00079
00080 template <>
00081 class LoopStatistics<false> {
00082 public:
00083 explicit LoopStatistics(const char* ln) {}
00084 inline void inc_iterations(int amount = 1) const { }
00085 inline void inc_conflicts() const { }
00086 };
00087
00088 template<class WorkListTy, class T, class FunctionTy>
00089 class ForEachWork {
00090 protected:
00091 typedef T value_type;
00092 typedef typename WorkListTy::template retype<value_type>::WL WLTy;
00093 typedef WorkList::GFIFO<value_type> AbortedList;
00094
00095 struct ThreadLocalData {
00096 FunctionTy function;
00097 UserContextAccess<value_type> facing;
00098 SimpleRuntimeContext cnx;
00099 LoopStatistics<ForEachTraits<FunctionTy>::NeedsStats> stat;
00100 ThreadLocalData(const FunctionTy& fn, const char* ln): function(fn), stat(ln) {}
00101 };
00102
00103 WLTy& wl;
00104 FunctionTy& origFunction;
00105 WLTy default_wl;
00106 const char* loopname;
00107
00108 TerminationDetection& term;
00109 PerPackageStorage<AbortedList> aborted;
00110 LL::CacheLineStorage<bool> broke;
00111
00112 inline void commitIteration(ThreadLocalData& tld) {
00113 if (ForEachTraits<FunctionTy>::NeedsPush) {
00114 wl.push(tld.facing.getPushBuffer().begin(),
00115 tld.facing.getPushBuffer().end());
00116 tld.facing.resetPushBuffer();
00117 }
00118 if (ForEachTraits<FunctionTy>::NeedsPIA)
00119 tld.facing.resetAlloc();
00120 if (ForEachTraits<FunctionTy>::NeedsAborts)
00121 tld.cnx.commit_iteration();
00122 }
00123
00124 GALOIS_ATTRIBUTE_NOINLINE
00125 void abortIteration(value_type val, ThreadLocalData& tld, bool recursiveAbort) {
00126 assert(ForEachTraits<FunctionTy>::NeedsAborts);
00127
00128 tld.cnx.cancel_iteration();
00129 tld.stat.inc_conflicts();
00130 if (recursiveAbort)
00131 aborted.getRemote(LL::getLeaderForPackage(LL::getPackageForSelf(LL::getTID()) / 2))->push(val);
00132 else
00133 aborted.getLocal()->push(val);
00134
00135 if (ForEachTraits<FunctionTy>::NeedsPush)
00136 tld.facing.resetPushBuffer();
00137
00138 if (ForEachTraits<FunctionTy>::NeedsPIA)
00139 tld.facing.resetAlloc();
00140 }
00141
00142 inline void doProcess(boost::optional<value_type>& p, ThreadLocalData& tld) {
00143 tld.stat.inc_iterations();
00144 if (ForEachTraits<FunctionTy>::NeedsAborts)
00145 tld.cnx.start_iteration();
00146 tld.function(*p, tld.facing.data());
00147 commitIteration(tld);
00148 }
00149
00150 GALOIS_ATTRIBUTE_NOINLINE
00151 void handleBreak(ThreadLocalData& tld) {
00152 commitIteration(tld);
00153 broke.data = true;
00154 }
00155
00156 bool runQueueSimple(ThreadLocalData& tld) {
00157 bool workHappened = false;
00158 boost::optional<value_type> p = wl.pop();
00159 if (p)
00160 workHappened = true;
00161 while (p) {
00162 doProcess(p, tld);
00163 p = wl.pop();
00164 }
00165 return workHappened;
00166 }
00167
00168 template<bool limit, typename WL>
00169 bool runQueue(ThreadLocalData& tld, WL& lwl, bool recursiveAbort) {
00170 bool workHappened = false;
00171 boost::optional<value_type> p = lwl.pop();
00172 unsigned num = 0;
00173 int result = 0;
00174 if (p)
00175 workHappened = true;
00176 #if GALOIS_USE_EXCEPTION_HANDLER
00177 try {
00178 #else
00179 if ((result = setjmp(hackjmp)) == 0) {
00180 #endif
00181 while (p) {
00182 doProcess(p, tld);
00183 if (limit) {
00184 ++num;
00185 if (num == 32)
00186 break;
00187 }
00188 p = lwl.pop();
00189 }
00190 #if GALOIS_USE_EXCEPTION_HANDLER
00191 } catch (ConflictFlag const& flag) {
00192 clearConflictLock();
00193 result = flag;
00194 }
00195 #else
00196 }
00197 #endif
00198 switch (result) {
00199 case 0:
00200 break;
00201 case GaloisRuntime::CONFLICT:
00202 abortIteration(*p, tld, recursiveAbort);
00203 break;
00204 case GaloisRuntime::BREAK:
00205 handleBreak(tld);
00206 return false;
00207 default:
00208 GALOIS_ERROR(true, "unknown conflict type");
00209 }
00210 return workHappened;
00211 }
00212
00213 GALOIS_ATTRIBUTE_NOINLINE
00214 bool handleAborts(ThreadLocalData& tld) {
00215 return runQueue<false>(tld, *aborted.getLocal(), true);
00216 }
00217
00218 template<bool checkAbort>
00219 void go() {
00220
00221
00222 ThreadLocalData tld(origFunction, loopname);
00223 if (ForEachTraits<FunctionTy>::NeedsAborts)
00224 setThreadContext(&tld.cnx);
00225 bool didAnyWork;
00226 do {
00227 didAnyWork = false;
00228 bool didWork;
00229 do {
00230 didWork = false;
00231
00232 if (ForEachTraits<FunctionTy>::NeedsBreak || ForEachTraits<FunctionTy>::NeedsAborts)
00233 didWork = runQueue<checkAbort || ForEachTraits<FunctionTy>::NeedsBreak>(tld, wl, false);
00234 else
00235 didWork = runQueueSimple(tld);
00236
00237 if (ForEachTraits<FunctionTy>::NeedsBreak && broke.data)
00238 break;
00239
00240 if (checkAbort)
00241 didWork |= handleAborts(tld);
00242 didAnyWork |= didWork;
00243 } while (didWork);
00244 if (ForEachTraits<FunctionTy>::NeedsBreak && broke.data)
00245 break;
00246
00247 term.localTermination(didAnyWork);
00248 } while ((ForEachTraits<FunctionTy>::NeedsPush
00249 ||ForEachTraits<FunctionTy>::NeedsBreak
00250 ||ForEachTraits<FunctionTy>::NeedsAborts)
00251 && !term.globalTermination());
00252
00253 if (ForEachTraits<FunctionTy>::NeedsAborts)
00254 setThreadContext(0);
00255 }
00256
00257 public:
00258 ForEachWork(FunctionTy& f, const char* l): wl(default_wl), origFunction(f), loopname(l), term(getSystemTermination()), broke(false) { }
00259
00260 template<typename W>
00261 ForEachWork(W& w, FunctionTy& f, const char* l): wl(w), origFunction(f), loopname(l), term(getSystemTermination()), broke(false) { }
00262
00263 template<typename RangeTy>
00264 void AddInitialWork(RangeTy range) {
00265 term.initializeThread();
00266 wl.push_initial(range);
00267 }
00268
00269 void operator()() {
00270 if (LL::isPackageLeaderForSelf(LL::getTID()) &&
00271 galoisActiveThreads > 1 &&
00272 ForEachTraits<FunctionTy>::NeedsAborts)
00273 go<true>();
00274 else
00275 go<false>();
00276 }
00277 };
00278
00279
00280 template<typename WLTy, typename RangeTy, typename FunctionTy>
00281 void for_each_impl(RangeTy range, FunctionTy f, const char* loopname) {
00282 assert(!inGaloisForEach);
00283
00284 inGaloisForEach = true;
00285
00286 typedef typename RangeTy::value_type T;
00287 typedef ForEachWork<WLTy, T, FunctionTy> WorkTy;
00288
00289 WorkTy W(f, loopname);
00290 Initializer<RangeTy, WorkTy> init(range, W);
00291 RunCommand w[4] = {Config::ref(init),
00292 Config::ref(getSystemBarrier()),
00293 Config::ref(W),
00294 Config::ref(getSystemBarrier())};
00295 getSystemThreadPool().run(&w[0], &w[4]);
00296 runAllLoopExitHandlers();
00297 inGaloisForEach = false;
00298 }
00299
00300 template<typename FunctionTy>
00301 struct WOnEach {
00302 FunctionTy& origFunction;
00303 WOnEach(FunctionTy& f): origFunction(f) { }
00304 void operator()(void) {
00305 FunctionTy fn(origFunction);
00306 fn(LL::getTID(), galoisActiveThreads);
00307 }
00308 };
00309
00310 template<typename FunctionTy>
00311 void on_each_impl(FunctionTy fn, const char* loopname = 0) {
00312 GaloisRuntime::RunCommand w[2] = {WOnEach<FunctionTy>(fn),
00313 Config::ref(getSystemBarrier())};
00314 GaloisRuntime::getSystemThreadPool().run(&w[0], &w[2]);
00315 }
00316
00317
00318 void preAlloc_impl(int num);
00319
00320 }
00321
00322 #endif
00323