[C# Akka] Akka 기초 1-6 | 액터 라이프사이클(The Actor Lifecycle)

참조

소개

  • 액터 라이프 사이클에 대해서 학습합니다.

Key Concepts / background

액터 라이프사이클(actor life cycle)이란?

  • 액터에는 잘 정의된 라이프사이클이 있습니다.
  • 액터가 생성되고 시작 후 대부분의 삶을 메시지를 받는데 보냅니다.
  • 액터가 더 이상 필요하지 않은 경우 액터를 종료하거나 중지 할 수 있습니다.

액터 라이프사이클의 단계는 무엇인가요?

  • Akka.NET의 액터 라이프 사이클의 5단계가 있습니다.
    1. Starting
    2. Receiving
    3. Stopping
    4. Terminated
    5. Restarting

Starting

  • 액터가 ActorSystem 에 의해 초기화 될 때의 초기 상태입니다.

Receiving

  • 이제 액터는 메시지를 처리할 수 있습니다.
  • MailBox는 처리를 위해 액터의 OnReceive 메서드로 메시지를 전달하기 시작합니다.

Stopping

  • 액터는 상태를 정리합니다. 이 단계에서 일어나는 일은 액터를 종료할 지, 다시 시작할지 여부에 따라 다릅니다.
  • 액터가 다시 시작될 예정이라면, 액터가 다시 시작된 후, Receiving 상태로 돌아오면 처리할 상태 또는 메시지를 이 단계 동안 저장하는 것이 일반적입니다.
  • 액터가 종료될 예정이라면, Mailbox의 모든 메시지가 ActorSystem의 DeadLetters 메일함으로 전송됩니다. DeadLetters 는 보통 액터가 소멸되었기 때문에 배달 할 수 없는 메시지의 저장소입니다.

Terminated

  • 액터가 소멸되었습니다.
  • IActorRef를 통해 전송 된 모든 메시지는 이제 DeadLetters로 이동합니다.
  • 액터는 다시 시작할 수 없지만 같은 주소를 가진 새로운 액터를 생성할 수 있습니다.

Restarting

  • 액터가 다시 시작되고 Starting 상태로 돌아갑니다.

라이프사이클(Life cycle) 후크 방법(hook methods)

  • 액터 라이프사이클에 연결할 수 있는 4개의 메서드가 있습니다.

PreStart

  • 액터가 메시지 수신을 시작하기 전에 PreStart 로직이 실행됩니다.
  • 초기화 로직을 배치하는 것이 좋습니다.
  • 다시 시작하는 동안에도 호출됩니다.

PreRestart

  • 액터가 실패하면 부모 액터가 액터를 다시 시작합니다.
  • PreRestart는 액터가 다시 시작되기 전에 정리를 수행하거나 나중에 재 처리하기 위해 현재 메시지를 저장하기 위해 연결할 수 있는 메소드입니다.

PostStop

  • PostStop은 액터가 중지되고 더 이상 메시지를 수신하지 않으면 호출됩니다.
  • 이것은 정리 로직을 포함하기에 좋은 곳입니다.
  • PostStop 은 PreRestart 중에도 호출되지만, 재시작 중에 이 동작을 피하려면 PreRestart를 재정의해서 base.PreRestart를 호출하지 않아도 됩니다.
  • DeathWatch는 액터가 구독이 종료되면 알림을 받도록 구독 한 다른 액터에게 알릴 때도 사용됩니다.
  • DeathWatch는 모든 액터가 다른 액터의 종료에 대해 알림을 받을 수 있도록 프레임 워크에 내장 된 게시 / 구독 시스템입니다.

PostRestart

  • PostRestart는 PreRestart 후 재시작 중 PreStart 이전에 호출됩니다.
  • Akka.NET이 이미 수행한 작업을 넘어서 액터 충돌을 유발한 오류에 대한 추가보고 또는 진단을 수행할 수 있는 좋은 메서드입니다.
  • 다음은 후크 메서드가 라이프 사이클 단계에 적합한 위치입니다.

라이프사이클을 어떻게 후킹하나요?

  • 후킹하려면, 다음과 같이 후킹하려는 메서드를 재정의하면 됩니다.
protected override void PreStart()
{

}

가장 일반적으로 사용되는 라이프사이클 메서드는 무엇인가요?

PreStart

  • PreStart는 가장 많이 사용되는 후크 메서드입니다.
  • 액터의 초기 상태를 설정하고 액터에 필요한 사용자 지정 초기화 로직을 실행하는데 사용됩니다.

PostStop

  • 두 번째 메서드는 사용자 지정 정리 논리를 수행하는 PostStop 입니다.
  • 예를 들어, 액터가 종료하기 전에 파일 시스템 핸들이나 시스템에서 소비하고 있는 다른 리소스를 해제하도록 할 수 있습니다.

PreRestart

  • PreRestart는 위의 메서드보다 1/3 정도 떨어지지만 가끔 사용합니다.
  • PreRestart를 사용하는 것은 액터가 하는 일에 크게 의존하지만, 한 가지 일반적인 경우는 메시지를 숨기거나 액터가 다시 시작되면 재 처리를 하기 위한 조치를 취하는 것입니다.

감시(supervision)와 관련이 있나요?

  • 액터가 실수로 충돌하는 경우(즉, 처리되지 않은 예외가 발생하는 경우) 액터의 감독자는 액터의 메일함에 남아있는 메시지를 잃지 않고 처음부터 액터의 라이프 사이클을 자동으로 다시 시작합니다.
  • 레슨4에서 액터계층/감시에 대해 언급했듯이 처리되지 않은 오류에 대한 동작은 부모 액터의 감시 지쳄에 의해 결정됩니다.
  • 부모 액터는 자식 액터에게 오류에 대해 종료, 다시 시작 또는 무시하도록 지시하고 중단 된 부분부터 다시 시작할 수 있습니다.
  • 기본값은 다시 시작하는 것이므로 잘못된 상태가 사라지고 액터가 깨끗하게 시작됩니다.
  • 재시작이 저렴한 방법입니다.

실습

  • ConsoleReaderActor.cs
using System;
using Akka.Actor;

namespace Akka_Basic
{
    public class ConsoleReaderActor : UntypedActor
    {
        public const string StartCommand = "start";
        public const string ExitCommand = "exit";

        protected override void OnReceive(object message)
        {
            if(message.Equals(StartCommand))
            {
                DoPrintInstructions();
            }

            //GetAndValidateInput();

            Context.ActorSelection("akka://MyActorSystem/user/validatorActor").Tell(message);
        }

        private void DoPrintInstructions()
        {
            Console.WriteLine("Please provide that URI of a log file on disk.\n");
        }

        private void GetAndValidateInput()
        {
            var message = Console.ReadLine();

            if(!string.IsNullOrEmpty(message) &&
                string.Equals(message, ExitCommand, StringComparison.OrdinalIgnoreCase))
            {   
                //if user typed ExitCommand, shut down the entire actor
                //system (allows the process to exit)
                Context.System.Terminate();
                return;   
            }   
        }
    }
}
  • ConsoleWriterActor.cs
using System;
using Akka.Actor;

namespace Akka_Basic
{
    public class ConsoleWriterActor : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            if(message is Messages.InputError)
            {
                var msg = message as Messages.InputError;
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine(msg.Reason);
            }
            else if(message is Messages.InputSuccess)
            {
                var msg = message as Messages.InputSuccess;
                Console.ForegroundColor = ConsoleColor.Green;
                Console.WriteLine(msg.Reason);
            }
            else
            {
                Console.WriteLine(message);
            }

            Console.ResetColor();
        }
    }
}
  • FileObserver.cs
using System;
using System.IO;
using Akka.Actor;

namespace Akka_Basic
{
    public class FileObserver : IDisposable
    {
        private readonly IActorRef _tailActor;
        private readonly string _absoluteFilePath;
        private FileSystemWatcher _watcher;
        private readonly string _fileDir;
        private readonly string _fileNameOnly;

        public FileObserver(IActorRef tailActor, string absoluteFilePath)
        {
            _tailActor = tailActor;
            _absoluteFilePath = absoluteFilePath;
            _fileDir = Path.GetDirectoryName(absoluteFilePath);
            _fileNameOnly = Path.GetFileName(absoluteFilePath);
        }

        /// <summary>
        /// Begin monitoring file.
        /// </summary>
        public void Start()
        {
            // Need this for Mono 3.12.0 workaround
            // uncomment this line if you're running on Mono!
            // Environment.SetEnvironmentVariable("MONO_MANAGED_WATCHER", "enabled");

            // make watcher to observe our specific file
            _watcher = new FileSystemWatcher(_fileDir, _fileNameOnly);

            // watch our file for changes to the file name, or new messages being written to file
            _watcher.NotifyFilter = NotifyFilters.FileName | NotifyFilters.LastWrite;

            // assign callbacks for event types
            _watcher.Changed += OnFileChanged;
            _watcher.Error += OnFileError;

            // start watching
            _watcher.EnableRaisingEvents = true;

        }

        /// <summary>
        /// Stop monitoring file.
        /// </summary>
        public void Dispose()
        {
            _watcher.Dispose();
        }

        /// <summary>
        /// Callback for <see cref="FileSystemWatcher"/> file error events.
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        void OnFileError(object sender, ErrorEventArgs e)
        {
            _tailActor.Tell(new TailActor.FileError(_fileNameOnly, e.GetException().Message), ActorRefs.NoSender);
        }

        /// <summary>
        /// Callback for <see cref="FileSystemWatcher"/> file change events.
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        void OnFileChanged(object sender, FileSystemEventArgs e)
        {
            if (e.ChangeType == WatcherChangeTypes.Changed)
            {
                // here we use a special ActorRefs.NoSender
                // since this event can happen many times, this is a little microoptimization
                _tailActor.Tell(new TailActor.FileWrite(e.Name), ActorRefs.NoSender);
            }
        }
    }
}
  • FileValidatorActor.cs
using System.IO;
using Akka.Actor;

namespace Akka_Basic
{
    //Actor that validates user input and signals result to others
    public class FileValidatorActor : UntypedActor
    {
        private readonly IActorRef _consoleWriterActor;

        public FileValidatorActor(IActorRef consoleWriterActor)
        {
            _consoleWriterActor = consoleWriterActor;
        }

        protected override void OnReceive(object message)
        {
            var msg = message as string;

            if(string.IsNullOrEmpty(msg))
            {
                //signal that the user needs to supply an input
                _consoleWriterActor.Tell(new Messages.NullInputError("Input as blank. Please type again.\n"));

                //tell sender to continue doing its thing
                Sender.Tell(new Messages.ContinueProcessing());
            }
            else
            {
                var valid = IsFileUri(msg);

                if(valid)
                {
                    //signal successful input
                    _consoleWriterActor.Tell(new Messages.InputSuccess($"starting processing for {msg}"));

                    Context.ActorSelection("akka://MyActorSystem/user/tailCoordinatorActor").Tell(new TailCoordinatorActor.StartTail(msg, _consoleWriterActor));
                }
                else
                {
                    //singal that input was bad
                    _consoleWriterActor.Tell(new Messages.ValidationError($"{msg} is not an existing URI on disk."));

                    //tell sender to continue doing its thing
                    Sender.Tell(new Messages.ContinueProcessing());
                }
            }
        }

        private static bool IsFileUri(string path)
        {
            return File.Exists(path);
        }
    }
}
  • Messages.cs
namespace Akka_Basic
{
    public class Messages
    {
        #region Neutral/system messages
        public class ContinueProcessing { }
        #endregion

        #region Success messages
        public class InputSuccess
        {
            public string Reason { get; private set; }
            public InputSuccess(string reason)
            {
                Reason = reason;
            }
        }
        #endregion

        #region Error messages
        public class InputError
        {
            public string Reason { get; private set; }
            public InputError(string reason)
            {
                Reason = reason;
            }
        }

        public class NullInputError : InputError
        {
            public NullInputError(string reason) : base(reason) { }
        }

        public class ValidationError : InputError
        {
            public ValidationError(string reason) : base(reason) { }
        }
        #endregion
    }
}
  • Program.cs
using Akka.Actor;

namespace Akka_Basic
{
    class Program
    {
        static void Main(string[] args)
        {
            //initialize MyActorSystem
            ActorSystem MyActorSystem = ActorSystem.Create("MyActorSystem");

            //Create Actor
            IActorRef consoleWriterActor = MyActorSystem.ActorOf(Props.Create(() => new ConsoleWriterActor()), "consoleWriterActor");
            IActorRef consoleReaderActor = MyActorSystem.ActorOf(Props.Create(() => new ConsoleReaderActor()), "consoleReaderActor");
            IActorRef tailCoordinatorActor = MyActorSystem.ActorOf(Props.Create(() => new TailCoordinatorActor()), "tailCoordinatorActor");
            IActorRef fileValidatorActor = MyActorSystem.ActorOf(Props.Create(() => new FileValidatorActor(consoleWriterActor)), "validatorActor");

            fileValidatorActor.Tell(@"C:\Users\bh.cho\Desktop\test.txt");

            //blocks the main thread from exiting until the actor system is shut down
            MyActorSystem.WhenTerminated.Wait();
        }
    }
}
  • TailActor.cs
using System.IO;
using Akka.Actor;
using System.Text;

namespace Akka_Basic
{
    public class TailActor : UntypedActor
    {
        public class FileWrite
        {
            public string FileName { get; private set; }
            public FileWrite(string fileName)
            {
                FileName = fileName;
            }
        }

        public class FileError
        {
            public string FileName { get; private set; }
            public string Reason { get; private set; }
            public FileError(string fileName, string reason)
            {
                FileName = fileName;
                Reason = reason;
            }
        }

        public class InitialRead
        {
            public string FileName { get; private set; }
            public string Text { get; private set; }
            public InitialRead(string fileName, string text)
            {
                FileName = fileName;
                Text = text;
            }
        }

        private string _filePath;
        private IActorRef _reporterActor;
        private FileObserver _observer;
        private Stream _fileStream;
        private StreamReader _fileStreamReader;

        public TailActor(IActorRef reporterActor, string filePath)
        {
            _reporterActor = reporterActor;
            _filePath = filePath;
        }

        protected override void PreStart()
        {
            //start watching file for changes
            _observer = new FileObserver(Self, Path.GetFullPath(_filePath));
            _observer.Start();

            //open the file stream with shared read/write permissions
            _fileStream = new FileStream(Path.GetFullPath(_filePath), FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
            _fileStreamReader = new StreamReader(_fileStream, Encoding.UTF8);

            //read the initial contents of the file and send it to console as first message
            var text = _fileStreamReader.ReadToEnd();
            Self.Tell(new InitialRead(_filePath, text));
        }

        protected override void PostStop()
        {
            _observer.Dispose();
            _observer = null;
            _fileStreamReader.Close();
            _fileStreamReader.Dispose();
            base.PostStop();
        }

        protected override void OnReceive(object message)
        {
            if (message is FileWrite)
            {
                // move file cursor forward
                // pull results from cursor to end of file and write to output
                // (this is assuming a log file type format that is append-only)
                var text = _fileStreamReader.ReadToEnd();
                if(!string.IsNullOrEmpty(text))
                {
                    _reporterActor.Tell(text);
                }
            }
            else if (message is FileError)
            {
                var fe = message as FileError;
                _reporterActor.Tell(string.Format("Tail error: {0}", fe.Reason));
            }
            else if (message is InitialRead)
            {
                var ir = message as InitialRead;
                _reporterActor.Tell(ir.Text);
            }
        }
    }
}
  • TailCoordinatorActor.cs
using System;
using Akka.Actor;

namespace Akka_Basic
{
    public class TailCoordinatorActor : UntypedActor
    {
        #region Message Type

        public class StartTail
        {
            public string FilePath { get; private set; }
            public IActorRef ReporterActor { get; private set; }
            public StartTail(string filePath, IActorRef reporterActor)
            {
                FilePath = filePath;
                ReporterActor = reporterActor;
            }
        }

        public class StopTail
        {
            public string FilePath { get; private set; }
            public StopTail(string filePath)
            {
                FilePath = filePath;
            }
        }

        #endregion

        protected override void OnReceive(object message)
        {
            if(message is StartTail)
            {
                var msg = message as StartTail;

                Context.ActorOf(Props.Create(() => new TailActor(msg.ReporterActor, msg.FilePath)));
            }
        }

        protected override SupervisorStrategy SupervisorStrategy()
        {
            return new OneForOneStrategy(
                10, //maxNumberOfretries
                TimeSpan.FromSeconds(30), //withinTimeRange
                x => //localOnlyDecider
                {
                    if(x is ArithmeticException) return Directive.Resume;

                    else if(x is NotSupportedException) return Directive.Stop;

                    else return Directive.Restart;
                }
            );
        }
    }
}
728x90

이 글을 공유하기

댓글(0)

Designed by JB FACTORY