[C# Actor] Akka 기초 1-4 | 자식 액터, 액터 계층 구조, 그리고 감시(Supervision)
- C#/Akka(Actor)
- 2021. 6. 15. 00:00
참조
소개
- 코드베이스의 기능과 액터 모델이 어떻게 작동하는지 이해하는데 있어 큰 도움이 됩니다.
Key Concepts / background
- 액터 계층 구조에 대해 깊게 들어가기 전에 알아봅시다. 왜 우리는 계층 구조가 필요할까요?
- 계층 구조를 사용하는 두 가지 중요한 키 포인트가 있습니다.
- 작업을 원자화하고 대용량 데이터를 처리하기 쉬운 양으로 변환하기 위해
- 에러를 억제하고 시스템을 회복력 있게 만들기 위해
계층 구조의 원자화 작업
- 계층 구조를 가지는 것은 아주 작은 조각으로 작업을 쪼개고, 서로 다른 계층의 레벨에서 다른 전문 기술을 활용할 수 있게 합니다.
- 액터 시스템에서 이것이 실현되는 일반적인 방법은 큰 데이터 스트림을 원자화 하는 것입니다. 그것이 작고 쉽게 처리할 만한 코드 조각이 될 때까지 계속 원자화를 반복합니다.
- 트위터를 예로 들어봅시다. Akka를 사용하면 그들의 대용량 데이터 수집을 작고, 처리하기 쉬운 정보의 흐름으로 분해할 수 있습니다.
- 트위터의 경우에는 거대한 소방호스를 통해 뿜어져 나오는 것 같은 트윗들을 각 유저들의 타임라인에 작은 물줄기 하나하나로 나누어 분산할 수 있고, Akka를 사용하여 해당 유저의 스트림으로 도착한 메시지를 websocket 등을 통해 푸시할 수도 있습니다.
계층 구조를 통해 복원력 있는 시스템 지원
- 계층 구조를 사용함으로써 위험의 계층와와 전문화가 가능합니다.
- 상위 계층의 액터는 본질적으로 좀 더 감독적이며, 이러한 점은 액터시스템의 리스크를 낮추고 가장 자리로 밀어냅니다. 계층의 가장자리로 위험한 작업을 밀어냄으로써 시스템은 리스크를 고립시키고 전체 시스템의 크레싱 없이 오류로부터 복구할 수 있습니다.
감시(Supervision)란? 왜 신경써야 하나?
- 감시(Supervision) 는 액터 시스템이 오류(failures) 에서 빠르고 신속하게 격리와 회복이 이뤄질 수 있게 하는 기본 개념입니다.
- 모든 액터는 자신을 감시하는 다른 액터를 가지고 있으며, 이 액터들은 에러가 발생했을 때 복구할 수 있도록 돕습니다. 이는 계층 구조의 최상단부터 최하단까지 모두에게 해당합니다.
- 이 감시(Supervision)는 애플리케이션이 예기치 않은 오류가 발생할 경우, 액터 계층 구조에서 해당 계층에만 영향을 받도록 제어합니다.
- 모든 다른 액터들은 아무일도 없었던 것처럼 동작할것입니다. 우리는 이것을 "오류 고립", "봉쇄" 라고 부릅니다.
액터의 계층 구조
- 첫 번째 키 포인트 : 모든 액터는 부모 액터를 가지고 있습니다. 그리고 일부 액터들은 자식을 가지고 있습니다.
- 부모 액터는 자식을 감독(supervise) 합니다.
- 부모 액터가 자식 액터를 감독한다는 것은 모든 액터가 감독자를 가진다는 것이고, 모든 액터가 감독자가 될 수 있음을 말합니다.
- 액터시스템 내에서 액터는 계층 구조로 정리됩니다. ActorSystem 자체에 직접 보고하는 최상위 액터와 다른 액터에게 보고하는 자식 액터가 있음을 의미합니다.
- 전반적인 계층 구조는 다음과 같습니다.
계층 구조의 라벨이란?
- Guardians 는 전체 시스템의 root 액터입니다.
- 계층 구조에서 최상단에 있는 이 3개의 액터를 말합니다.
/ 액터는 액터 시스템 전체의 root 입니다. The Root Guardian 이라고 불립니다. 이 액터는 다른 가디언(/system과 /user 액터) 을 감독합니다.
이 하나를 제외한 모든 액터는 그들의 부모 액터를 필요로 합니다. 이 액터는 일반 액터 시스템의 바깥에 있기 때문에 때때로 bubble-walker 라고 불립니다.
/system 액터는 "시스템 가디언" 이라고 불립니다. 이 액터의 주 역할은 정상적인 방법으로 시스템이 종료되고, 프레임워크 레벨의 기능 및 유틸리티로 구현된 다른 시스템 액터를 유지하고 감독합니다.
시스템 가디언과 시스템 액터 계층 구조에 대해서는 다음에 포스팅 하겠습니다.
/user 액터는 "가디언 액터" 로 불립니다. 사용자 관점에서 볼 때 /user 액터는 시스템의 root 이기 때문에 root actor 라고 부릅니다.
사용자는 가디언에 대해 걱정할 필요가 없습니다.
어떠한 예외도 가디언까지 거품을 일으켜 전체 시스템에 크래시가 발생하지 않도록 /user 아래에서 적절하게 감독합니다.
/user 액터의 계층 구조
- 액터 계층 구조의 주요 포인트입니다.
- 애플리케이션 내에서 당신이 정의한 모든 액터들이 속합니다.
- /user 액터 바로 아래의 자식 액터를 최상위 액터 라고 부릅니다.
- 액터는 항상 다른 액터의 자식 액터로 만들어집니다.
- 액터시스템 자체의 컨텍스트를 활용해 직접 액터를 만들면, 새 액터는 최상위 액터가 됩니다.
//다이어그램에 표현된 최상위 액터를 만듭니다.
IActorRef a1 = MyActorSystem.ActorOf(Props.Create<BasicActor>(), "a1");
IActorRef a2 = MyActorSystem.ActorOf(props.Create<BasicActor>(), "a2");
- a2 컨텍스트 안쪽에서 a2의 자식 액터들을 만들어 봅니다.
//a2의 자식 액터들을 만듭니다.
//a2의 내부에서 진행합니다.
IActorRef b1 = Context.ActorOf(Props.Create<BasicActor>(), "b1");
IActorRef b2 = Context.ActorOf(Props.Create<BasicActor>(), "b2");
액터 주소(Actor Path) == 계층 구조 내에서 액터의 위치
- 모든 액터는 주소를 가집니다.
- 액터에서 다른 액터로 메시지를 보내려면, 주소를 알아야 합니다.
- 완전한 액터 주소를 다음과 같습니다.
Path는 액터가 당신의 액터 계층 구조에서 어디에 위치하는지 알려줍니다. 슬래시('/')로 액터의 계층을 구분합니다.
- 만일 locathost에서 실행했다면, b2 액터의 완전한 주소는 akka.tcp://MyActorSystem@localhost:9001/user/a2/b2 입니다.
- 어떤 액터든 당신의 액터 계층 구조 안에서 어디에나 위치할 수 있습니다.
액터 계층 구조에서 감시가 동작하는 방법
- 액터는 자식 액터를 감독합니다. 하지만, 그들은 액터 계층 구조에서 바로 아래 단계만 감독합니다.
액터는 계층 구조상 바로 아래 단계의 자식만 감독합니다.
언제 감시가 시작되나요? 에러가 발생하면!
- 자식 액터가 unhandled exception 이나 크래시가 발생할 때, 부모 액터에게 도움을 청하고 무엇을 해야 하는지 알려줍니다.
- 구체적으로, 자식 액터는 Failure 클래스의 메시지를 부모 액터에게 보냅니다. 무엇을 어떻게 해야할지 결정하는 것은 부모 액터에게 달려있습니다.
부모 액터는 어떻게 오류를 해결할 수 있을까?
- 자식 액터의 실패요인(자식 액터가 보낸 Failure 메시지에 어떤 타입의 Exception이 포함되었는가)
- 자식 액터의 Failure 에 대한 부모 액터가 실행하는 명령. 이는 부모 액터의 감시 전략에 의해 결정됩니다.
오류가 발생했을 때 이벤트의 순서
- 자식 액터에서 Unhandled exception이 발생하면, 부모 액터에 의해 관리됩니다.
- c1은 작업을 중지합니다.
- 시스템은 Failure 메시지에 Exception을 담아 c1에서 b1으로 전달합니다.
- b1은 c1에게 어떻게 해야하는지 지시합니다.
감시 지침
- 자식 액터에게서 에러를 받으면, 부모 액터는 다음 동작 중 하나를 수행할 수 있습니다.
- 감시전략은 예외 타입에 따라 침을 매핑하므로, 여러 에러의 유형에 따라 적절하게 처리할 수 있습니다.
감시 지침의 종류
- Restart the child : 가장 일반적인 경우이며 기본값입니다.
- Stop the child : 자식 액터를 영구히 종료합니다.
- Escalate the error : 이건 부모 액터가 "뭘 해야 할지 모르겠어! 다 멈추고 내 부모 액터에게 물어봐!" 라고 말하는 겁니다.
- Resume processing : 일반적으로는 사용하지 않습니다. 일단 무시합니다.
여기서 알아야 할 중요한 것은 부모 액터에게 어떤 조치가 취해지든 자식 액터에게 전파된다는 것입니다. 부모 액터가 중단되면 모든 자식 액터가 중단됩니다. 다시 시작하면 모든 자식 액터가 다시 시작됩니다.
감시전략
- 두 가지 기본 감시 전략이 있습니다.
- One-For-One Strategy
- All-For-One-Strategy
- 두 가지의 기본적인 차이점은 에러 해결 지시 효과가 얼마나 널리 퍼지는가 입니다.
- One-For-One 부모 액터의 지시가 실패한 자식 액터에게만 적용됩니다. 실패한 자식 액터의 형제 액터에게 영향을 끼치지 않습니다. 이것은 별도로 지정하지 않는 이상 기본각으로 동작합니다.
- All-For-One 부모 액터의 지시가 실패한 자식 액터와 그 형제 액터에게 적용됩니다.
- 감시전략에서 또 다른 중요한 선택은, 자식 액터가 얼마의 시간 동안에 몇 번의 실패를 허용하는가입니다.
- 다음은 감시 전략의 예시입니다.
public class MyActor : UntypedActor
{
protected override SupervisiorStrategt SupervisorStrategy()
{
return new OnForOneStrategy(
maxNrOfretries: 10,
withinTimeRange: TimeSpan.FromSeconds(30),
localOnlyDecider : x =>
{
// Maybe ArithmeticException is not application critical
// so we just ignore the error and keep going.
if (x is ArithmeticException) return Directive.resume;
// Error that we have no idea what to do with
else if (x is InsanelyBadException) return Directive.Escalate;
// Error that we can't recover from, stop the failing child
else if (x is notSupportedException) return Directive.Stop;
// otherwise restart the failing child
else return Directive.restart;
}
)
}
}
핵심은 봉쇄(Containmnt)
- 감시 전략과 지침의 전체적인 핵심은 시스템 내에서 오류를 포함하고 자가 치유하는 것이므로, 시스템 전체에 크래시가 발생하지 않는것입니다.
- 잠재적으로 위험한 작업을 부모 액터에서 자식 액터에게 전달합니다. 이 작업은 위험한 작업을 수행하는 것입니다.
- 예를 들어, 월드컵 기간동안에 치뤄지는 수많은 경기에서 점수와 선수 통계들을 관리하는 시스템을 운영한다고 생각해 봅시다.
- 월드컵이 진행되는 동안 처리 한계에 다다를 정도의 엄청난 양의 API 요청이 발생할 겁니다. 때로는 크래시가 발생할 수도 있습니다.
- 스코어 키퍼는 게임이 진행되는 동안 정기적으로 데이터를 업데이트해야 합니다.
- 네트워크 호출은 위험합니다. 만약 요청이 오류를 발생시키면, 호출을 시작한 액터는 크래시가 발생합니다. 그러면 어떻게 보호행 할까요?
- 부모 액터가 상태를 보관하고, 형편없는 네트워크 호출은 자식 액터에게 밀어 넣습니다. 그렇게 하면, 자식 액터가 크래시 되더라도 중요한 데이터를 보관하고 있는 부모 액터에게 영향을 끼치지 ㅇ낳습니다.
- 오류를 지역화 하여 시스템 전체에 퍼지는 것을 방지합니다.
- 액터 계층 구조에서 안정성을 이루는 방법의 예시입니다.
- 추적중인 게임마다 하나의 클론으로 이러한 구조를 만들어 병렬로 동작할 수 있음을 기억합니다.
- 새로운 코드를 작성할 필요가 없습니다.
실습
- Program.cs
using Akka.Actor;
namespace Akka_Basic
{
class Program
{
static void Main(string[] args)
{
//initialize MyActorSystem
ActorSystem MyActorSystem = ActorSystem.Create("MyActorSystem");
//Create Actor
IActorRef consoleWriterActor = MyActorSystem.ActorOf(Props.Create(() => new ConsoleWriterActor()), "consoleWriterActor");
IActorRef tailCoordinatorActor = MyActorSystem.ActorOf(Props.Create(() => new TailCoordinatorActor()), "tailCoordinatorActor");
IActorRef fileValidatorActor = MyActorSystem.ActorOf(Props.Create(() => new FileValidatorActor(consoleWriterActor, tailCoordinatorActor)), "validatorActor");
fileValidatorActor.Tell(@"C:\Users\bh.cho\Desktop\test.txt");
//blocks the main thread from exiting until the actor system is shut down
MyActorSystem.WhenTerminated.Wait();
}
}
}
- FileObserver.cs
using System;
using System.IO;
using Akka.Actor;
namespace Akka_Basic
{
public class FileObserver : IDisposable
{
private readonly IActorRef _tailActor;
private readonly string _absoluteFilePath;
private FileSystemWatcher _watcher;
private readonly string _fileDir;
private readonly string _fileNameOnly;
public FileObserver(IActorRef tailActor, string absoluteFilePath)
{
_tailActor = tailActor;
_absoluteFilePath = absoluteFilePath;
_fileDir = Path.GetDirectoryName(absoluteFilePath);
_fileNameOnly = Path.GetFileName(absoluteFilePath);
}
/// <summary>
/// Begin monitoring file.
/// </summary>
public void Start()
{
// Need this for Mono 3.12.0 workaround
// uncomment this line if you're running on Mono!
// Environment.SetEnvironmentVariable("MONO_MANAGED_WATCHER", "enabled");
// make watcher to observe our specific file
_watcher = new FileSystemWatcher(_fileDir, _fileNameOnly);
// watch our file for changes to the file name, or new messages being written to file
_watcher.NotifyFilter = NotifyFilters.FileName | NotifyFilters.LastWrite;
// assign callbacks for event types
_watcher.Changed += OnFileChanged;
_watcher.Error += OnFileError;
// start watching
_watcher.EnableRaisingEvents = true;
}
/// <summary>
/// Stop monitoring file.
/// </summary>
public void Dispose()
{
_watcher.Dispose();
}
/// <summary>
/// Callback for <see cref="FileSystemWatcher"/> file error events.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
void OnFileError(object sender, ErrorEventArgs e)
{
_tailActor.Tell(new TailActor.FileError(_fileNameOnly, e.GetException().Message), ActorRefs.NoSender);
}
/// <summary>
/// Callback for <see cref="FileSystemWatcher"/> file change events.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
void OnFileChanged(object sender, FileSystemEventArgs e)
{
if (e.ChangeType == WatcherChangeTypes.Changed)
{
// here we use a special ActorRefs.NoSender
// since this event can happen many times, this is a little microoptimization
_tailActor.Tell(new TailActor.FileWrite(e.Name), ActorRefs.NoSender);
}
}
}
}
- FileValidatorActor.cs
using System.IO;
using Akka.Actor;
namespace Akka_Basic
{
//Actor that validates user input and signals result to others
public class FileValidatorActor : UntypedActor
{
private readonly IActorRef _consoleWriterActor;
private readonly IActorRef _tailCoordinatorActor;
public FileValidatorActor(IActorRef consoleWriterActor, IActorRef tailCoordinatorActor)
{
_consoleWriterActor = consoleWriterActor;
_tailCoordinatorActor = tailCoordinatorActor;
}
protected override void OnReceive(object message)
{
var msg = message as string;
if(string.IsNullOrEmpty(msg))
{
//signal that the user needs to supply an input
_consoleWriterActor.Tell(new Messages.NullInputError("Input as blank. Please type again.\n"));
//tell sender to continue doing its thing
Sender.Tell(new Messages.ContinueProcessing());
}
else
{
var valid = IsFileUri(msg);
if(valid)
{
//signal successful input
_consoleWriterActor.Tell(new Messages.InputSuccess($"starting processing for {msg}"));
//start coordinator
_tailCoordinatorActor.Tell(new TailCoordinatorActor.StartTail(msg, _consoleWriterActor));
}
else
{
//singal that input was bad
_consoleWriterActor.Tell(new Messages.ValidationError($"{msg} is not an existing URI on disk."));
//tell sender to continue doing its thing
Sender.Tell(new Messages.ContinueProcessing());
}
}
}
private static bool IsFileUri(string path)
{
return File.Exists(path);
}
}
}
- TailActor.cs
using System.IO;
using Akka.Actor;
using System.Text;
namespace Akka_Basic
{
public class TailActor : UntypedActor
{
public class FileWrite
{
public string FileName { get; private set; }
public FileWrite(string fileName)
{
FileName = fileName;
}
}
public class FileError
{
public string FileName { get; private set; }
public string Reason { get; private set; }
public FileError(string fileName, string reason)
{
FileName = fileName;
Reason = reason;
}
}
public class InitialRead
{
public string FileName { get; private set; }
public string Text { get; private set; }
public InitialRead(string fileName, string text)
{
FileName = fileName;
Text = text;
}
}
private readonly string _filePath;
private readonly IActorRef _reporterActor;
private readonly FileObserver _observer;
private readonly Stream _fileStream;
private readonly StreamReader _fileStreamReader;
public TailActor(IActorRef reporterActor, string filePath)
{
_reporterActor = reporterActor;
_filePath = filePath;
//start watching file fot changes
_observer = new FileObserver(Self, Path.GetFullPath(_filePath));
_observer.Start();
//open the file stream with shared read/write permissions
_fileStream = new FileStream(
Path.GetFullPath(_filePath),
FileMode.Open,
FileAccess.Read,
FileShare.ReadWrite
);
_fileStreamReader = new StreamReader(_fileStream, Encoding.UTF8);
//read the initial contents of the file and send it to console as first message
var text = _fileStreamReader.ReadToEnd();
Self.Tell(new InitialRead(_filePath, text));
}
protected override void OnReceive(object message)
{
if (message is FileWrite)
{
// move file cursor forward
// pull results from cursor to end of file and write to output
// (this is assuming a log file type format that is append-only)
var text = _fileStreamReader.ReadToEnd();
if(!string.IsNullOrEmpty(text))
{
_reporterActor.Tell(text);
}
}
else if (message is FileError)
{
var fe = message as FileError;
_reporterActor.Tell(string.Format("Tail error: {0}", fe.Reason));
}
else if (message is InitialRead)
{
var ir = message as InitialRead;
_reporterActor.Tell(ir.Text);
}
}
}
}
- TailCoordinatorActor.cs
using System;
using Akka.Actor;
namespace Akka_Basic
{
public class TailCoordinatorActor : UntypedActor
{
#region Message Type
public class StartTail
{
public string FilePath { get; private set; }
public IActorRef ReporterActor { get; private set; }
public StartTail(string filePath, IActorRef reporterActor)
{
FilePath = filePath;
ReporterActor = reporterActor;
}
}
public class StopTail
{
public string FilePath { get; private set; }
public StopTail(string filePath)
{
FilePath = filePath;
}
}
#endregion
protected override void OnReceive(object message)
{
if(message is StartTail)
{
var msg = message as StartTail;
Context.ActorOf(Props.Create(() => new TailActor(msg.ReporterActor, msg.FilePath)));
}
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
10, //maxNumberOfretries
TimeSpan.FromSeconds(30), //withinTimeRange
x => //localOnlyDecider
{
if(x is ArithmeticException) return Directive.Resume;
else if(x is NotSupportedException) return Directive.Stop;
else return Directive.Restart;
}
);
}
}
}
- ConsoleWriterActor.cs
using System;
using Akka.Actor;
namespace Akka_Basic
{
public class ConsoleWriterActor : UntypedActor
{
protected override void OnReceive(object message)
{
if(message is Messages.InputError)
{
var msg = message as Messages.InputError;
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine(msg.Reason);
}
else if(message is Messages.InputSuccess)
{
var msg = message as Messages.InputSuccess;
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine(msg.Reason);
}
else
{
Console.WriteLine(message);
}
Console.ResetColor();
}
}
}
- Messages.cs
namespace Akka_Basic
{
public class Messages
{
#region Neutral/system messages
public class ContinueProcessing { }
#endregion
#region Success messages
public class InputSuccess
{
public string Reason { get; private set; }
public InputSuccess(string reason)
{
Reason = reason;
}
}
#endregion
#region Error messages
public class InputError
{
public string Reason { get; private set; }
public InputError(string reason)
{
Reason = reason;
}
}
public class NullInputError : InputError
{
public NullInputError(string reason) : base(reason) { }
}
public class ValidationError : InputError
{
public ValidationError(string reason) : base(reason) { }
}
#endregion
}
}
실행 결과
728x90
'C# > Akka(Actor)' 카테고리의 다른 글
[C# Akka] Akka 기초 1-6 | 액터 라이프사이클(The Actor Lifecycle) (0) | 2021.06.17 |
---|---|
[C# Actor] Akka 기초 1-5 | ActorSelection과 함께 주소로 액터 찾기 (0) | 2021.06.16 |
[C# Actor] Akka 기초 1-3 | Props와 IActorRef (0) | 2021.06.14 |
[C# Actor] Akka 기초 1-2 | 메시지 정의 및 핸들링 (0) | 2021.06.13 |
[C# Actor] 액터와 액터시스템(ActorSystem) (0) | 2021.06.12 |
이 글을 공유하기