Friday, November 25, 2011

Synchronizing the File Staging Area with Talend Open Studio

Every filesystem can be seen as a non-relational DBMS, in which data are sequentially stored into disk data "blocks", logically organized into a hierarchy (in presence of hard links, as graph) referenced by API pointers. Using the filesystem is the most "stupid", but also the most efficient way to handle and store data with poor interdependency and low mutual relationships: i.e., non-relational data.

Most of the existing Data Warehousing solutions present a Data Staging Area, often implemented as a collection of flat files, stored at filesystem level. Data are first "brutally" downloaded by source systems and then stored into flat files, and only later processed by the ETL jobs.

The Data Staging area can be seen, therefore, as an intermediate storage area between the sources of information and the Data Warehouse itself, providing the following benefits:
  • Reducing of the disk and network I/O: many ETL transformations involve several accesses and processing on the same source table or resource. Often those accesses are simple sequential reading operations and without any transformation, but they are performed several times on the same resource and they deal with huge amount of data. If we store this "raw" data into the filesystem once, we can directly load once them into the ETL server memory, and access the data as many times we want, without overloading the network;
  • Improving reliability: as previously said, in this initial phase of ETL we deal with really huge and "raw" amount of data. All the following and complex needed transformations can then occur, without interfering with the initial "brutal" load operation. In case of I/O failure while storing the data staging files, it's much easier to restart the whole, unique downloading process without the need of restarting each, full job;
  • Easier historization: we can store a sequential amount of data (example: list of customers from SAP) and perform a fast and simple comparing-check between the staging data file and our data mart relational table, in order to find changes against current DWH values. This has sense for both fact and dimension tables;
  • More flexibility: many production ETL or Data Integration tools lack the presence of components for specific, often legacy source systems. We can therefore use our beloved Talend in order to build small "patch" jobs between the legacy system and the filesystem staging area, that will be then easly accessed by our production ETL solution. In this case, I suggest to store the "patch" File Staging data directly into the standard, CSV format, which is always and surely supported. By the way, Talend itself internally uses the CSV as data flow format between the components.

In our scenario, taken by the environmental data monitoring project of my university, we have a collection of different source environmental data measuring stations, each of one generates a series of data files in different formats, containing all the environmental measures, for each sensor and with the corresponding timestamp.

One of the source systems is the meterological FTP site of the Hortus Ltd. The job "job_fact_measure_hortus" is responsabile for the loading of data coming from two different "hortus" source stations, whose measures are periodically (every 15 minutes) "published" into the FTP remote site, in the form of flat file.


Complete Data Flow of the entire environmental system.


The "transformation & load" of the job simply works on every new, incoming file found in the "/var/staging/hortus/2load/" directory; once parsed, transformed and loaded, the file is moved into the "/var/staging/hortus/loaded/" directory. In this way, if we wanna a set of data files to be re-loaded, we simply move them back into the incoming staging directory "/var/staging/hortus/2load/" directory : a fast, easy and performant solution.

In this way, however, we have a data file staging area "splitted" into two different directories: how can we keep it syncronized with the "hortus" source system? Unfortunately we cannot simply syncronize the remote FTP directory  with the "/var/staging/hortus/2load/" directory: in this way we would download (and re-process) also the already-processed data files, since they have been moved into the "../loaded/" directory...

Once again, the easiest and simpliest way is to work at filesystem level. We therefore implement a subjob, part of the "job_fact_measure_hortus" job, which will be run before the transformation and load phase. Here is the complete picture of the subjob:

The complete file staging area synchronizing subjob.

The logic is simple: we scan the remote source FTP directory, using the fFTPFileList Talend component, providing us an iterate link with a global map variable that indicates the current file name into the iteration.
For each file in the remote FTP source, we check (by comparing the file names) the "2load/" directory: if the data file is already present, it means we already downloaded it (but still not parsed, transformed and loaded into the DWH); in case we abort the current iteration and we pass to the next one. If the file is not present, we have two possibilities:
  1. The data file has already been downloaded, parsed, transformed and loaded into the DWH: in this case, we will find it into the "loaded/" directory;
  2. The data file wasn't downloaded. 
Again we make the same check. In case we find the data file, it means that we already downloaded and processed the file: no download is needed and we can then abort the current iteration and pass to the next file in the tFTPFileList iteration. In case we can't find the file in the "loaded/" directory, it means we still have to download and process it: we download it into the "2load/" directory, and the following "tranformation & load" core subjobs will find and work on it.

Be begin therefore by scanning our remote FTP. For debugging reasons, it's useful to print the file name into a tJava component: 

System.out.println(globalMap.get("tFTPFileList_1_CURRENT_FILE"))

If we wanna print the entire file path, we can simply use the tFTPFileList_1_CURRENT_FILEPATH global variable, instead.

If we want or need it, we are also free to specify a filemask in the tFTPFileList component, restricting with the list of file to work with, by specifying a matching file name pattern:

Filemask in the tFTPFileList component.

We can now compare the current (iteration) file name with the content of the "2load/" directory, using the tFileExist component:

The tFileExist Talend component: first step.


Into the tFileExist Talend component, we use the current iteration file name available as global variable.

The tFileExist component provides us a global varible of boolean type, "tFileExist_1_EXISTS". We use it inside an "if" link:

The output "if" link of the first tFileExist component.
Using the global boolean variable provided by the first tFileExist component inside an "if" component: "no file exists" case.


In this way, if the file exist the current subjob iteration simply aborts: we don't need to download it. In case the file does not exist:



Using the global boolean variable provided by the first tFileExist component inside an "if" component: "file exists" case.


The next step follows the same logic, but applied to the "loaded/" directory:

Step 2: check into the "loaded" file staging directory.

If we arrived to the last component, tFTPGet, it means we found no file in both the data staging directories, and therefore we need to download and make the current iteration file available inside the "2load/" directory:



This "two directory" staging filesystem logic can be applied in many different contexts, as it will be soon shown in a new post.

No comments:

Post a Comment