Czy Parallel.ForEach ogranicza liczbę aktywnych wątków?


107

Biorąc pod uwagę ten kod:

var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
    DoSomething(someString);
});

Czy wszystkie 1000 wątków pojawią się prawie jednocześnie?

Odpowiedzi:


149

Nie, nie uruchomi 1000 wątków - tak, ograniczy liczbę używanych wątków. Rozszerzenia równoległe używają odpowiedniej liczby rdzeni, w zależności od tego, ile masz fizycznie i ile jest już zajętych. Przydziela pracę każdemu rdzeniu, a następnie wykorzystuje technikę zwaną kradzieżą pracy, aby umożliwić każdemu wątkowi wydajne przetwarzanie własnej kolejki i wystarczy wykonać kosztowny dostęp między wątkami tylko wtedy, gdy jest to naprawdę konieczne.

Zajrzyj na blog zespołu PFX, aby uzyskać mnóstwo informacji o tym, jak przydziela pracę i wszelkiego rodzaju inne tematy.

Zauważ, że w niektórych przypadkach możesz także określić żądany stopień równoległości.


2
Używałem Parallel.ForEach (FilePathArray, path => ... do przeczytania około 24 000 plików dzisiejszego wieczoru, tworząc jeden nowy plik dla każdego czytanego pliku. Bardzo prosty kod. Wygląda na to, że nawet 6 wątków wystarczyło, aby przytłoczyć dysk 7200 RPM Czytałem z wykorzystaniem 100%. W ciągu kilku godzin obserwowałem, jak biblioteka Parallel wydziela ponad 8000 wątków. Testowałem przy użyciu MaxDegreeOfParallelism i na pewno ponad 8000 wątków zniknęło. Testowałem ją wiele razy z tym samym wynik
Jake Drew

To mogło rozpocząć 1000 wątki na jakiś degenerat „doSomething”. (Tak jak w przypadku, gdy obecnie mam do czynienia z problemem w kodzie produkcyjnym, który nie ustawił limitu i spowodował powstanie 200+ wątków, powodując tym samym wyskakiwanie puli połączeń SQL .. Polecam ustawienie Max DOP dla każdej pracy, której nie można w trywialny sposób uzasadnić jako wyraźnie związane z procesorem.)
user2864740


28

Na komputerze z jednym rdzeniem ... Parallel.ForEach partycje (fragmenty) kolekcji, nad którymi pracuje, między wieloma wątkami, ale ta liczba jest obliczana na podstawie algorytmu, który bierze pod uwagę i wydaje się stale monitorować pracę wykonywaną przez wątków, które alokuje do ForEach. Więc jeśli część ciała ForEach wywołuje długo działające funkcje związane z IO / blokujące, które pozostawiałyby wątek czekający w pobliżu, algorytm odrodzi więcej wątków i ponownie podzieli kolekcję między nimi . Jeśli wątki kończą się szybko i nie blokują na przykład wątków we / wy, takich jak po prostu obliczanie niektórych liczb,algorytm zwiększy (lub faktycznie zmniejszy) liczbę wątków do punktu, w którym algorytm uzna za optymalny dla przepustowości (średni czas zakończenia każdej iteracji) .

Zasadniczo pula wątków za wszystkimi różnymi funkcjami biblioteki równoległej opracuje optymalną liczbę wątków do użycia. Liczba fizycznych rdzeni procesorów stanowi tylko część równania. NIE ma prostej zależności jeden do jednego między liczbą rdzeni a liczbą utworzonych wątków.

Dokumentacja dotycząca anulowania i obsługi synchronizacji wątków nie jest dla mnie zbyt pomocna. Miejmy nadzieję, że MS może dostarczyć lepsze przykłady w MSDN.

Nie zapominaj, że kod body musi być napisany tak, aby działał w wielu wątkach, wraz ze wszystkimi typowymi względami dotyczącymi bezpieczeństwa wątków, struktura nie wyodrębnia tego czynnika ... jeszcze.


1
"..jeśli część ciała ForEach wywołuje długo działające funkcje blokujące, które pozostawiałyby wątek czekający w pobliżu, algorytm utworzy więcej wątków .." - W zdegenerowanych przypadkach oznacza to, że może być utworzonych tyle wątków, ile jest dozwolone na ThreadPool.
user2864740

2
Masz rację, dla IO może przydzielić +100 wątków, ponieważ sam debugowałem
FindOutIslamNow

5

Wypracowuje optymalną liczbę wątków na podstawie liczby procesorów / rdzeni. Nie wszystkie odrodzą się na raz.



4

Świetne pytanie. W twoim przykładzie poziom zrównoleglenia jest dość niski nawet w przypadku czterordzeniowego procesora, ale po pewnym czasie poziom zrównoleglenia może być dość wysoki.

// Max concurrency: 5
[Test]
public void Memory_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);
        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Teraz spójrz, co się stanie, gdy zostanie dodana operacja oczekująca, aby zasymulować żądanie HTTP.

// Max concurrency: 34
[Test]
public void Waiting_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Nie wprowadziłem jeszcze żadnych zmian, a poziom współbieżności / równoległości gwałtownie podskoczył. Limit współbieżności można zwiększyć za pomocą ParallelOptions.MaxDegreeOfParallelism.

// Max concurrency: 43
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

// Max concurrency: 391
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(100000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Polecam ustawienie ParallelOptions.MaxDegreeOfParallelism. Niekoniecznie zwiększy to liczbę używanych wątków, ale zapewni, że uruchomisz tylko rozsądną liczbę wątków, co wydaje się być twoim problemem.

Na koniec odpowiadając na pytanie: nie, nie wszystkie wątki zaczną się od razu. Użyj Parallel.Invoke, jeśli chcesz wywołać równolegle idealnie, np. Testowanie warunków wyścigu.

// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623368346
// 636462943623368346
// 636462943623373351
// 636462943623393364
// 636462943623393364
[Test]
public void Test()
{
    ConcurrentBag<string> monitor = new ConcurrentBag<string>();
    ConcurrentBag<string> monitorOut = new ConcurrentBag<string>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(DateTime.UtcNow.Ticks.ToString());
        monitor.TryTake(out string result);
        monitorOut.Add(result);
    });

    var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList();
    Console.WriteLine(string.Join(Environment.NewLine, startTimes.Take(10)));
}
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.