본문 바로가기

빅데이터/Kafka

카프카 커넥트의 태스크 밸런싱 로직, DistributedHerder(양치기) 그리고 IncrementalCooperativeAssignor 내부 동작 소개

Herder; 명사 1. 양치기, 목부

카프카 커넥트는 워커, 커넥터, 태스크로 이루어져 있습니다. 워커는 카프카 커넥트 프로세스를 뜻하며 커넥터와 태스크를 실행시키기 위한 프로세스입니다. 커넥터는 태스크를 실행하는 관리도구로서 여러 태스크를 하나의 파이프라인으로 라이프 사이클을 관리합니다. 태스크는 데이터를 실질적으로 처리하는 부분이라고 볼 수 있습니다. 커넥터에는 1개 이상의 태스크가 포함되며 각 태스크는 프로듀서 또는 컨슈머 역할을 수행합니다.

 

일반적으로 분산모드 커넥트를 운영할 때 커넥터를 실행할 경우 태스크는 여러 워커에서 분산해서 실행됩니다. 예를 들어 5개의 워커로 이루어진 분산 모드 커넥트를 실행하고 7개의 태스크를 가진 커넥터를 실행하면 다음과 같이 할당됩니다.

[worker-0] - [task-0,task-1]
[worker-1] - [task-2,task-3]
[worker-2] - [task-4,task-5]
[worker-3] - [task-6]
[worker-4] - [task-7]

문제는 어떤 로직으로 태스크들이 워커들에 분배가 되는지 입니다. 하나의 커넥트 클러스터에서 하나의 커넥터만 운영 하는 경우는 거의 없다고 볼 수 있습니다. 예를 들어 태스크가 3개인 커넥터 2개를 동시에 띄우게 되는 경우가 있겠죠. 이런 경우 어떻게 분배하는 것이 가장 효율적인 방식일까요?

 

카프카 커넥트 소스를 살펴보면 AbstractHerder라는 가상 클래스가 있는 것을 확인할 수 있습니다. AbstractHerder는 Connector과 task의 라이프사이클을 트래킹하며 훅을 통해 추가 처리를 수행하기도 합니다. AbstracteHerder는 StandaloneHerder와 DistributedHerder로 상속되어 구현되어 있습니다. StandaloneHerder는 단일모드 커넥트를 운영할 때 사용됩니다.

/**
 * <p>
 * The herder interface tracks and manages workers and connectors. It is the main interface for external components
 * to make changes to the state of the cluster. For example, in distributed mode, an implementation of this class
 * knows how to accept a connector configuration, may need to route it to the current leader worker for the cluster so
 * the config can be written to persistent storage, and then ensures the new connector is correctly instantiated on one
 * of the workers.
 * </p>
 * <p>
 * This class must implement all the actions that can be taken on the cluster (add/remove connectors, pause/resume tasks,
 * get state of connectors and tasks, etc). The non-Java interfaces to the cluster (REST API and CLI) are very simple
 * wrappers of the functionality provided by this interface.
 * </p>
 * <p>
 * In standalone mode, this implementation of this class will be trivial because no coordination is needed. In that case,
 * the implementation will mainly be delegating tasks directly to other components. For example, when creating a new
 * connector in standalone mode, there is no need to persist the config and the connector and its tasks must run in the
 * same process, so the standalone herder implementation can immediately instantiate and start the connector and its
 * tasks.
 * </p>
 */
public interface Herder {
...

 

DistributedHerder는 분산 모드 커넥트로 여러 워커를 동시에 운영할 때 사용됩니다. DistributedHerder를 통해 워커와 커넥터는 기본적으로 Round-robin으로 할당하게 되어 있으나 일부 케이스(sticky assignment 적용)에서는 다른 방식으로 동작합니다. 분산 할당하는 기능에 추가하여 각 커넥터와 태스크에 대한 라이프사이클을 관리하는데 로직들도 작성되어 있습니다. 즉, 커넥터의 생애주기에 따라 항상 DistributedHerder가 호출되는 것을 알 수 있습니다. 

 

커넥터와 태스크가 해당 워커에서 실행되어야 되는지를 담은 정보는 ExtendedAssignment 클래스입니다. 해당 클래스는 ConnectProtocol 클래스 내부의 Assignment 스태틱 클래스를 상속받은 클래스인데 쉽게 말해 워커에 해당하는 커넥터 id와 태스크 id를 담고 있다고 보면 됩니다. 

/**
 * The basic assignment of connectors and tasks introduced with V0 version of the Connect protocol.
 */
public static class Assignment {
    public static final short NO_ERROR = 0;
    // Configuration offsets mismatched in a way that the leader could not resolve. Workers should read to the end
    // of the config log and try to re-join
    public static final short CONFIG_MISMATCH = 1;

    private final short error;
    private final String leader;
    private final String leaderUrl;
    private final long offset;
    private final Collection<String> connectorIds;
    private final Collection<ConnectorTaskId> taskIds;
...

이렇게 정의된 ExtendedAssignment는 워커가 실행될 때 태스크의 번호를 토대로 실행됩니다. 참고로 assign되는 태스크의 설정이 변경되지 않아도 되는 일부 태스크는 그대로 남겨두는 로직도 포함되어 있음을 알 수 있습니다. 이 방식은 IncrementalCooperativeAssignor과 연관이 있습니다. IncrementalCooperativeAssignor는 카프카 커넥트 2.3 이후 버전에서 적용된 태스크, 파티션 어싸인 방식입니다. 기존 리밸런싱 때와 같이 모든 파티션 할당이 해제되는 것과 다르게 IncrementalCooperativeAssignor는 기존 데이터 처리는 지속적으로 하고 나머지 변경이 있는 파티션만 재할당과정을 거치는 것입니다.

private void startWork() {
    // Start assigned connectors and tasks
    List<Callable<Void>> callables = new ArrayList<>();

    // The sets in runningAssignment may change when onRevoked is called voluntarily by this
    // herder (e.g. when a broker coordinator failure is detected). Otherwise the
    // runningAssignment is always replaced by the assignment here.
    synchronized (this) {
        log.info("Starting connectors and tasks using config offset {}", assignment.offset());
        log.debug("Received assignment: {}", assignment);
        log.debug("Currently running assignment: {}", runningAssignment);

        for (String connectorName : assignmentDifference(assignment.connectors(), runningAssignment.connectors())) {
            callables.add(getConnectorStartingCallable(connectorName));
        }

        // These tasks have been stopped by this worker due to task reconfiguration. In order to
        // restart them, they are removed just before the overall task startup from the set of
        // currently running tasks. Therefore, they'll be restarted only if they are included in
        // the assignment that was just received after rebalancing.
        log.debug("Tasks to restart from currently running assignment: {}", tasksToRestart);
        runningAssignment.tasks().removeAll(tasksToRestart);
        tasksToRestart.clear();
        for (ConnectorTaskId taskId : assignmentDifference(assignment.tasks(), runningAssignment.tasks())) {
            callables.add(getTaskStartingCallable(taskId));
        }
    }
...

이런 할당 데이터는 IncrementalCooperativeAssignor 클래스 내부에서 이루어집니다. IncrementalCooperativeAssignor를 살펴보면 다음과 같은 2개의 메서드가 있는 것을 확인할 수 있습니다.

/**
 * Perform a round-robin assignment of connectors to workers with existing worker load. This
 * assignment tries to balance the load between workers, by assigning connectors to workers
 * that have equal load, starting with the least loaded workers.
 *
 * @param workerAssignment the current worker assignment; assigned connectors are added to this list
 * @param connectors the connectors to be assigned
 */
protected void assignConnectors(List<WorkerLoad> workerAssignment, Collection<String> connectors) {
    workerAssignment.sort(WorkerLoad.connectorComparator());
    WorkerLoad first = workerAssignment.get(0);

    Iterator<String> load = connectors.iterator();
...


/**
 * Perform a round-robin assignment of tasks to workers with existing worker load. This
 * assignment tries to balance the load between workers, by assigning tasks to workers that
 * have equal load, starting with the least loaded workers.
 *
 * @param workerAssignment the current worker assignment; assigned tasks are added to this list
 * @param tasks the tasks to be assigned
 */
protected void assignTasks(List<WorkerLoad> workerAssignment, Collection<ConnectorTaskId> tasks) {
    workerAssignment.sort(WorkerLoad.taskComparator());
    WorkerLoad first = workerAssignment.get(0);
...

assignConnectors와 assignTasks인데요. 각각 커넥터와 태스크를 어느 워커로 할당할지에 대한 로직이 작성되어 잇는 것을 확인할 수 있습니다. 기본적으로 워커들의 부하를 확인하고 최대한 동일하게 부하를 가져가도록 설정되어 있는 것을 확인할 수 있습니다. 물론 기본 할당방식은 앞서 말한 대로 round-robin이며 워커의 부하를 참고하여 결정합니다. 

 

워커의 부하란 무엇일까요? 내부 코드를 보면 유추할 수 있습니다. WorkerLoad라는 클래스가 새롭게 등장하는 것을 확인할 수 있는데요.  WorkerLoad는 해당 워커가 가지고 있는(실행하고 있는) 태스크와 커넥터 Collection을 가지고 있는 객체입니다.

public static class WorkerLoad {
    private final String worker;
    private final Collection<String> connectors;
    private final Collection<ConnectorTaskId> tasks;

    private WorkerLoad(
            String worker,
            Collection<String> connectors,
            Collection<ConnectorTaskId> tasks
    ) {
        this.worker = worker;
        this.connectors = connectors;
        this.tasks = tasks;
    }
...

assignTasks에서는 해당 워커의 부하를 tasksSize로 측정합니다. 즉, 태스크의 개수에 따라 달라짐을 확인할 수 있습니다.

protected void assignTasks(List<WorkerLoad> workerAssignment, Collection<ConnectorTaskId> tasks) {
    workerAssignment.sort(WorkerLoad.taskComparator());
    WorkerLoad first = workerAssignment.get(0);

    Iterator<ConnectorTaskId> load = tasks.iterator();
    while (load.hasNext()) {
        int firstLoad = first.tasksSize();
        int upTo = IntStream.range(0, workerAssignment.size())
                .filter(i -> workerAssignment.get(i).tasksSize() > firstLoad)
                .findFirst()
                .orElse(workerAssignment.size());
        for (WorkerLoad worker : workerAssignment.subList(0, upTo)) {
            ConnectorTaskId task = load.next();
            log.debug("Assigning task {} to {}", task, worker.worker());
            worker.assign(task);
            if (!load.hasNext()) {
                break;
            }
        }
    }
}

예를 들어 3개의 워커와 11개의 커넥터, 6개의 태스크를 가진 커넥트가 있다고 가정해봅시다.

{ worker=worker0, taskIds=[task-0, task-1]}
{ worker=worker1, taskIds=[task-2, task-3]}
{ worker=worker2, taskIds=[task-4, task-5]}

이때, 신규 태스크 5개(task-6~10)가 추가된다고 하면 다음과 같이 수행됩니다.

- firstLoad는 2 : 0번 워커는 2개의 태스크를 가지고 있으므로

- upTo는 3 : 3개의 워커에 동일하게 어싸인 가능하므로

- 이후 라운드 로빈으로 태스크 할당 진행

최종 할당된 태스크는 다음과 같습니다.

{ worker=worker0, taskIds=[task-0, task-1, task-6, task-9]}
{ worker=worker1, taskIds=[task-2, task-3, task-7, task-10]}
{ worker=worker2, taskIds=[task-4, task-5, task-8]}

또 다른 예로 다음과 같이 할당되어 있는 경우는 어떻게 될까요? 워커가 3개이고 태스크 3개가 비균등 분배되어 있다고 가정해봅시다.

{ worker=worker0, taskIds=[]}
{ worker=worker1, taskIds=[task-1]}
{ worker=worker2, taskIds=[task-2, task-3]}

이때 3개의 추가 태스크가 들어오면 다음과 같이 분배됩니다.

{ worker=worker0, taskIds=[task-4, task-5]}
{ worker=worker1, taskIds=[task-1, task-6]}
{ worker=worker2, taskIds=[task-2, task-3]}

Round-robin과 함께 더 적은 숫자의 태스크를 가진 워커에 태스크가 분배되는 것을 볼 수 있습니다. 이 방식은 직관적이긴 하지만 한편으로는 아쉽습니다. 커스텀 커넥터를 개발해보신 분들은 아시겠지만 모든 커넥터들이 동일한 성능을 가지지는 않습니다. 그러므로 일부 태스크는 다른 태스크에 비해 더 많은 리소스(cpu, ram)을 소모할 수도 있습니다. 이런 경우에는 이런 태스크 분배 방식은 적합하지 않을 수 있습니다. 왜냐면 리소스 사용량이 많은 태스크가 일부 워커에 몰릴 수 있기 때문입니다. 앞으로는 이런 부분이 개선되었으면 좋겠다는 생각이 듭니다.


관련 동작을 더 자세히 알고 싶다면 IncrementalCooperativeAssignorTest의 testAssignTasksWhenBalanced 메서드를 참고하면 됩니다.

 

반응형