package main import ( "context" "flag" "fmt" "log" "log/slog" "math" "math/rand" "net/http" "os" "os/signal" "strings" "syscall" "time" ) var logger *slog.Logger type Response struct { Body []byte Header http.Header Cookies []*http.Cookie } func main() { var baseUrl, logLevel string var duration time.Duration var workerCount int flag.StringVar(&baseUrl, "base-url", "", "") flag.DurationVar(&duration, "duration", 0, "e.g. 10s, 5m [0 = forever]") flag.IntVar(&workerCount, "workers", 1, "") flag.StringVar(&logLevel, "log-level", "INFO", "DEBUG/INFO/ERROR") flag.Parse() if baseUrl == "" { log.Fatalf("required option is missing: run with -h") } baseUrl = strings.TrimSuffix(baseUrl, "/") var loglevel slog.LevelVar switch strings.ToUpper(logLevel) { case "DEBUG": loglevel.Set(slog.LevelDebug) case "INFO": loglevel.Set(slog.LevelInfo) case "ERROR": loglevel.Set(slog.LevelError) } logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: &loglevel})) dt := http.DefaultTransport.(*http.Transport) tr := dt.Clone() tr.MaxIdleConnsPerHost = 20 s := &MyTask{ hc: &http.Client{ Transport: tr, }, baseUrl: baseUrl, } runner := Runner{} ctx := context.Background() ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) defer stop() if duration != 0 { ctxto, cancel := context.WithTimeout(ctx, duration) defer cancel() ctx = ctxto } errChan := make(chan error) if err := runner.Run(ctx, errChan, workerCount, s.Execute); err != nil { panic(err) } for { select { case <-ctx.Done(): fmt.Println("Finish!!") return case err := <-errChan: if err != nil { fmt.Printf("error occured!! %v", err) } } } } type Task func() error type Runner struct{} func (r *Runner) Run(ctx context.Context, errChan chan<- error, workers int, tasks ...Task) error { totalTaskNum := len(tasks) for workerNum := 0; workerNum < workers; workerNum++ { workerNum := workerNum go func() { index := 0 loopCount := 1 delay := calculateExponentialBackoffWithJitter(workerNum, time.Second, 10*time.Second) time.Sleep(delay) for { select { case <-ctx.Done(): return default: task := tasks[index] time.Sleep(delay) errChan <- r.safeRun(task) index++ if index == totalTaskNum { loopCount++ index = 0 } } } }() } return nil } func (r *Runner) safeRun(task Task) (returnErr error) { defer func() { // don't panic if r := recover(); r != nil { err, ok := r.(error) if !ok { err = fmt.Errorf("%v", r) } returnErr = err } }() return task() } func calculateExponentialBackoffWithJitter(attempt int, baseDelay, maxDelay time.Duration) time.Duration { maxf := float64(maxDelay) basef := float64(baseDelay) durf := basef * math.Pow(2, float64(attempt)) durf = rand.Float64()*(durf-basef) + basef if durf > maxf { durf = maxf } return time.Duration(durf) }