[C# Actor] Akka 기초 1-4 | 자식 액터, 액터 계층 구조, 그리고 감시(Supervision)

참조

소개

  • 코드베이스의 기능과 액터 모델이 어떻게 작동하는지 이해하는데 있어 큰 도움이 됩니다.

Key Concepts / background

  • 액터 계층 구조에 대해 깊게 들어가기 전에 알아봅시다. 왜 우리는 계층 구조가 필요할까요?
  • 계층 구조를 사용하는 두 가지 중요한 키 포인트가 있습니다.
    • 작업을 원자화하고 대용량 데이터를 처리하기 쉬운 양으로 변환하기 위해
    • 에러를 억제하고 시스템을 회복력 있게 만들기 위해

계층 구조의 원자화 작업

  • 계층 구조를 가지는 것은 아주 작은 조각으로 작업을 쪼개고, 서로 다른 계층의 레벨에서 다른 전문 기술을 활용할 수 있게 합니다.
  • 액터 시스템에서 이것이 실현되는 일반적인 방법은 큰 데이터 스트림을 원자화 하는 것입니다. 그것이 작고 쉽게 처리할 만한 코드 조각이 될 때까지 계속 원자화를 반복합니다.
  • 트위터를 예로 들어봅시다. Akka를 사용하면 그들의 대용량 데이터 수집을 작고, 처리하기 쉬운 정보의 흐름으로 분해할 수 있습니다.
  • 트위터의 경우에는 거대한 소방호스를 통해 뿜어져 나오는 것 같은 트윗들을 각 유저들의 타임라인에 작은 물줄기 하나하나로 나누어 분산할 수 있고, Akka를 사용하여 해당 유저의 스트림으로 도착한 메시지를 websocket 등을 통해 푸시할 수도 있습니다.

계층 구조를 통해 복원력 있는 시스템 지원

  • 계층 구조를 사용함으로써 위험의 계층와와 전문화가 가능합니다.
  • 상위 계층의 액터는 본질적으로 좀 더 감독적이며, 이러한 점은 액터시스템의 리스크를 낮추고 가장 자리로 밀어냅니다. 계층의 가장자리로 위험한 작업을 밀어냄으로써 시스템은 리스크를 고립시키고 전체 시스템의 크레싱 없이 오류로부터 복구할 수 있습니다.

감시(Supervision)란? 왜 신경써야 하나?

  • 감시(Supervision) 는 액터 시스템이 오류(failures) 에서 빠르고 신속하게 격리와 회복이 이뤄질 수 있게 하는 기본 개념입니다.
  • 모든 액터는 자신을 감시하는 다른 액터를 가지고 있으며, 이 액터들은 에러가 발생했을 때 복구할 수 있도록 돕습니다. 이는 계층 구조의 최상단부터 최하단까지 모두에게 해당합니다.
  • 이 감시(Supervision)는 애플리케이션이 예기치 않은 오류가 발생할 경우, 액터 계층 구조에서 해당 계층에만 영향을 받도록 제어합니다.
  • 모든 다른 액터들은 아무일도 없었던 것처럼 동작할것입니다. 우리는 이것을 "오류 고립", "봉쇄" 라고 부릅니다.

액터의 계층 구조

  • 첫 번째 키 포인트 : 모든 액터는 부모 액터를 가지고 있습니다. 그리고 일부 액터들은 자식을 가지고 있습니다.
  • 부모 액터는 자식을 감독(supervise) 합니다.
  • 부모 액터가 자식 액터를 감독한다는 것은 모든 액터가 감독자를 가진다는 것이고, 모든 액터가 감독자가 될 수 있음을 말합니다.
  • 액터시스템 내에서 액터는 계층 구조로 정리됩니다. ActorSystem 자체에 직접 보고하는 최상위 액터와 다른 액터에게 보고하는 자식 액터가 있음을 의미합니다.
  • 전반적인 계층 구조는 다음과 같습니다.

계층 구조의 라벨이란?

  • Guardians 는 전체 시스템의 root 액터입니다.
  • 계층 구조에서 최상단에 있는 이 3개의 액터를 말합니다.

  • / 액터는 액터 시스템 전체의 root 입니다. The Root Guardian 이라고 불립니다. 이 액터는 다른 가디언(/system과 /user 액터) 을 감독합니다.

  • 이 하나를 제외한 모든 액터는 그들의 부모 액터를 필요로 합니다. 이 액터는 일반 액터 시스템의 바깥에 있기 때문에 때때로 bubble-walker 라고 불립니다.

  • /system 액터는 "시스템 가디언" 이라고 불립니다. 이 액터의 주 역할은 정상적인 방법으로 시스템이 종료되고, 프레임워크 레벨의 기능 및 유틸리티로 구현된 다른 시스템 액터를 유지하고 감독합니다.

  • 시스템 가디언과 시스템 액터 계층 구조에 대해서는 다음에 포스팅 하겠습니다.

  • /user 액터는 "가디언 액터" 로 불립니다. 사용자 관점에서 볼 때 /user 액터는 시스템의 root 이기 때문에 root actor 라고 부릅니다.

  • 사용자는 가디언에 대해 걱정할 필요가 없습니다.

  • 어떠한 예외도 가디언까지 거품을 일으켜 전체 시스템에 크래시가 발생하지 않도록 /user 아래에서 적절하게 감독합니다.

/user 액터의 계층 구조

  • 액터 계층 구조의 주요 포인트입니다.
  • 애플리케이션 내에서 당신이 정의한 모든 액터들이 속합니다.

  • /user 액터 바로 아래의 자식 액터를 최상위 액터 라고 부릅니다.
  • 액터는 항상 다른 액터의 자식 액터로 만들어집니다.
  • 액터시스템 자체의 컨텍스트를 활용해 직접 액터를 만들면, 새 액터는 최상위 액터가 됩니다.
//다이어그램에 표현된 최상위 액터를 만듭니다.
IActorRef a1 = MyActorSystem.ActorOf(Props.Create<BasicActor>(), "a1");
IActorRef a2 = MyActorSystem.ActorOf(props.Create<BasicActor>(), "a2");
  • a2 컨텍스트 안쪽에서 a2의 자식 액터들을 만들어 봅니다.
//a2의 자식 액터들을 만듭니다.
//a2의 내부에서 진행합니다.
IActorRef b1 = Context.ActorOf(Props.Create<BasicActor>(), "b1");
IActorRef b2 = Context.ActorOf(Props.Create<BasicActor>(), "b2");

액터 주소(Actor Path) == 계층 구조 내에서 액터의 위치

  • 모든 액터는 주소를 가집니다.
  • 액터에서 다른 액터로 메시지를 보내려면, 주소를 알아야 합니다.
  • 완전한 액터 주소를 다음과 같습니다.

Path는 액터가 당신의 액터 계층 구조에서 어디에 위치하는지 알려줍니다. 슬래시('/')로 액터의 계층을 구분합니다.

  • 만일 locathost에서 실행했다면, b2 액터의 완전한 주소는 akka.tcp://MyActorSystem@localhost:9001/user/a2/b2 입니다.
  • 어떤 액터든 당신의 액터 계층 구조 안에서 어디에나 위치할 수 있습니다.

액터 계층 구조에서 감시가 동작하는 방법

  • 액터는 자식 액터를 감독합니다. 하지만, 그들은 액터 계층 구조에서 바로 아래 단계만 감독합니다.

    액터는 계층 구조상 바로 아래 단계의 자식만 감독합니다.

언제 감시가 시작되나요? 에러가 발생하면!

  • 자식 액터가 unhandled exception 이나 크래시가 발생할 때, 부모 액터에게 도움을 청하고 무엇을 해야 하는지 알려줍니다.
  • 구체적으로, 자식 액터는 Failure 클래스의 메시지를 부모 액터에게 보냅니다. 무엇을 어떻게 해야할지 결정하는 것은 부모 액터에게 달려있습니다.

부모 액터는 어떻게 오류를 해결할 수 있을까?

  1. 자식 액터의 실패요인(자식 액터가 보낸 Failure 메시지에 어떤 타입의 Exception이 포함되었는가)
  2. 자식 액터의 Failure 에 대한 부모 액터가 실행하는 명령. 이는 부모 액터의 감시 전략에 의해 결정됩니다.

오류가 발생했을 때 이벤트의 순서

  1. 자식 액터에서 Unhandled exception이 발생하면, 부모 액터에 의해 관리됩니다.
  2. c1은 작업을 중지합니다.
  3. 시스템은 Failure 메시지에 Exception을 담아 c1에서 b1으로 전달합니다.
  4. b1은 c1에게 어떻게 해야하는지 지시합니다.

감시 지침

  • 자식 액터에게서 에러를 받으면, 부모 액터는 다음 동작 중 하나를 수행할 수 있습니다.
  • 감시전략은 예외 타입에 따라 침을 매핑하므로, 여러 에러의 유형에 따라 적절하게 처리할 수 있습니다.

감시 지침의 종류

  • Restart the child : 가장 일반적인 경우이며 기본값입니다.
  • Stop the child : 자식 액터를 영구히 종료합니다.
  • Escalate the error : 이건 부모 액터가 "뭘 해야 할지 모르겠어! 다 멈추고 내 부모 액터에게 물어봐!" 라고 말하는 겁니다.
  • Resume processing : 일반적으로는 사용하지 않습니다. 일단 무시합니다.

여기서 알아야 할 중요한 것은 부모 액터에게 어떤 조치가 취해지든 자식 액터에게 전파된다는 것입니다. 부모 액터가 중단되면 모든 자식 액터가 중단됩니다. 다시 시작하면 모든 자식 액터가 다시 시작됩니다.

감시전략

  • 두 가지 기본 감시 전략이 있습니다.
    • One-For-One Strategy
    • All-For-One-Strategy
  • 두 가지의 기본적인 차이점은 에러 해결 지시 효과가 얼마나 널리 퍼지는가 입니다.
  • One-For-One 부모 액터의 지시가 실패한 자식 액터에게만 적용됩니다. 실패한 자식 액터의 형제 액터에게 영향을 끼치지 않습니다. 이것은 별도로 지정하지 않는 이상 기본각으로 동작합니다.
  • All-For-One 부모 액터의 지시가 실패한 자식 액터와 그 형제 액터에게 적용됩니다.
  • 감시전략에서 또 다른 중요한 선택은, 자식 액터가 얼마의 시간 동안에 몇 번의 실패를 허용하는가입니다.
  • 다음은 감시 전략의 예시입니다.
public class MyActor : UntypedActor
{
    protected override SupervisiorStrategt SupervisorStrategy()
    {
        return new OnForOneStrategy(
            maxNrOfretries: 10,
            withinTimeRange: TimeSpan.FromSeconds(30),
            localOnlyDecider : x =>
            {
                // Maybe ArithmeticException is not application critical
                // so we just ignore the error and keep going.
                if (x is ArithmeticException) return Directive.resume;

                // Error that we have no idea what to do with
                else if (x is InsanelyBadException) return Directive.Escalate;

                // Error that we can't recover from, stop the failing child
                else if (x is notSupportedException) return Directive.Stop;

                // otherwise restart the failing child
                else return Directive.restart;   
            }
        )
    }
}

핵심은 봉쇄(Containmnt)

  • 감시 전략과 지침의 전체적인 핵심은 시스템 내에서 오류를 포함하고 자가 치유하는 것이므로, 시스템 전체에 크래시가 발생하지 않는것입니다.
  • 잠재적으로 위험한 작업을 부모 액터에서 자식 액터에게 전달합니다. 이 작업은 위험한 작업을 수행하는 것입니다.
  • 예를 들어, 월드컵 기간동안에 치뤄지는 수많은 경기에서 점수와 선수 통계들을 관리하는 시스템을 운영한다고 생각해 봅시다.
  • 월드컵이 진행되는 동안 처리 한계에 다다를 정도의 엄청난 양의 API 요청이 발생할 겁니다. 때로는 크래시가 발생할 수도 있습니다.
  • 스코어 키퍼는 게임이 진행되는 동안 정기적으로 데이터를 업데이트해야 합니다.
  • 네트워크 호출은 위험합니다. 만약 요청이 오류를 발생시키면, 호출을 시작한 액터는 크래시가 발생합니다. 그러면 어떻게 보호행 할까요?
  • 부모 액터가 상태를 보관하고, 형편없는 네트워크 호출은 자식 액터에게 밀어 넣습니다. 그렇게 하면, 자식 액터가 크래시 되더라도 중요한 데이터를 보관하고 있는 부모 액터에게 영향을 끼치지 ㅇ낳습니다.
  • 오류를 지역화 하여 시스템 전체에 퍼지는 것을 방지합니다.
  • 액터 계층 구조에서 안정성을 이루는 방법의 예시입니다.

  • 추적중인 게임마다 하나의 클론으로 이러한 구조를 만들어 병렬로 동작할 수 있음을 기억합니다.
  • 새로운 코드를 작성할 필요가 없습니다.

실습

  • Program.cs
using Akka.Actor;

namespace Akka_Basic
{
    class Program
    {
        static void Main(string[] args)
        {
            //initialize MyActorSystem
            ActorSystem MyActorSystem = ActorSystem.Create("MyActorSystem");

            //Create Actor
            IActorRef consoleWriterActor = MyActorSystem.ActorOf(Props.Create(() => new ConsoleWriterActor()), "consoleWriterActor");
            IActorRef tailCoordinatorActor = MyActorSystem.ActorOf(Props.Create(() => new TailCoordinatorActor()), "tailCoordinatorActor");
            IActorRef fileValidatorActor = MyActorSystem.ActorOf(Props.Create(() => new FileValidatorActor(consoleWriterActor, tailCoordinatorActor)), "validatorActor");

            fileValidatorActor.Tell(@"C:\Users\bh.cho\Desktop\test.txt");

            //blocks the main thread from exiting until the actor system is shut down
            MyActorSystem.WhenTerminated.Wait();
        }
    }
}
  • FileObserver.cs
using System;
using System.IO;
using Akka.Actor;

namespace Akka_Basic
{
    public class FileObserver : IDisposable
    {
        private readonly IActorRef _tailActor;
        private readonly string _absoluteFilePath;
        private FileSystemWatcher _watcher;
        private readonly string _fileDir;
        private readonly string _fileNameOnly;

        public FileObserver(IActorRef tailActor, string absoluteFilePath)
        {
            _tailActor = tailActor;
            _absoluteFilePath = absoluteFilePath;
            _fileDir = Path.GetDirectoryName(absoluteFilePath);
            _fileNameOnly = Path.GetFileName(absoluteFilePath);
        }

        /// <summary>
        /// Begin monitoring file.
        /// </summary>
        public void Start()
        {
            // Need this for Mono 3.12.0 workaround
            // uncomment this line if you're running on Mono!
            // Environment.SetEnvironmentVariable("MONO_MANAGED_WATCHER", "enabled");

            // make watcher to observe our specific file
            _watcher = new FileSystemWatcher(_fileDir, _fileNameOnly);

            // watch our file for changes to the file name, or new messages being written to file
            _watcher.NotifyFilter = NotifyFilters.FileName | NotifyFilters.LastWrite;

            // assign callbacks for event types
            _watcher.Changed += OnFileChanged;
            _watcher.Error += OnFileError;

            // start watching
            _watcher.EnableRaisingEvents = true;

        }

        /// <summary>
        /// Stop monitoring file.
        /// </summary>
        public void Dispose()
        {
            _watcher.Dispose();
        }

        /// <summary>
        /// Callback for <see cref="FileSystemWatcher"/> file error events.
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        void OnFileError(object sender, ErrorEventArgs e)
        {
            _tailActor.Tell(new TailActor.FileError(_fileNameOnly, e.GetException().Message), ActorRefs.NoSender);
        }

        /// <summary>
        /// Callback for <see cref="FileSystemWatcher"/> file change events.
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        void OnFileChanged(object sender, FileSystemEventArgs e)
        {
            if (e.ChangeType == WatcherChangeTypes.Changed)
            {
                // here we use a special ActorRefs.NoSender
                // since this event can happen many times, this is a little microoptimization
                _tailActor.Tell(new TailActor.FileWrite(e.Name), ActorRefs.NoSender);
            }
        }
    }
}
  • FileValidatorActor.cs
using System.IO;
using Akka.Actor;

namespace Akka_Basic
{
    //Actor that validates user input and signals result to others
    public class FileValidatorActor : UntypedActor
    {
        private readonly IActorRef _consoleWriterActor;
        private readonly IActorRef _tailCoordinatorActor;

        public FileValidatorActor(IActorRef consoleWriterActor, IActorRef tailCoordinatorActor)
        {
            _consoleWriterActor = consoleWriterActor;
            _tailCoordinatorActor = tailCoordinatorActor;
        }

        protected override void OnReceive(object message)
        {
            var msg = message as string;

            if(string.IsNullOrEmpty(msg))
            {
                //signal that the user needs to supply an input
                _consoleWriterActor.Tell(new Messages.NullInputError("Input as blank. Please type again.\n"));

                //tell sender to continue doing its thing
                Sender.Tell(new Messages.ContinueProcessing());
            }
            else
            {
                var valid = IsFileUri(msg);

                if(valid)
                {
                    //signal successful input
                    _consoleWriterActor.Tell(new Messages.InputSuccess($"starting processing for {msg}"));

                    //start coordinator
                    _tailCoordinatorActor.Tell(new TailCoordinatorActor.StartTail(msg, _consoleWriterActor));
                }
                else
                {
                    //singal that input was bad
                    _consoleWriterActor.Tell(new Messages.ValidationError($"{msg} is not an existing URI on disk."));

                    //tell sender to continue doing its thing
                    Sender.Tell(new Messages.ContinueProcessing());
                }
            }
        }

        private static bool IsFileUri(string path)
        {
            return File.Exists(path);
        }
    }
}
  • TailActor.cs
using System.IO;
using Akka.Actor;
using System.Text;

namespace Akka_Basic
{
    public class TailActor : UntypedActor
    {
        public class FileWrite
        {
            public string FileName { get; private set; }
            public FileWrite(string fileName)
            {
                FileName = fileName;
            }
        }

        public class FileError
        {
            public string FileName { get; private set; }
            public string Reason { get; private set; }
            public FileError(string fileName, string reason)
            {
                FileName = fileName;
                Reason = reason;
            }
        }

        public class InitialRead
        {
            public string FileName { get; private set; }
            public string Text { get; private set; }
            public InitialRead(string fileName, string text)
            {
                FileName = fileName;
                Text = text;
            }
        }

        private readonly string _filePath;
        private readonly IActorRef _reporterActor;
        private readonly FileObserver _observer;
        private readonly Stream _fileStream;
        private readonly StreamReader _fileStreamReader;

        public TailActor(IActorRef reporterActor, string filePath)
        {
            _reporterActor = reporterActor;
            _filePath = filePath;

            //start watching file fot changes
            _observer = new FileObserver(Self, Path.GetFullPath(_filePath));
            _observer.Start();

            //open the file stream with shared read/write permissions
             _fileStream = new FileStream(
                Path.GetFullPath(_filePath), 
                FileMode.Open, 
                FileAccess.Read, 
                FileShare.ReadWrite
            );
            _fileStreamReader = new StreamReader(_fileStream, Encoding.UTF8);

            //read the initial contents of the file and send it to console as first message
            var text = _fileStreamReader.ReadToEnd();
            Self.Tell(new InitialRead(_filePath, text));
        }

        protected override void OnReceive(object message)
        {
            if (message is FileWrite)
            {
                // move file cursor forward
                // pull results from cursor to end of file and write to output
                // (this is assuming a log file type format that is append-only)
                var text = _fileStreamReader.ReadToEnd();
                if(!string.IsNullOrEmpty(text))
                {
                    _reporterActor.Tell(text);
                }
            }
            else if (message is FileError)
            {
                var fe = message as FileError;
                _reporterActor.Tell(string.Format("Tail error: {0}", fe.Reason));
            }
            else if (message is InitialRead)
            {
                var ir = message as InitialRead;
                _reporterActor.Tell(ir.Text);
            }
        }
    }
}
  • TailCoordinatorActor.cs
using System;
using Akka.Actor;

namespace Akka_Basic
{
    public class TailCoordinatorActor : UntypedActor
    {
        #region Message Type

        public class StartTail
        {
            public string FilePath { get; private set; }
            public IActorRef ReporterActor { get; private set; }
            public StartTail(string filePath, IActorRef reporterActor)
            {
                FilePath = filePath;
                ReporterActor = reporterActor;
            }
        }

        public class StopTail
        {
            public string FilePath { get; private set; }
            public StopTail(string filePath)
            {
                FilePath = filePath;
            }
        }

        #endregion

        protected override void OnReceive(object message)
        {
            if(message is StartTail)
            {
                var msg = message as StartTail;

                Context.ActorOf(Props.Create(() => new TailActor(msg.ReporterActor, msg.FilePath)));
            }
        }

        protected override SupervisorStrategy SupervisorStrategy()
        {
            return new OneForOneStrategy(
                10, //maxNumberOfretries
                TimeSpan.FromSeconds(30), //withinTimeRange
                x => //localOnlyDecider
                {
                    if(x is ArithmeticException) return Directive.Resume;

                    else if(x is NotSupportedException) return Directive.Stop;

                    else return Directive.Restart;
                }
            );
        }
    }
}
  • ConsoleWriterActor.cs
using System;
using Akka.Actor;

namespace Akka_Basic
{
    public class ConsoleWriterActor : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            if(message is Messages.InputError)
            {
                var msg = message as Messages.InputError;
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine(msg.Reason);
            }
            else if(message is Messages.InputSuccess)
            {
                var msg = message as Messages.InputSuccess;
                Console.ForegroundColor = ConsoleColor.Green;
                Console.WriteLine(msg.Reason);
            }
            else
            {
                Console.WriteLine(message);
            }

            Console.ResetColor();
        }
    }
}
  • Messages.cs
namespace Akka_Basic
{
    public class Messages
    {
        #region Neutral/system messages
        public class ContinueProcessing { }
        #endregion

        #region Success messages
        public class InputSuccess
        {
            public string Reason { get; private set; }
            public InputSuccess(string reason)
            {
                Reason = reason;
            }
        }
        #endregion

        #region Error messages
        public class InputError
        {
            public string Reason { get; private set; }
            public InputError(string reason)
            {
                Reason = reason;
            }
        }

        public class NullInputError : InputError
        {
            public NullInputError(string reason) : base(reason) { }
        }

        public class ValidationError : InputError
        {
            public ValidationError(string reason) : base(reason) { }
        }
        #endregion
    }
}

실행 결과

728x90

이 글을 공유하기

댓글

Designed by JB FACTORY