benthos/코드 분석

input - generate

justbagmeg 2023. 6. 28. 00:27

컨텍스트 없이 실행되는 [Bloblang](/docs/guides/bloblang/ about) 매핑을 사용해 지정된 interval마다 메시지를 생성한다.

generateReader는 다음처럼 생겼다.

type generateReader struct {  
    remaining   int  
    batchSize   int  
    limited     bool  
    firstIsFree bool  
    exec        *mapping.Executor  
    timer       *time.Ticker  
    schedule    *cron.Schedule  
    location    *time.Location  
}

generateReader를 생성하는 newGenerateReader함수의 프로토타입.
conf는 default 값 + 설정 값이 적용된 상태.

func newGenerateReader(mgr bundle.NewManagement, conf input.GenerateConfig) (*generateReader, error) 

Interval을 파싱하는 부분.
이 부분이 나는 대단하다고 느껴진 부분이다. 왜? 나라면 숫자만 떡하니 적어서 초단위 아니면 마이크로초 단위로 동작하게 구현했을거야. 그런데 이 여기서는 UTC timezone 사용, @yearly @every, cron 표현식 등을 모두 사용할 수 있도록 구현했다.
그래서 아래와 같은 것들이 다 가능하다.

interval: '@every 1s'
interval: 0,*/2 * * * * *
interval: 2m

cron 표현이 헷갈리는데, [초,분,시,dom,month,dow] 순이다. 그리고 dow는 [일,월,화,수,목,금,토] 순이다. 일주일의 시작은 일요일 ....!!!

if len(conf.Interval) > 0 {
    if duration, err = time.ParseDuration(conf.Interval); err != nil {
        // interval is not a duration so try to parse as a cron expression
        var cerr error
        if schedule, location, cerr = parseCronExpression(conf.Interval); cerr != nil {
            return nil, fmt.Errorf("failed to parse interval as duration string: %v, or as cron expression: %w", err, cerr)
        }
        firstIsFree = false
        duration = getDurationTillNextSchedule(*schedule, location)
        // 다음 스케줄 시간까지의 시간 차.
    }
    if duration > 0 {
        timer = time.NewTicker(duration)
    }
}

line138 부터 분석 재개...

'benthos > 코드 분석' 카테고리의 다른 글

manager  (0) 2023.02.01
package import  (0) 2023.01.28