aboutsummaryrefslogtreecommitdiff
path: root/tool/sqlite3_rsync.c
diff options
context:
space:
mode:
Diffstat (limited to 'tool/sqlite3_rsync.c')
-rw-r--r--tool/sqlite3_rsync.c577
1 files changed, 473 insertions, 104 deletions
diff --git a/tool/sqlite3_rsync.c b/tool/sqlite3_rsync.c
index 34faaf0fd..760559aec 100644
--- a/tool/sqlite3_rsync.c
+++ b/tool/sqlite3_rsync.c
@@ -46,9 +46,11 @@ struct SQLiteRsync {
const char *zOrigin; /* Name of the origin */
const char *zReplica; /* Name of the replica */
const char *zErrFile; /* Append error messages to this file */
+ const char *zDebugFile; /* Append debugging messages to this file */
FILE *pOut; /* Transmit to the other side */
FILE *pIn; /* Receive from the other side */
FILE *pLog; /* Duplicate output here if not NULL */
+ FILE *pDebug; /* Write debug info here if not NULL */
sqlite3 *db; /* Database connection */
int nErr; /* Number of errors encountered */
int nWrErr; /* Number of failed attempts to write on the pipe */
@@ -70,25 +72,31 @@ struct SQLiteRsync {
/* The version number of the protocol. Sent in the *_BEGIN message
** to verify that both sides speak the same dialect.
*/
-#define PROTOCOL_VERSION 1
+#define PROTOCOL_VERSION 2
/* Magic numbers to identify particular messages sent over the wire.
*/
+/**** Baseline: protocol version 1 ****/
#define ORIGIN_BEGIN 0x41 /* Initial message */
#define ORIGIN_END 0x42 /* Time to quit */
#define ORIGIN_ERROR 0x43 /* Error message from the remote */
#define ORIGIN_PAGE 0x44 /* New page data */
#define ORIGIN_TXN 0x45 /* Transaction commit */
#define ORIGIN_MSG 0x46 /* Informational message */
+/**** Added in protocol version 2 ****/
+#define ORIGIN_DETAIL 0x47 /* Request finer-grain hash info */
+#define ORIGIN_READY 0x48 /* Ready for next round of hash exchanges */
+/**** Baseline: protocol version 1 ****/
#define REPLICA_BEGIN 0x61 /* Welcome message */
#define REPLICA_ERROR 0x62 /* Error. Report and quit. */
#define REPLICA_END 0x63 /* Replica wants to stop */
#define REPLICA_HASH 0x64 /* One or more pages hashes to report */
#define REPLICA_READY 0x65 /* Read to receive page content */
#define REPLICA_MSG 0x66 /* Informational message */
-
+/**** Added in protocol version 2 ****/
+#define REPLICA_CONFIG 0x67 /* Hash exchange configuration */
/****************************************************************************
** Beginning of the popen2() implementation copied from Fossil *************
@@ -796,11 +804,49 @@ static void hashFunc(
sqlite3_result_blob(context, HashFinal(&cx), 160/8, SQLITE_TRANSIENT);
}
+/*
+** Implementation of the agghash(X) function.
+**
+** Return a 160-bit BLOB which is the hash of the concatenation
+** of all X inputs.
+*/
+static void agghashStep(
+ sqlite3_context *context,
+ int argc,
+ sqlite3_value **argv
+){
+ HashContext *pCx;
+ int eType = sqlite3_value_type(argv[0]);
+ int nByte = sqlite3_value_bytes(argv[0]);
+ if( eType==SQLITE_NULL ) return;
+ pCx = (HashContext*)sqlite3_aggregate_context(context, sizeof(*pCx));
+ if( pCx==0 ) return;
+ if( pCx->iSize==0 ) HashInit(pCx, 160);
+ if( eType==SQLITE_BLOB ){
+ HashUpdate(pCx, sqlite3_value_blob(argv[0]), nByte);
+ }else{
+ HashUpdate(pCx, sqlite3_value_text(argv[0]), nByte);
+ }
+}
+static void agghashFinal(sqlite3_context *context){
+ HashContext *pCx = (HashContext*)sqlite3_aggregate_context(context, 0);
+ if( pCx ){
+ sqlite3_result_blob(context, HashFinal(pCx), 160/8, SQLITE_TRANSIENT);
+ }
+}
+
/* Register the hash function */
static int hashRegister(sqlite3 *db){
- return sqlite3_create_function(db, "hash", 1,
+ int rc;
+ rc = sqlite3_create_function(db, "hash", 1,
SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC,
0, hashFunc, 0, 0);
+ if( rc==SQLITE_OK ){
+ rc = sqlite3_create_function(db, "agghash", 1,
+ SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC,
+ 0, 0, agghashStep, agghashFinal);
+ }
+ return rc;
}
/* End of the hashing logic
@@ -838,6 +884,25 @@ static void logError(SQLiteRsync *p, const char *zFormat, ...){
p->nErr++;
}
+/*
+** Append text to the debugging mesage file, if an that file is
+** specified.
+*/
+static void debugMessage(SQLiteRsync *p, const char *zFormat, ...){
+ if( p->zDebugFile ){
+ if( p->pDebug==0 ){
+ p->pDebug = fopen(p->zDebugFile, "wb");
+ }
+ if( p->pDebug ){
+ va_list ap;
+ va_start(ap, zFormat);
+ vfprintf(p->pDebug, zFormat, ap);
+ va_end(ap);
+ fflush(p->pDebug);
+ }
+ }
+}
+
/* Read a single big-endian 32-bit unsigned integer from the input
** stream. Return 0 on success and 1 if there are any errors.
@@ -1192,6 +1257,13 @@ static void closeDb(SQLiteRsync *p){
** nPage, and szPage. Then enter a loop responding to message from
** the replica:
**
+** REPLICA_BEGIN iProtocol
+**
+** An optional message sent by the replica in response to the
+** prior ORIGIN_BEGIN with a counter-proposal for the protocol
+** level. If seen, try to reduce the protocol level to what is
+** requested and send a new ORGIN_BEGIN.
+**
** REPLICA_ERROR size text
**
** Report an error from the replica and quit
@@ -1202,30 +1274,42 @@ static void closeDb(SQLiteRsync *p){
**
** REPLICA_HASH hash
**
-** The argument is the 20-byte SHA1 hash for the next page
-** page hashes appear in sequential order with no gaps.
+** The argument is the 20-byte SHA1 hash for the next page or
+** block of pages. Hashes appear in sequential order with no gaps,
+** unless there is an intervening REPLICA_CONFIG message.
+**
+** REPLICA_CONFIG pgno cnt
+**
+** Set counters used by REPLICA_HASH. The next hash will start
+** on page pgno and all subsequent hashes will cover cnt pages
+** each. Note that for a multi-page hash, the hash value is
+** actually a hash of the individual page hashes.
**
** REPLICA_READY
**
** The replica has sent all the hashes that it intends to send.
** This side (the origin) can now start responding with page
-** content for pages that do not have a matching hash.
+** content for pages that do not have a matching hash or with
+** ORIGIN_DETAIL messages with requests for more detail.
*/
static void originSide(SQLiteRsync *p){
int rc = 0;
int c = 0;
unsigned int nPage = 0;
- unsigned int iPage = 0;
+ unsigned int iHash = 1; /* Pgno for next hash to receive */
+ unsigned int nHash = 1; /* Number of pages per hash received */
+ unsigned int mxHash = 0; /* Maximum hash value received */
unsigned int lockBytePage = 0;
unsigned int szPg = 0;
- sqlite3_stmt *pCkHash = 0;
- sqlite3_stmt *pInsHash = 0;
+ sqlite3_stmt *pCkHash = 0; /* Verify hash on a single page */
+ sqlite3_stmt *pCkHashN = 0; /* Verify a multi-page hash */
+ sqlite3_stmt *pInsHash = 0; /* Record a bad hash */
char buf[200];
p->isReplica = 0;
if( p->bCommCheck ){
infoMsg(p, "origin zOrigin=%Q zReplica=%Q isRemote=%d protocol=%d",
- p->zOrigin, p->zReplica, p->isRemote, PROTOCOL_VERSION);
+ p->zOrigin, p->zReplica, p->isRemote, p->iProtocol);
writeByte(p, ORIGIN_END);
fflush(p->pOut);
}else{
@@ -1251,13 +1335,15 @@ static void originSide(SQLiteRsync *p){
if( p->nErr==0 ){
/* Send the ORIGIN_BEGIN message */
writeByte(p, ORIGIN_BEGIN);
- writeByte(p, PROTOCOL_VERSION);
+ writeByte(p, p->iProtocol);
writePow2(p, szPg);
writeUint32(p, nPage);
fflush(p->pOut);
+ if( p->zDebugFile ){
+ debugMessage(p, "-> ORIGIN_BEGIN %u %u %u\n", p->iProtocol,szPg,nPage);
+ }
p->nPage = nPage;
p->szPage = szPg;
- p->iProtocol = PROTOCOL_VERSION;
lockBytePage = (1<<30)/szPg + 1;
}
}
@@ -1270,11 +1356,24 @@ static void originSide(SQLiteRsync *p){
** that is larger than what it knows about. The replica sends back
** a counter-proposal of an earlier protocol which the origin can
** accept by resending a new ORIGIN_BEGIN. */
- p->iProtocol = readByte(p);
- writeByte(p, ORIGIN_BEGIN);
- writeByte(p, p->iProtocol);
- writePow2(p, p->szPage);
- writeUint32(p, p->nPage);
+ u8 newProtocol = readByte(p);
+ if( p->zDebugFile ){
+ debugMessage(p, "<- REPLICA_BEGIN %d\n", (int)newProtocol);
+ }
+ if( newProtocol < p->iProtocol ){
+ p->iProtocol = newProtocol;
+ writeByte(p, ORIGIN_BEGIN);
+ writeByte(p, p->iProtocol);
+ writePow2(p, p->szPage);
+ writeUint32(p, p->nPage);
+ fflush(p->pOut);
+ if( p->zDebugFile ){
+ debugMessage(p, "-> ORIGIN_BEGIN %d %d %u\n", p->iProtocol,
+ p->szPage, p->nPage);
+ }
+ }else{
+ reportError(p, "Invalid REPLICA_BEGIN reply");
+ }
break;
}
case REPLICA_MSG:
@@ -1282,25 +1381,73 @@ static void originSide(SQLiteRsync *p){
readAndDisplayMessage(p, c);
break;
}
+ case REPLICA_CONFIG: {
+ readUint32(p, &iHash);
+ readUint32(p, &nHash);
+ if( p->zDebugFile ){
+ debugMessage(p, "<- REPLICA_CONFIG %u %u\n", iHash, nHash);
+ }
+ break;
+ }
case REPLICA_HASH: {
+ int bMatch = 0;
if( pCkHash==0 ){
- runSql(p, "CREATE TEMP TABLE badHash(pgno INTEGER PRIMARY KEY)");
+ runSql(p, "CREATE TEMP TABLE badHash("
+ " pgno INTEGER PRIMARY KEY,"
+ " sz INT)");
pCkHash = prepareStmt(p,
- "SELECT pgno FROM sqlite_dbpage('main')"
- " WHERE pgno=?1 AND hash(data)!=?2"
+ "SELECT hash(data)==?3 FROM sqlite_dbpage('main')"
+ " WHERE pgno=?1"
);
if( pCkHash==0 ) break;
- pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?)");
+ pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?1,?2)");
if( pInsHash==0 ) break;
}
p->nHashSent++;
- iPage++;
- sqlite3_bind_int64(pCkHash, 1, iPage);
readBytes(p, 20, buf);
- sqlite3_bind_blob(pCkHash, 2, buf, 20, SQLITE_STATIC);
- rc = sqlite3_step(pCkHash);
- if( rc==SQLITE_ROW ){
- sqlite3_bind_int64(pInsHash, 1, sqlite3_column_int64(pCkHash, 0));
+ if( nHash>1 ){
+ if( pCkHashN==0 ){
+ pCkHashN = prepareStmt(p,
+ "WITH c(n) AS "
+ " (VALUES(?1) UNION ALL SELECT n+1 FROM c WHERE n<?2)"
+ "SELECT agghash(hash(data))==?3"
+ " FROM c CROSS JOIN sqlite_dbpage('main') ON pgno=n"
+ );
+ if( pCkHashN==0 ) break;
+ }
+ sqlite3_bind_int64(pCkHashN, 1, iHash);
+ sqlite3_bind_int64(pCkHashN, 2, iHash + nHash - 1);
+ sqlite3_bind_blob(pCkHashN, 3, buf, 20, SQLITE_STATIC);
+ rc = sqlite3_step(pCkHashN);
+ if( rc==SQLITE_ROW ){
+ bMatch = sqlite3_column_int(pCkHashN,0);
+ }else if( rc==SQLITE_ERROR ){
+ reportError(p, "SQL statement [%s] failed: %s",
+ sqlite3_sql(pCkHashN), sqlite3_errmsg(p->db));
+ }
+ sqlite3_reset(pCkHashN);
+ }else{
+ sqlite3_bind_int64(pCkHash, 1, iHash);
+ sqlite3_bind_blob(pCkHash, 3, buf, 20, SQLITE_STATIC);
+ rc = sqlite3_step(pCkHash);
+ if( rc==SQLITE_ERROR ){
+ reportError(p, "SQL statement [%s] failed: %s",
+ sqlite3_sql(pCkHash), sqlite3_errmsg(p->db));
+ }else if( rc==SQLITE_ROW && sqlite3_column_int(pCkHash,0) ){
+ bMatch = 1;
+ }
+ sqlite3_reset(pCkHash);
+ }
+ if( p->zDebugFile ){
+ debugMessage(p, "<- REPLICA_HASH %u %u %s %08x...\n",
+ iHash, nHash,
+ bMatch ? "match" : "fail",
+ *(unsigned int*)buf
+ );
+ }
+ if( !bMatch ){
+ sqlite3_bind_int64(pInsHash, 1, iHash);
+ sqlite3_bind_int64(pInsHash, 2, nHash);
rc = sqlite3_step(pInsHash);
if( rc!=SQLITE_DONE ){
reportError(p, "SQL statement [%s] failed: %s",
@@ -1308,42 +1455,74 @@ static void originSide(SQLiteRsync *p){
}
sqlite3_reset(pInsHash);
}
- else if( rc!=SQLITE_DONE ){
- reportError(p, "SQL statement [%s] failed: %s",
- sqlite3_sql(pCkHash), sqlite3_errmsg(p->db));
- }
- sqlite3_reset(pCkHash);
+ if( iHash+nHash>mxHash ) mxHash = iHash+nHash;
+ iHash += nHash;
break;
}
case REPLICA_READY: {
+ int nMulti = 0;
sqlite3_stmt *pStmt;
- sqlite3_finalize(pCkHash);
- sqlite3_finalize(pInsHash);
- pCkHash = 0;
- pInsHash = 0;
- if( iPage+1<p->nPage ){
- runSql(p, "WITH RECURSIVE c(n) AS"
- " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)"
- " INSERT INTO badHash SELECT n FROM c",
- iPage+1, p->nPage);
+ if( p->zDebugFile ){
+ debugMessage(p, "<- REPLICA_READY\n");
}
- runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage);
- pStmt = prepareStmt(p,
- "SELECT pgno, data"
- " FROM badHash JOIN sqlite_dbpage('main') USING(pgno)");
+ pStmt = prepareStmt(p,"SELECT pgno, sz FROM badHash WHERE sz>1");
if( pStmt==0 ) break;
- while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){
+ while( sqlite3_step(pStmt)==SQLITE_ROW ){
unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0);
- const void *pContent = sqlite3_column_blob(pStmt, 1);
- writeByte(p, ORIGIN_PAGE);
+ unsigned int cnt = (unsigned int)sqlite3_column_int64(pStmt,1);
+ writeByte(p, ORIGIN_DETAIL);
writeUint32(p, pgno);
- writeBytes(p, szPg, pContent);
- p->nPageSent++;
+ writeUint32(p, cnt);
+ nMulti++;
+ if( p->zDebugFile ){
+ debugMessage(p, "-> ORIGIN_DETAIL %u %u\n", pgno, cnt);
+ }
}
sqlite3_finalize(pStmt);
- writeByte(p, ORIGIN_TXN);
- writeUint32(p, nPage);
- writeByte(p, ORIGIN_END);
+ if( nMulti ){
+ runSql(p, "DELETE FROM badHash WHERE sz>1");
+ writeByte(p, ORIGIN_READY);
+ if( p->zDebugFile ) debugMessage(p, "-> ORIGIN_READY\n");
+ }else{
+ sqlite3_stmt *pStmt;
+ sqlite3_finalize(pCkHash);
+ sqlite3_finalize(pCkHashN);
+ sqlite3_finalize(pInsHash);
+ pCkHash = 0;
+ pInsHash = 0;
+ if( mxHash<p->nPage ){
+ runSql(p, "WITH RECURSIVE c(n) AS"
+ " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)"
+ " INSERT INTO badHash SELECT n, 1 FROM c",
+ mxHash, p->nPage);
+ }
+ runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage);
+ pStmt = prepareStmt(p,
+ "SELECT pgno, data"
+ " FROM badHash JOIN sqlite_dbpage('main') USING(pgno)");
+ if( pStmt==0 ) break;
+ while( sqlite3_step(pStmt)==SQLITE_ROW
+ && p->nErr==0
+ && p->nWrErr==0
+ ){
+ unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0);
+ const void *pContent = sqlite3_column_blob(pStmt, 1);
+ writeByte(p, ORIGIN_PAGE);
+ writeUint32(p, pgno);
+ writeBytes(p, szPg, pContent);
+ p->nPageSent++;
+ if( p->zDebugFile ){
+ debugMessage(p, "-> ORIGIN_PAGE %u\n", pgno);
+ }
+ }
+ sqlite3_finalize(pStmt);
+ writeByte(p, ORIGIN_TXN);
+ writeUint32(p, nPage);
+ if( p->zDebugFile ){
+ debugMessage(p, "-> ORIGIN_TXN %u\n", nPage);
+ }
+ writeByte(p, ORIGIN_END);
+ }
fflush(p->pOut);
break;
}
@@ -1361,6 +1540,102 @@ static void originSide(SQLiteRsync *p){
}
/*
+** Send a REPLICA_HASH message for each entry in the sendHash table.
+** The sendHash table looks like this:
+**
+** CREATE TABLE sendHash(
+** fpg INTEGER PRIMARY KEY, -- Page number of the hash
+** npg INT -- Number of pages in this hash
+** );
+**
+** If iHash is page number for the next page that the origin will
+** be expecting, and nHash is the number of pages that the origin will
+** be expecting in the hash that follows. Send a REPLICA_CONFIG message
+** if either of these values if not correct.
+*/
+static void sendHashMessages(
+ SQLiteRsync *p, /* The replica-side of the sync */
+ unsigned int iHash, /* Next page expected by origin */
+ unsigned int nHash /* Next number of pages expected by origin */
+){
+ sqlite3_stmt *pStmt;
+ pStmt = prepareStmt(p,
+ "SELECT if(npg==1,"
+ " (SELECT hash(data) FROM sqlite_dbpage('replica') WHERE pgno=fpg),"
+ " (WITH RECURSIVE c(n) AS"
+ " (SELECT fpg UNION ALL SELECT n+1 FROM c WHERE n<fpg+npg-1)"
+ " SELECT agghash(hash(data))"
+ " FROM c CROSS JOIN sqlite_dbpage('replica') ON pgno=n)) AS hash,"
+ " fpg,"
+ " npg"
+ " FROM sendHash ORDER BY fpg"
+ );
+ while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){
+ const unsigned char *a = sqlite3_column_blob(pStmt, 0);
+ unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt, 1);
+ unsigned int npg = (unsigned int)sqlite3_column_int64(pStmt, 2);
+ if( pgno!=iHash || npg!=nHash ){
+ writeByte(p, REPLICA_CONFIG);
+ writeUint32(p, pgno);
+ writeUint32(p, npg);
+ if( p->zDebugFile ){
+ debugMessage(p, "-> REPLICA_CONFIG %u %u\n", pgno, npg);
+ }
+ }
+ if( a==0 ){
+ if( p->zDebugFile ){
+ debugMessage(p, "# Oops: No hash for %u %u\n", pgno, npg);
+ }
+ }else{
+ writeByte(p, REPLICA_HASH);
+ writeBytes(p, 20, a);
+ if( p->zDebugFile ){
+ debugMessage(p, "-> REPLICA_HASH %u %u (%08x...)\n",
+ pgno, npg, *(unsigned int*)a);
+ }
+ }
+ p->nHashSent++;
+ iHash = pgno + npg;
+ nHash = npg;
+ }
+ sqlite3_finalize(pStmt);
+ runSql(p, "DELETE FROM sendHash");
+ writeByte(p, REPLICA_READY);
+ fflush(p->pOut);
+ if( p->zDebugFile ) debugMessage(p, "-> REPLICA_READY\n", iHash);
+}
+
+/*
+** Make entries in the sendHash table to send hashes for
+** npg (mnemonic: Number of PaGes) pages starting with fpg
+** (mnemonic: First PaGe).
+*/
+static void subdivideHashRange(
+ SQLiteRsync *p, /* The replica-side of the sync */
+ unsigned int fpg, /* First page of the range */
+ unsigned int npg /* Number of pages */
+){
+ unsigned int nChunk; /* How many pages to request per hash */
+ sqlite3_uint64 iEnd; /* One more than the last page */
+ if( npg<=30 ){
+ nChunk = 1;
+ }else if( npg<=1000 ){
+ nChunk = 30;
+ }else{
+ nChunk = 1000;
+ }
+ iEnd = fpg;
+ iEnd += npg;
+ runSql(p,
+ "WITH RECURSIVE c(n) AS"
+ " (VALUES(%u) UNION ALL SELECT n+%u FROM c WHERE n<%llu)"
+ "REPLACE INTO sendHash(fpg,npg)"
+ " SELECT n, min(%llu-n,%u) FROM c",
+ fpg, nChunk, iEnd-nChunk, iEnd, nChunk
+ );
+}
+
+/*
** Run the replica-side protocol. The protocol is passive in the sense
** that it only response to message from the origin side.
**
@@ -1370,15 +1645,35 @@ static void originSide(SQLiteRsync *p){
** each page in the origin database (sent as a single-byte power-of-2),
** and the number of pages in the origin database.
** This procedure checks compatibility, and if everything is ok,
-** it starts sending hashes of pages already present back to the origin.
+** it starts sending hashes back to the origin using REPLICA_HASH
+** and/or REPLICA_CONFIG message, followed by a single REPLICA_READY.
+** REPLICA_CONFIG is only sent if the protocol is 2 or greater.
+**
+** ORIGIN_ERROR size text
+**
+** Report an error and quit.
+**
+** ORIGIN_DETAIL pgno cnt
+**
+** The origin reports that a multi-page hash starting at pgno and
+** spanning cnt pages failed to match. The origin is requesting
+** details (more REPLICA_HASH message with a smaller cnt). The
+** replica must wait on ORIGIN_READY before sending its reply.
**
-** ORIGIN_ERROR size text
+** ORIGIN_READY
**
-** Report the received error and quit.
+** After sending one or more ORIGIN_DETAIL messages, the ORIGIN_READY
+** is sent by the origin to indicate that it has finished sending
+** requests for detail and is ready for the replicate to reply
+** with a new round of REPLICA_CONFIG and REPLICA_HASH messages.
**
-** ORIGIN_PAGE pgno content
+** ORIGIN_PAGE pgno content
**
-** Update the content of the given page.
+** Once the origin believes it knows exactly which pages need to be
+** updated in the replica, it starts sending those pages using these
+** messages. These messages will only appear immediately after
+** REPLICA_READY. The origin never mixes ORIGIN_DETAIL and
+** ORIGIN_PAGE messages in the same batch.
**
** ORIGIN_TXN pgno
**
@@ -1399,10 +1694,11 @@ static void replicaSide(SQLiteRsync *p){
p->isReplica = 1;
if( p->bCommCheck ){
infoMsg(p, "replica zOrigin=%Q zReplica=%Q isRemote=%d protocol=%d",
- p->zOrigin, p->zReplica, p->isRemote, PROTOCOL_VERSION);
+ p->zOrigin, p->zReplica, p->isRemote, p->iProtocol);
writeByte(p, REPLICA_END);
fflush(p->pOut);
}
+ if( p->iProtocol<=0 ) p->iProtocol = PROTOCOL_VERSION;
/* Respond to message from the origin. The origin will initiate the
** the conversation with an ORIGIN_BEGIN message.
@@ -1418,22 +1714,31 @@ static void replicaSide(SQLiteRsync *p){
unsigned int nOPage = 0;
unsigned int nRPage = 0, szRPage = 0;
int rc = 0;
- sqlite3_stmt *pStmt = 0;
+ u8 iProtocol;
closeDb(p);
- p->iProtocol = readByte(p);
+ iProtocol = readByte(p);
szOPage = readPow2(p);
readUint32(p, &nOPage);
+ if( p->zDebugFile ){
+ debugMessage(p, "<- ORIGIN_BEGIN %d %d %u\n", iProtocol, szOPage,
+ nOPage);
+ }
if( p->nErr ) break;
- if( p->iProtocol>PROTOCOL_VERSION ){
+ if( iProtocol>p->iProtocol ){
/* If the protocol version on the origin side is larger, send back
** a REPLICA_BEGIN message with the protocol version number of the
** replica side. This gives the origin an opportunity to resend
** a new ORIGIN_BEGIN with a reduced protocol version. */
writeByte(p, REPLICA_BEGIN);
- writeByte(p, PROTOCOL_VERSION);
+ writeByte(p, p->iProtocol);
+ fflush(p->pOut);
+ if( p->zDebugFile ){
+ debugMessage(p, "-> REPLICA_BEGIN %u\n", p->iProtocol);
+ }
break;
}
+ p->iProtocol = iProtocol;
p->nPage = nOPage;
p->szPage = szOPage;
rc = sqlite3_open(":memory:", &p->db);
@@ -1458,6 +1763,12 @@ static void replicaSide(SQLiteRsync *p){
closeDb(p);
break;
}
+ runSql(p,
+ "CREATE TABLE sendHash("
+ " fpg INTEGER PRIMARY KEY," /* The page number of hash to send */
+ " npg INT" /* Number of pages in this hash */
+ ")"
+ );
hashRegister(p->db);
if( runSqlReturnUInt(p, &nRPage, "PRAGMA replica.page_count") ){
break;
@@ -1484,26 +1795,43 @@ static void replicaSide(SQLiteRsync *p){
"replica is %d bytes", szOPage, szRPage);
break;
}
-
- pStmt = prepareStmt(p,
- "SELECT hash(data) FROM sqlite_dbpage('replica')"
- " WHERE pgno<=min(%d,%d)"
- " ORDER BY pgno", nRPage, nOPage);
- while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){
- const unsigned char *a = sqlite3_column_blob(pStmt, 0);
- writeByte(p, REPLICA_HASH);
- writeBytes(p, 20, a);
- p->nHashSent++;
+ if( p->iProtocol<2 || nRPage<=100 ){
+ runSql(p,
+ "WITH RECURSIVE c(n) AS"
+ "(VALUES(1) UNION ALL SELECT n+1 FROM c WHERE n<%d)"
+ "INSERT INTO sendHash(fpg, npg) SELECT n, 1 FROM c",
+ nRPage);
+ }else{
+ runSql(p,"INSERT INTO sendHash VALUES(1,1)");
+ subdivideHashRange(p, 2, nRPage);
}
- sqlite3_finalize(pStmt);
- writeByte(p, REPLICA_READY);
- fflush(p->pOut);
+ sendHashMessages(p, 1, 1);
runSql(p, "PRAGMA writable_schema=ON");
break;
}
+ case ORIGIN_DETAIL: {
+ unsigned int fpg, npg;
+ readUint32(p, &fpg);
+ readUint32(p, &npg);
+ if( p->zDebugFile ){
+ debugMessage(p, "<- ORIGIN_DETAIL %u %u\n", fpg, npg);
+ }
+ subdivideHashRange(p, fpg, npg);
+ break;
+ }
+ case ORIGIN_READY: {
+ if( p->zDebugFile ){
+ debugMessage(p, "<- ORIGIN_READY\n");
+ }
+ sendHashMessages(p, 0, 0);
+ break;
+ }
case ORIGIN_TXN: {
unsigned int nOPage = 0;
readUint32(p, &nOPage);
+ if( p->zDebugFile ){
+ debugMessage(p, "<- ORIGIN_TXN %u\n", nOPage);
+ }
if( pIns==0 ){
/* Nothing has changed */
runSql(p, "COMMIT");
@@ -1531,6 +1859,9 @@ static void replicaSide(SQLiteRsync *p){
unsigned int pgno = 0;
int rc;
readUint32(p, &pgno);
+ if( p->zDebugFile ){
+ debugMessage(p, "<- ORIGIN_PAGE %u\n", pgno);
+ }
if( p->nErr ) break;
if( pIns==0 ){
pIns = prepareStmt(p,
@@ -1678,9 +2009,11 @@ int main(int argc, char const * const *argv){
sqlite3_int64 tmEnd;
sqlite3_int64 tmElapse;
const char *zRemoteErrFile = 0;
+ const char *zRemoteDebugFile = 0;
#define cli_opt_val cmdline_option_value(argc, argv, ++i)
memset(&ctx, 0, sizeof(ctx));
+ ctx.iProtocol = PROTOCOL_VERSION;
for(i=1; i<argc; i++){
const char *z = argv[i];
if( z[0]=='-' && z[1]=='-' && z[2]!=0 ) z++;
@@ -1704,6 +2037,20 @@ int main(int argc, char const * const *argv){
zExe = cli_opt_val;
continue;
}
+ if( strcmp(z, "-wal-only")==0 ){
+ ctx.bWalOnly = 1;
+ continue;
+ }
+ if( strcmp(z, "-version")==0 ){
+ printf("%s\n", sqlite3_sourceid());
+ return 0;
+ }
+ if( strcmp(z, "-help")==0 || strcmp(z, "--help")==0
+ || strcmp(z, "-?")==0
+ ){
+ printf("%s", zUsage);
+ return 0;
+ }
if( strcmp(z, "-logfile")==0 ){
/* DEBUG OPTION: --logfile FILENAME
** Cause all local output traffic to be duplicated in FILENAME */
@@ -1729,39 +2076,49 @@ int main(int argc, char const * const *argv){
zRemoteErrFile = cli_opt_val;
continue;
}
- if( strcmp(z, "-wal-only")==0 ){
- ctx.bWalOnly = 1;
+ if( strcmp(z, "-debugfile")==0 ){
+ /* DEBUG OPTION: --debugfile FILENAME
+ ** Debugging messages on the local side are written into FILENAME */
+ ctx.zDebugFile = cli_opt_val;
continue;
}
- if( strcmp(z, "-help")==0 || strcmp(z, "--help")==0
- || strcmp(z, "-?")==0
- ){
- printf("%s", zUsage);
- return 0;
- }
- if( strcmp(z, "-version")==0 ){
- printf("%s\n", sqlite3_sourceid());
- return 0;
+ if( strcmp(z, "-remote-debugfile")==0 ){
+ /* DEBUG OPTION: --remote-debugfile FILENAME
+ ** Error messages on the remote side are written into FILENAME on
+ ** the remote side. */
+ zRemoteDebugFile = cli_opt_val;
+ continue;
}
- if( z[0]=='-' ){
- if( strcmp(z,"-commcheck")==0 ){ /* DEBUG ONLY */
- /* Run a communication check with the remote side. Do not attempt
- ** to exchange any database connection */
- ctx.bCommCheck = 1;
- continue;
+ if( strcmp(z, "-protocol")==0 ){
+ /* DEBUG OPTION: --protocool N
+ ** Set the protocol version to N */
+ ctx.iProtocol = atoi(cli_opt_val);
+ if( ctx.iProtocol<1 ){
+ ctx.iProtocol = 1;
+ }else if( ctx.iProtocol>PROTOCOL_VERSION ){
+ ctx.iProtocol = PROTOCOL_VERSION;
}
- if( strcmp(z,"-arg-escape-check")==0 ){ /* DEBUG ONLY */
- /* Test the append_escaped_arg() routine by using it to render a
- ** copy of the input command-line, assuming all arguments except
- ** this one are filenames. */
- sqlite3_str *pStr = sqlite3_str_new(0);
- int k;
- for(k=0; k<argc; k++){
- append_escaped_arg(pStr, argv[k], i!=k);
- }
- printf("%s\n", sqlite3_str_value(pStr));
- return 0;
+ continue;
+ }
+ if( strcmp(z,"-commcheck")==0 ){ /* DEBUG ONLY */
+ /* Run a communication check with the remote side. Do not attempt
+ ** to exchange any database connection */
+ ctx.bCommCheck = 1;
+ continue;
+ }
+ if( strcmp(z,"-arg-escape-check")==0 ){ /* DEBUG ONLY */
+ /* Test the append_escaped_arg() routine by using it to render a
+ ** copy of the input command-line, assuming all arguments except
+ ** this one are filenames. */
+ sqlite3_str *pStr = sqlite3_str_new(0);
+ int k;
+ for(k=0; k<argc; k++){
+ append_escaped_arg(pStr, argv[k], i!=k);
}
+ printf("%s\n", sqlite3_str_value(pStr));
+ return 0;
+ }
+ if( z[i]=='-' ){
fprintf(stderr,
"unknown option: \"%s\". Use --help for more detail.\n", z);
return 1;
@@ -1838,6 +2195,10 @@ int main(int argc, char const * const *argv){
append_escaped_arg(pStr, "--errorfile", 0);
append_escaped_arg(pStr, zRemoteErrFile, 1);
}
+ if( zRemoteDebugFile ){
+ append_escaped_arg(pStr, "--debugfile", 0);
+ append_escaped_arg(pStr, zRemoteDebugFile, 1);
+ }
if( ctx.bWalOnly ){
append_escaped_arg(pStr, "--wal-only", 0);
}
@@ -1867,6 +2228,10 @@ int main(int argc, char const * const *argv){
append_escaped_arg(pStr, "--errorfile", 0);
append_escaped_arg(pStr, zRemoteErrFile, 1);
}
+ if( zRemoteDebugFile ){
+ append_escaped_arg(pStr, "--debugfile", 0);
+ append_escaped_arg(pStr, zRemoteDebugFile, 1);
+ }
append_escaped_arg(pStr, file_tail(ctx.zOrigin), 1);
append_escaped_arg(pStr, zDiv, 1);
zCmd = sqlite3_str_finish(pStr);
@@ -1888,6 +2253,10 @@ int main(int argc, char const * const *argv){
append_escaped_arg(pStr, "--errorfile", 0);
append_escaped_arg(pStr, zRemoteErrFile, 1);
}
+ if( zRemoteDebugFile ){
+ append_escaped_arg(pStr, "--debugfile", 0);
+ append_escaped_arg(pStr, zRemoteDebugFile, 1);
+ }
append_escaped_arg(pStr, ctx.zOrigin, 1);
append_escaped_arg(pStr, ctx.zReplica, 1);
zCmd = sqlite3_str_finish(pStr);