InfluxStreamSharp —— 流式读写InfluxDB

简介 / Introduction

适用于有大量数据持续性写入的业务场景,内置写入队列,按照一定的时间间隔将数据分批写入。建议写入时带入时间戳,这样读取时时间才准确。

当需要读取时,支持重现写入时的场景,举例:若在 t1 时刻写入了数据记录 a1,在 t2 = t1 + 1s 时刻写入了数据 a2,在 t2 + 5 s 时刻又写入了数据 a3,则在读取时,QueryManager会在返回 a1 记录之后1秒返回 a2,再等5秒返回 a3。就好像播放视频录像一样,进行历史再现。此功能内置懒加载功能和缓冲队列,不用担心时间段过长、数据过多造成InfluxDB查询卡死等问题。

当然若不希望使用这种查询方式,可以使用传统的查询模式,即给定时间范围,一次性将所有数据取出来。

读取和写入数据都借鉴了 EntityFramework 的思路,即采用实体对象进行映射,极大地简化了数据库读取、序列化和反序列化的步骤。在构建实体对象时,需要采用特性(Attribute)对字段中的Tag、Value和Timestamp进行标记。

Github

https://github.com/XingKongSync/InfluxStreamSharp

用法 / Usage

static void Main(string[] args)
{
	//Init a InfluxDB writing buffer
	WriteService.Instance.Value.Start();

	//Create a database if not exist
	var influx = InfluxService.Instance.Value;
	influx.InitAsync(
		DB_Url,
		DB_UserName,
		DB_Pwd,
		DB_DbName,
		DB_RetentionHours
		).Wait();

	//write data with buffering
	TestStreamingWrite();
	//Read all data by buffering and timing
	TestStreamingRead();

	Console.ReadKey();
}

static void TestStreamingWrite()
{
	for (int i = 0; i < 10; i++)
	{
		var testModel = new DataModel.Test();
		testModel.DeviceId = i.ToString();
		testModel.x = i;
		testModel.y = i;
		testModel.LocalTime = DateTime.Now.AddMinutes(-1 * i);

		//Convert custom data model to influx model
		var point = ModelTransformer.Convert(testModel);
		//Send the data to the writing queue, the data will be buffered and send to InfluxDB
		WriteService.Instance.Value.Enqueue(point);
	}
}

static void TestStreamingRead()
{
	//Build a query statement
	InfluxQLTemplet templet = new InfluxQLTemplet();
	templet.Measurement = ModelTransformer.GetMeasurement(typeof(DataModel.Test));
	//Add query reqirement
	//templet.WhereEqual("DeviceId", "0");//Only query data which DeviceId equals to 0

	//Construct query manager for streaming read
	QueryManager manager = new QueryManager(DateTime.Now.AddMinutes(-30), DateTime.Now);//Query data within 30 miniutes
	//If you want do muliti queries, please add more QueryTemplet
	manager.AddInfluxQueryTemplet<DataModel.Test>(templet);
	//Handle receveied data
	manager.DataReceived += (object data) =>
	{
		if (data is DataModel.Test t)
		{
			Console.WriteLine($"CurrentPlayTime: {manager.CurrentPlayTime.ToString("yyyy-MM-dd HH:mm:ss")}, id: {t.DeviceId}, x: {t.x}, y: {t.y}");
		}
	};
	//Start query data
	manager.Start();
}

发表评论

邮箱地址不会被公开。 必填项已用*标注