Konkurs: najszybszy sposób na posortowanie dużej tablicy danych dystrybuowanych przez Gaussa


71

Biorąc pod uwagę zainteresowanie tym pytaniem , pomyślałem, że interesujące byłoby uczynienie odpowiedzi nieco bardziej obiektywnymi i ilościowymi poprzez zaproponowanie konkursu.

Pomysł jest prosty: wygenerowałem plik binarny zawierający 50 milionów podwójnych rozkładów gaussowskich (średnia: 0, stdev 1). Celem jest stworzenie programu, który posortuje je w pamięci tak szybko, jak to możliwe. Bardzo prosta implementacja referencji w Pythonie zajmuje 1m4s. Jak nisko możemy zejść?

Reguły są następujące: odpowiedz za pomocą programu, który otworzy plik „gaussian.dat” i posortuje liczby w pamięci (nie trzeba ich wysyłać), oraz instrukcje dotyczące budowania i uruchamiania programu. Program musi działać na moim komputerze Arch Linux (co oznacza, że ​​możesz używać dowolnego języka programowania lub biblioteki, które można łatwo zainstalować w tym systemie).

Program musi być wystarczająco czytelny, aby upewnić się, że można go bezpiecznie uruchomić (proszę, nie ma rozwiązania tylko dla asemblera!).

Odpowiedzi uruchomię na moim komputerze (czterordzeniowy, 4 gigabajty pamięci RAM). Najszybsze rozwiązanie otrzyma zaakceptowaną odpowiedź i nagrodę za 100 punktów :)

Program używany do generowania liczb:

#!/usr/bin/env python
import random
from array import array
from sys import argv
count=int(argv[1])
a=array('d',(random.gauss(0,1) for x in xrange(count)))
f=open("gaussian.dat","wb")
a.tofile(f)

Prosta implementacja referencyjna:

#!/usr/bin/env python
from array import array
from sys import argv
count=int(argv[1])
a=array('d')
a.fromfile(open("gaussian.dat"),count)
print "sorting..."
b=sorted(a)

EDYCJA: tylko 4 GB pamięci RAM, przepraszam

EDYCJA 2: Zauważ, że celem konkursu jest sprawdzenie, czy możemy wykorzystać wcześniejsze informacje o danych . to nie powinno być pasujące dopasowanie między różnymi implementacjami języka programowania!


1
Weź każdą wartość i przenieś ją bezpośrednio do jej „oczekiwanej” pozycji, powtórz dla przesuniętej wartości. Nie wiesz, jak rozwiązać z tym kilka problemów. Po zakończeniu sortuj bąbelki, aż do ukończenia (wystarczy kilka pasaży).

1
Jutro wieczorem opublikuję rozwiązanie sortowania kubełkowego, jeśli do tego czasu nie zostało zamknięte :)

1
@static_rtti - jako duży użytkownik CG jest to dokładnie coś, co „lubimy” hakować w CG.SE. Do wszystkich modów do czytania przenieś to do CG, nie zamykaj go.
arrdem

1
Witamy w CodeGolf.SE! Wyczyściłem wiele komentarzy z oryginału SO dotyczących tego, gdzie to jest lub nie należy, i ponownie otagowałem, aby być bliżej głównego nurtu CodeGolf.SE.
dmckee

2
Jedynym trudnym problemem jest to, że szukamy obiektywnych kryteriów wygrywania, a „najszybszy” wprowadza zależności platformy ... czy algorytm O (n ^ {1.2}) zaimplementowany na maszynie wirtualnej cpython pokonał O (n ^ {1.3} ) algorytm z podobną stałą zaimplementowaną w c? Ogólnie proponuję dyskusję na temat charakterystyki wydajności każdego rozwiązania, ponieważ może to pomóc ludziom ocenić, co się dzieje.
dmckee

Odpowiedzi:


13

Oto rozwiązanie w C ++, które najpierw dzieli liczby na segmenty z taką samą oczekiwaną liczbą elementów, a następnie sortuje każdy segment osobno. Wstępnie oblicza tabelę funkcji skumulowanego rozkładu w oparciu o niektóre formuły z Wikipedii, a następnie interpoluje wartości z tej tabeli, aby uzyskać szybkie przybliżenie.

Kilka kroków przebiega w wielu wątkach, aby wykorzystać cztery rdzenie.

#include <cstdlib>
#include <math.h>
#include <stdio.h>
#include <algorithm>

#include <tbb/parallel_for.h>

using namespace std;

typedef unsigned long long ull;

double signum(double x) {
    return (x<0) ? -1 : (x>0) ? 1 : 0;
}

const double fourOverPI = 4 / M_PI;

double erf(double x) {
    double a = 0.147;
    double x2 = x*x;
    double ax2 = a*x2;
    double f1 = -x2 * (fourOverPI + ax2) / (1 + ax2);
    double s1 = sqrt(1 - exp(f1));
    return signum(x) * s1;
}

const double sqrt2 = sqrt(2);

double cdf(double x) {
    return 0.5 + erf(x / sqrt2) / 2;
}

const int cdfTableSize = 200;
const double cdfTableLimit = 5;
double* computeCdfTable(int size) {
    double* res = new double[size];
    for (int i = 0; i < size; ++i) {
        res[i] = cdf(cdfTableLimit * i / (size - 1));
    }
    return res;
}
const double* const cdfTable = computeCdfTable(cdfTableSize);

double cdfApprox(double x) {
    bool negative = (x < 0);
    if (negative) x = -x;
    if (x > cdfTableLimit) return negative ? cdf(-x) : cdf(x);
    double p = (cdfTableSize - 1) * x / cdfTableLimit;
    int below = (int) p;
    if (p == below) return negative ? -cdfTable[below] : cdfTable[below];
    int above = below + 1;
    double ret = cdfTable[below] +
            (cdfTable[above] - cdfTable[below])*(p - below);
    return negative ? 1 - ret : ret;
}

void print(const double* arr, int len) {
    for (int i = 0; i < len; ++i) {
        printf("%e; ", arr[i]);
    }
    puts("");
}

void print(const int* arr, int len) {
    for (int i = 0; i < len; ++i) {
        printf("%d; ", arr[i]);
    }
    puts("");
}

void fillBuckets(int N, int bucketCount,
        double* data, int* partitions,
        double* buckets, int* offsets) {
    for (int i = 0; i < N; ++i) {
        ++offsets[partitions[i]];
    }

    int offset = 0;
    for (int i = 0; i < bucketCount; ++i) {
        int t = offsets[i];
        offsets[i] = offset;
        offset += t;
    }
    offsets[bucketCount] = N;

    int next[bucketCount];
    memset(next, 0, sizeof(next));
    for (int i = 0; i < N; ++i) {
        int p = partitions[i];
        int j = offsets[p] + next[p];
        ++next[p];
        buckets[j] = data[i];
    }
}

class Sorter {
public:
    Sorter(double* data, int* offsets) {
        this->data = data;
        this->offsets = offsets;
    }

    static void radixSort(double* arr, int len) {
        ull* encoded = (ull*)arr;
        for (int i = 0; i < len; ++i) {
            ull n = encoded[i];
            if (n & signBit) {
                n ^= allBits;
            } else {
                n ^= signBit;
            }
            encoded[i] = n;
        }

        const int step = 11;
        const ull mask = (1ull << step) - 1;
        int offsets[8][1ull << step];
        memset(offsets, 0, sizeof(offsets));

        for (int i = 0; i < len; ++i) {
            for (int b = 0, j = 0; b < 64; b += step, ++j) {
                int p = (encoded[i] >> b) & mask;
                ++offsets[j][p];
            }
        }

        int sum[8] = {0};
        for (int i = 0; i <= mask; i++) {
            for (int b = 0, j = 0; b < 64; b += step, ++j) {
                int t = sum[j] + offsets[j][i];
                offsets[j][i] = sum[j];
                sum[j] = t;
            }
        }

        ull* copy = new ull[len];
        ull* current = encoded;
        for (int b = 0, j = 0; b < 64; b += step, ++j) {
            for (int i = 0; i < len; ++i) {
                int p = (current[i] >> b) & mask;
                copy[offsets[j][p]] = current[i];
                ++offsets[j][p];
            }

            ull* t = copy;
            copy = current;
            current = t;
        }

        if (current != encoded) {
            for (int i = 0; i < len; ++i) {
                encoded[i] = current[i];
            }
        }

        for (int i = 0; i < len; ++i) {
            ull n = encoded[i];
            if (n & signBit) {
                n ^= signBit;
            } else {
                n ^= allBits;
            }
            encoded[i] = n;
        }
    }

    void operator() (tbb::blocked_range<int>& range) const {
        for (int i = range.begin(); i < range.end(); ++i) {
            double* begin = &data[offsets[i]];
            double* end = &data[offsets[i+1]];
            //std::sort(begin, end);
            radixSort(begin, end-begin);
        }
    }

private:
    double* data;
    int* offsets;
    static const ull signBit = 1ull << 63;
    static const ull allBits = ~0ull;
};

void sortBuckets(int bucketCount, double* data, int* offsets) {
    Sorter sorter(data, offsets);
    tbb::blocked_range<int> range(0, bucketCount);
    tbb::parallel_for(range, sorter);
    //sorter(range);
}

class Partitioner {
public:
    Partitioner(int bucketCount, double* data, int* partitions) {
        this->data = data;
        this->partitions = partitions;
        this->bucketCount = bucketCount;
    }

    void operator() (tbb::blocked_range<int>& range) const {
        for (int i = range.begin(); i < range.end(); ++i) {
            double d = data[i];
            int p = (int) (cdfApprox(d) * bucketCount);
            partitions[i] = p;
        }
    }

private:
    double* data;
    int* partitions;
    int bucketCount;
};

const int bucketCount = 512;
int offsets[bucketCount + 1];

int main(int argc, char** argv) {
    if (argc != 2) {
        printf("Usage: %s N\n N = the size of the input\n", argv[0]);
        return 1;
    }

    puts("initializing...");
    int N = atoi(argv[1]);
    double* data = new double[N];
    double* buckets = new double[N];
    memset(offsets, 0, sizeof(offsets));
    int* partitions = new int[N];

    puts("loading data...");
    FILE* fp = fopen("gaussian.dat", "rb");
    if (fp == 0 || fread(data, sizeof(*data), N, fp) != N) {
        puts("Error reading data");
        return 1;
    }
    //print(data, N);

    puts("assigning partitions...");
    tbb::parallel_for(tbb::blocked_range<int>(0, N),
            Partitioner(bucketCount, data, partitions));

    puts("filling buckets...");
    fillBuckets(N, bucketCount, data, partitions, buckets, offsets);
    data = buckets;

    puts("sorting buckets...");
    sortBuckets(bucketCount, data, offsets);

    puts("done.");

    /*
    for (int i = 0; i < N-1; ++i) {
        if (data[i] > data[i+1]) {
            printf("error at %d: %e > %e\n", i, data[i], data[i+1]);
        }
    }
    */

    //print(data, N);

    return 0;
}

Aby go skompilować i uruchomić, użyj tego polecenia:

g++ -O3 -ltbb -o gsort gsort.cpp && time ./gsort 50000000

EDYCJA: Wszystkie segmenty są teraz umieszczane w tej samej tablicy, aby wyeliminować potrzebę kopiowania segmentów z powrotem do tablicy. Zmniejszono również rozmiar tabeli z wartościami wstępnie obliczonymi, ponieważ wartości są wystarczająco dokładne. Jeśli jednak zmienię liczbę segmentów powyżej 256, uruchomienie programu potrwa dłużej niż przy tej liczbie segmentów.

EDYCJA: Ten sam algorytm, inny język programowania. Użyłem C ++ zamiast Javy, a czas działania skrócił się z ~ 3.2s do ~ 2.35s na moim komputerze. Optymalna liczba segmentów nadal wynosi około 256 (znowu na moim komputerze).

Nawiasem mówiąc, TBB jest naprawdę niesamowity.

EDYCJA: Zainspirowało mnie świetne rozwiązanie Alexandru i zastąpiłem std :: sort w ostatniej fazie zmodyfikowaną wersją jego sortowania radix. Użyłem innej metody, aby radzić sobie z liczbami dodatnimi / ujemnymi, mimo że potrzebuje ona więcej przejść przez tablicę. Postanowiłem również dokładnie posortować tablicę i usunąć sortowanie wstawiania. Później poświęcę trochę czasu na sprawdzenie, w jaki sposób te zmiany wpływają na wydajność i ewentualnie je cofają. Jednak zastosowanie sortowania radix skróciło czas z ~ 2,35 s do ~ 1,63 s.


Miły. Mam na swoim 3.055. Najniższy, jaki udało mi się zdobyć, to 6,3. Wybieram twoje, aby poprawić statystyki. Dlaczego wybrałeś 256 jako liczbę wiader? Próbowałem 128 i 512, ale 256 działało najlepiej.
Scott

Dlaczego wybrałem 256 jako liczbę segmentów? Próbowałem 128 i 512, ale 256 działało najlepiej. :) Znalazłem to empirycznie i nie jestem pewien, dlaczego zwiększenie liczby segmentów spowalnia algorytm - przydzielanie pamięci nie powinno trwać tak długo. Może coś związanego z rozmiarem pamięci podręcznej?
k21

2,725 s na moim komputerze. Całkiem niezłe jak na rozwiązanie Java, biorąc pod uwagę czas ładowania JVM.
static_rtti

2
Zmieniłem twój kod, aby korzystać z pakietów nio, zgodnie z rozwiązaniem mojego i Arjana (użyłem jego składni, ponieważ była czystsza niż moja) i byłem w stanie uzyskać go .3 sekundy szybciej. Mam SSD, zastanawiam się, jakie mogą być konsekwencje, jeśli nie. Pozbywa się również niektórych twiddlingu. Zmodyfikowane sekcje są tutaj.
Scott,

3
Jest to najszybsze równoległe rozwiązanie w moich testach (procesor 16core). 1,22s od drugiego miejsca 1,94s.
Alexandru

13

Nie będąc inteligentnym, aby zapewnić znacznie szybszy naiwny sorter, oto jeden w C, który powinien być prawie równoważny z twoim Pythonem:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int cmp(const void* av, const void* bv) {
    double a = *(const double*)av;
    double b = *(const double*)bv;
    return a < b ? -1 : a > b ? 1 : 0;
}
int main(int argc, char** argv) {
    if (argc <= 1)
        return puts("No argument!");
    unsigned count = atoi(argv[1]);

    double *a = malloc(count * sizeof *a);

    FILE *f = fopen("gaussian.dat", "rb");
    if (fread(a, sizeof *a, count, f) != count)
        return puts("fread failed!");
    fclose(f);

    puts("sorting...");
    double *b = malloc(count * sizeof *b);
    memcpy(b, a, count * sizeof *b);
    qsort(b, count, sizeof *b, cmp);
    return 0;
}

Skompilowane gcc -O3, na moim komputerze zajmuje to ponad minutę mniej niż Python: około 11 s w porównaniu do 87 s.


1
Wziąłem 10.086s na moją maszynę, co czyni cię aktualnym liderem! Ale jestem pewien, że możemy to zrobić lepiej :)

1
Czy możesz spróbować usunąć drugiego operatora trójskładnikowego i po prostu zwrócić 1 dla tego przypadku, ponieważ losowe liczby podwójne nie są sobie równe w tej ilości danych.
Codism

@Codism: Dodałbym, że nie dbamy o zamianę lokalizacji równoważnych danych, dlatego nawet gdybyśmy mogli uzyskać równoważne wartości, byłoby to odpowiednie uproszczenie.

10

Podzieliłem na segmenty w oparciu o odchylenie standardowe, które najlepiej podzielić na 4. Edycja: Przepisany na partycję na podstawie wartości x w http://en.wikipedia.org/wiki/Error_function#Table_of_values

http://www.wolframalpha.com/input/?i=percentages+by++normal+distribution

Próbowałem użyć mniejszych segmentów, ale wydawało się, że raz * 2 miało to niewielki wpływ na liczbę dostępnych rdzeni. Bez równoległych kolekcji zajęłoby 37 sekund na moim pudełku i 24 z równoległymi kolekcjami. Jeśli partycjonowanie odbywa się za pomocą dystrybucji, nie możesz po prostu użyć tablicy, więc jest więcej narzutu. Nie jestem pewien, kiedy wartość będzie pudełkowana / rozpakowywana w scala.

Używam scala 2.9 do zbierania równoległego. Możesz po prostu pobrać jego dystrybucję tar.gz.

Aby skompilować: scalac SortFile.scala (właśnie skopiowałem go bezpośrednio do folderu scala / bin.

Aby uruchomić: JAVA_OPTS = "- Xmx4096M" ./scala SortFile (uruchomiłem go z 2 koncertami pamięci RAM i dostałem mniej więcej w tym samym czasie)

Edycja: Usunięto przydzielanie bezpośrednie, wolniejsze niż tylko przydzielanie. Usunięto zalewanie początkowego rozmiaru buforów tablic. Właściwie odczytał wszystkie wartości 50000000. Przepisano, aby uniknąć problemów z autoboxingiem (wciąż wolniej niż naiwny c)

import java.io.FileInputStream;
import java.nio.ByteBuffer
import java.nio.ByteOrder
import scala.collection.mutable.ArrayBuilder


object SortFile {

//used partition numbers from Damascus' solution
val partList = List(0, 0.15731, 0.31864, 0.48878, 0.67449, 0.88715, 1.1503, 1.5341)

val listSize = partList.size * 2;
val posZero = partList.size;
val neg = partList.map( _ * -1).reverse.zipWithIndex
val pos = partList.map( _ * 1).zipWithIndex.reverse

def partition(dbl:Double): Int = { 

//for each partition, i am running through the vals in order
//could make this a binary search to be more performant... but our list size is 4 (per side)

  if(dbl < 0) { return neg.find( dbl < _._1).get._2  }
  if(dbl > 0) { return posZero  + pos.find( dbl > _._1).get._2  }
      return posZero; 

}

  def main(args: Array[String])
    { 

    var l = 0
    val dbls = new Array[Double](50000000)
    val partList = new Array[Int](50000000)
    val pa = Array.fill(listSize){Array.newBuilder[Double]}
    val channel = new FileInputStream("../../gaussian.dat").getChannel()
    val bb = ByteBuffer.allocate(50000000 * 8)
    bb.order(ByteOrder.LITTLE_ENDIAN)
    channel.read(bb)
    bb.rewind
    println("Loaded" + System.currentTimeMillis())
    var dbl = 0.0
    while(bb.hasRemaining)
    { 
      dbl = bb.getDouble
      dbls.update(l,dbl) 

      l+=1
    }
    println("Beyond first load" + System.currentTimeMillis());

    for( i <- (0 to 49999999).par) { partList.update(i, partition(dbls(i)))}

    println("Partition computed" + System.currentTimeMillis() )
    for(i <- (0 to 49999999)) { pa(partList(i)) += dbls(i) }
    println("Partition completed " + System.currentTimeMillis())
    val toSort = for( i <- pa) yield i.result()
    println("Arrays Built" + System.currentTimeMillis());
    toSort.par.foreach{i:Array[Double] =>scala.util.Sorting.quickSort(i)};

    println("Read\t" + System.currentTimeMillis());

  }
}

1
8,185s! Fajnie jak na rozwiązanie SCALA, chyba ... Brawo za dostarczenie pierwszego rozwiązania, które faktycznie wykorzystuje rozkład Gaussa w jakiś sposób!

1
Chciałem tylko konkurować z rozwiązaniem c #. Nie sądziłem, że pokonam c / c ++. Poza tym .. zachowuje się zupełnie inaczej dla ciebie niż dla mnie. Używam openJDK na moim końcu i jest o wiele wolniejszy. Zastanawiam się, czy dodanie większej liczby partycji pomogłoby w twojej env.
Scott

9

Po prostu włóż to do pliku cs i skompiluj teoretycznie z csc: (Wymaga mono)

using System;
using System.IO;
using System.Threading;

namespace Sort
{
    class Program
    {
        const int count = 50000000;
        static double[][] doubles;
        static WaitHandle[] waiting = new WaitHandle[4];
        static AutoResetEvent[] events = new AutoResetEvent[4];

        static double[] Merge(double[] left, double[] right)
        {
            double[] result = new double[left.Length + right.Length];
            int l = 0, r = 0, spot = 0;
            while (l < left.Length && r < right.Length)
            {
                if (right[r] < left[l])
                    result[spot++] = right[r++];
                else
                    result[spot++] = left[l++];
            }
            while (l < left.Length) result[spot++] = left[l++];
            while (r < right.Length) result[spot++] = right[r++];
            return result;
        }

        static void ThreadStart(object data)
        {
            int index = (int)data;
            Array.Sort(doubles[index]);
            events[index].Set();
        }

        static void Main(string[] args)
        {
            System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();
            watch.Start();
            byte[] bytes = File.ReadAllBytes(@"..\..\..\SortGuassian\Data.dat");
            doubles = new double[][] { new double[count / 4], new double[count / 4], new double[count / 4], new double[count / 4] };
            for (int i = 0; i < 4; i++)
            {
                for (int j = 0; j < count / 4; j++)
                {
                    doubles[i][j] = BitConverter.ToDouble(bytes, i * count/4 + j * 8);
                }
            }
            Thread[] threads = new Thread[4];
            for (int i = 0; i < 4; i++)
            {
                threads[i] = new Thread(ThreadStart);
                waiting[i] = events[i] = new AutoResetEvent(false);
                threads[i].Start(i);
            }
            WaitHandle.WaitAll(waiting);
            double[] left = Merge(doubles[0], doubles[1]);
            double[] right = Merge(doubles[2], doubles[3]);
            double[] result = Merge(left, right);
            watch.Stop();
            Console.WriteLine(watch.Elapsed.ToString());
            Console.ReadKey();
        }
    }
}

Czy mogę uruchomić twoje rozwiązania w Mono? Jak mam to zrobić?

Nie użyłem Mono, nie pomyślałem o tym, powinieneś być w stanie skompilować F #, a następnie uruchomić go.

1
Zaktualizowano, aby używać czterech wątków w celu poprawy wydajności. Teraz daje mi 6 sekund. Zauważ, że można to znacznie poprawić (prawdopodobnie 5 sekund), jeśli użyjesz tylko jednej wolnej tablicy i unikniesz inicjalizacji tony pamięci do zera, co robi CLR, ponieważ wszystko jest zapisywane przynajmniej raz.

1
9,598s na mojej maszynie! Jesteś aktualnym liderem :)

1
Moja mama powiedziała mi, abym trzymał się z dala od facetów z Mono!

8

Ponieważ wiesz, co to jest dystrybucja, możesz użyć sortowania bezpośredniego O (N). (Jeśli zastanawiasz się, co to jest, załóżmy, że masz talię 52 kart i chcesz ją posortować. Po prostu 52 pojemniki i wrzuć każdą kartę do osobnego pojemnika.)

Masz 5e7 podwójnych. Przydziel tablicę wyników R liczby 5e7 podwójnej. Weź każdą liczbę xi zdobądź i = phi(x) * 5e7. Zasadniczo tak R[i] = x. Mają sposób radzenia sobie z kolizjami, na przykład przenoszenie numeru, z którym może kolidować (jak w prostym kodowaniu mieszającym). Alternatywnie możesz zwiększyć R kilka razy, wypełniając go unikalną pustą wartością. Na koniec po prostu zamiatasz elementy R.

phijest po prostu funkcją rozkładu skumulowanego gaussa. Konwertuje liczbę rozproszoną gaussa między +/- nieskończonością na jednolitą liczbę rozproszoną między 0 a 1. Prostym sposobem jej obliczenia jest wyszukiwanie i interpolacja tabeli.


3
Uważaj: znasz rozkład przybliżony, a nie dokładny. Wiesz, że dane zostały wygenerowane przy użyciu prawa Gaussa, ale ponieważ są skończone, nie są dokładnie zgodne z Gaussa.

@static_rtti: W tym przypadku niezbędne zbliżenie phi spowodowałoby większy problem niż jakiekolwiek nieprawidłowości w zestawie danych IMO.

1
@static_rtti: nie musi być dokładny. Musi tylko rozłożyć dane, aby były w przybliżeniu jednolite, aby nie wiązały się zbytnio w niektórych miejscach.

Załóżmy, że masz 5e7 podwójnych. Dlaczego po prostu nie uczynić każdego wpisu w R wektorem, powiedzmy, 5e6 wektorów podwójnych. Następnie push_back każde podwójne w odpowiednim wektorze. Sortuj wektory i gotowe. Powinno to zająć liniowy czas w wielkości wejścia.
Neil G

W rzeczywistości widzę, że mdkess już wymyślił to rozwiązanie.
Neil G

8

Oto kolejne sekwencyjne rozwiązanie:

#include <stdio.h>
#include <stdlib.h>
#include <algorithm>
#include <ctime>

typedef unsigned long long ull;

int size;
double *dbuf, *copy;
int cnt[8][1 << 16];

void sort()
{
  const int step = 10;
  const int start = 24;
  ull mask = (1ULL << step) - 1;

  ull *ibuf = (ull *) dbuf;
  for (int i = 0; i < size; i++) {
    for (int w = start, v = 0; w < 64; w += step, v++) {
      int p = (~ibuf[i] >> w) & mask;
      cnt[v][p]++;
    }
  }

  int sum[8] = { 0 };
  for (int i = 0; i <= mask; i++) {
    for (int w = start, v = 0; w < 64; w += step, v++) {
      int tmp = sum[v] + cnt[v][i];
      cnt[v][i] = sum[v];
      sum[v] = tmp;
    }
  }

  for (int w = start, v = 0; w < 64; w += step, v++) {
    ull *ibuf = (ull *) dbuf;
    for (int i = 0; i < size; i++) {
      int p = (~ibuf[i] >> w) & mask;
      copy[cnt[v][p]++] = dbuf[i];
    }

    double *tmp = copy;
    copy = dbuf;
    dbuf = tmp;
  }

  for (int p = 0; p < size; p++)
    if (dbuf[p] >= 0.) {
      std::reverse(dbuf + p, dbuf + size);
      break;
    }

  // Insertion sort
  for (int i = 1; i < size; i++) {
    double value = dbuf[i];
    if (value < dbuf[i - 1]) {
      dbuf[i] = dbuf[i - 1];
      int p = i - 1;
      for (; p > 0 && value < dbuf[p - 1]; p--)
        dbuf[p] = dbuf[p - 1];
      dbuf[p] = value;
    }
  }
}

int main(int argc, char **argv) {
  size = atoi(argv[1]);
  dbuf = new double[size];
  copy = new double[size];

  FILE *f = fopen("gaussian.dat", "r");
  fread(dbuf, size, sizeof(double), f);
  fclose(f);

  clock_t c0 = clock();
  sort();
  printf("Finished after %.3f\n", (double) ((clock() - c0)) / CLOCKS_PER_SEC);
  return 0;
}

Wątpię, czy pobije to rozwiązanie wielowątkowe, ale czasy na moim laptopie i7 są (stdsort to rozwiązanie C ++ podane w innej odpowiedzi):

$ g++ -O3 mysort.cpp -o mysort && ./mysort 50000000
Finished after 2.10
$ g++ -O3 stdsort.cpp -o stdsort && ./stdsort
Finished after 7.12

Zauważ, że to rozwiązanie ma liniową złożoność czasową (ponieważ wykorzystuje specjalną reprezentację podwójnych).

EDYCJA : Naprawiono wzrost kolejności elementów.

EDYCJA : Poprawiona prędkość o prawie pół sekundy.

EDYCJA : Poprawiona prędkość o kolejne 0,7 sekundy. Sprawiono, że algorytm jest bardziej przyjazny dla pamięci podręcznej.

EDYCJA : Poprawiona prędkość o kolejne 1 sekundę. Ponieważ jest tam tylko 50 000 000 elementów, mogę częściowo posortować mantysę i użyć sortowania wstawek (co jest przyjazne dla pamięci podręcznej), aby naprawić elementy nie na miejscu. Ten pomysł usuwa około dwóch iteracji z ostatniej pętli sortowania podstawników.

EDYCJA : 0,16 sekundy mniej. Pierwszy std :: reverse można wyeliminować, jeśli kolejność sortowania zostanie odwrócona.


Teraz robi się ciekawie! Jaki to algorytm sortowania?
static_rtti

2
Sortowanie co najmniej cyfr znacznikowych . Możesz posortować mantysę, potem wykładnik, a następnie znak. Przedstawiony tutaj algorytm posuwa ten pomysł o krok dalej. Można to zrównoleglić za pomocą idei partycjonowania podanej w innej odpowiedzi.
Alexandru

Całkiem szybko jak na rozwiązanie z jednym gwintem: 2,552s! Czy uważasz, że mógłbyś zmienić swoje rozwiązanie, aby wykorzystać fakt, że dane są zwykle dystrybuowane? Prawdopodobnie możesz zrobić lepiej niż obecne najlepsze rozwiązania wielowątkowe.
static_rtti

1
@static_rtti: Widzę, że Damascus Steel opublikował już wielowątkową wersję tej implementacji. Poprawiłem zachowanie tego algorytmu w pamięci podręcznej, więc teraz powinieneś uzyskać lepsze czasy. Przetestuj tę nową wersję.
Alexandru

2
1,459 w moich najnowszych testach. Chociaż to rozwiązanie nie jest zwycięzcą według moich zasad, naprawdę zasługuje na duże uznanie. Gratulacje!
static_rtti

6

Biorąc rozwiązanie Christiana Ammera i równolegle go z gwintowanymi elementami konstrukcyjnymi Intela

#include <iostream>
#include <fstream>
#include <algorithm>
#include <vector>
#include <ctime>
#include <tbb/parallel_sort.h>

int main(void)
{
    std::ifstream ifs("gaussian.dat", std::ios::binary | std::ios::in);
    std::vector<double> values;
    values.reserve(50000000);
    double d;
    while (ifs.read(reinterpret_cast<char*>(&d), sizeof(double)))
    values.push_back(d);
    clock_t c0 = clock();
    tbb::parallel_sort(values.begin(), values.end());
    std::cout << "Finished after "
              << static_cast<double>((clock() - c0)) / CLOCKS_PER_SEC
              << std::endl;
}

Jeśli masz dostęp do biblioteki Intel Performance Primitive (IPP), możesz użyć jej sortowania radix. Po prostu wymień

#include <tbb/parallel_sort.h>

z

#include "ipps.h"

i

tbb::parallel_sort(values.begin(), values.end());

z

std::vector<double> copy(values.size());
ippsSortRadixAscend_64f_I(&values[0], &copy[0], values.size());

Na moim dwurdzeniowym laptopie czasy są

C               16.4 s
C#              20 s
C++ std::sort   7.2 s
C++ tbb         5 s
C++ ipp         4.5 s
python          too long

1
2,958s! TBB wydaje się całkiem fajny i łatwy w użyciu!

2
TBB jest absurdalnie niesamowite. Jest to dokładnie odpowiedni poziom abstrakcji do pracy algorytmicznej.
drxzcl

5

Co powiesz na implementację równoległego szybkiego sortowania, który wybiera swoje wartości przestawne na podstawie statystyk rozkładu, zapewniając w ten sposób równe rozmiary partycji? Pierwszy element przestawny miałby wartość średnią (w tym przypadku zero), kolejna para byłaby na 25. i 75. percentylu (+/- -0.67449 odchylenia standardowe) i tak dalej, przy każdej partycji o połowę pozostały zestaw danych więcej lub mniej idealnie.


Tak właściwie zrobiłem na moim ... oczywiście, że dostałeś ten post, zanim skończyłem pisać.

5

Bardzo brzydkie (po co używać tablic, kiedy mogę używać zmiennych kończących się cyframi), ale szybki kod (moja pierwsza próba std :: Thread), cały czas (czas rzeczywisty) w moim systemie 1,8 s (w porównaniu do std :: sort () 4,8 s), skompiluj z g ++ -std = c ++ 0x -O3 -march = natywny -pthread Wystarczy przekazać dane przez stdin (działa tylko dla 50M).

#include <cstdio>
#include <iostream>
#include <algorithm>
#include <thread>
using namespace std;
const size_t size=50000000;

void pivot(double* start,double * end, double middle,size_t& koniec){
    double * beg=start;
    end--;
    while (start!=end){
        if (*start>middle) swap (*start,*end--);
        else start++;
    }
    if (*end<middle) start+=1;
    koniec= start-beg;
}
void s(double * a, double* b){
    sort(a,b);
}
int main(){
    double *data=new double[size];
    FILE *f = fopen("gaussian.dat", "rb");
    fread(data,8,size,f);
    size_t end1,end2,end3,temp;
    pivot(data, data+size,0,end2);
    pivot(data, data+end2,-0.6745,end1);
    pivot(data+end2,data+size,0.6745,end3);
    end3+=end2;
    thread ts1(s,data,data+end1);
    thread ts2(s,data+end1,data+end2);
    thread ts3(s,data+end2,data+end3);
    thread ts4(s,data+end3,data+size);
    ts1.join(),ts2.join(),ts3.join(),ts4.join();
    //for (int i=0; i<size-1; i++){
    //  if (data[i]>data[i+1]) cerr<<"BLAD\n";
    //}
    fclose(f);
    //fwrite(data,8,size,stdout);
}

// Edycja została zmieniona, aby odczytać plik gaussian.dat.


Czy możesz to zmienić na read gaussian.dat, tak jak powyższe rozwiązania C ++?

Spróbuję później, kiedy wrócę do domu.
static_rtti

Bardzo fajne rozwiązanie, jesteś obecnym liderem (1.949s)! I miłe użycie rozkładu gaussowskiego :)
static_rtti

4

Wykorzystanie rozwiązania C ++ std::sort(ostatecznie szybsze niż qsort, w odniesieniu do wydajności qsort vs std :: sort )

#include <iostream>
#include <fstream>
#include <algorithm>
#include <vector>
#include <ctime>

int main(void)
{
    std::ifstream ifs("C:\\Temp\\gaussian.dat", std::ios::binary | std::ios::in);
    std::vector<double> values;
    values.reserve(50000000);
    double d;
    while (ifs.read(reinterpret_cast<char*>(&d), sizeof(double)))
        values.push_back(d);
    clock_t c0 = clock();
    std::sort(values.begin(), values.end());
    std::cout << "Finished after "
              << static_cast<double>((clock() - c0)) / CLOCKS_PER_SEC
              << std::endl;
}

Nie mogę rzetelnie powiedzieć, ile to trwa, ponieważ mam tylko 1 GB na moim komputerze, a dzięki podanemu kodowi Python mogłem stworzyć gaussian.datplik tylko z 25 mln kopii podwójnych (bez błędu pamięci). Ale jestem bardzo zainteresowany, jak długo działa algorytm std :: sort.


6,425 s! Zgodnie z oczekiwaniami, C ++ świeci :)

@static_rtti: Próbowałem algorytmu swimsons Timsort (jak sugeruje Matthieu M. w pierwszym pytaniu ). Musiałem wprowadzić pewne zmiany w sort.hpliku, aby skompilować go w C ++. Było około dwa razy wolniej niż std::sort. Nie wiem dlaczego, może z powodu optymalizacji kompilatora?
Christian Ammer,

4

Oto mieszanka radixu Alexandru z inteligentnym obrotowym gwintowaniem Zjareka. Skompiluj to z

g++ -std=c++0x -pthread -O3 -march=native sorter_gaussian_radix.cxx -o sorter_gaussian_radix

Możesz zmienić rozmiar podstawy określając STEP (np. Dodaj -DSTEP = 11). Uważam, że najlepszy dla mojego laptopa jest 8 (domyślnie).

Domyślnie dzieli problem na 4 części i uruchamia go na wielu wątkach. Możesz to zmienić, przekazując parametr głębokości do wiersza poleceń. Więc jeśli masz dwa rdzenie, uruchom go jako

sorter_gaussian_radix 50000000 1

a jeśli masz 16 rdzeni

sorter_gaussian_radix 50000000 4

Maksymalna głębokość w tej chwili wynosi 6 (64 wątków). Jeśli ustawisz zbyt wiele poziomów, po prostu spowolnisz kod.

Próbowałem też sortowania radix z biblioteki Intel Performance Primitive (IPP). Implementacja Alexandru mocno niepokoi IPP, przy czym IPP jest o około 30% wolniejszy. Ta odmiana jest tu również uwzględniona (skomentowana).

#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <ctime>
#include <iostream>
#include <thread>
#include <vector>
#include <boost/cstdint.hpp>
// #include "ipps.h"

#ifndef STEP
#define STEP 8
#endif

const int step = STEP;
const int start_step=24;
const int num_steps=(64-start_step+step-1)/step;
int size;
double *dbuf, *copy;

clock_t c1, c2, c3, c4, c5;

const double distrib[]={-2.15387,
                        -1.86273,
                        -1.67594,
                        -1.53412,
                        -1.4178,
                        -1.31801,
                        -1.22986,
                        -1.15035,
                        -1.07752,
                        -1.00999,
                        -0.946782,
                        -0.887147,
                        -0.830511,
                        -0.776422,
                        -0.724514,
                        -0.67449,
                        -0.626099,
                        -0.579132,
                        -0.53341,
                        -0.488776,
                        -0.445096,
                        -0.40225,
                        -0.36013,
                        -0.318639,
                        -0.27769,
                        -0.237202,
                        -0.197099,
                        -0.157311,
                        -0.11777,
                        -0.0784124,
                        -0.0391761,
                        0,
                        0.0391761,
                        0.0784124,
                        0.11777,
                        0.157311,
                        0.197099,
                        0.237202,
                        0.27769,
                        0.318639,
                        0.36013,
                        0.40225,
                        0.445097,
                        0.488776,
                        0.53341,
                        0.579132,
                        0.626099,
                        0.67449,
                        0.724514,
                        0.776422,
                        0.830511,
                        0.887147,
                        0.946782,
                        1.00999,
                        1.07752,
                        1.15035,
                        1.22986,
                        1.31801,
                        1.4178,
                        1.53412,
                        1.67594,
                        1.86273,
                        2.15387};


class Distrib
{
  const int value;
public:
  Distrib(const double &v): value(v) {}

  bool operator()(double a)
  {
    return a<value;
  }
};


void recursive_sort(const int start, const int end,
                    const int index, const int offset,
                    const int depth, const int max_depth)
{
  if(depth<max_depth)
    {
      Distrib dist(distrib[index]);
      const int middle=std::partition(dbuf+start,dbuf+end,dist) - dbuf;

      // const int middle=
      //   std::partition(dbuf+start,dbuf+end,[&](double a)
      //                  {return a<distrib[index];})
      //   - dbuf;

      std::thread lower(recursive_sort,start,middle,index-offset,offset/2,
                        depth+1,max_depth);
      std::thread upper(recursive_sort,middle,end,index+offset,offset/2,
                        depth+1,max_depth);
      lower.join(), upper.join();
    }
  else
    {
  // ippsSortRadixAscend_64f_I(dbuf+start,copy+start,end-start);

      c1=clock();

      double *dbuf_local(dbuf), *copy_local(copy);
      boost::uint64_t mask = (1 << step) - 1;
      int cnt[num_steps][mask+1];

      boost::uint64_t *ibuf = reinterpret_cast<boost::uint64_t *> (dbuf_local);

      for(int i=0;i<num_steps;++i)
        for(uint j=0;j<mask+1;++j)
          cnt[i][j]=0;

      for (int i = start; i < end; i++)
        {
          for (int w = start_step, v = 0; w < 64; w += step, v++)
            {
              int p = (~ibuf[i] >> w) & mask;
              (cnt[v][p])++;
            }
        }

      c2=clock();

      std::vector<int> sum(num_steps,0);
      for (uint i = 0; i <= mask; i++)
        {
          for (int w = start_step, v = 0; w < 64; w += step, v++)
            {
              int tmp = sum[v] + cnt[v][i];
              cnt[v][i] = sum[v];
              sum[v] = tmp;
            }
        }

      c3=clock();

      for (int w = start_step, v = 0; w < 64; w += step, v++)
        {
          ibuf = reinterpret_cast<boost::uint64_t *>(dbuf_local);

          for (int i = start; i < end; i++)
            {
              int p = (~ibuf[i] >> w) & mask;
              copy_local[start+((cnt[v][p])++)] = dbuf_local[i];
            }
          std::swap(copy_local,dbuf_local);
        }

      // Do the last set of reversals
      for (int p = start; p < end; p++)
        if (dbuf_local[p] >= 0.)
          {
            std::reverse(dbuf_local+p, dbuf_local + end);
            break;
          }

      c4=clock();

      // Insertion sort
      for (int i = start+1; i < end; i++) {
        double value = dbuf_local[i];
        if (value < dbuf_local[i - 1]) {
          dbuf_local[i] = dbuf_local[i - 1];
          int p = i - 1;
          for (; p > 0 && value < dbuf_local[p - 1]; p--)
            dbuf_local[p] = dbuf_local[p - 1];
          dbuf_local[p] = value;
        }
      }
      c5=clock();

    }
}


int main(int argc, char **argv) {
  size = atoi(argv[1]);
  copy = new double[size];

  dbuf = new double[size];
  FILE *f = fopen("gaussian.dat", "r");
  fread(dbuf, size, sizeof(double), f);
  fclose(f);

  clock_t c0 = clock();

  const int max_depth= (argc > 2) ? atoi(argv[2]) : 2;

  // ippsSortRadixAscend_64f_I(dbuf,copy,size);

  recursive_sort(0,size,31,16,0,max_depth);

  if(num_steps%2==1)
    std::swap(dbuf,copy);

  // for (int i=0; i<size-1; i++){
  //   if (dbuf[i]>dbuf[i+1])
  //     std::cout << "BAD "
  //               << i << " "
  //               << dbuf[i] << " "
  //               << dbuf[i+1] << " "
  //               << "\n";
  // }

  std::cout << "Finished after "
            << (double) (c1 - c0) / CLOCKS_PER_SEC << " "
            << (double) (c2 - c1) / CLOCKS_PER_SEC << " "
            << (double) (c3 - c2) / CLOCKS_PER_SEC << " "
            << (double) (c4 - c3) / CLOCKS_PER_SEC << " "
            << (double) (c5 - c4) / CLOCKS_PER_SEC << " "
            << "\n";

  // delete [] dbuf;
  // delete [] copy;
  return 0;
}

EDYCJA : Wdrożyłem ulepszenia pamięci podręcznej Alexandru, co skróciło około 30% czasu na moim komputerze.

EDYCJA : To implementuje sortowanie rekurencyjne, więc powinno dobrze działać na 16-rdzeniowej maszynie Alexandru. Wykorzystuje także ostatnie ulepszenie Alexandru i usuwa jeden z rewersów. Dla mnie to dało 20% poprawę.

EDYCJA : Naprawiono błąd znaku, który powodował nieefektywność, gdy jest więcej niż 2 rdzenie.

EDYCJA : Usunięto lambda, aby skompilowała się ze starszymi wersjami gcc. Obejmuje skomentowaną odmianę kodu IPP. Poprawiłem również dokumentację dotyczącą działania na 16 rdzeniach. O ile wiem, jest to najszybsza implementacja.

EDYCJA : Naprawiono błąd, gdy STEP nie jest 8. Zwiększono maksymalną liczbę wątków do 64. Dodano kilka informacji o taktowaniu.


Miły. Sortowanie Radix jest bardzo nieprzyjazne dla pamięci podręcznej. Sprawdź, czy możesz uzyskać lepsze wyniki, zmieniając step(11 było optymalne na moim laptopie).
Alexandru

Masz błąd: int cnt[mask]powinien być int cnt[mask + 1]. Aby uzyskać lepsze wyniki, użyj stałej wartości int cnt[1 << 16].
Alexandru

Wypróbuję wszystkie te rozwiązania dzisiaj, kiedy wrócę do domu.
static_rtti

1,534s !!! Myślę, że mamy lidera :-D
static_rtti

@static_rtti: Czy możesz spróbować ponownie? Stał się znacznie szybszy niż ostatnim razem, gdy go wypróbowałeś. Na moim komputerze jest znacznie szybszy niż jakiekolwiek inne rozwiązanie.
Damascus Steel

2

Myślę, że to naprawdę zależy od tego, co chcesz zrobić. Jeśli chcesz posortować grupę Gaussów, to ci to nie pomoże. Ale jeśli chcesz garść posortowanych Gaussów, tak będzie. Nawet jeśli to trochę pomija ten problem, myślę, że interesujące byłoby porównanie z faktycznymi procedurami sortowania.

Jeśli chcesz być szybki, zrób mniej.

Zamiast generować wiązkę losowych próbek z rozkładu normalnego, a następnie sortować, można wygenerować wiązkę próbek z rozkładu normalnego w uporządkowanej kolejności.

Możesz użyć rozwiązania tutaj, aby wygenerować n jednolitych liczb losowych w posortowanej kolejności. Następnie możesz użyć odwrotnego cdf (scipy.stats.norm.ppf) rozkładu normalnego, aby przekształcić jednolite liczby losowe w liczby z rozkładu normalnego za pomocą próbkowania z transformacją odwrotną .

import scipy.stats
import random

# slightly modified from linked stackoverflow post
def n_random_numbers_increasing(n):
  """Like sorted(random() for i in range(n))),                                
  but faster because we avoid sorting."""
  v = 1.0
  while n:
    v *= random.random() ** (1.0 / n)
    yield 1 - v
    n -= 1

def n_normal_samples_increasing(n):
  return map(scipy.stats.norm.ppf, n_random_numbers_increasing(n))

Jeśli chcesz zabrudzić sobie ręce, domyślam się, że możesz przyspieszyć wiele odwrotnych obliczeń cdf, stosując jakąś metodę iteracyjną i wykorzystując poprzedni wynik jako początkowe przypuszczenie. Ponieważ domysły będą bardzo bliskie, prawdopodobnie jedna iteracja zapewni dużą dokładność.


2
Dobra odpowiedź, ale to byłoby oszustwo :) Ideą mojego pytania jest to, że choć algorytmy sortowania poświęcono ogromną uwagę, prawie nie ma literatury na temat korzystania z wcześniejszej wiedzy na temat danych do sortowania, mimo że kilka artykułów zawiera rozwiązany problem zgłosił niezłe zyski. Zobaczmy więc, co jest możliwe!

2

Wypróbuj to zmieniające się rozwiązanie Guvante z tym Main (), zaczyna sortować, gdy tylko odczyt 1/4 IO zostanie zakończony, w moim teście jest szybszy:

    static void Main(string[] args)
    {
        FileStream filestream = new FileStream(@"..\..\..\gaussian.dat", FileMode.Open, FileAccess.Read);
        doubles = new double[][] { new double[count / 4], new double[count / 4], new double[count / 4], new double[count / 4] };
        Thread[] threads = new Thread[4];

        for (int i = 0; i < 4; i++)
        {
            byte[] bytes = new byte[count * 4];
            filestream.Read(bytes, 0, count * 4);

            for (int j = 0; j < count / 4; j++)
            {
                doubles[i][j] = BitConverter.ToDouble(bytes, i * count/4 + j * 8);
            }

            threads[i] = new Thread(ThreadStart);
            waiting[i] = events[i] = new AutoResetEvent(false);
            threads[i].Start(i);    
        }

        WaitHandle.WaitAll(waiting);
        double[] left = Merge(doubles[0], doubles[1]);
        double[] right = Merge(doubles[2], doubles[3]);
        double[] result = Merge(left, right);
        Console.ReadKey();
    }
}

8,933s. Nieco szybciej :)

2

Ponieważ znasz rozkład, moim pomysłem byłoby zrobienie k segmentów, każdy z taką samą oczekiwaną liczbą elementów (ponieważ znasz rozkład, możesz to obliczyć). Następnie w czasie O (n) zmieść tablicę i włożyć elementy do ich wiader.

Następnie równolegle posortuj wiadra. Załóżmy, że masz k wiader i n elementów. Sortowanie zajmie (n / k) lg (n / k) czas. Załóżmy teraz, że masz procesory p, których możesz użyć. Ponieważ wiadra można sortować niezależnie, masz do czynienia z mnożnikiem pułapu (k / p). Daje to końcowy czas działania n + ceil (k / p) * (n / k) lg (n / k), co powinno być o wiele szybsze niż n lg n, jeśli dobrze wybierzesz k.


Myślę, że to najlepsze rozwiązanie.
Neil G

Nie znasz dokładnie liczby elementów, które trafią do wiadra, więc matematyka jest w rzeczywistości błędna. Biorąc to pod uwagę, wydaje mi się, że to dobra odpowiedź.
poulejapon

@pouejapon: Masz rację.
Neil G

Ta odpowiedź brzmi naprawdę dobrze. Problem w tym, że to nie jest naprawdę szybkie. Zaimplementowałem to w C99 (patrz moja odpowiedź) iz pewnością łatwo bije std::sort(), ale jest znacznie wolniejsze niż rozwiązanie radixsort Alexandru.
Sven Marnach,

2

Jednym z pomysłów optymalizacji niskiego poziomu jest dopasowanie dwóch podwójnych danych do rejestru SSE, aby każdy wątek działał jednocześnie z dwoma elementami. W przypadku niektórych algorytmów może to być skomplikowane.

Inną rzeczą do zrobienia jest posortowanie tablicy w części przyjazne dla pamięci podręcznej, a następnie scalenie wyników. Należy zastosować dwa poziomy: na przykład pierwsze 4 KB dla L1, a następnie 64 KB dla L2.

Powinno to być bardzo przyjazne dla pamięci podręcznej, ponieważ sortowanie kubełkowe nie wyjdzie poza pamięć podręczną, a końcowe scalanie będzie przechodzić pamięć sekwencyjnie.

Obecnie obliczenia są znacznie tańsze niż dostęp do pamięci. Mamy jednak dużą liczbę elementów, więc trudno powiedzieć, jaki jest rozmiar tablicy, gdy głupie sortowanie z pamięcią podręczną jest wolniejsze niż wersja o niskiej złożoności, która nie obsługuje pamięci podręcznej.

Ale nie przedstawię implementacji powyższego, ponieważ zrobiłbym to w systemie Windows (VC ++).


2

Oto implementacja sortowania kubełkowego skanowania liniowego. Myślę, że jest szybszy niż wszystkie obecne implementacje jednowątkowe oprócz sortowania radix. Powinien mieć liniowy oczekiwany czas działania, jeśli odpowiednio oceniam plik cdf (używam interpolacji liniowej wartości znalezionych w Internecie) i nie popełniłem żadnych błędów, które spowodowałyby nadmierne skanowanie:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <ctime>

using std::fill;

const double q[] = {
  0.0,
  9.865E-10,
  2.8665150000000003E-7,
  3.167E-5,
  0.001349898,
  0.022750132,
  0.158655254,
  0.5,
  0.8413447460000001,
  0.9772498679999999,
  0.998650102,
  0.99996833,
  0.9999997133485,
  0.9999999990134999,
  1.0,
};
int main(int argc, char** argv) {
  if (argc <= 1)
    return puts("No argument!");
  unsigned count = atoi(argv[1]);
  unsigned count2 = 3 * count;

  bool *ba = new bool[count2 + 1000];
  fill(ba, ba + count2 + 1000, false);
  double *a = new double[count];
  double *c = new double[count2 + 1000];

  FILE *f = fopen("gaussian.dat", "rb");
  if (fread(a, 8, count, f) != count)
    return puts("fread failed!");
  fclose(f);

  int i;
  int j;
  bool s;
  int t;
  double z;
  double p;
  double d1;
  double d2;
  for (i = 0; i < count; i++) {
    s = a[i] < 0;
    t = a[i];
    if (s) t--;
    z = a[i] - t;
    t += 7;
    if (t < 0) {
      t = 0;
      z = 0;
    } else if (t >= 14) {
      t = 13;
      z = 1;
    }
    p = q[t] * (1 - z) + q[t + 1] * z;
    j = count2 * p;
    while (ba[j] && c[j] < a[i]) {
      j++;
    }
    if (!ba[j]) {
      ba[j] = true;
      c[j] = a[i];
    } else {
      d1 = c[j];
      c[j] = a[i];
      j++;
      while (ba[j]) {
        d2 = c[j];
        c[j] = d1;
        d1 = d2;
        j++;
      }
      c[j] = d1;
      ba[j] = true;
    }
  }
  i = 0;
  int max = count2 + 1000;
  for (j = 0; j < max; j++) {
    if (ba[j]) {
      a[i++] = c[j];
    }
  }
  // for (i = 0; i < count; i += 1) {
  //   printf("here %f\n", a[i]);
  // }
  return 0;
}

1
Spróbuję tego później, kiedy wrócę do domu. Czy w międzyczasie mogę powiedzieć, że twój kod jest bardzo brzydki? :-D
static_rtti

3.071s! Nieźle jak na rozwiązanie jednowątkowe!
static_rtti

2

Nie wiem, dlaczego nie mogę edytować mojego poprzedniego postu, więc oto nowa wersja, 0,2 sekundy szybciej (ale około 1,5 s szybciej w czasie procesora (użytkownika)). To rozwiązanie ma 2 programy, najpierw wstępnie oblicza kwantyle dla rozkładu normalnego do sortowania w segmentach i zapisuje je w tabeli, t [double * scale] = indeks segmentu, gdzie skala jest dowolną liczbą, która umożliwia rzutowanie do dwukrotności. Następnie program główny może wykorzystać te dane do umieszczenia podwójnej liczby w poprawnym segmencie. Ma jedną wadę, jeśli dane nie są gaussowskie, nie będzie działać poprawnie (a także istnieje prawie zerowa szansa na niepoprawną pracę dla normalnej dystrybucji), ale modyfikacja dla specjalnego przypadku jest łatwa i szybka (tylko liczba kontroli segmentów i spada do standardowej ::sortować()).

Kompilacja: g ++ => http://pastebin.com/WG7pZEzH program pomocniczy

g ++ -std = c ++ 0x -O3 -march = natywny -pthread => http://pastebin.com/T3yzViZP główny program sortujący


1.621s! Myślę, że jesteś liderem, ale szybko
tracę orientację

2

Oto kolejne rozwiązanie sekwencyjne. Ten wykorzystuje fakt, że elementy są normalnie rozmieszczone, i myślę, że pomysł ma ogólne zastosowanie, aby uzyskać sortowanie zbliżone do czasu liniowego.

Algorytm wygląda następująco:

  • Przybliżony CDF (patrz phi()funkcja w implementacji)
  • Dla wszystkich elementów oblicz przybliżoną pozycję w posortowanej tablicy: size * phi(x)
  • Umieść elementy w nowej tablicy blisko ich ostatecznej pozycji
    • W mojej implementacji tablica docelowa ma pewne luki, więc nie muszę przesuwać zbyt wielu elementów podczas wstawiania.
  • Użyj insertSort, aby posortować ostatnie elementy (insertSort jest liniowy, jeśli odległość do pozycji końcowej jest mniejsza niż stała).

Niestety, ukryta stała jest dość duża, a to rozwiązanie jest dwa razy wolniejsze niż algorytm sortowania radix.


1
2.470s! Bardzo fajne pomysły. Nie ma znaczenia, że ​​rozwiązanie nie jest najszybsze, jeśli pomysły są interesujące :)
static_rtti

1
To jest to samo co moje, ale grupowanie obliczeń phi razem i przesunięć razem dla lepszej wydajności pamięci podręcznej, prawda?
jonderry

@jonderry: Głosowałem za twoim rozwiązaniem, teraz, gdy rozumiem, co robi. Nie chciałem ukraść twojego pomysłu. Włączyłem twoją implementację do mojego (nieoficjalnego) zestawu testów
Alexandru

2

Mój osobisty faworyt wykorzystujący wątkowe bloki konstrukcyjne Intela został już opublikowany, ale oto prymitywne równoległe rozwiązanie z użyciem JDK 7 i jego nowego interfejsu API rozwidlenia / dołączania:

import java.io.FileInputStream;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.concurrent.*;
import static java.nio.channels.FileChannel.MapMode.READ_ONLY;
import static java.nio.ByteOrder.LITTLE_ENDIAN;


/**
 * 
 * Original Quicksort: https://github.com/pmbauer/parallel/tree/master/src/main/java/pmbauer/parallel
 *
 */
public class ForkJoinQuicksortTask extends RecursiveAction {

    public static void main(String[] args) throws Exception {

        double[] array = new double[Integer.valueOf(args[0])];

        FileChannel fileChannel = new FileInputStream("gaussian.dat").getChannel();
        fileChannel.map(READ_ONLY, 0, fileChannel.size()).order(LITTLE_ENDIAN).asDoubleBuffer().get(array);

        ForkJoinPool mainPool = new ForkJoinPool();

        System.out.println("Starting parallel computation");

        mainPool.invoke(new ForkJoinQuicksortTask(array));        
    }

    private static final long serialVersionUID = -642903763239072866L;
    private static final int SERIAL_THRESHOLD = 0x1000;

    private final double a[];
    private final int left, right;

    public ForkJoinQuicksortTask(double[] a) {this(a, 0, a.length - 1);}

    private ForkJoinQuicksortTask(double[] a, int left, int right) {
        this.a = a;
        this.left = left;
        this.right = right;
    }

    @Override
    protected void compute() {
        if (right - left < SERIAL_THRESHOLD) {
            Arrays.sort(a, left, right + 1);
        } else {
            int pivotIndex = partition(a, left, right);
            ForkJoinTask<Void> t1 = null;

            if (left < pivotIndex)
                t1 = new ForkJoinQuicksortTask(a, left, pivotIndex).fork();
            if (pivotIndex + 1 < right)
                new ForkJoinQuicksortTask(a, pivotIndex + 1, right).invoke();

            if (t1 != null)
                t1.join();
        }
    }

    public static int partition(double[] a, int left, int right) {
        // chose middle value of range for our pivot
        double pivotValue = a[left + (right - left) / 2];

        --left;
        ++right;

        while (true) {
            do
                ++left;
            while (a[left] < pivotValue);

            do
                --right;
            while (a[right] > pivotValue);

            if (left < right) {
                double tmp = a[left];
                a[left] = a[right];
                a[right] = tmp;
            } else {
                return right;
            }
        }
    }    
}

Ważna informacja : wziąłem adaptację szybkiego sortowania dla fork / join z: https://github.com/pmbauer/parallel/tree/master/src/main/java/pmbauer/parallel

Aby to uruchomić, potrzebujesz wersji beta JDK 7 (http://jdk7.java.net/download.html).

Na moim 2,93 GHz Quad Core i7 (OS X):

Odwołanie do Pythona

time python sort.py 50000000
sorting...

real    1m13.885s
user    1m11.942s
sys     0m1.935s

Widelec / łączenie Java JDK 7

time java ForkJoinQuicksortTask 50000000
Starting parallel computation

real    0m2.404s
user    0m10.195s
sys     0m0.347s

Próbowałem też trochę eksperymentować z równoległym czytaniem i konwertowaniem bajtów na podwójne, ale nie zauważyłem żadnej różnicy.

Aktualizacja:

Jeśli ktoś chce eksperymentować z równoległym ładowaniem danych, wersja ładowania równoległego jest poniżej. Teoretycznie może to jeszcze trochę przyspieszyć, jeśli twoje urządzenie IO ma wystarczającą pojemność równoległą (zwykle dyski SSD). Tworzenie dublów z bajtów wiąże się również z pewnym nakładem, więc potencjalnie może to również przyspieszyć równolegle. Na moich systemach (Ubuntu 10.10 / Nehalem Quad / Intel X25M SSD i OS X 10.6 / i7 Quad / Samsung SSD) nie widziałem żadnej prawdziwej różnicy.

import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static java.nio.channels.FileChannel.MapMode.READ_ONLY;

import java.io.FileInputStream;
import java.nio.DoubleBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;


/**
 *
 * Original Quicksort: https://github.com/pmbauer/parallel/tree/master/src/main/java/pmbauer/parallel
 *
 */
public class ForkJoinQuicksortTask extends RecursiveAction {

   public static void main(String[] args) throws Exception {

       ForkJoinPool mainPool = new ForkJoinPool();

       double[] array = new double[Integer.valueOf(args[0])];
       FileChannel fileChannel = new FileInputStream("gaussian.dat").getChannel();
       DoubleBuffer buffer = fileChannel.map(READ_ONLY, 0, fileChannel.size()).order(LITTLE_ENDIAN).asDoubleBuffer();

       mainPool.invoke(new ReadAction(buffer, array, 0, array.length));
       mainPool.invoke(new ForkJoinQuicksortTask(array));
   }

   private static final long serialVersionUID = -642903763239072866L;
   private static final int SERIAL_THRESHOLD = 0x1000;

   private final double a[];
   private final int left, right;

   public ForkJoinQuicksortTask(double[] a) {this(a, 0, a.length - 1);}

   private ForkJoinQuicksortTask(double[] a, int left, int right) {
       this.a = a;
       this.left = left;
       this.right = right;
   }

   @Override
   protected void compute() {
       if (right - left < SERIAL_THRESHOLD) {
           Arrays.sort(a, left, right + 1);
       } else {
           int pivotIndex = partition(a, left, right);
           ForkJoinTask<Void> t1 = null;

           if (left < pivotIndex)
               t1 = new ForkJoinQuicksortTask(a, left, pivotIndex).fork();
           if (pivotIndex + 1 < right)
               new ForkJoinQuicksortTask(a, pivotIndex + 1, right).invoke();

           if (t1 != null)
               t1.join();
       }
   }

   public static int partition(double[] a, int left, int right) {
       // chose middle value of range for our pivot
       double pivotValue = a[left + (right - left) / 2];

       --left;
       ++right;

       while (true) {
           do
               ++left;
           while (a[left] < pivotValue);

           do
               --right;
           while (a[right] > pivotValue);

           if (left < right) {
               double tmp = a[left];
               a[left] = a[right];
               a[right] = tmp;
           } else {
               return right;
           }
       }
   }

}

class ReadAction extends RecursiveAction {

   private static final long serialVersionUID = -3498527500076085483L;

   private final DoubleBuffer buffer;
   private final double[] array;
   private final int low, high;

   public ReadAction(DoubleBuffer buffer, double[] array, int low, int high) {
       this.buffer = buffer;
       this.array = array;
       this.low = low;
       this.high = high;
   }

   @Override
   protected void compute() {
       if (high - low < 100000) {
           buffer.position(low);
           buffer.get(array, low, high-low);
       } else {
           int middle = (low + high) >>> 1;

           invokeAll(new ReadAction(buffer.slice(), array, low, middle),  new ReadAction(buffer.slice(), array, middle, high));
       }
   }
}

Aktualizacja 2:

Wykonałem kod na jednej z naszych 12 podstawowych maszyn programistycznych z niewielką modyfikacją, aby ustawić stałą liczbę rdzeni. To dało następujące wyniki:

Cores  Time
1      7.568s
2      3.903s
3      3.325s
4      2.388s
5      2.227s
6      1.956s
7      1.856s
8      1.827s
9      1.682s
10     1.698s
11     1.620s
12     1.503s

W tym systemie wypróbowałem także wersję Pythona, która zajęła 1m2.994s oraz wersję C ++ Zjareka, która zajęła 1.925s (z jakiegoś powodu wersja C ++ Zjareka wydaje się działać stosunkowo szybciej na komputerze static_rtti).

Próbowałem również, co się stanie, jeśli podwoję rozmiar pliku do 100 000 000 kopii dwukrotnie:

Cores  Time
1      15.056s
2      8.116s
3      5.925s
4      4.802s
5      4.430s
6      3.733s
7      3.540s
8      3.228s
9      3.103s
10     2.827s
11     2.784s
12     2.689s

W tym przypadku wersja C ++ Zjarek zajęła 3.968s. Python trwał tu zbyt długo.

150 000 000 podwójnych:

Cores  Time
1      23.295s
2      12.391s
3      8.944s
4      6.990s
5      6.216s
6      6.211s
7      5.446s
8      5.155s
9      4.840s
10     4.435s
11     4.248s
12     4.174s

W tym przypadku wersja C ++ w Zjarek miała 6.044s. Nawet nie próbowałem Pythona.

Wersja C ++ jest bardzo spójna z wynikami, w których Java trochę się waha. Najpierw staje się trochę bardziej wydajny, gdy problem staje się większy, ale potem znów mniej wydajny.


1
Ten kod nie analizuje poprawnie podwójnych wartości dla mnie. Czy Java 7 jest wymagane, aby poprawnie parsować wartości z pliku?
poniedziałek,

1
Ach, głupie ja. Zapomniałem ponownie ustawić endianness po tym, jak lokalnie zrefakturowałem kod IO z wielu wierszy do jednego. Java 7 byłaby normalnie potrzebna, chyba że dodałeś osobno fork / join do Java 6 oczywiście.
arjan

3,411 na moim komputerze. Nieźle, ale wolniej niż rozwiązanie Java
Koumes21

1
Spróbuję rozwiązania koumes21 tutaj też lokalnie, aby zobaczyć, jakie są względne różnice w moim systemie. Zresztą nie ma się czego wstydzić w „przegrywaniu” z koumes21, ponieważ jest to znacznie mądrzejsze rozwiązanie. To tylko prawie standardowe szybkie sortowanie wrzucone do rozwidlenia / dołączenia do puli;)
arjan

1

Wersja wykorzystująca tradycyjne wątki. Kod scalania skopiowany z odpowiedzi Guvante. Kompiluj z g++ -O3 -pthread.

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <algorithm>

static unsigned int nthreads = 4;
static unsigned int size = 50000000;

typedef struct {
  double *array;
  int size;
} array_t;


void 
merge(double *left, int leftsize,
      double *right, int rightsize,
      double *result)
{
  int l = 0, r = 0, insertat = 0;
  while (l < leftsize && r < rightsize) {
    if (left[l] < right[r])
      result[insertat++] = left[l++];
    else
      result[insertat++] = right[r++];
  }

  while (l < leftsize) result[insertat++] = left[l++];
  while (r < rightsize) result[insertat++] = right[r++];
}


void *
run_thread(void *input)
{
  array_t numbers = *(array_t *)input;
  std::sort(numbers.array, numbers.array+numbers.size); 
  pthread_exit(NULL);
}

int 
main(int argc, char **argv) 
{
  double *numbers = (double *) malloc(size * sizeof(double));

  FILE *f = fopen("gaussian.dat", "rb");
  if (fread(numbers, sizeof(double), size, f) != size)
    return printf("Reading gaussian.dat failed");
  fclose(f);

  array_t worksets[nthreads];
  int worksetsize = size / nthreads;
  for (int i = 0; i < nthreads; i++) {
    worksets[i].array=numbers+(i*worksetsize);
    worksets[i].size=worksetsize;
  }

  pthread_attr_t attributes;
  pthread_attr_init(&attributes);
  pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE);

  pthread_t threads[nthreads];
  for (int i = 0; i < nthreads; i++) {
    pthread_create(&threads[i], &attributes, &run_thread, &worksets[i]);
  }

  for (int i = 0; i < nthreads; i++) {
    pthread_join(threads[i], NULL);
  }

  double *tmp = (double *) malloc(size * sizeof(double));
  merge(numbers, worksetsize, numbers+worksetsize, worksetsize, tmp);
  merge(numbers+(worksetsize*2), worksetsize, numbers+(worksetsize*3), worksetsize, tmp+(size/2));
  merge(tmp, worksetsize*2, tmp+(size/2), worksetsize*2, numbers);

  /*
  printf("Verifying result..\n");
  for (int i = 0; i < size - 1; i++) {
    if (numbers[i] > numbers[i+1])
      printf("Result is not correct\n");
  }
  */

  pthread_attr_destroy(&attributes);
  return 0;
}  

Na moim laptopie otrzymuję następujące wyniki:

real    0m6.660s
user    0m9.449s
sys     0m1.160s

1

Oto sekwencyjna implementacja C99, która próbuje naprawdę wykorzystać znaną dystrybucję. Zasadniczo wykonuje pojedynczą rundę sortowania segmentu z wykorzystaniem informacji o dystrybucji, a następnie kilka rund szybkiego sortowania w każdym segmencie, zakładając jednolity rozkład w granicach segmentu, a na koniec zmodyfikowany sortowanie selekcji w celu skopiowania danych z powrotem do pierwotnego bufora. Quicksort zapamiętuje punkty podziału, więc sortowanie selekcji musi działać tylko na małych porcjach. I pomimo (bo?) Całej tej złożoności, nie jest nawet tak naprawdę szybka.

Aby szybko ocenić,, wartości są próbkowane w kilku punktach, a później stosowana jest tylko interpolacja liniowa. W rzeczywistości nie ma znaczenia, czy Φ jest dokładnie oszacowane, o ile przybliżenie jest ściśle monotoniczne.

Rozmiary pojemników dobiera się w taki sposób, aby ryzyko przepełnienia pojemnika było znikome. Mówiąc dokładniej, przy obecnych parametrach prawdopodobieństwo, że zestaw danych 50000000 elementów spowoduje przepełnienie kosza, wynosi 3,65e-09. (Może to być obliczony przy użyciu funkcji przeżycia na rozkład Poissona ).

Aby skompilować, użyj

gcc -std=c99 -msse3 -O3 -ffinite-math-only

Ponieważ obliczenia są znacznie większe niż w innych rozwiązaniach, te flagi kompilatora są potrzebne, aby uczynić je przynajmniej względnie szybkim. Bez -msse3konwersji z doubleaby intstać się naprawdę powoli. Jeśli twoja architektura nie obsługuje SSE3, konwersji tych można również dokonać za pomocą lrint()funkcji.

Kod jest raczej brzydki - nie jestem pewien, czy spełnia to wymaganie „rozsądnej czytelności” ...

#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <math.h>

#define N 50000000
#define BINSIZE 720
#define MAXBINSIZE 880
#define BINCOUNT (N / BINSIZE)
#define SPLITS 64
#define PHI_VALS 513

double phi_vals[PHI_VALS];

int bin_index(double x)
{
    double y = (x + 8.0) * ((PHI_VALS - 1) / 16.0);
    int interval = y;
    y -= interval;
    return (1.0 - y) * phi_vals[interval] + y * phi_vals[interval + 1];
}

double bin_value(int bin)
{
    int left = 0;
    int right = PHI_VALS - 1;
    do
    {
        int centre = (left + right) / 2;
        if (bin < phi_vals[centre])
            right = centre;
        else
            left = centre;
    } while (right - left > 1);
    double frac = (bin - phi_vals[left]) / (phi_vals[right] - phi_vals[left]);
    return (left + frac) * (16.0 / (PHI_VALS - 1)) - 8.0;
}

void gaussian_sort(double *restrict a)
{
    double *b = malloc(BINCOUNT * MAXBINSIZE * sizeof(double));
    double **pos = malloc(BINCOUNT * sizeof(double*));
    for (size_t i = 0; i < BINCOUNT; ++i)
        pos[i] = b + MAXBINSIZE * i;
    for (size_t i = 0; i < N; ++i)
        *pos[bin_index(a[i])]++ = a[i];
    double left_val, right_val = bin_value(0);
    for (size_t bin = 0, i = 0; bin < BINCOUNT; ++bin)
    {
        left_val = right_val;
        right_val = bin_value(bin + 1);
        double *splits[SPLITS + 1];
        splits[0] = b + bin * MAXBINSIZE;
        splits[SPLITS] = pos[bin];
        for (int step = SPLITS; step > 1; step >>= 1)
            for (int left_split = 0; left_split < SPLITS; left_split += step)
            {
                double *left = splits[left_split];
                double *right = splits[left_split + step] - 1;
                double frac = (double)(left_split + (step >> 1)) / SPLITS;
                double pivot = (1.0 - frac) * left_val + frac * right_val;
                while (1)
                {
                    while (*left < pivot && left <= right)
                        ++left;
                    while (*right >= pivot && left < right)
                        --right;
                    if (left >= right)
                        break;
                    double tmp = *left;
                    *left = *right;
                    *right = tmp;
                    ++left;
                    --right;
                }
                splits[left_split + (step >> 1)] = left;
            }
        for (int left_split = 0; left_split < SPLITS; ++left_split)
        {
            double *left = splits[left_split];
            double *right = splits[left_split + 1] - 1;
            while (left <= right)
            {
                double *min = left;
                for (double *tmp = left + 1; tmp <= right; ++tmp)
                    if (*tmp < *min)
                        min = tmp;
                a[i++] = *min;
                *min = *right--;
            }
        }
    }
    free(b);
    free(pos);
}

int main()
{
    double *a = malloc(N * sizeof(double));
    FILE *f = fopen("gaussian.dat", "rb");
    assert(fread(a, sizeof(double), N, f) == N);
    fclose(f);
    for (int i = 0; i < PHI_VALS; ++i)
    {
        double x = (i * (16.0 / PHI_VALS) - 8.0) / sqrt(2.0);
        phi_vals[i] =  (erf(x) + 1.0) * 0.5 * BINCOUNT;
    }
    gaussian_sort(a);
    free(a);
}

4,098s! Musiałem dodać -lm, aby go skompilować (dla erf).
static_rtti

1
#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include <memory.h>
#include <algorithm>

// maps [-inf,+inf] to (0,1)
double normcdf(double x) {
        return 0.5 * (1 + erf(x * M_SQRT1_2));
}

int calcbin(double x, int bins) {
        return (int)floor(normcdf(x) * bins);
}

int *docensus(int bins, int n, double *arr) {
        int *hist = calloc(bins, sizeof(int));
        int i;
        for(i = 0; i < n; i++) {
                hist[calcbin(arr[i], bins)]++;
        }
        return hist;
}

void partition(int bins, int *orig_counts, double *arr) {
        int *counts = malloc(bins * sizeof(int));
        memcpy(counts, orig_counts, bins*sizeof(int));
        int *starts = malloc(bins * sizeof(int));
        int b, i;
        starts[0] = 0;
        for(i = 1; i < bins; i++) {
                starts[i] = starts[i-1] + counts[i-1];
        }
        for(b = 0; b < bins; b++) {
                while (counts[b] > 0) {
                        double v = arr[starts[b]];
                        int correctbin;
                        do {
                                correctbin = calcbin(v, bins);
                                int swappos = starts[correctbin];
                                double tmp = arr[swappos];
                                arr[swappos] = v;
                                v = tmp;
                                starts[correctbin]++;
                                counts[correctbin]--;
                        } while (correctbin != b);
                }
        }
        free(counts);
        free(starts);
}


void sortbins(int bins, int *counts, double *arr) {
        int start = 0;
        int b;
        for(b = 0; b < bins; b++) {
                std::sort(arr + start, arr + start + counts[b]);
                start += counts[b];
        }
}


void checksorted(double *arr, int n) {
        int i;
        for(i = 1; i < n; i++) {
                if (arr[i-1] > arr[i]) {
                        printf("out of order at %d: %lf %lf\n", i, arr[i-1], arr[i]);
                        exit(1);
                }
        }
}


int main(int argc, char *argv[]) {
        if (argc == 1 || argv[1] == NULL) {
                printf("Expected data size as argument\n");
                exit(1);
        }
        int n = atoi(argv[1]);
        const int cachesize = 128 * 1024; // a guess
        int bins = (int) (1.1 * n * sizeof(double) / cachesize);
        if (argc > 2) {
                bins = atoi(argv[2]);
        }
        printf("Using %d bins\n", bins);
        FILE *f = fopen("gaussian.dat", "rb");
        if (f == NULL) {
                printf("Couldn't open gaussian.dat\n");
                exit(1);
        }
        double *arr = malloc(n * sizeof(double));
        fread(arr, sizeof(double), n, f);
        fclose(f);

        int *counts = docensus(bins, n, arr);
        partition(bins, counts, arr);
        sortbins(bins, counts, arr);
        checksorted(arr, n);

        return 0;
}

Używa erf (), aby odpowiednio umieścić każdy element w koszu, a następnie sortuje każdy bin. Utrzymuje tablicę całkowicie na miejscu.

Pierwszy przebieg: docensus () zlicza liczbę elementów w każdym bin.

Drugie przejście: partition () zezwala na tablicę, umieszczając każdy element w odpowiednim bin

Trzecie przejście: sortbins () wykonuje qsort na każdym bin.

Jest to naiwne i wywołuje kosztowną funkcję erf () dwukrotnie dla każdej wartości. Pierwsze i trzecie przejście są potencjalnie równoległe. Drugi jest wysoce seryjny i prawdopodobnie jest spowolniony przez bardzo losowe wzorce dostępu do pamięci. Warto również buforować numer bin każdego podwójnego, w zależności od stosunku mocy procesora do prędkości pamięci.

Ten program pozwala wybrać liczbę używanych pojemników. Wystarczy dodać drugą liczbę do wiersza poleceń. Skompilowałem go z gcc -O3, ale moja maszyna jest tak słaba, że ​​nie mogę powiedzieć ci żadnych dobrych wyników.

Edycja: Poof! Mój program C magicznie przekształcił się w program C ++ przy użyciu std :: sort!


Możesz użyć phi do szybszego stdnormal_cdf.
Alexandru

Ile pojemników powinienem umieścić w przybliżeniu?
static_rtti

@Alexandru: Dodałem częściowe przybliżenie liniowe do normcdf i uzyskałem tylko około 5% prędkości.
frud

@static_rtti: Nie musisz nic umieszczać. Domyślnie kod wybiera liczbę pojemników, więc średni rozmiar pojemnika to 10/11 z 128kb. Za mało pojemników, a partycjonowanie nie przynosi korzyści. Zbyt wiele i faza podziału zawiesza się z powodu przepełnienia pamięci podręcznej.
frud

10,6s! Próbowałem grać trochę z liczbą koszy i uzyskałem najlepsze wyniki z 5000 (nieco powyżej domyślnej wartości 3356). Muszę powiedzieć, że spodziewałem się znacznie lepszej wydajności twojego rozwiązania ... Może to dlatego, że używasz qsort zamiast potencjalnie szybszego std :: rodzaj rozwiązań C ++?
static_rtti

1

Zobacz implementację sortowania radix autorstwa Michaela Herfa ( Radix Tricks ). Na mojej maszynie sortowanie było 5 razy szybsze w porównaniu z std::sortalgorytmem z mojej pierwszej odpowiedzi. Nazwa funkcji sortowania to RadixSort11.

int main(void)
{
    std::ifstream ifs("C:\\Temp\\gaussian.dat", std::ios::binary | std::ios::in);
    std::vector<float> v;
    v.reserve(50000000);
    double d;
    while (ifs.read(reinterpret_cast<char*>(&d), sizeof(double)))
        v.push_back(static_cast<float>(d));
    std::vector<float> vres(v.size(), 0.0);
    clock_t c0 = clock();
    RadixSort11(&v[0], &vres[0], v.size());
    std::cout << "Finished after: "
              << static_cast<double>(clock() - c0) / CLOCKS_PER_SEC << std::endl;
    return 0;
}
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.