[C# Akka] Akka 기초 1-6 | 액터 라이프사이클(The Actor Lifecycle)
- C#/Akka(Actor)
- 2021. 6. 17. 00:00
참조
소개
- 액터 라이프 사이클에 대해서 학습합니다.
Key Concepts / background
액터 라이프사이클(actor life cycle)이란?
- 액터에는 잘 정의된 라이프사이클이 있습니다.
- 액터가 생성되고 시작 후 대부분의 삶을 메시지를 받는데 보냅니다.
- 액터가 더 이상 필요하지 않은 경우 액터를 종료하거나 중지 할 수 있습니다.
액터 라이프사이클의 단계는 무엇인가요?
- Akka.NET의 액터 라이프 사이클의 5단계가 있습니다.
Starting
Receiving
Stopping
Terminated
Restarting
Starting
- 액터가 ActorSystem 에 의해 초기화 될 때의 초기 상태입니다.
Receiving
- 이제 액터는 메시지를 처리할 수 있습니다.
- MailBox는 처리를 위해 액터의 OnReceive 메서드로 메시지를 전달하기 시작합니다.
Stopping
- 액터는 상태를 정리합니다. 이 단계에서 일어나는 일은 액터를 종료할 지, 다시 시작할지 여부에 따라 다릅니다.
- 액터가 다시 시작될 예정이라면, 액터가 다시 시작된 후, Receiving 상태로 돌아오면 처리할 상태 또는 메시지를 이 단계 동안 저장하는 것이 일반적입니다.
- 액터가 종료될 예정이라면, Mailbox의 모든 메시지가 ActorSystem의 DeadLetters 메일함으로 전송됩니다. DeadLetters 는 보통 액터가 소멸되었기 때문에 배달 할 수 없는 메시지의 저장소입니다.
Terminated
- 액터가 소멸되었습니다.
- IActorRef를 통해 전송 된 모든 메시지는 이제 DeadLetters로 이동합니다.
- 액터는 다시 시작할 수 없지만 같은 주소를 가진 새로운 액터를 생성할 수 있습니다.
Restarting
- 액터가 다시 시작되고 Starting 상태로 돌아갑니다.
라이프사이클(Life cycle) 후크 방법(hook methods)
- 액터 라이프사이클에 연결할 수 있는 4개의 메서드가 있습니다.
PreStart
- 액터가 메시지 수신을 시작하기 전에 PreStart 로직이 실행됩니다.
- 초기화 로직을 배치하는 것이 좋습니다.
- 다시 시작하는 동안에도 호출됩니다.
PreRestart
- 액터가 실패하면 부모 액터가 액터를 다시 시작합니다.
- PreRestart는 액터가 다시 시작되기 전에 정리를 수행하거나 나중에 재 처리하기 위해 현재 메시지를 저장하기 위해 연결할 수 있는 메소드입니다.
PostStop
- PostStop은 액터가 중지되고 더 이상 메시지를 수신하지 않으면 호출됩니다.
- 이것은 정리 로직을 포함하기에 좋은 곳입니다.
- PostStop 은 PreRestart 중에도 호출되지만, 재시작 중에 이 동작을 피하려면 PreRestart를 재정의해서 base.PreRestart를 호출하지 않아도 됩니다.
- DeathWatch는 액터가 구독이 종료되면 알림을 받도록 구독 한 다른 액터에게 알릴 때도 사용됩니다.
- DeathWatch는 모든 액터가 다른 액터의 종료에 대해 알림을 받을 수 있도록 프레임 워크에 내장 된 게시 / 구독 시스템입니다.
PostRestart
- PostRestart는 PreRestart 후 재시작 중 PreStart 이전에 호출됩니다.
- Akka.NET이 이미 수행한 작업을 넘어서 액터 충돌을 유발한 오류에 대한 추가보고 또는 진단을 수행할 수 있는 좋은 메서드입니다.
- 다음은 후크 메서드가 라이프 사이클 단계에 적합한 위치입니다.
라이프사이클을 어떻게 후킹하나요?
- 후킹하려면, 다음과 같이 후킹하려는 메서드를 재정의하면 됩니다.
protected override void PreStart()
{
}
가장 일반적으로 사용되는 라이프사이클 메서드는 무엇인가요?
PreStart
- PreStart는 가장 많이 사용되는 후크 메서드입니다.
- 액터의 초기 상태를 설정하고 액터에 필요한 사용자 지정 초기화 로직을 실행하는데 사용됩니다.
PostStop
- 두 번째 메서드는 사용자 지정 정리 논리를 수행하는 PostStop 입니다.
- 예를 들어, 액터가 종료하기 전에 파일 시스템 핸들이나 시스템에서 소비하고 있는 다른 리소스를 해제하도록 할 수 있습니다.
PreRestart
- PreRestart는 위의 메서드보다 1/3 정도 떨어지지만 가끔 사용합니다.
- PreRestart를 사용하는 것은 액터가 하는 일에 크게 의존하지만, 한 가지 일반적인 경우는 메시지를 숨기거나 액터가 다시 시작되면 재 처리를 하기 위한 조치를 취하는 것입니다.
감시(supervision)와 관련이 있나요?
- 액터가 실수로 충돌하는 경우(즉, 처리되지 않은 예외가 발생하는 경우) 액터의 감독자는 액터의 메일함에 남아있는 메시지를 잃지 않고 처음부터 액터의 라이프 사이클을 자동으로 다시 시작합니다.
- 레슨4에서 액터계층/감시에 대해 언급했듯이 처리되지 않은 오류에 대한 동작은 부모 액터의 감시 지쳄에 의해 결정됩니다.
- 부모 액터는 자식 액터에게 오류에 대해 종료, 다시 시작 또는 무시하도록 지시하고 중단 된 부분부터 다시 시작할 수 있습니다.
- 기본값은 다시 시작하는 것이므로 잘못된 상태가 사라지고 액터가 깨끗하게 시작됩니다.
- 재시작이 저렴한 방법입니다.
실습
- ConsoleReaderActor.cs
using System;
using Akka.Actor;
namespace Akka_Basic
{
public class ConsoleReaderActor : UntypedActor
{
public const string StartCommand = "start";
public const string ExitCommand = "exit";
protected override void OnReceive(object message)
{
if(message.Equals(StartCommand))
{
DoPrintInstructions();
}
//GetAndValidateInput();
Context.ActorSelection("akka://MyActorSystem/user/validatorActor").Tell(message);
}
private void DoPrintInstructions()
{
Console.WriteLine("Please provide that URI of a log file on disk.\n");
}
private void GetAndValidateInput()
{
var message = Console.ReadLine();
if(!string.IsNullOrEmpty(message) &&
string.Equals(message, ExitCommand, StringComparison.OrdinalIgnoreCase))
{
//if user typed ExitCommand, shut down the entire actor
//system (allows the process to exit)
Context.System.Terminate();
return;
}
}
}
}
- ConsoleWriterActor.cs
using System;
using Akka.Actor;
namespace Akka_Basic
{
public class ConsoleWriterActor : UntypedActor
{
protected override void OnReceive(object message)
{
if(message is Messages.InputError)
{
var msg = message as Messages.InputError;
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine(msg.Reason);
}
else if(message is Messages.InputSuccess)
{
var msg = message as Messages.InputSuccess;
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine(msg.Reason);
}
else
{
Console.WriteLine(message);
}
Console.ResetColor();
}
}
}
- FileObserver.cs
using System;
using System.IO;
using Akka.Actor;
namespace Akka_Basic
{
public class FileObserver : IDisposable
{
private readonly IActorRef _tailActor;
private readonly string _absoluteFilePath;
private FileSystemWatcher _watcher;
private readonly string _fileDir;
private readonly string _fileNameOnly;
public FileObserver(IActorRef tailActor, string absoluteFilePath)
{
_tailActor = tailActor;
_absoluteFilePath = absoluteFilePath;
_fileDir = Path.GetDirectoryName(absoluteFilePath);
_fileNameOnly = Path.GetFileName(absoluteFilePath);
}
/// <summary>
/// Begin monitoring file.
/// </summary>
public void Start()
{
// Need this for Mono 3.12.0 workaround
// uncomment this line if you're running on Mono!
// Environment.SetEnvironmentVariable("MONO_MANAGED_WATCHER", "enabled");
// make watcher to observe our specific file
_watcher = new FileSystemWatcher(_fileDir, _fileNameOnly);
// watch our file for changes to the file name, or new messages being written to file
_watcher.NotifyFilter = NotifyFilters.FileName | NotifyFilters.LastWrite;
// assign callbacks for event types
_watcher.Changed += OnFileChanged;
_watcher.Error += OnFileError;
// start watching
_watcher.EnableRaisingEvents = true;
}
/// <summary>
/// Stop monitoring file.
/// </summary>
public void Dispose()
{
_watcher.Dispose();
}
/// <summary>
/// Callback for <see cref="FileSystemWatcher"/> file error events.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
void OnFileError(object sender, ErrorEventArgs e)
{
_tailActor.Tell(new TailActor.FileError(_fileNameOnly, e.GetException().Message), ActorRefs.NoSender);
}
/// <summary>
/// Callback for <see cref="FileSystemWatcher"/> file change events.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
void OnFileChanged(object sender, FileSystemEventArgs e)
{
if (e.ChangeType == WatcherChangeTypes.Changed)
{
// here we use a special ActorRefs.NoSender
// since this event can happen many times, this is a little microoptimization
_tailActor.Tell(new TailActor.FileWrite(e.Name), ActorRefs.NoSender);
}
}
}
}
- FileValidatorActor.cs
using System.IO;
using Akka.Actor;
namespace Akka_Basic
{
//Actor that validates user input and signals result to others
public class FileValidatorActor : UntypedActor
{
private readonly IActorRef _consoleWriterActor;
public FileValidatorActor(IActorRef consoleWriterActor)
{
_consoleWriterActor = consoleWriterActor;
}
protected override void OnReceive(object message)
{
var msg = message as string;
if(string.IsNullOrEmpty(msg))
{
//signal that the user needs to supply an input
_consoleWriterActor.Tell(new Messages.NullInputError("Input as blank. Please type again.\n"));
//tell sender to continue doing its thing
Sender.Tell(new Messages.ContinueProcessing());
}
else
{
var valid = IsFileUri(msg);
if(valid)
{
//signal successful input
_consoleWriterActor.Tell(new Messages.InputSuccess($"starting processing for {msg}"));
Context.ActorSelection("akka://MyActorSystem/user/tailCoordinatorActor").Tell(new TailCoordinatorActor.StartTail(msg, _consoleWriterActor));
}
else
{
//singal that input was bad
_consoleWriterActor.Tell(new Messages.ValidationError($"{msg} is not an existing URI on disk."));
//tell sender to continue doing its thing
Sender.Tell(new Messages.ContinueProcessing());
}
}
}
private static bool IsFileUri(string path)
{
return File.Exists(path);
}
}
}
- Messages.cs
namespace Akka_Basic
{
public class Messages
{
#region Neutral/system messages
public class ContinueProcessing { }
#endregion
#region Success messages
public class InputSuccess
{
public string Reason { get; private set; }
public InputSuccess(string reason)
{
Reason = reason;
}
}
#endregion
#region Error messages
public class InputError
{
public string Reason { get; private set; }
public InputError(string reason)
{
Reason = reason;
}
}
public class NullInputError : InputError
{
public NullInputError(string reason) : base(reason) { }
}
public class ValidationError : InputError
{
public ValidationError(string reason) : base(reason) { }
}
#endregion
}
}
- Program.cs
using Akka.Actor;
namespace Akka_Basic
{
class Program
{
static void Main(string[] args)
{
//initialize MyActorSystem
ActorSystem MyActorSystem = ActorSystem.Create("MyActorSystem");
//Create Actor
IActorRef consoleWriterActor = MyActorSystem.ActorOf(Props.Create(() => new ConsoleWriterActor()), "consoleWriterActor");
IActorRef consoleReaderActor = MyActorSystem.ActorOf(Props.Create(() => new ConsoleReaderActor()), "consoleReaderActor");
IActorRef tailCoordinatorActor = MyActorSystem.ActorOf(Props.Create(() => new TailCoordinatorActor()), "tailCoordinatorActor");
IActorRef fileValidatorActor = MyActorSystem.ActorOf(Props.Create(() => new FileValidatorActor(consoleWriterActor)), "validatorActor");
fileValidatorActor.Tell(@"C:\Users\bh.cho\Desktop\test.txt");
//blocks the main thread from exiting until the actor system is shut down
MyActorSystem.WhenTerminated.Wait();
}
}
}
- TailActor.cs
using System.IO;
using Akka.Actor;
using System.Text;
namespace Akka_Basic
{
public class TailActor : UntypedActor
{
public class FileWrite
{
public string FileName { get; private set; }
public FileWrite(string fileName)
{
FileName = fileName;
}
}
public class FileError
{
public string FileName { get; private set; }
public string Reason { get; private set; }
public FileError(string fileName, string reason)
{
FileName = fileName;
Reason = reason;
}
}
public class InitialRead
{
public string FileName { get; private set; }
public string Text { get; private set; }
public InitialRead(string fileName, string text)
{
FileName = fileName;
Text = text;
}
}
private string _filePath;
private IActorRef _reporterActor;
private FileObserver _observer;
private Stream _fileStream;
private StreamReader _fileStreamReader;
public TailActor(IActorRef reporterActor, string filePath)
{
_reporterActor = reporterActor;
_filePath = filePath;
}
protected override void PreStart()
{
//start watching file for changes
_observer = new FileObserver(Self, Path.GetFullPath(_filePath));
_observer.Start();
//open the file stream with shared read/write permissions
_fileStream = new FileStream(Path.GetFullPath(_filePath), FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
_fileStreamReader = new StreamReader(_fileStream, Encoding.UTF8);
//read the initial contents of the file and send it to console as first message
var text = _fileStreamReader.ReadToEnd();
Self.Tell(new InitialRead(_filePath, text));
}
protected override void PostStop()
{
_observer.Dispose();
_observer = null;
_fileStreamReader.Close();
_fileStreamReader.Dispose();
base.PostStop();
}
protected override void OnReceive(object message)
{
if (message is FileWrite)
{
// move file cursor forward
// pull results from cursor to end of file and write to output
// (this is assuming a log file type format that is append-only)
var text = _fileStreamReader.ReadToEnd();
if(!string.IsNullOrEmpty(text))
{
_reporterActor.Tell(text);
}
}
else if (message is FileError)
{
var fe = message as FileError;
_reporterActor.Tell(string.Format("Tail error: {0}", fe.Reason));
}
else if (message is InitialRead)
{
var ir = message as InitialRead;
_reporterActor.Tell(ir.Text);
}
}
}
}
- TailCoordinatorActor.cs
using System;
using Akka.Actor;
namespace Akka_Basic
{
public class TailCoordinatorActor : UntypedActor
{
#region Message Type
public class StartTail
{
public string FilePath { get; private set; }
public IActorRef ReporterActor { get; private set; }
public StartTail(string filePath, IActorRef reporterActor)
{
FilePath = filePath;
ReporterActor = reporterActor;
}
}
public class StopTail
{
public string FilePath { get; private set; }
public StopTail(string filePath)
{
FilePath = filePath;
}
}
#endregion
protected override void OnReceive(object message)
{
if(message is StartTail)
{
var msg = message as StartTail;
Context.ActorOf(Props.Create(() => new TailActor(msg.ReporterActor, msg.FilePath)));
}
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
10, //maxNumberOfretries
TimeSpan.FromSeconds(30), //withinTimeRange
x => //localOnlyDecider
{
if(x is ArithmeticException) return Directive.Resume;
else if(x is NotSupportedException) return Directive.Stop;
else return Directive.Restart;
}
);
}
}
}
728x90
'C# > Akka(Actor)' 카테고리의 다른 글
[Akka.NET] 액터 생성하기 (1) | 2022.01.11 |
---|---|
Akka.Cluster 란? (0) | 2022.01.11 |
[C# Actor] Akka 기초 1-5 | ActorSelection과 함께 주소로 액터 찾기 (0) | 2021.06.16 |
[C# Actor] Akka 기초 1-4 | 자식 액터, 액터 계층 구조, 그리고 감시(Supervision) (0) | 2021.06.15 |
[C# Actor] Akka 기초 1-3 | Props와 IActorRef (0) | 2021.06.14 |
이 글을 공유하기