[C# Actor] Akka 기초 1-5 | ActorSelection과 함께 주소로 액터 찾기

참조

소개

  • 액터들을 서로 조금씩 분리하는 방법과 액터들 간의 새로운 소통 방법에 대해서 배워봅니다.

Key Concepts / background

  • ActorSelection은 지난 수업에서 다룬 액터 계층 구조와 자연스럽게 연결됩니다.
  • 이제 우리는 액터들이 계층 구조속에서 있다는 것을 이해하게 되면서, 이런 의문을 품게 됩니다.
  • 액터들은 모두 같은 수준에 있지 않습니다. 이것이 액터들의 의사소통 방식을 변화시킬까요?
  • 액터에게 메시지를 보내고 일을 시키기 위해 handle이 필요하다는 것을 알고 있습니다.
  • 이제 우리는 모든 계층의 액터들을 가지고 있지만, 메시지를 보내고자 하는 액터에게 항상 직접적인 링크(IActorRef) 가 있는 것은 아닙니다.

ActorSelection 이란?

  • ActorSelection 은 IActorRef를 저장해 두지 않은 상태에서 액터에 메시지를 보낼 수 있도록 액터의 handle을 찾기 위해 액터 주소를 이용한 것에 불과합니다.
  • 액터가 알고 있는 IActorRef 통해 액터를 생성하거나 소멸시키는 작업을 하는 대신, ActorPath를 통해 액터의 handle을 찾아서 할 수 있습니다.
  • ActorSelection이 IActorRef를 찾는 방식이지만, 본질적으로 단일 액터를 1대1로 찾는 것은 아니라는 점을 유념해야 합니다.
  • 기술적으로, 조회할 때 표시되는 ActorSelection 객체는 특정 IActorRef를 가리키지 않습니다. 검색한 표현식과 일치하는 모든 IActorRef를 가르키는 handle 입니다. 이 표현식에서는 와일드 카드가 지원되므로 0개 이상의 액터가 선택될 수 있습니다.
  • ActorSelection에 의해 이름이 같은 첫번쨰 액터가 소멸된 후 같은 이름으로 다시 생성한 서로 다른 두 IActorRef가 매칭될 수 있습니다.

오브젝트(object) 인가요? 프로세스(process)? 둘 다?

  • ActorSelect이 프로세스와 오브젝트 둘 모두라고 생각합니다.
  • ActorPath로 액터를 찾는 프로세스와 그 과정에서 되돌아온 오브젝트는 우리가 찾던 표현과 일치하는 액터에게 메시지를 보낼 수 있게 해줍니다.

왜 ActorSelection에 대해 신경써야 하나요?

  • 일반적으로 항상 IActorRef를 대신 사용해야 합니다.
  • 그러나 ActorSelection이 작업에 적합한 도구인 몇가지 시나리오가 있으며 여기에서 더 자세히 다룹니다.

동적 행동

  • 동적 행동은 Unit2 초반에 파고드는 상급 개념이지만, 지금은 주어진 액터의 행동이 매우 유연할 수 있다는 것만 알아두시면 됩니다.
  • 이를 통해 액터는 유한 상태 기계와 같은 것을 쉽게 표현하여 작은 코드 설치 공간으로 복잡한 상황을 쉽게 처리할 수 있다.
  • ActorSelection은 어디에서 활약할까요? 당신이 매우 역동적이고 적응력이 뛰어난 시스템을 원한다면, 아마도 많은 액터가 계층 구조에서 들어오고 나가는 가운데 그 모두에게 핸들을 저장/전달 하는 것은 정말 고통스러울 것입니다.
  • ActorSelection을 사용하면 통신해야 하는 키 액터의 잘 알려진 주소로 메시지를 쉽게 보낼 수 있고, 필요한 항목에 대한 핸들을 가져오거나 전달, 저장하는 것에 대해 걱정할 필요가 없습니다.
  • 또한 ActorSelection을 수행하는데 필요한 ActorPath 조차 하드 코딩되지 않는 대신에 액터에 전달되는 메시지로 대표 될 수 있는 극도로 동적인 액터를 빌드할 수 있습니다.

주의 : ActorSelection을 전달하지 마십시오.

  • IActorRef 처럼 ActorSelection을 매개 변수로 전달하지 않는 것을 권합니다.
  • ActorSelection이 절대적이 아니라 상대적 일 수 있기 때문인데, 이 경우 계층 구조에서 다른 위치를 가진 액터에 전달 될 때 의도한 효과를 내지 못할 것입니다.

ActorSelection은 어떻게 만드나요?

  • ActorSelection을 만드는 것은 매우 간단합니다.
var selection = Context.ActorSelection("/path/to/actorName");

NOTE : 액터 경로에는 액터 클래스명이 아닌 액터를 인스턴스화 할때 액터에 할당한 이름을 사용합니다. 액터를 만들 때 이름을 지정하지 않으면 시스템에서 고유한 이름을 자동으로 생성합니다.

class FooActor : UntypedActor {}
Props props = Props.Create<FooActor>();

IActorRef myFooActor = MyActorSystem.ActorOf(props, "barBazActor");

IActorRef myFooActor = MyActorSystem.ActorOf(props);

ActorSelection과 IActorRef에 다르게 메시지를 보내나요?

  • ActorSelection도 IActorRef와 똑같이 메시지를 보낼때 Tell()을 사용합니다.
var selection = Context.ActorSelection("/path/to/actorName");
selection.Tell(message);

실습

  • ConsoleReaderActor.cs
using System;
using Akka.Actor;

namespace Akka_Basic
{
    public class ConsoleReaderActor : UntypedActor
    {
        public const string StartCommand = "start";
        public const string ExitCommand = "exit";

        protected override void OnReceive(object message)
        {
            if(message.Equals(StartCommand))
            {
                DoPrintInstructions();
            }

            //GetAndValidateInput();

            Context.ActorSelection("akka://MyActorSystem/user/validatorActor").Tell(message);
        }

        private void DoPrintInstructions()
        {
            Console.WriteLine("Please provide that URI of a log file on disk.\n");
        }

        private void GetAndValidateInput()
        {
            var message = Console.ReadLine();

            if(!string.IsNullOrEmpty(message) &&
                string.Equals(message, ExitCommand, StringComparison.OrdinalIgnoreCase))
            {   
                //if user typed ExitCommand, shut down the entire actor
                //system (allows the process to exit)
                Context.System.Terminate();
                return;   
            }   
        }
    }
}
  • ConsoleWriterActor.cs
using System;
using Akka.Actor;

namespace Akka_Basic
{
    public class ConsoleWriterActor : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            if(message is Messages.InputError)
            {
                var msg = message as Messages.InputError;
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine(msg.Reason);
            }
            else if(message is Messages.InputSuccess)
            {
                var msg = message as Messages.InputSuccess;
                Console.ForegroundColor = ConsoleColor.Green;
                Console.WriteLine(msg.Reason);
            }
            else
            {
                Console.WriteLine(message);
            }

            Console.ResetColor();
        }
    }
}
  • FileObserver.cs
using System;
using System.IO;
using Akka.Actor;

namespace Akka_Basic
{
    public class FileObserver : IDisposable
    {
        private readonly IActorRef _tailActor;
        private readonly string _absoluteFilePath;
        private FileSystemWatcher _watcher;
        private readonly string _fileDir;
        private readonly string _fileNameOnly;

        public FileObserver(IActorRef tailActor, string absoluteFilePath)
        {
            _tailActor = tailActor;
            _absoluteFilePath = absoluteFilePath;
            _fileDir = Path.GetDirectoryName(absoluteFilePath);
            _fileNameOnly = Path.GetFileName(absoluteFilePath);
        }

        /// <summary>
        /// Begin monitoring file.
        /// </summary>
        public void Start()
        {
            // Need this for Mono 3.12.0 workaround
            // uncomment this line if you're running on Mono!
            // Environment.SetEnvironmentVariable("MONO_MANAGED_WATCHER", "enabled");

            // make watcher to observe our specific file
            _watcher = new FileSystemWatcher(_fileDir, _fileNameOnly);

            // watch our file for changes to the file name, or new messages being written to file
            _watcher.NotifyFilter = NotifyFilters.FileName | NotifyFilters.LastWrite;

            // assign callbacks for event types
            _watcher.Changed += OnFileChanged;
            _watcher.Error += OnFileError;

            // start watching
            _watcher.EnableRaisingEvents = true;

        }

        /// <summary>
        /// Stop monitoring file.
        /// </summary>
        public void Dispose()
        {
            _watcher.Dispose();
        }

        /// <summary>
        /// Callback for <see cref="FileSystemWatcher"/> file error events.
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        void OnFileError(object sender, ErrorEventArgs e)
        {
            _tailActor.Tell(new TailActor.FileError(_fileNameOnly, e.GetException().Message), ActorRefs.NoSender);
        }

        /// <summary>
        /// Callback for <see cref="FileSystemWatcher"/> file change events.
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        void OnFileChanged(object sender, FileSystemEventArgs e)
        {
            if (e.ChangeType == WatcherChangeTypes.Changed)
            {
                // here we use a special ActorRefs.NoSender
                // since this event can happen many times, this is a little microoptimization
                _tailActor.Tell(new TailActor.FileWrite(e.Name), ActorRefs.NoSender);
            }
        }
    }
}
  • FileValidatorActor.cs
using System.IO;
using Akka.Actor;

namespace Akka_Basic
{
    //Actor that validates user input and signals result to others
    public class FileValidatorActor : UntypedActor
    {
        private readonly IActorRef _consoleWriterActor;

        public FileValidatorActor(IActorRef consoleWriterActor)
        {
            _consoleWriterActor = consoleWriterActor;
        }

        protected override void OnReceive(object message)
        {
            var msg = message as string;

            if(string.IsNullOrEmpty(msg))
            {
                //signal that the user needs to supply an input
                _consoleWriterActor.Tell(new Messages.NullInputError("Input as blank. Please type again.\n"));

                //tell sender to continue doing its thing
                Sender.Tell(new Messages.ContinueProcessing());
            }
            else
            {
                var valid = IsFileUri(msg);

                if(valid)
                {
                    //signal successful input
                    _consoleWriterActor.Tell(new Messages.InputSuccess($"starting processing for {msg}"));

                    Context.ActorSelection("akka://MyActorSystem/user/tailCoordinatorActor").Tell(new TailCoordinatorActor.StartTail(msg, _consoleWriterActor));
                }
                else
                {
                    //singal that input was bad
                    _consoleWriterActor.Tell(new Messages.ValidationError($"{msg} is not an existing URI on disk."));

                    //tell sender to continue doing its thing
                    Sender.Tell(new Messages.ContinueProcessing());
                }
            }
        }

        private static bool IsFileUri(string path)
        {
            return File.Exists(path);
        }
    }
}
  • Messages.cs
namespace Akka_Basic
{
    public class Messages
    {
        #region Neutral/system messages
        public class ContinueProcessing { }
        #endregion

        #region Success messages
        public class InputSuccess
        {
            public string Reason { get; private set; }
            public InputSuccess(string reason)
            {
                Reason = reason;
            }
        }
        #endregion

        #region Error messages
        public class InputError
        {
            public string Reason { get; private set; }
            public InputError(string reason)
            {
                Reason = reason;
            }
        }

        public class NullInputError : InputError
        {
            public NullInputError(string reason) : base(reason) { }
        }

        public class ValidationError : InputError
        {
            public ValidationError(string reason) : base(reason) { }
        }
        #endregion
    }
}
  • Program.cs
using Akka.Actor;

namespace Akka_Basic
{
    class Program
    {
        static void Main(string[] args)
        {
            //initialize MyActorSystem
            ActorSystem MyActorSystem = ActorSystem.Create("MyActorSystem");

            //Create Actor
            IActorRef consoleWriterActor = MyActorSystem.ActorOf(Props.Create(() => new ConsoleWriterActor()), "consoleWriterActor");
            IActorRef consoleReaderActor = MyActorSystem.ActorOf(Props.Create(() => new ConsoleReaderActor()), "consoleReaderActor");
            IActorRef tailCoordinatorActor = MyActorSystem.ActorOf(Props.Create(() => new TailCoordinatorActor()), "tailCoordinatorActor");
            IActorRef fileValidatorActor = MyActorSystem.ActorOf(Props.Create(() => new FileValidatorActor(consoleWriterActor)), "validatorActor");

            fileValidatorActor.Tell(@"C:\Users\bh.cho\Desktop\test.txt");

            //blocks the main thread from exiting until the actor system is shut down
            MyActorSystem.WhenTerminated.Wait();
        }
    }
}
  • TailActor.cs
using System.IO;
using Akka.Actor;
using System.Text;

namespace Akka_Basic
{
    public class TailActor : UntypedActor
    {
        public class FileWrite
        {
            public string FileName { get; private set; }
            public FileWrite(string fileName)
            {
                FileName = fileName;
            }
        }

        public class FileError
        {
            public string FileName { get; private set; }
            public string Reason { get; private set; }
            public FileError(string fileName, string reason)
            {
                FileName = fileName;
                Reason = reason;
            }
        }

        public class InitialRead
        {
            public string FileName { get; private set; }
            public string Text { get; private set; }
            public InitialRead(string fileName, string text)
            {
                FileName = fileName;
                Text = text;
            }
        }

        private readonly string _filePath;
        private readonly IActorRef _reporterActor;
        private readonly FileObserver _observer;
        private readonly Stream _fileStream;
        private readonly StreamReader _fileStreamReader;

        public TailActor(IActorRef reporterActor, string filePath)
        {
            _reporterActor = reporterActor;
            _filePath = filePath;

            //start watching file fot changes
            _observer = new FileObserver(Self, Path.GetFullPath(_filePath));
            _observer.Start();

            //open the file stream with shared read/write permissions
             _fileStream = new FileStream(
                Path.GetFullPath(_filePath), 
                FileMode.Open, 
                FileAccess.Read, 
                FileShare.ReadWrite
            );
            _fileStreamReader = new StreamReader(_fileStream, Encoding.UTF8);

            //read the initial contents of the file and send it to console as first message
            var text = _fileStreamReader.ReadToEnd();
            Self.Tell(new InitialRead(_filePath, text));
        }

        protected override void OnReceive(object message)
        {
            if (message is FileWrite)
            {
                // move file cursor forward
                // pull results from cursor to end of file and write to output
                // (this is assuming a log file type format that is append-only)
                var text = _fileStreamReader.ReadToEnd();
                if(!string.IsNullOrEmpty(text))
                {
                    _reporterActor.Tell(text);
                }
            }
            else if (message is FileError)
            {
                var fe = message as FileError;
                _reporterActor.Tell(string.Format("Tail error: {0}", fe.Reason));
            }
            else if (message is InitialRead)
            {
                var ir = message as InitialRead;
                _reporterActor.Tell(ir.Text);
            }
        }
    }
}
  • TailCoordinatorActor.cs
using System;
using Akka.Actor;

namespace Akka_Basic
{
    public class TailCoordinatorActor : UntypedActor
    {
        #region Message Type

        public class StartTail
        {
            public string FilePath { get; private set; }
            public IActorRef ReporterActor { get; private set; }
            public StartTail(string filePath, IActorRef reporterActor)
            {
                FilePath = filePath;
                ReporterActor = reporterActor;
            }
        }

        public class StopTail
        {
            public string FilePath { get; private set; }
            public StopTail(string filePath)
            {
                FilePath = filePath;
            }
        }

        #endregion

        protected override void OnReceive(object message)
        {
            if(message is StartTail)
            {
                var msg = message as StartTail;

                Context.ActorOf(Props.Create(() => new TailActor(msg.ReporterActor, msg.FilePath)));
            }
        }

        protected override SupervisorStrategy SupervisorStrategy()
        {
            return new OneForOneStrategy(
                10, //maxNumberOfretries
                TimeSpan.FromSeconds(30), //withinTimeRange
                x => //localOnlyDecider
                {
                    if(x is ArithmeticException) return Directive.Resume;

                    else if(x is NotSupportedException) return Directive.Stop;

                    else return Directive.Restart;
                }
            );
        }
    }
}
728x90

이 글을 공유하기

댓글

Designed by JB FACTORY