1. 指南

1.1. 目标

本项目旨在为 Dart 语言生态系统提供一个**高性能**且**直观**的作业调度框架,供全球任何人使用。

该框架的开发理念是遵循软件工程界长期提倡的“DRY”(不要重复)、“KISS”(保持简单)和“YAGNI”(你不会需要它)原则。

1.2. 特性

  • 简单直观的作业调度。
  • 无复杂的配置文件。
  • 支持 Cron 格式的调度。
  • 支持强大的日志记录功能,且可自定义。
  • 支持轻松定义并行进程。
  • 支持作业和步骤的条件分支。
  • 支持在每个事件处进行广泛的回调函数。
  • 支持根据用户定义的条件跳过和重试。

1.3. 入门

1.3.1. 安装库

 dart pub add batch

笔记
在 Pub.dev 上,此库发布时的自动识别标签将其标记为可在 Flutter 中使用,但这完全不适合。

1.3.2. 导入

以下导入将提供使用 Batch.dart 开发作业调度的所有材料。

import 'package:batch/batch.dart';

1.3.3. 基本概念

Batch.dart 将调度处理单元表示为 Event
Event 由以下元素组成。

描述
工作 这个 Job 事件是广义上的批处理单元。Job 包含多个 Step 事件。
ScheduledJob 它表示一个计划的作业。ScheduledJob 包含多个 Step 事件。
步骤 此事件表示顺序处理。每个 Step 都有一个定义单个特定进程的 Task
ParallelStep 此事件表示并行处理。每个 ParallelStep 都包含定义特定进程的 ParallelTask

1.3.4. 调度作业

1.3.4.1. 顺序处理

在定义简单的顺序处理时,只需定义一个继承自 Task 并实现 execute 方法的类。

定义调度作业也很容易:定义一个类来实现 ScheduledJobBuilder,在其中生成 ScheduledJob,然后将其传递给 BatchApplicationjobs 参数。

示例

import 'package:batch/batch.dart';

void main() => BatchApplication(
      jobs: [SayHelloWorldJob()],
    )..run();


class SayHelloWorldJob implements ScheduledJobBuilder {
  @override
  ScheduledJob build() => ScheduledJob(
        name: 'Test Job',
        schedule: CronParser('*/2 * * * *'), // Execute every 2 minutes
        steps: [
          Step(
            name: 'Test Step',
            task: SayHelloWorldTask(),
          )
        ],
      );
}

class SayHelloWorldTask extends Task<SayHelloWorldTask> {
  @override
  void execute(ExecutionContext context) {
    log.info('Hello, World!');
  }
}

您可以在 官方示例 中查看更多示例。

1.3.4.2. 并行处理

Batch.dart 支持强大的并行处理,且易于定义。

定义并行处理时,只需继承 ParallelTask 并在 execute 方法中描述要并行化的过程。

在主线程中设置的 SharedParametersJobParameters 可以通过 ExecutionContext 引用。但请注意,根据当前规范,在并行处理过程中对 ExecutionContext 值的更改不会反映在主线程的 ExecutionContext 中。

示例

import 'dart:async';

import 'package:batch/batch.dart';

void main() => BatchApplication(
      jobs: [DoHeavyProcessJob()],
    )..run();

class DoHeavyProcessJob implements ScheduledJobBuilder {
  @override
  ScheduledJob build() => ScheduledJob(
        name: 'Job',
        schedule: CronParser('*/2 * * * *'), // Execute every 2 minutes
        steps: [
          ParallelStep(
            name: 'Parallel Step',
            tasks: [
              DoHeavyTask(),
              DoHeavyTask(),
              DoHeavyTask(),
              DoHeavyTask(),
            ],
          )
        ],
      );
}


class DoHeavyTask extends ParallelTask<DoHeavyTask> {
  @override
  FutureOr<void> execute(ExecutionContext context) {
    int i = 0;
    while (i < 10000000000) {
      i++;
    }
  }
}

1.3.5. 日志记录

Batch.dart 标准提供了以下著名的日志记录功能。
默认日志级别为 **trace**。

  • trace
  • debug
  • info
  • warn
  • error
  • fatal

Batch.dart 提供的日志记录功能具有广泛的自定义选项。有关更多信息,您可以参考描述 Batch.dart 日志记录的 官方文档

1.3.5.1. 顺序处理

在顺序处理中使用日志记录功能非常简单。

Batch.dart 提供的日志记录方法可以从任何导入 batch.dart 的类中使用。**因此,您无需自己实例化任何 Logger**!

Batch.dart 中,您只需要指定日志的配置,然后在运行 BatchApplication 之前进行设置,Logger 会在 Batch.dart 的生命周期内安全地提供。

示例

import 'package:batch/batch.dart';

class TestLogTask extends Task<TestLogTask> {
  @override
  void execute() {
    log.trace('Test trace');
    log.debug('Test debug');
    log.info('Test info');
    log.warn('Test warning');
    log.error('Test error');
    log.fatal('Test fatal');
  }
}

例如,如果您运行 示例,您会得到以下日志输出。

yyyy-MM-dd 19:25:10.575109 [info ] (_BatchApplication.run:129:11  ) - ??????? The batch process has started! ???????
yyyy-MM-dd 19:25:10.579318 [info ] (_BatchApplication.run:130:11  ) - Logger instance has completed loading
yyyy-MM-dd 19:25:10.580177 [info ] (_BootDiagnostics.run:32:9     ) - Batch application diagnostics have been started
yyyy-MM-dd 19:25:10.583234 [info ] (_BootDiagnostics.run:46:9     ) - Batch application diagnostics have been completed
yyyy-MM-dd 19:25:10.583344 [info ] (_BootDiagnostics.run:47:9     ) - Batch applications can be started securely
yyyy-MM-dd 19:25:10.585729 [info ] (JobScheduler.run:37:9         ) - Started Job scheduling on startup
yyyy-MM-dd 19:25:10.585921 [info ] (JobScheduler.run:38:9         ) - Detected 3 Jobs on the root
yyyy-MM-dd 19:25:10.586023 [info ] (JobScheduler.run:41:11        ) - Scheduling Job [name=Job1]
yyyy-MM-dd 19:25:10.595706 [info ] (JobScheduler.run:41:11        ) - Scheduling Job [name=Job2]
yyyy-MM-dd 19:25:10.597471 [info ] (JobScheduler.run:41:11        ) - Scheduling Job [name=Job4]
yyyy-MM-dd 19:25:10.597692 [info ] (JobScheduler.run:56:9         ) - Job scheduling has been completed and the batch application is now running

笔记
Logger 的设置在执行 BatchApplicationrun 方法时完成。
如果您想在此库的生命周期之外使用日志记录功能,
请务必在执行 BatchApplicationrun 方法之后进行。

1.3.5.2. 并行处理

并行处理无法直接使用上面 描述 的便捷日志记录功能。这是因为 Dart 语言中的并行处理**不共享**任何实例。

取而代之的是,在继承 ParallelTask 的类中使用以下方法进行并行处理:

  • sendMessageAsTrace
  • sendMessageAsDebug
  • sendMessageAsInfo
  • sendMessageAsWarn
  • sendMessageAsError
  • sendMessageAsFatal

示例

class TestParallelTask extends ParallelTask<TestParallelTask> {
  @override
  FutureOr<void> invoke() {
    super.sendMessageAsTrace('Trace');
    super.sendMessageAsDebug('Debug');
    super.sendMessageAsInfo('Info');
    super.sendMessageAsWarn('Warn');
    super.sendMessageAsError('Error');
    super.sendMessageAsFatal('Fatal');
  }
}

需要注意的是,日志输出不会在调用上述 sendMessageAsX 方法时发生。

这只是一个模拟并行处理中日志输出的函数,所有消息将在包含在 Parallel 中的所有并行处理完成后一次性输出。

您可以从并行处理中获得以下日志输出。

yyyy-MM-dd 10:05:06.662561 [trace] (solatedLogMessage.output:36:13) - Received from the isolated thread [message=Trace]
yyyy-MM-dd 10:05:06.662666 [debug] (solatedLogMessage.output:39:13) - Received from the isolated thread [message=Debug]
yyyy-MM-dd 10:05:06.662760 [info ] (solatedLogMessage.output:42:13) - Received from the isolated thread [message=Info]
yyyy-MM-dd 10:05:06.662856 [warn ] (solatedLogMessage.output:45:13) - Received from the isolated thread [message=Warn]
yyyy-MM-dd 10:05:06.662947 [error] (solatedLogMessage.output:48:13) - Received from the isolated thread [message=Error]
yyyy-MM-dd 10:05:06.663039 [fatal] (solatedLogMessage.output:51:13) - Received from the isolated thread [message=Fatal]

1.3.6. 分支

Batch.dart 支持对每个计划的事件进行条件分支(在 Batch.dart 中简称为“**Branch**”)。

Branch 被设计为可以从每个事件(如 JobStep)派生。分支的数量没有限制,并且也可以实现递归嵌套结构。

为每个事件创建分支非常简单。

创建分支

    Step(
      name: 'Step',
      task: SwitchBranchStatusTask(),

      // Each branch can be multiple and nested
      branchesOnSucceeded: [Step(name: 'Step on succeeded', task: doSomethingTask)],
      branchesOnFailed: [Step(name: 'Step on failed', task: doSomethingTask)],
      branchesOnCompleted: [Step(name: 'Step on completed', task: doSomethingTask)],
    );

Batch.dart 的条件分支通过切换 ExecutionContext 中可引用的每个 ExecutionBranchStatus 来控制。
默认分支状态为“completed”。

管理分支

class SwitchBranchStatusTask extends Task<SwitchBranchStatusTask> {
  @override
  void execute(ExecutionContext context) {
    // You can easily manage branch status through methods as below.
    context.jobExecution!.switchBranchToSucceeded();
    context.stepExecution!.switchBranchToFailed();
  }
}

1.4. 更多示例

您可以在 官方示例 中查看更多。

1.5. 贡献

如果您想为 Batch.dart 做出贡献,请创建一个 issue 或创建一个 Pull Request。

作者将尽快响应 issues 并审查 pull requests。

1.6. 支持

表达支持的最简单方法是在 此处 给项目点个赞。

我一直在寻找赞助商来支持这个项目。我并不要求使用此框架的版税,但我需要支持来继续进行开源开发。

赞助者可以是个人或公司,金额可选。

? 点击下方按钮查看更多详情!?

myconsciousness

1.7. 许可证

Batch.dart 的所有资源均根据 BSD-3 许可证提供。

FOSSA Status

笔记
源代码中的许可证通知会根据 .github/header-checker-lint.yml 严格验证。请查看 header-checker-lint.yml 以了解允许的标准。

1.8. 更多信息

Batch.dart 由**Kato Shinya** 设计和实现。

GitHub

查看 Github