コンテンツにスキップ

トランザクション管理@マイクロサービス領域

はじめに

本サイトにつきまして、以下をご認識のほど宜しくお願いいたします。


01. トランザクションパターン

共有DBパターンの場合

  • パターン不要 (各マイクロサービスの従来のトランザクション)

マイクロサービス別DBの場合

  • 二相コミットパターン
  • Sagaパターン (オーケストレーションベース、コレグラフィベース、並列パイプラインベース)


02. 2フェーズコミットパターン (二相コミットパターン)

2フェーズコミットパターンとは

『二相コミットパターン』ともいう。

非推奨である。


実装パターン

▼ OSSを使用する場合

二相コミットのOSSはなさそう。

▼ クラウドプロバイダーのマネージドサービスを使用する場合

  • Scalar DB
  • Google Spanner


03. Sagaパターン

Sagaパターンとは

各マイクロサービスの永続化の間に依存関係がある場合 (例:受注データの永続化には、配送データや決済データの永続化の結果が必要) に、これらのマイクロサービスの永続化を調整する必要がある。

Sagaオーケストレーターにコールされるマイクロサービスに、永続化とロールバックに関するAPIを実装する。

Sagaオーケストレーターは、これらのマイクロサービスをコールし、ローカルトランザクションを連続的に実行する。

saga-pattern_usecase


SagaパターンとACID


デザインパターン

▼ Stateパターン

Sagaオーケストレーターをステートマシン図やStateパターンでモデリングし、ステートマシンを実装する。


03-02. オーケストレーションベースのSagaパターン

オーケストレーションベースのSagaパターンとは

一連のローカルトランザクションの実行をまとめて制御する責務を持ったSagaオーケストレーター (コーディネーター) と、これをコールする別のマイクロサービスを配置する。

各マイクロサービス間の通信方式は、リクエスト/レスポンスパターンまたはパブリッシュ/サブスクライブパターンのどちらでもよい。

orchestration


実装パターン (リクエスト/レスポンスパターン、パブリッシュ/サブスクライブパターン)

▼ 自前で実装する場合

Sagaオーケストレーターは、ローカルトランザクションの進捗度 (Sagaログ) をDBに都度記録する。

Sagaログから、いずれのローカルトランザクションを実行し終えたかを判断できる。

*実装例*

Sagaオーケストレーターと、これをコールする別のマイクロサービスを配置する。

Sagaオーケストレーターは、各ローカルトランザクションの成否を表すSagaログをDBで管理する。

Sagaオーケストレーターは、Orderサービス (T1) 、Inventoryサービス (T2) 、Paymentサービス (T3) 、のローカルトランザクションを連続して実行する。

例えば、Paymentサービスのローカルトランザクション (T3) が失敗した場合、OrderサービスとPaymentサービスのローカルトランザクションをロールバックする補償トランザクション (C1C2) を実行する。

saga-pattern_orchestrator

▼ OSSを使用する場合

SagaオーケストレーターのOSS (Temporal、Netflix Conductor、Uber Cadenceなど) を使用する。

Sagaオーケストレーターのドメインモデリングにステートソーシングパターンを採用する必要がある。

▼ クラウドプロバイダーのマネージドサービスを使用する場合

クラウドプロバイダー (例:AWS、Google Cloud) が提供するSagaオーケストレーター (例:AWS Step Functions、Google Workflowsなど) を使用する。

各マイクロサービスは、Sagaオーケストレーターをメッセージブローカー (これもクラウドプロバイダーが提供しているものでよい) を介して取得する。


SagaオーケストレーターのDB

通常のオーケストレーションベースのSagaパターンでは、DBにSagaログテーブルを作成する。

Sagaオーケストレーターは、ローカルトランザクションの進捗度 (Sagaログ) をDBに永続化する。

SagaオーケストレーターごとにDBを分割すると良い。

AWS StepFunctionsのステートも設計例として、参考になる。

id order_saga_execution_id order_saga_current_step order_id order_saga_payload order_saga_status order_saga_state order_saga_version start_data end_data
1 9db5b6da-daba-4633-b3cf-9c79f2bcf6f5 CreditApproval 1 "\"order-id\": 1, \"customer-id\": 456, \"payment-due\": 4999, \"credit-card-no\": \"xxxx-yyyy-dddd-9999\"}" SUCCEEDED "{\"creditApproval\":\"SUCCEEDED\"}" 楽観的ロックに使用するバージョン値 (例:最終更新日など) 開始時刻 終了時刻
2 b1f14b72-393d-432b-8ec2-782974a6ed60 Payment 1 "{ \"order-id\": 1, \"customer-id\": 456, ... }" STARTED "{\"creditApproval\":\"SUCCEEDED\",\"payment\":\"STARTED\"}"
3 b38229c6-30df-4166-a725-8b2c578e5ed5 CreditApproval 2 "{ \"order-id\": 2, \"customer-id\": 456, ... }" STARTED "{\"creditApproval\":\"STARTED\"}"
... ... ... ... ... ... ... ... ... ...


Sagaオーケストレーターのステータスチェッカー

▼ Sagaオーケストレーターのステータスチェッカーとは

オーケストレーションベースのSagaパターンにて、Sagaオーケストレーターにリクエストを送信するクライアントは、Sagaオーケストレーターの処理結果を知る必要がある。

▼ ポーリングパターンの場合

ポーリングパターンの場合、Sagaステータスチェッカーを採用する。

Sagaステータスチェッカーは、トランザクションIDを使用してワークフローの処理結果をDBから取得する。

saga-pattern_orchestrator_status-checker


Sagaオーケストレーターのワーカー

▼ ポイントツーポイントの場合

メッセージブローカー (例:Apache Kafka、RabbitMQなど) を経由しないオーケストレーションベースのSagaパターンを実装する。

メッセージブローカーを経由するよりも、Sagaオーケストレーターと各ワーカーの間の結合度が高まってしまうが、Sagaオーケストレーターの実装が簡単になる。

ローカルトランザクションの進捗度に応じて、次のローカルトラザクションや補償トランザクションを実行する。

▼ メッセージキューを経由する場合

メッセージキュー (例:AWS SQSなど) を経由して、Sagaオーケストレーターとワーカーの間で通信する。

▼ メッセージブローカーを経由する場合

メッセージブローカー (例:Apache Kafka、RabbitMQなど) を使い、オーケストレーションベースのSagaパターンを実装する。

Sagaオーケストレーターは、メッセージブローカーに対してパブリッシュとサブスクライブを実行する。

サブスクライブしたメッセージに応じて、次のメッセージをパブリッシュする。

orchestration_message-queue


補償トランザクション

▼ 補償トランザクションとは

ローカルトランザクションを元に戻すトランザクションを逆順に実行し、Sagaパターンによるトランザクションの結果を元に戻す仕組みのこと。

マイクロサービスアーキテクチャでは、トランザクションの通常のロールバック機能を使用した場合に、処理に失敗したマイクロサービスだけでロールバックし、それ以前のマイクロサービスではロールバックが起こらない問題がある。

いずれかのマイクロサービスのローカルトランザクションが失敗した場合に、まずそのマイクロサービスは自身のトランザクションをロールバックする。

その後、それまでのローカルトランザクションを擬似的にロールバックするトランザクションを逆順で実行する。

▼ 設計例

受注に関するトランザクションが異なるマイクロサービスにまたがる例。

saga-pattern_example

補償トランザクションによって、各ローカルトランザクションを元に戻す逆順のトランザクションを実行する。

saga-pattern_compensating_transaction_example

▼ 実装例 (Goのdefer関数)

この例では、Goのdefer関数で補償トランザクションの仕組みを実装している。

ローカルトランザクションで失敗した場合は、まずそのマイクロサービスが自身のトランザクションをロールバックする。

その後、それまでにコールされたdefer関数を実行し補償トランザクションを実行する。

package saga

import (
    "time"

    "go.uber.org/multierr"

    "go.temporal.io/sdk/temporal"
    "go.temporal.io/sdk/workflow"
)

func TransferMoney(ctx workflow.Context, transferDetails TransferDetails) (err error) {
    retryPolicy := &temporal.RetryPolicy{
        InitialInterval:    time.Second,
        BackoffCoefficient: 2.0,
        MaximumInterval:    time.Minute,
        MaximumAttempts:    3,
    }

    options := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy:         retryPolicy,
    }

    ctx = workflow.WithActivityOptions(ctx, options)

    err = workflow.ExecuteActivity(ctx, Withdraw, transferDetails).Get(ctx, nil)
    if err != nil {
        return err
    }

    // 補償トランザクション
    defer func() {
        if err != nil {
            errCompensation := workflow.ExecuteActivity(ctx, WithdrawCompensation, transferDetails).Get(ctx, nil)
            err = multierr.Append(err, errCompensation)
        }
    }()

    // ローカルトランザクション
    // 失敗した場合、まずは自身のトランザクションをロールバックする
    // その後、前のdefer関数を実行し、前のローカルトランザクションを元に戻す補償トランザクションを実行する
    err = workflow.ExecuteActivity(ctx, Deposit, transferDetails).Get(ctx, nil)
    if err != nil {
        return err
    }

    // 補償トランザクション
    defer func() {
        if err != nil {
            errCompensation := workflow.ExecuteActivity(ctx, DepositCompensation, transferDetails).Get(ctx, nil)
            err = multierr.Append(err, errCompensation)
        }

        // uncomment to have time to shut down worker to simulate worker rolling update and ensure that compensation sequence preserves after restart
        // workflow.Sleep(ctx, 10*time.Second)
    }()

    // ローカルトランザクション
    // 失敗した場合、まずは自身のトランザクションをロールバックする
    // その後、前のdefer関数を実行し、前のローカルトランザクションを元に戻す補償トランザクションを実行する
    err = workflow.ExecuteActivity(ctx, StepWithError, transferDetails).Get(ctx, nil)
    if err != nil {
        return err
    }

    return nil
}

▼ 実装例 (Goのslice)

この例では、Goのsliceで補償トランザクションの仕組みを実装している。

スライス内のローカルトランザクションを順番に実行し、どこかで失敗した場合は逆順に補償トランザクションを実行する。

package main

import (
    "fmt"
    "errors"
)

// ローカルトランザクションを表す関数型
type LocalTransaction func() error

// 補償トランザクションを表す関数型
type CompensatingAction func() error

// Sagaオーケストレーターの各ステップを表す構造体
type SagaStep struct {
    Transaction LocalTransaction // ローカルトランザクション
    Compensate  CompensatingAction // 補償トランザクション
}

// Sagaを表す構造体
type Saga struct {
    Steps []SagaStep // 複数のSagaStepから成る
}

// ローカルトランザクションと補償トランザクションを実行する関数
func (s *Saga) Execute() error {

    for _, step := range s.Steps {
        // ローカルトランザクションを順番に実行する
        if err := step.Transaction(); err != nil {
            // 失敗した場合は、補償トランザクションを逆順で実行する
            for i := len(s.Steps) - 1; i >= 0; i-- {
                if err := s.Steps[i].Compensate(); err != nil {
                    // 補償トランザクションが失敗した場合はエラーメッセージを返す
                    return errors.New(fmt.Sprintf("failed to compensate for step %d: %v", i, err))
                }
            }
            // 最初のエラーを返す
            return err
        }
    }
    // 全てのトランザクションが成功した場合、nilを返す
    return nil
}

// ローカルトランザクション
// 資金移動
func transferFunds() error {
    return nil
}

// 補償トランザクション
// 資金移動の取り消し
func reverseTransfer() error {
    return nil
}

func main() {
    // Sagaオーケストレーター
    saga := Saga{
        Steps: []SagaStep{
            SagaStep{
                Transaction: transferFunds, // 1つ目のローカルトランザクション
                Compensate:  reverseTransfer, // 1つ目の補償トランザクション
            },
            SagaStep{
                Transaction: transferFunds, // 2つ目のローカルトランザクション
                Compensate:  reverseTransfer, // 2つ目の補償トランザクション
            },
        },
    }

    // Sagaの実行
    if err := saga.Execute(); err != nil {
        fmt.Println("saga failed:", err) // エラーが発生した場合
    } else {
        fmt.Println("saga succeeded") // 正常に完了した場合
    }
}

▼ 実装例 (TypeScriptの配列)

この例では、AzureのDurable Functionにて、TypeScriptの配列で補償トランザクションの仕組みを実装している。

スライス内のローカルトランザクションを順番に実行し、どこかで失敗した場合は逆順に補償トランザクションを実行する。

import df from "durable-functions";
import {Task} from "durable-functions/lib/src/classes";

// APIError型の定義。ステータスコードとボディを持つ
type APIError = {
  status: 200 | 400 | 500;
  body: object | string;
};

// APIErrorかどうかをチェックする関数
const isAPIError = (arg: any): arg is APIError => {
  // 引数がオブジェクトでない場合は、トランザクション不可とする
  if (typeof arg !== "object") return false;

  // ステータスコードが200, 400, 500のいずれかでない場合は、トランザクション不可とする
  if (!(arg.status && [200, 400, 500].includes(arg.status))) return false;

  // メッセージが文字列でない場合は、トランザクション不可とする
  if (typeof arg.message !== "string") return false;

  // 全ての条件を満たす場合はトランザクション可とする
  return true;
};

// Sagaオーケストレーター
export const saga = df.orchestrator(function* (context) {
  // 補償トランザクションを格納する配列
  const compensatingTransactions: Task[] = [];

  try {
    // Sagaオーケストレーターの入力を取得
    const {input} = context.df.getInput();

    // ローカルトランザクションとして、doActivityAを実行する
    const a = yield context.df.callActivity("doActivityA", input.body);

    // 補償トランザクションとして、rejectActivityAを追加する
    compensatingTransactions.push(
      context.df.callActivity("rejectActivityA", input.body),
    );

    // ローカルトランザクションとして、doActivityBを実行する
    const b = yield context.df.callActivity("doActivityB", a);

    // 補償トランザクションとして、rejectActivityBを追加する
    compensatingTransactions.push(
      context.df.callActivity("rejectActivityB", b),
    );

    // ローカルトランザクションとして、doActivityCを実行する
    const c = yield context.df.callActivity("doActivityC", b);

    // 補償トランザクションとして、rejectActivityCを追加する
    compensatingTransactions.push(
      context.df.callActivity("rejectActivityC", c),
    );

    // Sagaオーケストレーターのクライアントに正常終了のレスポンスを返す
    return {
      status: 200,
      body: "The process has succeeded.",
    };
  } catch (e) {
    // 例外発生時に補償トランザクションをまとめて実行する
    yield context.df.Task.all(compensatingTransactions);

    // 例外がAPIError型の場合、そのまま返す
    if (isAPIError(e)) return e;

    // その他の例外は500エラーとして、返す
    return {
      status: 500,
      body: (e as Error).message,
    };
  }
});

▼ 実装例 (Goのslice)

この例では、アウトボックスパターンでSagaオーケストレーションを実装している。

ちょっと難しいかな...

package saga

import (
    "context"
    "database/sql"
    "fmt"
    "github.com/google/uuid"
    "go.example/saga/pkg/jsonmap"
)

type SagaState struct {
    ID          uuid.UUID
    Version     int8
    Type        string
    Payload     jsonmap.JSONMap
    CurrentStep SagaStep
    StepStatus  jsonmap.JSONMap
    SagaStatus  SagaStatus
}

// Repository
type Repository interface {
    Persist(ctx context.Context, tx *sql.Tx, ss SagaState) error
    Update(ctx context.Context, tx *sql.Tx, ss SagaState) error
    QueryByID(ctx context.Context, tx *sql.Tx, ID string) (*SagaState, error)
}

func NewSaga(sagaType string, payload jsonmap.JSONMap, currentStep SagaStep) SagaState {
    // ステートマシン
    return SagaState{
        ID:          uuid.New(),
        Version:     1,
        Type:        sagaType,
        Payload:     payload,
        CurrentStep: currentStep,
        StepStatus:  jsonmap.JSONMap{string(currentStep): SagaStepStatusStarted},
        SagaStatus:  SagaStatusStarted,
    }
}

// NextSagaStatus evaluate current SagaStepStatuses and set SagaStatus
func (s *SagaState) NextSagaStatus() {
    ss := map[string]bool{}
    for _, v := range s.StepStatus {
        ss[fmt.Sprintf("%v", v)] = true
    }

    if ss[SagaStepStatusSucceeded] && len(ss) == 1 {
        s.SagaStatus = SagaStatusCompleted
    } else if (ss[SagaStepStatusStarted] && len(ss) == 1) || (ss[SagaStepStatusSucceeded] && ss[SagaStepStatusStarted] && len(ss) == 2) {
        s.SagaStatus = SagaStatusStarted
    } else if !ss[SagaStepStatusCompensating] {
        s.SagaStatus = SagaStatusAborted
    } else {
        s.SagaStatus = SagaStatusAborting
    }
}

// IncrementVersion
func (s *SagaState) IncrementVersion() {
    s.Version++
}

// SagaStatus represents the saga status based on steps status
type SagaStatus string

// SagaStatus type
const (
    SagaStatusStarted   = "STARTED"
    SagaStatusAborting  = "ABORTING"
    SagaStatusAborted   = "ABORTED"
    SagaStatusCompleted = "COMPLETED"
)

// SagaStepStatus represent current saga step status
type SagaStepStatus string

// SagaStepStatus type
const (
    SagaStepStatusStarted      = "STARTED"
    SagaStepStatusFailed       = "FAILED"
    SagaStepStatusSucceeded    = "SUCCEEDED"
    SagaStepStatusCompensating = "COMPENSATING"
    SagaStepStatusCompensated  = "COMPENSATED"
)

// SagaStep define saga service step in order to follow
type SagaStep string

// NextSagaStep find saga next step from provided steps and current saga step
func NextSagaStep(steps []SagaStep, currentStep SagaStep) SagaStep {
    if currentStep == "" {
        return steps[0]
    }

    curr := -1
    for i := 0; i < len(steps); i++ {
        if steps[i] == currentStep {
            curr = i
            break
        }
    }

    if curr == -1 || curr+1 == len(steps) {
        return ""
    }

    return steps[curr+1]
}

// PrevSagaStep find saga previous step from provided steps and current saga step
func PrevSagaStep(steps []SagaStep, currentStep SagaStep) SagaStep {
    curr := -1
    for i := 0; i < len(steps); i++ {
        if steps[i] == currentStep {
            curr = i
            break
        }
    }

    if curr == -1 || curr-1 == -1 {
        return ""
    }

    return steps[curr-1]
}
package reservation

import (
    "context"
    "database/sql"
    "fmt"
    "go.example/saga/pkg/saga"
    "go.example/saga/pkg/store/postgres"
    "go.example/saga/reservation/pkg/model"
    "log"
)

...

func (c *Controller) PostReservation(ctx context.Context, cmd model.ReservationCmd) (*model.Reservation, error) {
    r := model.NewReservation(cmd.HotelID, cmd.RoomID, cmd.GuestID, cmd.PaymentDue, cmd.StartDate, cmd.EndDate, cmd.CreditCardNO)

    if _, err := c.store.Transact(ctx, func(tx *sql.Tx) (interface{}, error) {

        // persist reservation
        if err := c.repository.Add(ctx, tx, r); err != nil {
            return nil, err
        }

        payload := r.ToJSONMap()
        currStep := saga.NextSagaStep(sagaSteps, "")
        sagaState := saga.NewSaga(roomReservationSaga, payload, currStep)

        if err := c.sagaRepository.Persist(ctx, tx, sagaState); err != nil {
            return nil, err
        }

        outboxEvent := postgres.NewEvent(sagaState.ID.String(), string(currStep), postgres.RequestEventType, payload)
        if err := outboxEvent.Persist(ctx, tx); err != nil {
            return nil, err
        }

        log.Printf("Started Saga for reservationID %s sagaID %s", r.ID, sagaState.ID)

        return r, nil
    }); err != nil {
        return nil, err
    }

    return r, nil
}

...


Outboxパターン

▼ Outboxパターンとは

Outboxパターンでは、Sagaログテーブルに加えて、Outboxテーブルを作成する。

メッセージリレイを使用して、OutboxテーブルのイベントをSagaオーケストレーターのクライアントやマイクロサービスに通知する。

Id AggregateType AggregateId Type Payload
ec6e Order 123 OrderCreated {"id": 123, ...}
8af8 Order 456 OrderDetailCanceled {"id": 456, ...}
890b Customer 789 InvoiceCreated {"id": 789, ...}

saga-pattern_orchestrator_outbox-pattern

▼ Polling publisherパターンとは

DBのイベントチェッカー (例:Debezium) を使用して、Outboxテーブルのイベントを検知する。

また、検知したイベントをメッセージブローカー (例:Apache Kafka、RabbitMQなど) にパブリッシュする。

Sagaオーケストレーターのクライアントやマイクロサービス側では、これをポーリングする。

saga-pattern_orchestrator_outbox-pattern_polling-publisher

▼ Transaction log tailingパターンとは

トランザクションログ (例:MySQLバイナリログ、PostgreSQL WALなど) を追跡する。


03-03. Choreography (コレオグラフィ) ベースのSagaパターン

コレオグラフィベースのSagaパターンとは

マイクロサービスは、自身のローカルトランザクションを完了させた後に、次のマイクロサービスをコールする。

各マイクロサービス間の通信方式は、パブリッシュ/サブスクライブパターンにする必要がある。

そのために、マイクロサービス間にメッセージキューやメッセージブローカーを配置する。

choreography


実装パターン (パブリッシュ/サブスクライブパターン)

▼ 自前で実装する場合

*実装例*

以下のリポジトリを参考にせよ。

choreography_example

▼ OSSを使用する場合

コレオグラフィのOSS (例:Debezium、Maxwellなど) を使用する。

Sagaオーケストレーターのドメインモデリングにイベントソーシングパターンを採用する必要がある。

▼ クラウドプロバイダーのマネージドサービスを使用する場合

各マイクロサービス間の通信方式は、パブリッシュ/サブスクライブパターンにする必要がある。

  • AWS Lambda、マイクロサービス間のパブリッシュ/サブスクライブAWSリソース (例:AWS EventBridge、AWS SQS、AWS SNS)
  • Google Cloud Run Functions、マイクロサービス間のパブリッシュ/サブスクライブGoogle Cloudリソース (例:Google Eventarc)


後続マイクロサービスとの通信方式

各マイクロサービスにパブリッシュとサブスクライブを処理する責務を持たせる。

choreography_message-queue

saga-pattern


03-04. 並列パイプラインベースのSagaパターン

並列パイプラインベースのSagaパターンとは

オーケストレーションベースとコレオグラフィベースのパターンを組み合わせる。

イベント駆動のマイクロサービスを連続的にコールするルーターサービスを配置する。


実装パターン

▼ OSSを使用する場合

並列パイプラインのOSS (例:Apache Camelなど) を使用する。


04. TCCパターン

TCCパターンとは

各マイクロサービスに各処理フェーズ (Try、Confirm、Cancel) を実行するAPIを実装し、APIを順に実行する。

Tryフェーズでは、ローカルトランザクションを開始する。

Confirmフェーズでは、ローカルトランザクションをコミットする。

Cancelフェーズでは、以前のフェーズで問題が合った場合に、ロールバックを実施する。


05. クエリパターン

API Composition

複数のマイクロサービスにまたがるread処理がある場合に必要になる。

API Compositionサービスは、クライアントからのリクエストを受信し、複数のマイクロサービスに連鎖的にリクエストをルーティングする。

メモリ上で取得結果を結合し、クライアントにレスポンスする。