use thread pooling and remove text thread cap

This commit is contained in:
Akash Mozumdar 2018-12-02 15:30:35 -05:00
parent de109d0840
commit dca006b28c
5 changed files with 22 additions and 35 deletions

View File

@ -102,12 +102,7 @@ namespace
default:
{
auto tp = *(ThreadParam*)buffer;
if (textThreadsByParams->count(tp) == 0)
{
auto textThread = textThreadsByParams->insert({ tp, std::make_shared<TextThread>(tp, Host::GetHookParam(tp)) }).first->second;
if (textThreadsByParams->size() < MAX_THREAD_COUNT) textThread->Start();
else Host::AddConsoleOutput(TOO_MANY_THREADS);
}
if (textThreadsByParams->count(tp) == 0) textThreadsByParams->insert({ tp, std::make_shared<TextThread>(tp, Host::GetHookParam(tp)) });
textThreadsByParams->at(tp)->Push(buffer + sizeof(tp), bytesRead - sizeof(tp));
}
break;

View File

@ -8,32 +8,25 @@ TextThread::TextThread(ThreadParam tp, HookParam hp, std::optional<std::wstring>
handle(threadCounter++),
name(name.value_or(Util::StringToWideString(hp.name).value())),
tp(tp),
hp(hp)
hp(hp),
timer(NULL)
{
HANDLE newTimer;
CreateTimerQueueTimer(&newTimer, NULL, Flush, this, 25, 25, WT_EXECUTELONGFUNCTION);
timer = newTimer;
OnCreate(this);
}
TextThread::~TextThread()
{
if (flushThread.joinable())
{
SetEvent(deletionEvent);
flushThread.join();
OnDestroy(this);
}
}
std::wstring TextThread::GetStorage()
{
return storage->c_str();
}
void TextThread::Start()
{
deletionEvent = CreateEventW(nullptr, FALSE, FALSE, NULL);
flushThread = std::thread([&] { while (WaitForSingleObject(deletionEvent, 10) == WAIT_TIMEOUT) Flush(); });
}
void TextThread::AddSentence(std::wstring sentence)
{
if (Output(this, sentence)) storage->append(sentence);
@ -41,7 +34,7 @@ void TextThread::AddSentence(std::wstring sentence)
void TextThread::Push(const BYTE* data, int len)
{
if (!flushThread.joinable() || len < 0) return;
if (len < 0) return;
LOCK(bufferMutex);
if (hp.type & USING_UNICODE) buffer += std::wstring((wchar_t*)data, len / 2);
else if (auto converted = Util::StringToWideString(std::string((char*)data, len), hp.codepage ? hp.codepage : defaultCodepage)) buffer += converted.value();
@ -50,18 +43,19 @@ void TextThread::Push(const BYTE* data, int len)
lastPushTime = GetTickCount();
}
void TextThread::Flush()
void CALLBACK Flush(void* thread, BOOLEAN)
{
auto This = (TextThread*)thread;
std::wstring sentence;
{
LOCK(bufferMutex);
if (buffer.empty()) return;
if (buffer.size() < maxBufferSize && GetTickCount() - lastPushTime < flushDelay) return;
sentence = buffer;
buffer.clear();
LOCK(This->bufferMutex);
if (This->buffer.empty()) return;
if (This->buffer.size() < This->maxBufferSize && GetTickCount() - This->lastPushTime < This->flushDelay) return;
sentence = This->buffer;
This->buffer.clear();
if (Util::RemoveRepetition(sentence)) repeatingChars = std::unordered_set(sentence.begin(), sentence.end());
else repeatingChars.clear();
if (Util::RemoveRepetition(sentence)) This->repeatingChars = std::unordered_set(sentence.begin(), sentence.end());
else This->repeatingChars.clear();
}
AddSentence(sentence);
This->AddSentence(sentence);
}

View File

@ -20,9 +20,9 @@ public:
~TextThread();
std::wstring GetStorage();
void Start();
void AddSentence(std::wstring sentence);
void Push(const BYTE* data, int len);
friend void CALLBACK Flush(void* thread, BOOLEAN);
const int64_t handle;
const std::wstring name;
@ -30,13 +30,12 @@ public:
const HookParam hp;
private:
void Flush();
struct TimerDeleter { void operator()(void* h) { DeleteTimerQueueTimer(NULL, h, INVALID_HANDLE_VALUE); } };
ThreadSafePtr<std::wstring> storage;
std::wstring buffer;
std::unordered_set<wchar_t> repeatingChars;
std::mutex bufferMutex;
AutoHandle<> deletionEvent = NULL;
std::thread flushThread;
AutoHandle<TimerDeleter> timer;
DWORD lastPushTime;
};

View File

@ -4,7 +4,7 @@
// 8/23/2013 jichi
// Branch: ITH/common.h, rev 128
enum { MESSAGE_SIZE = 500, PIPE_BUFFER_SIZE = 2000, SHIFT_JIS = 932, MAX_THREAD_COUNT = 250, MAX_MODULE_SIZE = 120, HOOK_NAME_SIZE = 30 };
enum { MESSAGE_SIZE = 500, PIPE_BUFFER_SIZE = 2000, SHIFT_JIS = 932, MAX_MODULE_SIZE = 120, HOOK_NAME_SIZE = 30 };
// jichi 375/2014: Add offset of pusha/pushad
// http://faydoc.tripod.com/cpu/pushad.htm

View File

@ -25,7 +25,6 @@ constexpr auto ABOUT = L"Textractor beta v3.5.0 (project homepage: https://githu
"Please contact Artikash with any problems, feature requests, or questions relating to Textractor\r\n"
"You can do so via the project homepage (issues section) or via email\r\n"
"Source code available under GPLv3 at project homepage";
constexpr auto TOO_MANY_THREADS = L"Textractor: too many text threads: can't create more";
constexpr auto ALREADY_INJECTED = L"Textractor: already injected";
constexpr auto ARCHITECTURE_MISMATCH = L"Textractor: architecture mismatch: try 32 bit Textractor instead";
constexpr auto INJECT_FAILED = L"Textractor: couldn't inject";