1   Rewind reading records by time

This tutorial shows how to rewind reading data from a stream by time.

1.1   Open a distributedlog manager

1.1.1   Create distributedlog URI

String dlUriStr = ...;
URI uri = URI.create(dlUriStr);

1.1.2   Create distributedlog configuration

DistributedLogConfiguration conf = new DistributedLogConfiguration();

1.1.3   Build the distributedlog namespace

DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
        .conf(conf)
        .uri(uri)
        .build();

1.1.4   Open the distributedlog manager

DistributedLogManager dlm = namespace.openLog("basic-stream-10");

1.2   Rewind the stream

1.2.1   Position the reader using timestamp

Since the records written by write proxy will be assigned System.currentTimeMillis() as the TransactionID. It is straightforward to use TransactionID to rewind reading the records.

int rewindSeconds = 60; // 60 seconds
long fromTxID = System.currentTimeMillis() -
        TimeUnit.MILLISECONDS.convert(rewindSeconds, TimeUnit.SECONDS);
AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(fromTxID));

1.3   Read Records

Read the next available record from the stream. The future is satisified when the record is available.

Future<LogRecordWithDLSN> readFuture = reader.readNext();

1.3.1   Register the read callback

Register a future listener on read completion. The reader will be notified once the record is ready for consuming.

final FutureEventListener<LogRecordWithDLSN> readListener = new FutureEventListener<LogRecordWithDLSN>() {
    @Override
    public void onFailure(Throwable cause) {
        // executed when read failed.
    }

    @Override
    public void onSuccess(LogRecordWithDLSN record) {
        // process the record
        ...
        // issue read next
        reader.readNext().addEventListener(this);
    }
};
reader.readNext().addEventListener(readListener);

1.4   Close the reader

Close the reader after usage.

FutureUtils.result(reader.asyncClose());

1.5   Run the tutorial

Run the example in the following steps:

1.5.1   Start the local bookkeeper cluster

You can use follow command to start the distributedlog stack locally. After the distributedlog cluster is started, you could access it using distributedlog uri distributedlog://127.0.0.1:7000/messaging/distributedlog.

// dlog local ${zk-port}
./distributedlog-core/bin/dlog local 7000

1.5.2   Start the write proxy

Start the write proxy, listening on port 8000.

// DistributedLogServerApp -p ${service-port} --shard-id ${shard-id} -sp ${stats-port} -u {distributedlog-uri} -mx -c ${conf-file}
./distributedlog-service/bin/dlog org.apache.distributedlog.service.DistributedLogServerApp -p 8000 --shard-id 1 -sp 8001 -u distributedlog://127.0.0.1:7000/messaging/distributedlog -mx -c ${distributedlog-repo}/distributedlog-service/conf/distributedlog_proxy.conf

1.5.3   Create the stream

Create the stream under the distributedlog uri.

// Create Stream `basic-stream-10`
// dlog tool create -u ${distributedlog-uri} -r ${stream-prefix} -e ${stream-regex}
./distributedlog-core/bin/dlog tool create -u distributedlog://127.0.0.1:7000/messaging/distributedlog -r basic-stream- -e 10

1.5.4   Generate records

Run the RecordGenerator to generate records.

// Write Records into Stream `basic-stream-10` in 1 requests/second
// runner run org.apache.distributedlog.basic.RecordGenerator ${distributedlog-uri} ${stream} ${rate}
./distributedlog-tutorials/distributedlog-basic/bin/runner run org.apache.distributedlog.basic.RecordGenerator 'inet!127.0.0.1:8000' basic-stream-10 1

1.5.5   Rewind the stream

Rewind the stream using StreamRewinder to read records from 30 seconds ago

// Rewind `basic-stream-10`
// runner run org.apache.distributedlog.basic.StreamRewinder ${distributedlog-uri} ${stream} ${seconds-to-rewind}
./distributedlog-tutorials/distributedlog-basic/bin/runner run org.apache.distributedlog.basic.StreamRewinder distributedlog://127.0.0.1:7000/messaging/distributedlog basic-stream-10  30

1.5.6   Check the results

Example output from StreamRewinder.

// Output of `StreamRewinder`
Opening log stream basic-stream-10
Record records starting from 1462736697481 which is 30 seconds ago
Received record DLSN{logSegmentSequenceNo=1, entryId=264, slotId=0}
"""
record-1462736697685
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=266, slotId=0}
"""
record-1462736698684
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=268, slotId=0}
"""
record-1462736699684
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=270, slotId=0}
"""
record-1462736700686
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=272, slotId=0}
"""
record-1462736701685
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=274, slotId=0}
"""
record-1462736702684
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=276, slotId=0}
"""
record-1462736703683
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=278, slotId=0}
"""
record-1462736704685
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=280, slotId=0}
"""
record-1462736705686
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=282, slotId=0}
"""
record-1462736706682
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=284, slotId=0}
"""
record-1462736707685
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=286, slotId=0}
"""
record-1462736708686
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=288, slotId=0}
"""
record-1462736709684
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=290, slotId=0}
"""
record-1462736710684
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=292, slotId=0}
"""
record-1462736711686
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=294, slotId=0}
"""
record-1462736712686
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=296, slotId=0}
"""
record-1462736713684
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=298, slotId=0}
"""
record-1462736714682
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=300, slotId=0}
"""
record-1462736715685
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=302, slotId=0}
"""
record-1462736716684
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=304, slotId=0}
"""
record-1462736717684
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=306, slotId=0}
"""
record-1462736718684
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=308, slotId=0}
"""
record-1462736719685
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=310, slotId=0}
"""
record-1462736720683
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=312, slotId=0}
"""
record-1462736721686
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=314, slotId=0}
"""
record-1462736722685
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=316, slotId=0}
"""
record-1462736723683
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=318, slotId=0}
"""
record-1462736724683
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=320, slotId=0}
"""
record-1462736725685
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=322, slotId=0}
"""
record-1462736726686
"""
Reader caught with latest data
Received record DLSN{logSegmentSequenceNo=1, entryId=324, slotId=0}
"""
record-1462736727686
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=326, slotId=0}
"""
record-1462736728684
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=328, slotId=0}
"""
record-1462736729682
"""
Received record DLSN{logSegmentSequenceNo=1, entryId=330, slotId=0}
"""
record-1462736730685
"""