コンテンツにスキップ

Temporal@ワークフロー

はじめに

本サイトにつきまして、以下をご認識のほど宜しくお願いいたします。


01. Temporalの仕組み

アーキテクチャ

Temporalは、Temporalクライアント、Temporalサーバー、ステート用データベース、Temporalワーカー、からなる。

temporal_architecture


Temporalクライアント

Temporalクライアントは、Temporalサーバーをコールし、Temporalワーカーによって登録されたワークフローを実行する。


Temporalサーバー

Temporalサーバーは、内臓するメッセージキューを操作してワークフローの現在のステートを管理し、またステートの履歴をデータベースに永続化する。


ステート用データベース

▼ ステート用データベース

ステート用データベースは、Sagaのステートの履歴を保管する。

Temporalサーバーで処理中に障害が起こった場合でも、ワークフローの途中から処理を実行できるようにする。

▼ PostgreSQLの場合

主に、temporalデータベースとtemporal_visibilityを使用する。

$ psql -U temporal -h temporal-postgresql -p 5432 -d temporal

# 現在はtemporalデータベース
# データベースの一覧
temporal=# \l

                                      List of databases
        Name         |  Owner   | Encoding |  Collate   |   Ctype    |   Access privileges
---------------------+----------+----------+------------+------------+-----------------------
 postgres            | temporal | UTF8     | en_US.utf8 | en_US.utf8 |
 template0           | temporal | UTF8     | en_US.utf8 | en_US.utf8 | =c/temporal          +
                     |          |          |            |            | temporal=CTc/temporal
 template1           | temporal | UTF8     | en_US.utf8 | en_US.utf8 | =c/temporal          +
                     |          |          |            |            | temporal=CTc/temporal
 temporal            | temporal | UTF8     | en_US.utf8 | en_US.utf8 |
 temporal_visibility | temporal | UTF8     | en_US.utf8 | en_US.utf8 |

temporalデータベースには、例えば以下のテーブルがある。(SQLスキーマを参照)

  • namespaces
  • namespace_metadata
  • shards
  • executions
  • current_executions
  • buffered_events
  • tasks
  • ...
$ psql -U temporal -h temporal-postgresql -p 5432 -d temporal

# 現在のデータベースのテーブル一覧
temporal=# \dt
                   List of relations
 Schema |           Name            | Type  |  Owner
--------+---------------------------+-------+----------
 public | activity_info_maps        | table | temporal
 public | buffered_events           | table | temporal
 ...

▼ MySQLの場合

主に、temporalデータベースとtemporal_visibilityを使用する。


Temporalワーカー

『制御の反転』という仕組みになっており、実際にワークフローやアクティビティを実行するのは、TemporalサーバーではなくTemporalワーカーである。

  1. Temporalワーカーは、Temporalサーバーにワークフローやアクティビティを登録する。
  2. Temporalワーカーは、Temporalサーバー内のメッセージキューにポーリングを実施し、ワークフローの現在のステートを取得する。
  3. Temporalクライアントは、ワークフローを開始する。
  4. Temporalワーカーは、ワークフロー内のアクティビティを実行し、結果をTemporalサーバーのメッセージキューに格納する。
  5. Temporalワーカーは、ワークフローの現在のステートに応じて次のアクティビティを実行する。

temporal_worker_architecture


02. ユースケース

Sagaオーケストレーターとして

▼ Sagaオーケストレーターとして

TemporalをSagaパターンのオーケストレーターとして使用する。

なお、Temporalサーバー内にメッセージキューが内臓されている。

そのため、タイムアウト処理、リトライ処理、キャンセル処理などの点でTemporalの外部にメッセージキュー (例:AWS SQS) やメッセージブローカー (例:RebbitMQ) は不要である。

temporal_saga-pattern

▼ Temporalクライアント

Temporalクライアントは、Temporalサーバーのエンドポイントをコールするサーバーとして実装する。

package main

import (
    "context"
    "encoding/json"
    "log"
    "net/http"

    "documentation-samples-go/yourapp"

    "go.temporal.io/sdk/client"
)

func main() {

    // Temporalサーバーに接続する
    temporalClient, err := client.Dial(client.Options{
        HostPort: client.DefaultHostPort,
    })

    if err != nil {
        log.Fatalln("Unable to create Temporal Client", err)
    }

    defer temporalClient.Close()

    // Temporalクライアントがワークフローを実行するエンドポイント
    http.HandleFunc("/start", func(w http.ResponseWriter, r *http.Request) {
        // ワークフローを実行するラッパー関数
        startWorkflowHandler(w, r, temporalClient)
    })

    // サーバーを起動する
    err = http.ListenAndServe(":8091", nil)

    if err != nil {
        log.Fatalln("Unable to run http server", err)
    }
}

func startWorkflowHandler(w http.ResponseWriter, r *http.Request, temporalClient client.Client) {

    workflowOptions := client.StartWorkflowOptions{
        ID:        "your-workflow-id",
        TaskQueue: "your-custom-task-queue-name",
    }

    workflowParams := yourapp.YourWorkflowParam{
        WorkflowParamX: "Hello World!",
        WorkflowParamY: 999,
    }

    // Temporalサーバーでワークフローを実行する
    workflowExecution, err := temporalClient.ExecuteWorkflow(
        context.Background(),
        workflowOptions,
        // ワークフローの定義
        yourapp.YourWorkflowDefinition,
        workflowParams,
    )

    if err != nil {
        log.Fatalln("Unable to execute the Workflow", err)
    }

    log.Println("Started Workflow!")

    log.Println("WorkflowID:", workflowExecution.GetID())

    log.Println("RunID:", workflowExecution.GetRunID())

    var result yourapp.YourWorkflowResultObject

    workflowExecution.Get(context.Background(), &result)

    if err != nil {
        log.Fatalln("Unable to get Workflow result:", err)
    }

    b, err := json.Marshal(result)

    if err != nil {
        log.Fatalln(err)
        return
    }

    log.Println(string(b))
}

▼ Temporalサーバーとステート用データベース

制御が反転しているため、Temporalサーバーはユーザーが何かを実装する必要はない。

▼ アクティビティ定義用のTemporalワーカー

アクティビティ定義用のTemporalワーカーは、ワークフロー定義を登録する。

また、実際にローカルトランザクションを実行する。

package main

import (
    "log"

    "go.temporal.io/sdk/activity"
    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"
    "go.temporal.io/sdk/workflow"

    "documentation-samples-go/yourapp"
)

func main() {


    temporalClient, err := client.Dial(client.Options{})

    if err != nil {
        log.Fatalln("Unable to create client", err)
    }

    defer temporalClient.Close()

    yourWorker := worker.New(temporalClient, "your-custom-task-queue-name", worker.Options{})

    yourWorker.RegisterWorkflow(yourapp.YourWorkflowDefinition)

    registerWFOptions := workflow.RegisterOptions{
        Name: "JustAnotherWorkflow",
    }

    yourWorker.RegisterWorkflowWithOptions(yourapp.YourSimpleWorkflowDefinition, registerWFOptions)

    message := "This could be a connection string or endpoint details"

    number := 100

    activities := &yourapp.YourActivityObject{
        Message: &message,
        Number:  &number,
    }

    yourWorker.RegisterActivity(activities)

    registerAOptions := activity.RegisterOptions{
        Name: "JustAnotherActivity",
    }

    yourWorker.RegisterActivityWithOptions(yourapp.YourSimpleActivityDefinition, registerAOptions)

    err = yourWorker.Run(worker.InterruptCh())

    if err != nil {
        log.Fatalln("Unable to start Worker", err)
    }
}
package activity

import (
    "context"

    "go.temporal.io/sdk/activity"
)

// パラメーター
type YourActivityParam struct {
    ActivityParamX string
    ActivityParamY int
}


type YourActivityObject struct {
    Message *string
    Number  *int
}

// アクティビティ
func (a *YourActivityObject) PrintInfo(ctx context.Context, param YourActivityParam) error {
    logger := activity.GetLogger(ctx)

    logger.Info("The message is:", param.ActivityParamX)

    logger.Info("The number is:", param.ActivityParamY)

    return nil
}

// アクティビティ
func (a *YourActivityObject) GetInfo(ctx context.Context) (*YourActivityResultObject, error) {

    return &YourActivityResultObject{
        ResultFieldX: *a.Message,
        ResultFieldY: *a.Number,
    }, nil
}

▼ ワークフロー定義用のTemporalワーカー

ワークフロー定義用のTemporalワーカーは、ワークフロー定義を登録する。

package workflow

import (
    "time"

    "go.temporal.io/sdk/workflow"
)

func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) (*YourWorkflowResultObject, error) {

    activityOptions := workflow.ActivityOptions{
        StartToCloseTimeout: 10 * time.Second,
    }

    ctx = workflow.WithActivityOptions(ctx, activityOptions)

    // ワークフローのパラメーターを定義する
    activityParam := YourActivityParam{
        ActivityParamX: param.WorkflowParamX,
        ActivityParamY: param.WorkflowParamY,
    }

    var a *YourActivityObject

    var activityResult YourActivityResultObject

    // (1)
    // ワークフローのステップを実行する
    // パラメーターの入力
    err := workflow.ExecuteActivity(ctx, a.YourActivityDefinition, activityParam).Get(ctx, &activityResult)

    if err != nil {
        return nil, err
    }

    var infoResult *YourActivityResultObject

    // (2)
    // ワークフローのステップを実行する
    // GetInfoアクティビティ
    err = workflow.ExecuteActivity(ctx, a.GetInfo).Get(ctx, &infoResult)

    if err != nil {
        return nil, err
    }

    infoParam := YourActivityParam{
        ActivityParamX: infoResult.ResultFieldX,
        ActivityParamY: infoResult.ResultFieldY,
    }

    // (3)
    // ワークフローのステップを実行する
    // PrintInfoアクティビティ
    err = workflow.ExecuteActivity(ctx, a.PrintInfo, infoParam).Get(ctx, nil)

    if err != nil {
        return nil, err
    }

    workflowResult := &YourWorkflowResultObject{
        WFResultFieldX: activityResult.ResultFieldX,
        WFResultFieldY: activityResult.ResultFieldY,
    }

    // ワークフロー全体の処理結果を返却する
    return workflowResult, nil
}


補償トランザクション

ローカルトランザクションで失敗した場合は、まずそのマイクロサービスが自身のトランザクションをロールバックする。

その後、それまでにコールされたdefer関数を実行し補償トランザクションを実行する。

package saga

import (
    "time"

    "go.uber.org/multierr"

    "go.temporal.io/sdk/temporal"
    "go.temporal.io/sdk/workflow"
)

func TransferMoney(ctx workflow.Context, transferDetails TransferDetails) (err error) {
    retryPolicy := &temporal.RetryPolicy{
        InitialInterval:    time.Second,
        BackoffCoefficient: 2.0,
        MaximumInterval:    time.Minute,
        MaximumAttempts:    3,
    }

    options := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy:         retryPolicy,
    }

    ctx = workflow.WithActivityOptions(ctx, options)

    err = workflow.ExecuteActivity(ctx, Withdraw, transferDetails).Get(ctx, nil)
    if err != nil {
        return err
    }

    // 補償トランザクション
    defer func() {
        if err != nil {
            errCompensation := workflow.ExecuteActivity(ctx, WithdrawCompensation, transferDetails).Get(ctx, nil)
            err = multierr.Append(err, errCompensation)
        }
    }()

    // ローカルトランザクション
    // 失敗した場合、まずは自身のトランザクションをロールバックする
    // その後、前のdefer関数を実行し、前のローカルトランザクションを元に戻す補償トランザクションを実行する
    err = workflow.ExecuteActivity(ctx, Deposit, transferDetails).Get(ctx, nil)
    if err != nil {
        return err
    }

    // 補償トランザクション
    defer func() {
        if err != nil {
            errCompensation := workflow.ExecuteActivity(ctx, DepositCompensation, transferDetails).Get(ctx, nil)
            err = multierr.Append(err, errCompensation)
        }

        // uncomment to have time to shut down worker to simulate worker rolling update and ensure that compensation sequence preserves after restart
        // workflow.Sleep(ctx, 10*time.Second)
    }()

    // ローカルトランザクション
    // 失敗した場合、まずは自身のトランザクションをロールバックする
    // その後、前のdefer関数を実行し、前のローカルトランザクションを元に戻す補償トランザクションを実行する
    err = workflow.ExecuteActivity(ctx, StepWithError, transferDetails).Get(ctx, nil)
    if err != nil {
        return err
    }

    return nil
}


仕組み

記入中...

---
title: Sagaオーケストレーション実行の仕組み
---
sequenceDiagram

    autonumber

    フロントエンド ->> API Gateway: 同期リクエスト

    API Gateway ->> ワークフローハンドラー: 同期リクエスト

    ワークフローハンドラー ->> Temporalサーバー: 同期リクエスト<br>(ワークフロー実行)

    ワークフローハンドラー -->> API Gateway: レスポンス

    API Gateway -->> フロントエンド: レスポンス

    フロントエンド -) API Gateway: 非同期リクエスト<br>(ポーリング処理)

    API Gateway -) Sagaステータスチェッカー: 非同期リクエスト<br>(ポーリング処理)


    TemporalワーカーA ->> Temporalサーバー: アクティビティ登録

    Temporalサーバー -) TemporalワーカーA: タスク取得

    TemporalワーカーB ->> Temporalサーバー: アクティビティ登録

    Temporalサーバー -) TemporalワーカーB: アクタスク取得


03. 実装

GetWorkflow

Temporalワーカー (アクティビティを持つマイクロサービス) は、実際にローカルトランザクションを実行するマイクロサービスに相当する。

package main

import (
    "context"
    "encoding/json"
    "log"
    "net/http"

    "documentation-samples-go/yourapp"

    "go.temporal.io/sdk/client"
)

func main() {

    // Temporalサーバーに接続する
    temporalClient, err := client.Dial(client.Options{
        HostPort: client.DefaultHostPort,
    })

    workflowRun := temporalClient.GetWorkflow(context.Background, "<ワークフローのID>")

    var result workflowResponse

    // ワークフローの結果を取得する
    err = workflowRun.Get(context.Background(), &result)

    if err != nil {
        ...
    }
}