Logowanie asynchroniczne - jak to zrobić?


11

W wielu usługach, nad którymi pracuję, jest dużo rejestrowania. Usługi są usługami WCF (głównie), które korzystają z klasy .NET EventLogger.

Jestem w trakcie poprawiania wydajności tych usług i pomyślałem, że asynchroniczne logowanie poprawiłoby wydajność.

Nie jestem świadomy tego, co się dzieje, gdy wiele wątków prosi o zalogowanie się, a jeśli tworzy to wąskie gardło, ale nawet jeśli tak nie jest, nadal uważam, że nie powinno to zakłócać rzeczywistego wykonywanego procesu.

Myślę, że powinienem wywołać tę samą metodę dziennika, którą teraz wywołuję, ale robię to przy użyciu nowego wątku, kontynuując rzeczywisty proces.

Kilka pytań na ten temat:

Czy to jest w porządku?

Czy są jakieś wady?

Czy należy to zrobić w inny sposób?

Może jest tak szybki, że nawet nie jest wart wysiłku?


1
Czy profilowałeś środowisko wykonawcze, aby wiedzieć, że rejestrowanie ma mierzalny wpływ na wydajność? Komputery są zbyt skomplikowane, by myśleć, że coś może być wolne, zmierzyć dwa razy i raz wyciąć to dobra rada w każdym zawodzie =)
Patrick Hughes

@PatrickHughes - niektóre statystyki z moich testów na jedno konkretne żądanie: 61 (!!) wiadomości dziennika, 150 ms przed wykonaniem jakiegoś prostego wątku, 90 ms później. więc jest o 40% szybszy.
Mithir

Odpowiedzi:


14

Osobny wątek dla operacji we / wy brzmi rozsądnie.

Na przykład nie byłoby dobrze logować, które przyciski użytkownik nacisnął w tym samym wątku interfejsu użytkownika. Taki interfejs użytkownika zawiesza się losowo i ma powolną postrzeganą wydajność .

Rozwiązaniem jest oddzielenie zdarzenia od jego przetworzenia.

Oto wiele informacji o problemach producenta i konsumenta oraz kolejce zdarzeń ze świata tworzenia gier

Często jest taki kod

///Never do this!!!
public void WriteLog_Like_Bastard(string msg)
{
    lock (_lockBecauseILoveThreadContention)
    {
        File.WriteAllText("c:\\superApp.log", msg);
    }
}

Takie podejście doprowadzi do rywalizacji wątków. Wszystkie wątki przetwarzania będą walczyć o możliwość uzyskania blokady i zapisu do tego samego pliku na raz.

Niektórzy mogą próbować usunąć blokady.

public void Log_Like_Dumbass(string msg)
{
      try 
      {  File.Append("c:\\superApp.log", msg); }
        catch (Exception ex) 
        {
            MessageBox.Show("Log file may be locked by other process...")
        }
      }    
}

Nie można przewidzieć wyniku, jeśli 2 wątki wejdą do metody w tym samym czasie.

W końcu programiści w ogóle wyłączą rejestrowanie ...

Czy można to naprawić?

Tak.

Powiedzmy, że mamy interfejs:

 public interface ILogger
 {
    void Debug(string message);
    // ... etc
    void Fatal(string message);
 }

Zamiast czekać na blokadę i wykonywać operacje blokowania plików za każdym razem, gdy ILoggerzostanie wywołane, dodamy nowy LogMessage do kolejki komunikatów Penging i wrócimy do ważniejszych rzeczy:

public class AsyncLogger : ILogger
{
    private readonly BlockingCollection<LogMessage> _pendingMessages;
    private readonly Type _loggerFor;
    private readonly IThreadAdapter _threadAdapter;

    public AsyncLogger(BlockingCollection<LogMessage> pendingMessages, Type loggerFor, IThreadAdapter threadAdapter)
    {
        _pendingMessages = pendingMessages;
        _loggerFor = loggerFor;
        _threadAdapter = threadAdapter;
    }

    public void Debug(string message)
    {
        Push(LoggingLevel.Debug, message);
    }

    public void Fatal(string message)
    {
        Push(LoggingLevel.Fatal, message);
    }

    private void Push(LoggingLevel importance, string message)
    {
        // since we do not know when our log entry will be written to disk, remember current time
        var timestamp = DateTime.Now;
        var threadId = _threadAdapter.GetCurrentThreadId();

        // adds message to the queue in lock-free manner and immediately returns control to caller
        _pendingMessages.Add(LogMessage.Create(timestamp, importance, message, _loggerFor, threadId));
    }
}

Zrobiliśmy to za pomocą tego prostego rejestratora asynchronicznego .

Następnym krokiem jest przetwarzanie wiadomości przychodzących.

Dla uproszczenia, zacznijmy nowy wątek i poczekaj wiecznie, aż aplikacja zakończy działanie, lub Asynchronous Logger doda nową wiadomość do kolejki oczekującej .

public class LoggingQueueDispatcher : IQueueDispatcher
{
    private readonly BlockingCollection<LogMessage> _pendingMessages;
    private readonly IEnumerable<ILogListener> _listeners;
    private readonly IThreadAdapter _threadAdapter;
    private readonly ILogger _logger;
    private Thread _dispatcherThread;

    public LoggingQueueDispatcher(BlockingCollection<LogMessage> pendingMessages, IEnumerable<ILogListener> listeners, IThreadAdapter threadAdapter, ILogger logger)
    {
        _pendingMessages = pendingMessages;
        _listeners = listeners;
        _threadAdapter = threadAdapter;
        _logger = logger;
    }

    public void Start()
    {
        //  Here I use 'new' operator, only to simplify example. Should be using interface  '_threadAdapter.CreateBackgroundThread' to allow unit testing
        Thread thread = new Thread(MessageLoop);
        thread.Name = "LoggingQueueDispatcher Thread";
        thread.IsBackground = true;

        thread.Start();
        _logger.Debug("Asked to start log message Dispatcher ");

        _dispatcherThread = thread;
    }

    public bool WaitForCompletion(TimeSpan timeout)
    {
        return _dispatcherThread.Join(timeout);
    }

    private void MessageLoop()
    {
        _logger.Debug("Entering dispatcher message loop...");
        var cancellationToken = new CancellationTokenSource();
        LogMessage message;

        while (_pendingMessages.TryTake(out message, Timeout.Infinite, cancellationToken.Token))
        {
            // !!!!! Now it is safe to use File.AppendAllText("c:\\my.log") without ever using lock or forcing important threads to wait.
            // this is example, do not use in production
            foreach (var listener in _listeners)
            {
                listener.Log(message);
            }
        }

    }
}

Mijam łańcuch niestandardowych słuchaczy. Prawdopodobnie możesz po prostu wysłać ramę rejestrowania połączeń ( log4netitp.)

Oto reszta kodu:

public enum LoggingLevel
{
    Debug,
    // ... etc
    Fatal,
}


public class LogMessage
{
    public DateTime Timestamp { get; private set; }
    public LoggingLevel Importance { get; private set; }
    public string Message { get; private set; }
    public Type Source { get; private set; }
    public int ThreadId { get; private set; }

    private LogMessage(DateTime timestamp, LoggingLevel importance, string message, Type source, int threadId)
    {
        Timestamp = timestamp;
        Message = message;
        Source = source;
        ThreadId = threadId;
        Importance = importance;
    }

    public static LogMessage Create(DateTime timestamp, LoggingLevel importance, string message, Type source, int threadId)
    {
        return  new LogMessage(timestamp, importance, message, source, threadId);
    }

    public override string ToString()
    {
        return string.Format("{0}  [TID:{4}] {1:h:mm:ss} ({2})\t{3}", Importance, Timestamp, Source, Message, ThreadId);
    }
}

public class LoggerFactory : ILoggerFactory
{
    private readonly BlockingCollection<LogMessage> _pendingMessages;
    private readonly IThreadAdapter _threadAdapter;

    private readonly ConcurrentDictionary<Type, ILogger> _loggersCache = new ConcurrentDictionary<Type, ILogger>();


    public LoggerFactory(BlockingCollection<LogMessage> pendingMessages, IThreadAdapter threadAdapter)
    {
        _pendingMessages = pendingMessages;
        _threadAdapter = threadAdapter;
    }

    public ILogger For(Type loggerFor)
    {
        return _loggersCache.GetOrAdd(loggerFor, new AsyncLogger(_pendingMessages, loggerFor, _threadAdapter));
    }
}

public class ThreadAdapter : IThreadAdapter
{
    public int GetCurrentThreadId()
    {
        return Thread.CurrentThread.ManagedThreadId;
    }
}

public class ConsoleLogListener : ILogListener
{
    public void Log(LogMessage message)
    {
        Console.WriteLine(message.ToString());
        Debug.WriteLine(message.ToString());
    }
}

public class SimpleTextFileLogger : ILogListener
{
    private readonly IFileSystem _fileSystem;
    private readonly string _userRoamingPath;
    private readonly string _logFileName;
    private FileStream _fileStream;

    public SimpleTextFileLogger(IFileSystem fileSystem, string userRoamingPath, string logFileName)
    {
        _fileSystem = fileSystem;
        _userRoamingPath = userRoamingPath;
        _logFileName = logFileName;
    }

    public void Start()
    {
        _fileStream = new FileStream(_fileSystem.Path.Combine(_userRoamingPath, _logFileName), FileMode.Append);
    }

    public void Stop()
    {
        if (_fileStream != null)
        {
            _fileStream.Dispose();
        }
    }

    public void Log(LogMessage message)
    {
        var bytes = Encoding.UTF8.GetBytes(message.ToString() + Environment.NewLine);
        _fileStream.Write(bytes, 0, bytes.Length);
    }
}

public interface ILoggerFactory
{
    ILogger For(Type loggerFor);
}

public interface ILogListener
{
    void Log(LogMessage message);
}

public interface IThreadAdapter
{
    int GetCurrentThreadId();
}

public interface IQueueDispatcher
{
    void Start();
}

Punkt wejścia:

public static class Program
{
    public static void Main()
    {
        Debug.WriteLine("[Program] Entering Main ...");

        var pendingLogQueue = new BlockingCollection<LogMessage>();


        var threadAdapter = new ThreadAdapter();
        var loggerFactory = new LoggerFactory(pendingLogQueue, threadAdapter);


        var fileSystem = new FileSystem();
        var userRoamingPath = GetUserDataDirectory(fileSystem);

        var simpleTextFileLogger = new SimpleTextFileLogger(fileSystem, userRoamingPath, "log.txt");
        simpleTextFileLogger.Start();
        ILogListener consoleListener = new ConsoleLogListener();
        ILogListener[] listeners = new [] { simpleTextFileLogger , consoleListener};

        var loggingQueueDispatcher = new LoggingQueueDispatcher(pendingLogQueue, listeners, threadAdapter, loggerFactory.For(typeof(LoggingQueueDispatcher)));
        loggingQueueDispatcher.Start();

        var logger = loggerFactory.For(typeof(Console));

        string line;
        while ((line = Console.ReadLine()) != "exit")
        {
            logger.Debug("you have entered: " + line);
        }

        logger.Fatal("Exiting...");

        Debug.WriteLine("[Program] pending LogQueue will be stopped now...");
        pendingLogQueue.CompleteAdding();
        var logQueueCompleted = loggingQueueDispatcher.WaitForCompletion(TimeSpan.FromSeconds(5));

        simpleTextFileLogger.Stop();
        Debug.WriteLine("[Program] Exiting... logQueueCompleted: " + logQueueCompleted);

    }



    private static string GetUserDataDirectory(FileSystem fileSystem)
    {
        var roamingDirectory = Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData);
        var userDataDirectory = fileSystem.Path.Combine(roamingDirectory, "Async Logging Sample");
        if (!fileSystem.Directory.Exists(userDataDirectory))
            fileSystem.Directory.CreateDirectory(userDataDirectory);
        return userDataDirectory;
    }
}

1

Kluczowymi czynnikami, które należy wziąć pod uwagę, jest potrzeba niezawodności plików dziennika i potrzeba wydajności. Patrz wady. Myślę, że to świetna strategia w sytuacjach wymagających dużej wydajności.

Czy to w porządku - tak

Czy są jakieś wady - tak - w zależności od krytyczności rejestrowania i implementacji może wystąpić dowolna z poniższych sytuacji - dzienniki zapisywane poza kolejnością, akcje wątku dziennika nie są zakończone przed zakończeniem akcji zdarzenia. (Wyobraź sobie scenariusz, w którym logujesz „zaczyna się łączyć z DB”, a następnie psujesz serwer, zdarzenie dziennika może nigdy nie zostać zapisane, nawet jeśli zdarzenie miało miejsce (!))

Powinno to zostać zrobione w inny sposób - możesz spojrzeć na model Disruptor, ponieważ jest on prawie idealny do tego scenariusza

Może jest tak szybki, że nawet nie jest wart wysiłku - nie zgadzam się. Jeśli twoja jest logiką „aplikacji”, a jedyne, co robisz, to zapisywanie dzienników aktywności - wtedy będziesz o rząd wielkości mniejszy opóźnienie poprzez odciążenie rejestrowania. Jeśli jednak powrócisz do wywołania SQL SQL trwającego 5 sekund przed zarejestrowaniem 1-2 instrukcji, korzyści będą mieszane.


1

Myślę, że rejestrowanie jest z natury operacją synchroniczną. Chcesz rejestrować rzeczy, jeśli się zdarzają lub nie zależą od twojej logiki, więc aby coś zalogować, należy to najpierw ocenić.

Powiedziawszy to, możesz poprawić wydajność aplikacji, buforując dzienniki, a następnie tworząc wątek i zapisując je w plikach, gdy masz operacje związane z procesorem.

Musisz sprytnie zidentyfikować punkty kontrolne, aby nie stracić ważnych informacji rejestracyjnych podczas tego okresu buforowania.

Jeśli chcesz zwiększyć wydajność wątków, musisz zrównoważyć operacje IO i operacje CPU.

Jeśli utworzysz 10 wątków, z których wszystkie wykonują operacje we / wy, nie zwiększysz wydajności.


Jak sugerujesz zapisywanie dzienników w pamięci podręcznej? w większości wiadomości dziennika znajdują się elementy specyficzne dla żądania, aby je zidentyfikować, w mojej usłudze rzadko pojawiają się dokładnie takie same żądania.
Mithir

0

Logowanie asynchroniczne to jedyny sposób, jeśli potrzebujesz wątków w wątkach rejestrowania. Aby osiągnąć maksymalną wydajność, należy zastosować wzorzec przerywacza dla komunikacji wątkowej bez blokowania i bez śmieci. Teraz, jeśli chcesz pozwolić na jednoczesne logowanie wielu wątków do tego samego pliku, musisz albo zsynchronizować wywołania dziennika i zapłacić cenę w rywalizacji o blokadę, albo użyć multipleksera bez blokady. Na przykład CoralQueue zapewnia prostą kolejkę multipleksowania, jak opisano poniżej:

wprowadź opis zdjęcia tutaj

Możesz spojrzeć na CoralLog, który wykorzystuje te strategie do rejestrowania asynchronicznego.

Oświadczenie: Jestem jednym z twórców CoralQueue i CoralLog.

Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.