Snowflake Connector for MySQL のレプリケーションの構成

注釈

Snowflake Connector for MySQL は コネクタ規約 に従います。

Snowflake Connector for MySQL のレプリケーションを構成するプロセスは、次のステップに従います。

関連項目:

データソースを追加する

データソースは、単一の MySQL サーバーを表します。 Snowflake Connector for MySQL は複数のデータソースからデータをレプリケートできます。レプリケーションを開始する前に、少なくとも1つのデータソースを追加する必要があります。

Snowflake Connector for MySQL は、各データソースからSnowflakeの個別の宛先データベースにデータをレプリケートします。同じ宛先データベースを複数のデータソースで使用することはできません。

データソースを追加するには、次のコマンドを実行します。

CALL PUBLIC.ADD_DATA_SOURCE('<data_source_name>', '<dest_db>');
Copy

条件:

data_source_name

データソースの一意の名前を指定します。この名前は、エージェント構成で定義されたデータソースの名前と一致する必要があります。選択した名前が次の要件に準拠していることを確認してください。

  • 名前に大文字(A~Z)と10進数(0~9)のみが含まれる。

  • 名前が50文字を超えていない。

dest_db

Snowflakeの宛先データベースの名前を指定します。データベースが存在しない場合は、プロシージャによって自動的に作成されます。それ以外の場合は、コネクタは既存のデータベースを使用します。その場合、データソースを追加する前に、データベースに対する権限をコネクタに付与する必要があります。

注釈

一度追加したデータソースの名前を変更または削除することはできません。

(オプション)宛先データベースに対する権限の付与

既存のデータベースを宛先データベースとして使用するには、 Snowflake Connector for MySQL にそのデータベースに対する CREATE SCHEMA 権限が必要です。コネクタは、インジェストされた MySQL データを含むスキーマとテーブルの所有者です。

CREATE SCHEMA 権限を付与するには、次のコマンドを実行します。

GRANT CREATE SCHEMA ON DATABASE <dest_db> TO APPLICATION <app_db_name>;
Copy

条件:

dest_db

データソースからのデータの宛先データベースの名前を指定します。

app_db_name

コネクタデータベースの名前を指定します。

他のデータソースを追加する

いつでも新しいデータソースを追加できます。エージェントの実行中に新しいデータソースを追加するには、次を実行します。

  1. データソースを追加する

  2. エージェントが停止していることを確認します。

  3. 新しいデータソースへのエージェント接続を構成する

  4. エージェントのDockerコンテナを実行する

レプリケーションのソーステーブルを追加する

レプリケーションのソーステーブルを追加するには、次のコマンドを実行します。

CALL PUBLIC.ADD_TABLES('<data_source_name>', '<schema_name>', <table_names_array>);
Copy

条件:

data_source_name

ソーステーブルを含むデータソースの名前を指定します。

schema_name

ソーステーブルのスキーマの名前を指定します。

table_names_array
テーブル名の配列を指定します。

ARRAY_CONSTRUCT('<table_name>', '<other_table_name>', ...)

ソーステーブルを追加すると、次のような効果があります。

  • schema_nametable_name は、ソースデータベースからソースデータをレプリケートするためのスキーマ名とテーブル名としてそれぞれ使用されます。

注釈

1回のプロシージャ呼び出しで、同じデータソースとスキーマから多数のテーブルを追加できます。

注釈

スキーマ名とテーブル名は一致する必要があります

ソースデータベースで定義されているとおりに、大文字と小文字を含めて正確なテーブル名とスキーマ名を使用する必要があります。指定した名前は、ソースデータベースで SELECT クエリを生成するためにそのまま使用されます。MySQL サーバー名では大文字と小文字が区別されるため、大文字と小文字が異なると「テーブルが存在しません」という例外が発生する可能性があります。

最近削除されたテーブル

テーブルが最近削除された場合(レプリケーションからテーブルを削除する)、構成のこの時点ではテーブルを再度追加できない可能性があります。 Tables are not ready to be re-added のメッセージが表示されるエラーが発生した場合は、数分待ってから再試行してください。

列フィルターを含むソーステーブルを追加する

フィルターされた列を含むソーステーブルを追加するには、次のコマンドを実行します。

CALL PUBLIC.ADD_TABLE_WITH_COLUMNS('<data_source_name>', '<schema_name>', '<table_name>', <included_columns_array>, <excluded_columns_array>);
Copy

条件:

data_source_name

ソーステーブルを含むデータソースの名前を指定します。

schema_name

ソーステーブルのスキーマの名前を指定します。

table_name

ソーステーブルの名前を指定します。

included_columns
レプリケートする列名の配列を指定します。

ARRAY_CONSTRUCT('<column_name>', '<other_column_name>', ...)

excluded_columns
無視する列名の配列を指定します。

ARRAY_CONSTRUCT('<column_name>', '<other_column_name>', ...)

注意

プロシージャに渡される列名は、ソースデータベースで表現されているとおりに、大文字と小文字が区別される必要があります。

上記のプロシージャには次のルールが適用されます。

  • フィルタリングは、データがSnowflakeにインジェストされる前に実行されます。スナップショットロードと増分ロードの両方で、選択した列のデータのみがSnowflakeにストリーミングされます。

  • included_columnsexcluded_columns は、単なるマスクとして機能します。この方法では、指定された列が存在しない場合でもコネクタはエラーをスローしません。存在しない列のマスクは単純に無視されます。

  • included_columnsexcluded_columns の両方を提供することはできません。 included_columns をリストする場合は、 excluded_columns を空にする必要があり、その逆も同様です。

  • 両方の配列が空でなく、競合する列がない場合、 included_columnsexcluded_columns より優先されます。

  • 列が included_columnsexcluded_columns の両方に表示される場合、プロシージャはエラーをスローします。

  • included_columnsexcluded_columns の両方が空の配列の場合、利用可能なすべての列がインジェストされます。

  • 構成に関係なく、主キー列は常にレプリケートされます。

たとえば、A、B、C、Dという列を持つソーステーブルがあり、Aが主キー列であるとします。

含まれる列

除外される列

期待される結果

[]

[]

[A, B, C, D]

[A, B]

[]

[A, B]

[B]

[]

[A, B]

[]

[C, D]

[A, B]

[]

[A, B]

[A, C, D]

[A, B, Z]

[]

[A, B]

[A]

[A]

エラー

レプリケーションからテーブルを削除する

レプリケーションからソーステーブルを削除するには、次のコマンドを実行します。

CALL PUBLIC.REMOVE_TABLE('<data_source_name>', '<schema_name>', '<table_name>');
Copy

条件:

data_source_name

ソーステーブルを含むデータソースの名前を指定します。

schema_name

ソーステーブルのスキーマの名前を指定します。

table_name

ソーステーブルの名前を指定します。

注釈

レプリケーションからテーブルを削除するプロセスには数分かかります。完了すると、テーブルはコネクタの PUBLIC.REPLICATION_STATE ビューから消えます(Snowflake Connector for MySQL のモニター を参照)。そうすることで、レプリケーションを再度有効にできるようになります。

この時点では、宛先テーブルはまだ コネクタアプリケーションによって所有されています。宛先テーブルを削除または変更する場合は、まずその所有権をアカウント内のロールに譲渡する必要があります。 ACCOUNTADMIN として次のクエリを実行します。

GRANT OWNERSHIP ON TABLE <destination_database_name>.<schema_name>.<table_name>
  TO ROLE <role_name>
  REVOKE CURRENT GRANTS;
Copy

注釈

レプリケーションからテーブルを削除してその FAILED 状態を修正する場合は、レプリケーションを再度有効にする前に、宛先テーブルの名前を変更するか手動で削除する必要もあります。

スケジュールされたレプリケーションを構成する

コネクタは、連続モードとスケジュールモードの2つのモードでデータをレプリケートできます。デフォルトは連続モードです。

連続モードでは、可能な限り高速にデータをレプリケートします。運用ウェアハウスを24時間365日稼働させる必要があり、進行中のレプリケーションがなくても不要なコストが発生する可能性があります。

スケジュールモードでは、構成されたスケジュールに従ってデータがレプリケートされます。データを連続でレプリケートする必要がない場合や、データ量が少ない場合(コネクタがほとんどの時間アイドル状態になる)に、レプリケーションコストを削減することを目的としています。

スケジュールモードでは、レプリケーション完了の概念が導入されます。スナップショットレプリケーションは、 SELECT <列> FROM <TABLE> クエリの実行が開始されると開始され、データが宛先テーブルにレプリケートされると終了します。増分レプリケーションは、以前に保存された変更データキャプチャ(CDC)ポインタから始まりますが、データは連続でインジェストされるため、終了はありません。したがって、コネクタは、以前に保存された CDC ポインタから最新の CDC ポインタ(レプリケーションの開始時に決定)までデータをレプリケートします。このようにして、コネクタはスケジュールされたモードでレプリケーションの完了を提供します。

スケジュールモードでは、運用ウェアハウスを一時停止することでレプリケーションコストが削減されます。各ソーステーブルのレプリケーションが完了すると、ウェアハウスを一時停止できます。スケジュールに従って、レプリケーションの次の実行までウェアハウスは一時停止されたままになります。

注釈

一度に1つのレプリケーションのみを実行できます。次の実行スケジュールが開始する際に、レプリケーションがまだ実行されている場合、そのスケジュールされた時間はスキップされます。

スケジュールモードを有効にするには、次のコマンドを実行します。

CALL PUBLIC.ENABLE_SCHEDULED_REPLICATION('<data_source_name>', '<schedule>');
Copy

条件:

data_source_name

データソースの名前を指定します。

schedule

コネクタがデータソースのレプリケーションを実行するスケジュールまたは頻度を指定します。許容される最小頻度は10分です。スケジュールまたは頻度の指定の詳細については、 SCHEDULE パラメーター をご参照ください。

スケジュールの例:

  • 60 MINUTE

    レプリケーションを60分ごとにスケジュールします。

  • USING CRON 0 2 * * * UTC

    レプリケーションを毎日午前2時(UTC)にスケジュールします。

スケジュールモードを無効にするには、次のコマンドを実行します。

CALL PUBLIC.DISABLE_SCHEDULED_REPLICATION('<data_source_name>');
Copy

条件:

data_source_name

データソースの名前を指定します。

現在のスケジュールを確認するには、 データソースの表示 をご参照ください。

注釈

運用ウェアハウスは、すべてのデータソースからのレプリケーションを処理します。各データソースからの各ソーステーブルのレプリケーションが完了した場合にのみ、ウェアハウスを一時停止できます。つまり、自動停止が適切に機能するには、すべてのデータソースに対してスケジュールモードを有効にする必要があります。

警告

スケジュールされた実行を完了し、運用ウェアハウスを一時停止するには、コネクタがソースデータベースで安定したアクティビティを実行する必要があります。データベースにデータを挿入、削除、更新するすべての操作でアクティビティが生成されます。この操作は、レプリケーションに追加されたテーブルに限らず、任意のテーブルに適用できます。挿入または更新された行の数は関係ありません。

次のステップ

これらの手順を完了したら、 Snowflakeで MySQL データを表示する のステップに従います。