Traditional Culture Encyclopedia - Traditional festivals - Building a real-time data warehouse based on flink sql

Building a real-time data warehouse based on flink sql

According to the current development of this piece of big data, has not been limited to offline analysis, mining the potential value of the data, the timeliness of the data in recent years has become a necessity, real-time processing frameworks such as storm, spark-streaming, flink. Want to do real-time data this program is feasible, you need to consider the following points: 1, state mechanism 2, precise once semantics 3, high throughput 4, elastic scalable application 5, fault-tolerant mechanism, just these points, flink are perfectly realized, and support flink sql advanced API, reducing the development cost, available to achieve rapid iteration, easy to maintain and other advantages.

Architecture diagram of offline warehouse:

Architecture diagram of real-time warehouse:

Currently, the real-time dimension table and DM layer data is stored in hbase, and the real-time public **** layer is stored in kafka, and write rolling logs to write to HDFS (mainly used to verify the data). In fact, there is a lot of work that can be done here, kafka cluster, flink cluster, hbase cluster independent of each other, which brings a certain challenge to the stability of the entire real-time data warehouse.

A data warehouse wants to become a system, into the asset, inseparable from the data domain division. So with reference to the offline data warehouse, thinking of real-time data warehouse to make this exploration, theoretically speaking, offline can be realized, real-time is also achievable. And has achieved results, the current division of the data domain and offline roughly the same, there are traffic domain, transaction domain, marketing domain and so on. Of course, this involves the design, development, and implementation of dimensional tables, multi-transaction fact tables, cumulative snapshot tables, and periodic snapshot tables.

The dimension table is also an integral part of the entire real-time data warehouse. From the current construction of the entire real-time data warehouse, the dimension table has a large amount of data, but the characteristics of the change is small, we tried to build a platform-wide real-time merchandise dimension table or real-time membership dimension table, but this type of dimension table is too complex, so for this type of dimension table is described below. There is another kind of dimension table is relatively simple, this kind of dimension may correspond to a single mysql table of the business system, or only need a few tables to carry out a simple ETL can produce the table, this kind of dimension table can be made into real-time. Here are a few key implementation points:

The following is the offline data synchronization architecture diagram:

Real-time data access is actually the same in the underlying architecture, from the kafka side is not the same, real-time with the UDTF of the flink for parsing, while the offline is timed (currently hourly) to pull to the HDFS with the camus, and then load the HDFS data to hive on a regular basis. HDFS data into the hive table, so as to realize the offline data access. The real-time data access is to use flink to parse the kafka data, and then write to kafka again.

Since the offline data has been running stably for a long time, the real-time access data can be compared to the offline data, but the offline data is hourly hive data, the real-time data is stored in kafka, and it can't be compared directly, so the relevant processing is done, the kafka data is written to HDFS using the form of the blink writing HDFS scrolling logs, and then the hive table is set up with hourly timers. The hive table is set up to load the files in HDFS at hourly intervals to get the real-time data.

Completion of the above two points, the remaining still need to consider a point, are hourly tasks, this time card point to use what fields? The first thing to determine is that offline and real-time task jam time fields must be consistent, otherwise there will certainly be problems. Currently offline use camus to pull data from kafka to HDFS, hourly tasks, the use of nginx_ts this time field to card point, this field is reported to the nginx server to record the point in time. And real-time data access is to use flink to consume kafka data, in the form of rolling logs written to HDFS, and then in the establishment of the hive table load HDFS file to get data, although this hive is also days/hours secondary partition, but the offline table is based on nginx_ts to cardinal point partition, but real-time hive table is based on the task start to go to the But the real-time hive table is partitioned according to the point in time when the task starts to load the file to distinguish the partition, there is a difference, direct screening partition and offline data comparison, there will be some differences, the practice should be to screen the scope of the partition, and then screen the nginx_ts interval, so that in the comparison with the offline to be reasonable.

Currently, the main delay in the real-time data access layer is in the UDTF function parsing. The real-time UDTF function is developed based on the reported log format, which can complete the log parsing function.

The parsing flow chart is as follows:

The parsing rate chart is as follows:

The chart is not at the time of the peak volume of data intercepted, the current 800 records/second shall prevail, the parsing rate of the approximate one record for 1.25ms.

The current task of the task's flink resource allocation of the number of cores for the number of 1, assuming that the parsing rate of 1.25ms a record, then the peak rate of 1.25ms a record, then the parsing rate of 1.25ms a record.

Introducing the current situation of offline dimension table, take the commodity dimension table, the number of records in the whole line is nearly one hundred million, the computational logic from 40-50 ods layer data table, the computational logic is quite complex, if the real-time dimension table is also referenced to the offline dimension table to complete the words, then the cost of development and maintenance costs are very large for the technology is also a big challenge, and at present there is no need to require 100% of the dimension attributes, but also to ensure that the dimension attributes can be used in the real time. There is no demand for 100% accuracy of dimension attributes. Therefore, the pseudo real-time dimension table is ready to be output at 24:00 on the same day, and the dimension table of the same day will be used by the real-time public **** layer on the next day, i.e., the T-1 model. The calculation logic of the pseudo real-time dimension table refers to the offline dimension table, but in order to guarantee the output before 24:00, it is necessary to simplify the offline calculation logic and remove some uncommon fields, so as to guarantee that the pseudo real-time dimension table can be output faster.

Real-time dimension table calculation flowchart:

At present, we use flink as the company's mainstream real-time computing engine, use memory as the state backend, and do checkpoint at a fixed interval of 30s, and use HDFS as the storage component of checkpoint. And checkpoint is also an important basis for restoring the state after the task restart. People familiar with flink should know that the use of memory as a state back-end, the memory is the JVM heap memory, after all, is a limited thing, the use of improper, OOM is a common thing, the following describes the limited memory, if the completion of regular calculations.