/* * Authored by Alex Hultman, 2018-2019. * Intellectual property of third-party. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef UWS_LOOP_H #define UWS_LOOP_H /* The loop is lazily created per-thread and run with uWS::run() */ #include "LoopData.h" #include "libusockets.h" namespace uWS { struct Loop { private: static void wakeupCb(us_loop_t *loop) { LoopData *loopData = (LoopData *) us_loop_ext(loop); /* Swap current deferQueue */ loopData->deferMutex.lock(); int oldDeferQueue = loopData->currentDeferQueue; loopData->currentDeferQueue = (loopData->currentDeferQueue + 1) % 2; loopData->deferMutex.unlock(); /* Drain the queue */ for (auto &x : loopData->deferQueues[oldDeferQueue]) { x(); } loopData->deferQueues[oldDeferQueue].clear(); } static void preCb(us_loop_t *loop) { LoopData *loopData = (LoopData *) us_loop_ext(loop); for (auto &p : loopData->preHandlers) { p.second((Loop *) loop); } } static void postCb(us_loop_t *loop) { LoopData *loopData = (LoopData *) us_loop_ext(loop); for (auto &p : loopData->postHandlers) { p.second((Loop *) loop); } } Loop() = delete; ~Loop() = default; Loop *init() { new (us_loop_ext((us_loop_t *) this)) LoopData; return this; } static Loop *create(void *hint) { return ((Loop *) us_create_loop(hint, wakeupCb, preCb, postCb, sizeof(LoopData)))->init(); } /* What to do with loops created with existingNativeLoop? */ struct LoopCleaner { ~LoopCleaner() { if(loop && cleanMe) { loop->free(); } } Loop *loop = nullptr; bool cleanMe = false; }; public: /* Lazily initializes a per-thread loop and returns it. * Will automatically free all initialized loops at exit. */ static Loop *get(void *existingNativeLoop = nullptr) { static thread_local LoopCleaner lazyLoop; if (!lazyLoop.loop) { /* If we are given a native loop pointer we pass that to uSockets and let it deal with it */ if (existingNativeLoop) { /* Todo: here we want to pass the pointer, not a boolean */ lazyLoop.loop = create(existingNativeLoop); /* We cannot register automatic free here, must be manually done */ } else { lazyLoop.loop = create(nullptr); lazyLoop.cleanMe = true; } } return lazyLoop.loop; } /* Freeing the default loop should be done once */ void free() { LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this); loopData->~LoopData(); /* uSockets will track whether this loop is owned by us or a borrowed alien loop */ us_loop_free((us_loop_t *) this); } void addPostHandler(void *key, fu2::unique_function &&handler) { LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this); loopData->postHandlers.emplace(key, std::move(handler)); } /* Bug: what if you remove a handler while iterating them? */ void removePostHandler(void *key) { LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this); loopData->postHandlers.erase(key); } void addPreHandler(void *key, fu2::unique_function &&handler) { LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this); loopData->preHandlers.emplace(key, std::move(handler)); } /* Bug: what if you remove a handler while iterating them? */ void removePreHandler(void *key) { LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this); loopData->preHandlers.erase(key); } /* Defer this callback on Loop's thread of execution */ void defer(fu2::unique_function &&cb) { LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this); //if (std::thread::get_id() == ) // todo: add fast path for same thread id loopData->deferMutex.lock(); loopData->deferQueues[loopData->currentDeferQueue].emplace_back(std::move(cb)); loopData->deferMutex.unlock(); us_wakeup_loop((us_loop_t *) this); } /* Actively block and run this loop */ void run() { us_loop_run((us_loop_t *) this); } /* Passively integrate with the underlying default loop */ /* Used to seamlessly integrate with third parties such as Node.js */ void integrate() { us_loop_integrate((us_loop_t *) this); } }; /* Can be called from any thread to run the thread local loop */ inline void run() { Loop::get()->run(); } } #endif // UWS_LOOP_H