チャネルの使用
これまで、Redux Storeとの通信には `take` および `put` エフェクトを使用してきました。チャネルは、これらのエフェクトを一般化して、外部イベントソースとの通信やSaga間の通信を可能にします。また、Storeからの特定のアクションをキューに入れるためにも使用できます。
このセクションでは、以下について説明します。
Storeからの特定のアクションをバッファリングするために、`yield actionChannel` エフェクトを使用する方法。
外部イベントソースに `take` エフェクトを接続するために、`eventChannel` ファクトリ関数を使用する方法。
汎用的な `channel` ファクトリ関数を使用してチャネルを作成し、それを `take` / `put` エフェクトで使用して2つのSaga間で通信する方法。
`actionChannel` エフェクトの使用
典型的な例を見てみましょう
import { take, fork, ... } from 'redux-saga/effects'
function* watchRequests() {
while (true) {
const {payload} = yield take('REQUEST')
yield fork(handleRequest, payload)
}
}
function* handleRequest(payload) { ... }
上記の例は、典型的な *watch-and-fork* パターンを示しています。`watchRequests` Sagaは、ブロッキングを回避し、Storeからのアクションを見逃さないように `fork` を使用しています。`handleRequest` タスクは、`REQUEST` アクションごとに作成されます。したがって、多くのアクションが急速に発生した場合、多くの `handleRequest` タスクが同時に実行される可能性があります。
ここで、要件が次のようになったと想像してください。`REQUEST` をシリアルに処理したいとします。任意の時点で4つのアクションがある場合、最初のアクション `REQUEST` を処理し、このアクションが完了してからのみ、2番目のアクションを処理します。以下同様です。
したがって、未処理のアクションをすべて *キュー* に入れ、現在のリクエストの処理が完了したら、キューから次のメッセージを取得したいと考えます。
Redux-Sagaは、これを処理できる小さなヘルパーエフェクト `actionChannel` を提供します。これを使用して、前の例をどのように書き直すことができるかを見てみましょう
import { take, actionChannel, call, ... } from 'redux-saga/effects'
function* watchRequests() {
// 1- Create a channel for request actions
const requestChan = yield actionChannel('REQUEST')
while (true) {
// 2- take from the channel
const {payload} = yield take(requestChan)
// 3- Note that we're using a blocking call
yield call(handleRequest, payload)
}
}
function* handleRequest(payload) { ... }
まず、アクションチャネルを作成します。`yield actionChannel(pattern)` を使用します。ここで、パターンは、以前に `take(pattern)` で説明したのと同じルールを使用して解釈されます。2つの形式の違いは、Sagaがそれらを取得する準備ができていない場合(たとえば、API呼び出しでブロックされている場合)、`actionChannel` は **受信メッセージをバッファリングできる** ことです。
次は `yield take(requestChan)` です。Redux Storeから特定のアクションを取得するために `pattern` を使用することに加えて、`take` はチャネルでも使用できます(上記では、特定Reduxアクションからチャネルオブジェクトを作成しました)。`take` は、チャネルでメッセージが利用可能になるまでSagaをブロックします。また、基になるバッファにメッセージが保存されている場合は、すぐに再開できます。
重要な注意点は、ブロッキング `call` をどのように使用しているかということです。Sagaは、`call(handleRequest)` が戻るまでブロックされたままになります。しかし、その間、Sagaがまだブロックされている間に他の `REQUEST` アクションがディスパッチされた場合、それらは `requestChan` によって内部的にキューに入れられます。Sagaが `call(handleRequest)` から再開し、次の `yield take(requestChan)` を実行すると、takeはキューに入れられたメッセージで解決します。
デフォルトでは、`actionChannel` はすべての受信メッセージを無制限にバッファリングします。バッファリングをより詳細に制御したい場合は、エフェクトクリエーターに Buffer 引数を指定できます。Redux-Sagaは、いくつかの一般的なバッファ(none、dropping、sliding)を提供していますが、独自のバッファ実装を提供することもできます。詳細については、APIドキュメントを参照してください。
たとえば、最新の5つのアイテムのみを処理する場合は、次のように使用できます。
import { buffers } from 'redux-saga'
import { actionChannel } from 'redux-saga/effects'
function* watchRequests() {
const requestChan = yield actionChannel('REQUEST', buffers.sliding(5))
...
}
外部イベントに接続するための `eventChannel` ファクトリーの使用
`actionChannel`(エフェクト)のように、`eventChannel`(エフェクトではなくファクトリー関数)は、Redux Store以外のイベントソースからのイベントのチャネルを作成します。
この基本的な例では、インターバルからチャネルを作成します。
import { eventChannel, END } from 'redux-saga'
function countdown(secs) {
return eventChannel(emitter => {
const iv = setInterval(() => {
secs -= 1
if (secs > 0) {
emitter(secs)
} else {
// this causes the channel to close
emitter(END)
}
}, 1000);
// The subscriber must return an unsubscribe function
return () => {
clearInterval(iv)
}
}
)
}
`eventChannel` の最初の引数は、*サブスクライバー* 関数です。サブスクライバーの役割は、外部イベントソースを初期化し(上記では `setInterval` を使用)、提供された `emitter` を呼び出すことによって、ソースからのすべての受信イベントをチャネルにルーティングすることです。上記の例では、毎秒 `emitter` を呼び出しています。
注意:イベントチャネルを介してnullまたは未定義を渡さないように、イベントソースをサニタイズする必要があります。数値を渡すことは問題ありませんが、Reduxアクションのようにイベントチャネルデータを構造化することをお勧めします。`number` よりも `{ number }` のように。
`emitter(END)` の呼び出しにも注意してください。これは、チャネルが閉じられたことをチャネルコンシューマーに通知するために使用します。つまり、このチャネルを介して他のメッセージは送信されません。
Sagaからこのチャネルをどのように使用できるかを見てみましょう。(これは、リポジトリのキャンセル可能なカウンターの例から引用されています。)
import { take, put, call } from 'redux-saga/effects'
import { eventChannel, END } from 'redux-saga'
// creates an event Channel from an interval of seconds
function countdown(seconds) { ... }
export function* saga() {
const chan = yield call(countdown, value)
try {
while (true) {
// take(END) will cause the saga to terminate by jumping to the finally block
let seconds = yield take(chan)
console.log(`countdown: ${seconds}`)
}
} finally {
console.log('countdown terminated')
}
}
したがって、Sagaは `take(chan)` を生成しています。これにより、メッセージがチャネルに送信されるまでSagaがブロックされます。上記の例では、`emitter(secs)` を呼び出したときに対応します。また、`try/finally` ブロック内で `while (true) {...}` ループ全体を実行していることにも注意してください。インターバルが終了すると、カウントダウン関数は `emitter(END)` を呼び出してイベントチャネルを閉じます。チャネルを閉じると、そのチャネルからの `take` でブロックされたすべてのSagaが終了します。上記の例では、Sagaを終了すると、その `finally` ブロックにジャンプします(提供されている場合。それ以外の場合は、Sagaが終了します)。
サブスクライバーは `unsubscribe` 関数を返します。これは、イベントソースが完了する前にサブスクライブを解除するためにチャネルによって使用されます。イベントチャネルからメッセージを消費するSaga内で、イベントソースが完了する前に *早期に終了* したい場合(たとえば、Sagaがキャンセルされた場合)、`chan.close()` を呼び出してチャネルを閉じ、ソースからサブスクライブを解除できます。
たとえば、Sagaがキャンセルをサポートできるようにすることができます
import { take, put, call, cancelled } from 'redux-saga/effects'
import { eventChannel, END } from 'redux-saga'
// creates an event Channel from an interval of seconds
function countdown(seconds) { ... }
export function* saga() {
const chan = yield call(countdown, value)
try {
while (true) {
let seconds = yield take(chan)
console.log(`countdown: ${seconds}`)
}
} finally {
if (yield cancelled()) {
chan.close()
console.log('countdown cancelled')
}
}
}
これは、イベントチャネルを使用してWebSocketイベントをSagaに渡す方法の別の例です(たとえば、socket.ioライブラリを使用)。サーバーメッセージ `ping` を待って、しばらく遅れて `pong` メッセージで応答するとします。
import { take, put, call, apply, delay } from 'redux-saga/effects'
import { eventChannel } from 'redux-saga'
import { createWebSocketConnection } from './socketConnection'
// this function creates an event channel from a given socket
// Setup subscription to incoming `ping` events
function createSocketChannel(socket) {
// `eventChannel` takes a subscriber function
// the subscriber function takes an `emit` argument to put messages onto the channel
return eventChannel(emit => {
const pingHandler = (event) => {
// puts event payload into the channel
// this allows a Saga to take this payload from the returned channel
emit(event.payload)
}
const errorHandler = (errorEvent) => {
// create an Error object and put it into the channel
emit(new Error(errorEvent.reason))
}
// setup the subscription
socket.on('ping', pingHandler)
socket.on('error', errorHandler)
// the subscriber must return an unsubscribe function
// this will be invoked when the saga calls `channel.close` method
const unsubscribe = () => {
socket.off('ping', pingHandler)
}
return unsubscribe
})
}
// reply with a `pong` message by invoking `socket.emit('pong')`
function* pong(socket) {
yield delay(5000)
yield apply(socket, socket.emit, ['pong']) // call `emit` as a method with `socket` as context
}
export function* watchOnPings() {
const socket = yield call(createWebSocketConnection)
const socketChannel = yield call(createSocketChannel, socket)
while (true) {
try {
// An error from socketChannel will cause the saga jump to the catch block
const payload = yield take(socketChannel)
yield put({ type: INCOMING_PONG_PAYLOAD, payload })
yield fork(pong, socket)
} catch(err) {
console.error('socket error:', err)
// socketChannel is still open in catch block
// if we want end the socketChannel, we need close it explicitly
// socketChannel.close()
}
}
}
注意:eventChannelのメッセージはデフォルトでバッファリングされません。チャネルのバッファリング戦略を指定するには、eventChannelファクトリーにバッファを提供する必要があります(例:`eventChannel(subscriber, buffer)`)。詳細については、APIドキュメントを参照してください。
このWebSocketの例では、ソケットエラーが発生すると、socketChannelがエラーを発生させ、このイベントチャネルで待機している `yield take(socketChannel)` を中止します。エラーを発生させてもデフォルトではチャネルは中止されないことに注意してください。エラー後にチャネルを終了する場合は、明示的にチャネルを閉じる必要があります。
Saga間で通信するためのチャネルの使用
アクションチャネルとイベントチャネルに加えて、デフォルトでどのソースにも接続されていないチャネルを直接作成することもできます。その後、チャネルに手動で `put` できます。これは、チャネルを使用してSaga間で通信する場合に便利です。
説明するために、リクエスト処理の前の例を見てみましょう。
import { take, fork, ... } from 'redux-saga/effects'
function* watchRequests() {
while (true) {
const {payload} = yield take('REQUEST')
yield fork(handleRequest, payload)
}
}
function* handleRequest(payload) { ... }
watch-and-forkパターンを使用すると、同時に実行されるワーカータスクの数に制限なく、複数のリクエストを同時に処理できることがわかりました。次に、`actionChannel` エフェクトを使用して、同時実行数を一度に1つのタスクに制限しました。
したがって、要件を同時に実行される最大3つのタスクにしたいとします。リクエストを取得し、実行中のタスクが3つ未満の場合は、リクエストをすぐに処理します。それ以外の場合は、タスクをキューに入れ、3つの *スロット* のいずれかが空になるのを待ちます。
以下は、チャネルを使用したソリューションの例です。
import { channel } from 'redux-saga'
import { take, fork, ... } from 'redux-saga/effects'
function* watchRequests() {
// create a channel to queue incoming requests
const chan = yield call(channel)
// create 3 worker 'threads'
for (var i = 0; i < 3; i++) {
yield fork(handleRequest, chan)
}
while (true) {
const {payload} = yield take('REQUEST')
yield put(chan, payload)
}
}
function* handleRequest(chan) {
while (true) {
const payload = yield take(chan)
// process the request
}
}
上記の例では、`channel` ファクトリを使用してチャネルを作成します。デフォルトですべてのメッセージをバッファリングするチャネルが返されます(保留中のテーカーがいる場合を除く。この場合、テーカーはメッセージですぐに再開されます)。
`watchRequests` Sagaは、3つのワーカーSagaをフォークします。作成されたチャネルは、すべてのフォークされたSagaに提供されることに注意してください。`watchRequests` は、このチャネルを使用して、3つのワーカーSagaに作業を *ディスパッチ* します。`REQUEST` アクションごとに、Sagaはペイロードをチャネルに配置します。次に、ペイロードは *空いている* ワーカーによって取得されます。それ以外の場合は、ワーカーSagaが取得する準備が整うまでチャネルによってキューに入れられます。
3つすべてのワーカーは、典型的なwhileループを実行します。各反復で、ワーカーは次のリクエストを取得するか、メッセージが利用可能になるまでブロックします。このメカニズムは、3人のワーカー間の自動負荷分散を提供することに注意してください。高速なワーカーは、低速なワーカーによって速度が低下することはありません。
異なるワーカーと通信するための `multicastChannel` の使用
上記のセクションでは、`channels` を使用して、複数回 *フォーク* された *同じ* ワーカー間でリクエストの負荷を分散する方法を説明しました。チャネルにアクションを `put` し、それを消費する *いくつかの異なる* ワーカーを使用する必要がある場合はどうでしょうか。
異なる副作用を実行するために、着信リクエストを異なるワーカーに渡す必要がある場合があります。
これは、問題を確認できる `channels` を使用した例です。`yield put(chan, payload)` を使用して `channel` に `put` すると、常に *両方* ではなく、*1つ* のワーカー (`logWorker` または `mainWorker`) のみ実行されます。
import { channel } from 'redux-saga'
import { take, fork, call, put } from 'redux-saga/effects'
function* watchRequests() {
// create a channel to queue incoming requests
const chan = yield call(channel)
// fork both workers
yield fork(logWorker, chan)
yield fork(mainWorker, chan)
while (true) {
const { payload } = yield take('REQUEST')
// put here will reach only one worker, not both!
yield put(chan, payload)
}
}
function* logWorker(channel) {
while (true) {
const payload = yield take(channel)
// Log the request somewhere..
console.log('logWorker:', payload)
}
}
function* mainWorker(channel) {
while (true) {
const payload = yield take(channel)
// Process the request
console.log('mainWorker', payload)
}
}
問題を解決するには、すべてワーカーに同時にアクションを *ブロードキャスト* する `multicastChannel` を使用する必要があります。
`multicastChannel` で `take` を使用するには、`take` するアクションをフィルタリングするために使用できる追加の引数 *pattern* を渡す必要があることに注意してください。
以下の例を参照してください
import { multicastChannel } from 'redux-saga'
import { take, fork, call, put } from 'redux-saga/effects'
function* watchRequests() {
// create a multicastChannel to queue incoming requests
const channel = yield call(multicastChannel)
// fork different workers
yield fork(logWorker, channel)
yield fork(mainWorker, channel)
while (true) {
const { payload } = yield take('REQUEST')
yield put(channel, payload)
}
}
function* logWorker(channel) {
while (true) {
// Pattern '*' for simplicity
const payload = yield take(channel, '*')
// Log the request somewhere..
console.log('logWorker:', payload)
}
}
function* mainWorker(channel) {
while (true) {
// Pattern '*' for simplicity
const payload = yield take(channel, '*')
// Process the request
console.log('mainWorker', payload)
}
}