[C# Actor] Akka 기초 1-5 | ActorSelection과 함께 주소로 액터 찾기
- C#/Akka(Actor)
- 2021. 6. 16. 00:00
참조
소개
- 액터들을 서로 조금씩 분리하는 방법과 액터들 간의 새로운 소통 방법에 대해서 배워봅니다.
Key Concepts / background
- ActorSelection은 지난 수업에서 다룬 액터 계층 구조와 자연스럽게 연결됩니다.
- 이제 우리는 액터들이 계층 구조속에서 있다는 것을 이해하게 되면서, 이런 의문을 품게 됩니다.
- 액터들은 모두 같은 수준에 있지 않습니다. 이것이 액터들의 의사소통 방식을 변화시킬까요?
- 액터에게 메시지를 보내고 일을 시키기 위해 handle이 필요하다는 것을 알고 있습니다.
- 이제 우리는 모든 계층의 액터들을 가지고 있지만, 메시지를 보내고자 하는 액터에게 항상 직접적인 링크(IActorRef) 가 있는 것은 아닙니다.
ActorSelection 이란?
- ActorSelection 은 IActorRef를 저장해 두지 않은 상태에서 액터에 메시지를 보낼 수 있도록 액터의 handle을 찾기 위해 액터 주소를 이용한 것에 불과합니다.
- 액터가 알고 있는 IActorRef 통해 액터를 생성하거나 소멸시키는 작업을 하는 대신, ActorPath를 통해 액터의 handle을 찾아서 할 수 있습니다.
- ActorSelection이 IActorRef를 찾는 방식이지만, 본질적으로 단일 액터를 1대1로 찾는 것은 아니라는 점을 유념해야 합니다.
- 기술적으로, 조회할 때 표시되는 ActorSelection 객체는 특정 IActorRef를 가리키지 않습니다. 검색한 표현식과 일치하는 모든 IActorRef를 가르키는 handle 입니다. 이 표현식에서는 와일드 카드가 지원되므로 0개 이상의 액터가 선택될 수 있습니다.
- ActorSelection에 의해 이름이 같은 첫번쨰 액터가 소멸된 후 같은 이름으로 다시 생성한 서로 다른 두 IActorRef가 매칭될 수 있습니다.
오브젝트(object) 인가요? 프로세스(process)? 둘 다?
- ActorSelect이 프로세스와 오브젝트 둘 모두라고 생각합니다.
- ActorPath로 액터를 찾는 프로세스와 그 과정에서 되돌아온 오브젝트는 우리가 찾던 표현과 일치하는 액터에게 메시지를 보낼 수 있게 해줍니다.
왜 ActorSelection에 대해 신경써야 하나요?
- 일반적으로 항상 IActorRef를 대신 사용해야 합니다.
- 그러나 ActorSelection이 작업에 적합한 도구인 몇가지 시나리오가 있으며 여기에서 더 자세히 다룹니다.
동적 행동
- 동적 행동은 Unit2 초반에 파고드는 상급 개념이지만, 지금은 주어진 액터의 행동이 매우 유연할 수 있다는 것만 알아두시면 됩니다.
- 이를 통해 액터는 유한 상태 기계와 같은 것을 쉽게 표현하여 작은 코드 설치 공간으로 복잡한 상황을 쉽게 처리할 수 있다.
- ActorSelection은 어디에서 활약할까요? 당신이 매우 역동적이고 적응력이 뛰어난 시스템을 원한다면, 아마도 많은 액터가 계층 구조에서 들어오고 나가는 가운데 그 모두에게 핸들을 저장/전달 하는 것은 정말 고통스러울 것입니다.
- ActorSelection을 사용하면 통신해야 하는 키 액터의 잘 알려진 주소로 메시지를 쉽게 보낼 수 있고, 필요한 항목에 대한 핸들을 가져오거나 전달, 저장하는 것에 대해 걱정할 필요가 없습니다.
- 또한 ActorSelection을 수행하는데 필요한 ActorPath 조차 하드 코딩되지 않는 대신에 액터에 전달되는 메시지로 대표 될 수 있는 극도로 동적인 액터를 빌드할 수 있습니다.
주의 : ActorSelection을 전달하지 마십시오.
- IActorRef 처럼 ActorSelection을 매개 변수로 전달하지 않는 것을 권합니다.
- ActorSelection이 절대적이 아니라 상대적 일 수 있기 때문인데, 이 경우 계층 구조에서 다른 위치를 가진 액터에 전달 될 때 의도한 효과를 내지 못할 것입니다.
ActorSelection은 어떻게 만드나요?
- ActorSelection을 만드는 것은 매우 간단합니다.
var selection = Context.ActorSelection("/path/to/actorName");
NOTE : 액터 경로에는 액터 클래스명이 아닌 액터를 인스턴스화 할때 액터에 할당한 이름을 사용합니다. 액터를 만들 때 이름을 지정하지 않으면 시스템에서 고유한 이름을 자동으로 생성합니다.
class FooActor : UntypedActor {}
Props props = Props.Create<FooActor>();
IActorRef myFooActor = MyActorSystem.ActorOf(props, "barBazActor");
IActorRef myFooActor = MyActorSystem.ActorOf(props);
ActorSelection과 IActorRef에 다르게 메시지를 보내나요?
- ActorSelection도 IActorRef와 똑같이 메시지를 보낼때 Tell()을 사용합니다.
var selection = Context.ActorSelection("/path/to/actorName");
selection.Tell(message);
실습
- ConsoleReaderActor.cs
using System;
using Akka.Actor;
namespace Akka_Basic
{
public class ConsoleReaderActor : UntypedActor
{
public const string StartCommand = "start";
public const string ExitCommand = "exit";
protected override void OnReceive(object message)
{
if(message.Equals(StartCommand))
{
DoPrintInstructions();
}
//GetAndValidateInput();
Context.ActorSelection("akka://MyActorSystem/user/validatorActor").Tell(message);
}
private void DoPrintInstructions()
{
Console.WriteLine("Please provide that URI of a log file on disk.\n");
}
private void GetAndValidateInput()
{
var message = Console.ReadLine();
if(!string.IsNullOrEmpty(message) &&
string.Equals(message, ExitCommand, StringComparison.OrdinalIgnoreCase))
{
//if user typed ExitCommand, shut down the entire actor
//system (allows the process to exit)
Context.System.Terminate();
return;
}
}
}
}
- ConsoleWriterActor.cs
using System;
using Akka.Actor;
namespace Akka_Basic
{
public class ConsoleWriterActor : UntypedActor
{
protected override void OnReceive(object message)
{
if(message is Messages.InputError)
{
var msg = message as Messages.InputError;
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine(msg.Reason);
}
else if(message is Messages.InputSuccess)
{
var msg = message as Messages.InputSuccess;
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine(msg.Reason);
}
else
{
Console.WriteLine(message);
}
Console.ResetColor();
}
}
}
- FileObserver.cs
using System;
using System.IO;
using Akka.Actor;
namespace Akka_Basic
{
public class FileObserver : IDisposable
{
private readonly IActorRef _tailActor;
private readonly string _absoluteFilePath;
private FileSystemWatcher _watcher;
private readonly string _fileDir;
private readonly string _fileNameOnly;
public FileObserver(IActorRef tailActor, string absoluteFilePath)
{
_tailActor = tailActor;
_absoluteFilePath = absoluteFilePath;
_fileDir = Path.GetDirectoryName(absoluteFilePath);
_fileNameOnly = Path.GetFileName(absoluteFilePath);
}
/// <summary>
/// Begin monitoring file.
/// </summary>
public void Start()
{
// Need this for Mono 3.12.0 workaround
// uncomment this line if you're running on Mono!
// Environment.SetEnvironmentVariable("MONO_MANAGED_WATCHER", "enabled");
// make watcher to observe our specific file
_watcher = new FileSystemWatcher(_fileDir, _fileNameOnly);
// watch our file for changes to the file name, or new messages being written to file
_watcher.NotifyFilter = NotifyFilters.FileName | NotifyFilters.LastWrite;
// assign callbacks for event types
_watcher.Changed += OnFileChanged;
_watcher.Error += OnFileError;
// start watching
_watcher.EnableRaisingEvents = true;
}
/// <summary>
/// Stop monitoring file.
/// </summary>
public void Dispose()
{
_watcher.Dispose();
}
/// <summary>
/// Callback for <see cref="FileSystemWatcher"/> file error events.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
void OnFileError(object sender, ErrorEventArgs e)
{
_tailActor.Tell(new TailActor.FileError(_fileNameOnly, e.GetException().Message), ActorRefs.NoSender);
}
/// <summary>
/// Callback for <see cref="FileSystemWatcher"/> file change events.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
void OnFileChanged(object sender, FileSystemEventArgs e)
{
if (e.ChangeType == WatcherChangeTypes.Changed)
{
// here we use a special ActorRefs.NoSender
// since this event can happen many times, this is a little microoptimization
_tailActor.Tell(new TailActor.FileWrite(e.Name), ActorRefs.NoSender);
}
}
}
}
- FileValidatorActor.cs
using System.IO;
using Akka.Actor;
namespace Akka_Basic
{
//Actor that validates user input and signals result to others
public class FileValidatorActor : UntypedActor
{
private readonly IActorRef _consoleWriterActor;
public FileValidatorActor(IActorRef consoleWriterActor)
{
_consoleWriterActor = consoleWriterActor;
}
protected override void OnReceive(object message)
{
var msg = message as string;
if(string.IsNullOrEmpty(msg))
{
//signal that the user needs to supply an input
_consoleWriterActor.Tell(new Messages.NullInputError("Input as blank. Please type again.\n"));
//tell sender to continue doing its thing
Sender.Tell(new Messages.ContinueProcessing());
}
else
{
var valid = IsFileUri(msg);
if(valid)
{
//signal successful input
_consoleWriterActor.Tell(new Messages.InputSuccess($"starting processing for {msg}"));
Context.ActorSelection("akka://MyActorSystem/user/tailCoordinatorActor").Tell(new TailCoordinatorActor.StartTail(msg, _consoleWriterActor));
}
else
{
//singal that input was bad
_consoleWriterActor.Tell(new Messages.ValidationError($"{msg} is not an existing URI on disk."));
//tell sender to continue doing its thing
Sender.Tell(new Messages.ContinueProcessing());
}
}
}
private static bool IsFileUri(string path)
{
return File.Exists(path);
}
}
}
- Messages.cs
namespace Akka_Basic
{
public class Messages
{
#region Neutral/system messages
public class ContinueProcessing { }
#endregion
#region Success messages
public class InputSuccess
{
public string Reason { get; private set; }
public InputSuccess(string reason)
{
Reason = reason;
}
}
#endregion
#region Error messages
public class InputError
{
public string Reason { get; private set; }
public InputError(string reason)
{
Reason = reason;
}
}
public class NullInputError : InputError
{
public NullInputError(string reason) : base(reason) { }
}
public class ValidationError : InputError
{
public ValidationError(string reason) : base(reason) { }
}
#endregion
}
}
- Program.cs
using Akka.Actor;
namespace Akka_Basic
{
class Program
{
static void Main(string[] args)
{
//initialize MyActorSystem
ActorSystem MyActorSystem = ActorSystem.Create("MyActorSystem");
//Create Actor
IActorRef consoleWriterActor = MyActorSystem.ActorOf(Props.Create(() => new ConsoleWriterActor()), "consoleWriterActor");
IActorRef consoleReaderActor = MyActorSystem.ActorOf(Props.Create(() => new ConsoleReaderActor()), "consoleReaderActor");
IActorRef tailCoordinatorActor = MyActorSystem.ActorOf(Props.Create(() => new TailCoordinatorActor()), "tailCoordinatorActor");
IActorRef fileValidatorActor = MyActorSystem.ActorOf(Props.Create(() => new FileValidatorActor(consoleWriterActor)), "validatorActor");
fileValidatorActor.Tell(@"C:\Users\bh.cho\Desktop\test.txt");
//blocks the main thread from exiting until the actor system is shut down
MyActorSystem.WhenTerminated.Wait();
}
}
}
- TailActor.cs
using System.IO;
using Akka.Actor;
using System.Text;
namespace Akka_Basic
{
public class TailActor : UntypedActor
{
public class FileWrite
{
public string FileName { get; private set; }
public FileWrite(string fileName)
{
FileName = fileName;
}
}
public class FileError
{
public string FileName { get; private set; }
public string Reason { get; private set; }
public FileError(string fileName, string reason)
{
FileName = fileName;
Reason = reason;
}
}
public class InitialRead
{
public string FileName { get; private set; }
public string Text { get; private set; }
public InitialRead(string fileName, string text)
{
FileName = fileName;
Text = text;
}
}
private readonly string _filePath;
private readonly IActorRef _reporterActor;
private readonly FileObserver _observer;
private readonly Stream _fileStream;
private readonly StreamReader _fileStreamReader;
public TailActor(IActorRef reporterActor, string filePath)
{
_reporterActor = reporterActor;
_filePath = filePath;
//start watching file fot changes
_observer = new FileObserver(Self, Path.GetFullPath(_filePath));
_observer.Start();
//open the file stream with shared read/write permissions
_fileStream = new FileStream(
Path.GetFullPath(_filePath),
FileMode.Open,
FileAccess.Read,
FileShare.ReadWrite
);
_fileStreamReader = new StreamReader(_fileStream, Encoding.UTF8);
//read the initial contents of the file and send it to console as first message
var text = _fileStreamReader.ReadToEnd();
Self.Tell(new InitialRead(_filePath, text));
}
protected override void OnReceive(object message)
{
if (message is FileWrite)
{
// move file cursor forward
// pull results from cursor to end of file and write to output
// (this is assuming a log file type format that is append-only)
var text = _fileStreamReader.ReadToEnd();
if(!string.IsNullOrEmpty(text))
{
_reporterActor.Tell(text);
}
}
else if (message is FileError)
{
var fe = message as FileError;
_reporterActor.Tell(string.Format("Tail error: {0}", fe.Reason));
}
else if (message is InitialRead)
{
var ir = message as InitialRead;
_reporterActor.Tell(ir.Text);
}
}
}
}
- TailCoordinatorActor.cs
using System;
using Akka.Actor;
namespace Akka_Basic
{
public class TailCoordinatorActor : UntypedActor
{
#region Message Type
public class StartTail
{
public string FilePath { get; private set; }
public IActorRef ReporterActor { get; private set; }
public StartTail(string filePath, IActorRef reporterActor)
{
FilePath = filePath;
ReporterActor = reporterActor;
}
}
public class StopTail
{
public string FilePath { get; private set; }
public StopTail(string filePath)
{
FilePath = filePath;
}
}
#endregion
protected override void OnReceive(object message)
{
if(message is StartTail)
{
var msg = message as StartTail;
Context.ActorOf(Props.Create(() => new TailActor(msg.ReporterActor, msg.FilePath)));
}
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
10, //maxNumberOfretries
TimeSpan.FromSeconds(30), //withinTimeRange
x => //localOnlyDecider
{
if(x is ArithmeticException) return Directive.Resume;
else if(x is NotSupportedException) return Directive.Stop;
else return Directive.Restart;
}
);
}
}
}
728x90
'C# > Akka(Actor)' 카테고리의 다른 글
Akka.Cluster 란? (0) | 2022.01.11 |
---|---|
[C# Akka] Akka 기초 1-6 | 액터 라이프사이클(The Actor Lifecycle) (0) | 2021.06.17 |
[C# Actor] Akka 기초 1-4 | 자식 액터, 액터 계층 구조, 그리고 감시(Supervision) (0) | 2021.06.15 |
[C# Actor] Akka 기초 1-3 | Props와 IActorRef (0) | 2021.06.14 |
[C# Actor] Akka 기초 1-2 | 메시지 정의 및 핸들링 (0) | 2021.06.13 |
이 글을 공유하기